Postgresql源码(117)libpq的两套实现(socket/shm_mq)

libpq的通信方式

libpq提供了两套通信方式

  • socket
  • shm_mq

分别实现在下面两个文件中

  • pqcomm.c
  • pqmq.c

什么时候用socket通信?

除了下述并行场景,其他场景全部使用socket通信。

static const PQcommMethods PqCommSocketMethods = {.comm_reset = socket_comm_reset,.flush = socket_flush,.flush_if_writable = socket_flush_if_writable,.is_send_pending = socket_is_send_pending,.putmessage = socket_putmessage,.putmessage_noblock = socket_putmessage_noblock
};

什么时候使用mq通信?

并行框架中会将子进程的libpq的通信改成mq通信,用于子进程给父进程发送错误信息。

static const PQcommMethods PqCommMqMethods = {.comm_reset = mq_comm_reset,.flush = mq_flush,.flush_if_writable = mq_flush_if_writable,.is_send_pending = mq_is_send_pending,.putmessage = mq_putmessage,.putmessage_noblock = mq_putmessage_noblock
};

使用MQ通信需要用pq_redirect_to_shm_mq函数指定使用的dsm和mq。

注意这个pq_mq_handle是申请在dsm上的,专门用于并行框架。

void
pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)
{PqCommMethods = &PqCommMqMethods;pq_mq_handle = mqh;whereToSendOutput = DestRemote;FrontendProtocol = PG_PROTOCOL_LATEST;on_dsm_detach(seg, pq_cleanup_redirect_to_shm_mq, (Datum) 0);
}

使用位置在并行框架子进程入口

void ParallelWorkerMain(...)
{......// 拿到父进程在共享内存中申请mq的内存其实地址error_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_ERROR_QUEUE, false);// 用自己的woker num偏移得到自己的mqmq = (shm_mq *) (error_queue_space +ParallelWorkerNumber * PARALLEL_ERROR_QUEUE_SIZE);// 配置PGPROC到mq上shm_mq_set_sender(mq, MyProc);// mq是单纯的mq抽象,用的时候一般使用mq handle,在这里包装一层成为mqhmqh = shm_mq_attach(mq, seg, NULL);// 配置libpq的消息队列为mqhpq_redirect_to_shm_mq(seg, mqh);// 记录父进程的pid为leader pidpq_set_parallel_leader(fps->parallel_leader_pid,fps->parallel_leader_backend_id);
}

配置好后,子进程已经记录了父进程的pid,在子进程中需要发送消息时:

int
mq_putmessage(...)
{...for (;;){// 先把书库放入mq中,flush到共享内存result = shm_mq_sendv(pq_mq_handle, iov, 2, true, true);if (pq_mq_parallel_leader_pid != 0){... // 这里只有子进程能走进来,通知父进程读取SendProcSignal(pq_mq_parallel_leader_pid,PROCSIG_PARALLEL_MESSAGE,pq_mq_parallel_leader_backend_id);...}}if (result != SHM_MQ_WOULD_BLOCK)break;(void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,WAIT_EVENT_MESSAGE_QUEUE_PUT_MESSAGE);ResetLatch(MyLatch);CHECK_FOR_INTERRUPTS();}...
}

子进程发完了,信息会留存在mq中。然后给父进程发信号。

父进程收到kill过来的信号,进入信号处理函数(函数已经绑定sigusr1了),标记ParallelMessagePending

void
procsignal_sigusr1_handler(SIGNAL_ARGS)
{...if (CheckProcSignal(PROCSIG_PARALLEL_MESSAGE))HandleParallelMessageInterrupt();...
}void
HandleParallelMessageInterrupt(void)
{InterruptPending = true;ParallelMessagePending = true;SetLatch(MyLatch);
}

等下次调用CHECK_FOR_INTERRUPTS宏,执行ProcessInterrupts时处理具体的消息。

在函数中会shm_mq_receive接受子进程发到mq中的消息。

void
HandleParallelMessages(void)
{...HOLD_INTERRUPTS();......res = shm_mq_receive(pcxt->worker[i].error_mqh, &nbytes,&data, true);if (res == SHM_MQ_WOULD_BLOCK)break;else if (res == SHM_MQ_SUCCESS){StringInfoData msg;initStringInfo(&msg);appendBinaryStringInfo(&msg, data, nbytes);HandleParallelMessage(pcxt, i, &msg);pfree(msg.data);}elseereport(ERROR,(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),errmsg("lost connection to parallel worker")));}}}...RESUME_INTERRUPTS();
}

elog如何发送错误日志?

无论是并发时的子进程,还是普通进程,调用elog发送日志都会经过两步:

errstart(int elevel, const char *domain)    做一些初始化和配置errfinish(const char *filename, int lineno, const char *funcname)调用EmitErrorReport发送错误

EmitErrorReport负责将日志发送到client和server log

EmitErrorReport/* Send to server log, if enabled */if (edata->output_to_server)send_message_to_server_log(edata);/* Send to client, if enabled */if (edata->output_to_client)send_message_to_frontend(edata);

这里发送到client的日志send_message_to_frontend中,会走libpq的逻辑:

  1. 普通场景libpq使用PqCommSocketMethods的实现,将日志发送给客户端。
  2. 并发场景子进程中libpq使用PqCommMqMethods的实现,将日志发送给父进程。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/230621.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Spring boot 3.2 新特性介绍

1.增加了对Apache Pulsar的支持 使用方式参考 官网Messaging 2.增加了对虚拟线程的支持 需要jdk 21 版本 在spring-boot 项目中 通过设置 spring.threads.virtual.enabledtrue 即可开启虚拟线程。虚拟线程开启后作用域如下 1.Servlet Web Servers 当启用虚拟线程时&…

174.【2023年华为OD机试真题(C卷)】开源项目热榜(一般排序算法实现JavaPythonC++JS)

🚀你的旅程将在这里启航!本专栏所有题目均包含优质解题思路,高质量解题代码,详细代码讲解,助你深入学习,深度掌握! 文章目录 【华为OD机试AB必刷题目】题目描述解题思路Python题解代码JAVA题解代码C/C++题解代码JS题解代码代码OJ评判结果代码讲解Python题解代码讲解JAV…

【Animatediff】制作 玫瑰,鲜花, 香水, 动态LOGO (结尾》图片停留)

提示:也可以后期加入文字。 电商\lofi_v4.safetensors [9462506675] 》制作初始图片 1:输入提示词 流动的烟雾,飘落的花瓣,优雅的香水瓶周围环绕着柔软的钻石,烟,红色浪漫的玫瑰:1.5,柔和的背光营造梦幻的效…

解决el-table组件中,分页后数据的勾选、回显问题?

问题描述: 1、记录一个弹窗点击确定按钮后,table列表所有勾选的数据信息2、再次打开弹窗,回显勾选所有保存的数据信息3、遇到的bug:切换分页,其他页面勾选的数据丢失;点击确认只保存当前页的数据&#xff1…

VUE中监听企业开发实践

背景:我干哦!最近需求是让中英文翻译vue页面,我这个后端哪里会哦,这不遇见了一个棘手的问题,我描述下:上面是一个list 根据查询到的值进行判断显示,如果是z就显示主信息,其他的ABC正…

动能方案 | 技术引领未来:两轮电动车遥控解锁方案探秘

随着电动交通工具的快速普及,创新性的智能解锁系统正在为两轮电动车带来更便捷、安全的使用体验。本文将深入介绍一种先进的两轮电动车遥控解锁方案,探讨其优势,并推荐一款先进的芯片技术,引领行业未来。 01方案介绍 1、技术原…

短剧成为今年最火赛道,短剧分销系统怎么开发?

近两年来是短剧的爆发期,迎来了飞速发展阶段,也成为了2023年最赚钱的赛道。再这样的发展下,短剧行业吸引了无数人进入市场。 目前,短剧变现的方式主要有两个,一种是拍短剧,就是成为导演或者演员&#xff1…

分享5款简单而高效的小工具

​ 在这个繁忙的时代,简单而高效的工具成为生活和工作中的宝贵助手。以下是五款小巧而实用的小工具,或许正是你所需的生活小搭档。 1.远程终端——MobaXTerm ​ MobaXTerm是一款集成了多种网络工具的远程终端软件,可以通过SSH、Telnet、RDP…

解读远程工作设计师之未来与发展

引言 在数字化的浪潮下,“远程工作”已经成为现代职场的一个重要趋势。对于设计师来说,这不仅是一种工作方式的转变,更是职业发展的新机遇。在这篇文章中,我将从以下9个方面,深入探讨远程工作设计师的机会、市场和职位…

WordPress主题 响应式个人博客主题Kratos源码

Kratos 是一款专注于用户阅读体验的响应式 WordPress 主题,整体布局简洁大方,针对资源加载进行了优化。 Kratos主题基于Bootstrap和Font Awesome的WordPress一个干净,简单且响应迅速的博客主题,Vtrois创建和维护, 主…

RHEL8_Linux_Ansible常用模块的使用

本章主要介绍Ansible中最常见模块的使用 shell模块文件管理模块软件包管理模块服务管理模块磁盘管理模块用户管理模块防火墙管理模块 ansible的基本用法如下。 ansible 机器名 -m 模块x -a "模块的参数" 对被管理机器执行不同的操作,只需要调用不同的模块…

Python三级 每周练习题31

如果你感觉有收获,欢迎给我微信扫打赏码 ———— 以激励我输出更多优质内容 练习一: 作业1:编写程序,在下面的字典中找出身高137的同学并输出姓名,如果没找到, 输出没有 a{‘小赵’:136,‘小钱’:141,‘小孙’:146,‘小李’:13…

Gateway网关-路由的过滤器配置

目录 一、路由过滤器 GatewayFilter 1.1 过滤器工厂GatewayFilterFactory 1.2 案例给所有进入userservice的请求添加一个请求头 Truthitcastis freaking awesome! 1.3 案例给所有请求添加一个请求头 Truthitcastis freaking awesome! 一、路由过滤器 …

一级浪涌保护器的行业应用解决方案

一级浪涌保护器是防雷系统中最重要的一环,它主要用于建筑物总配电柜、低压变压器进线柜等位置,防止浪涌电压直接从外部传导进入内部,使系统设备免遭雷击损坏。一级浪涌保护器的规范要求、应用、作用和原理以及国标,本文将分别进行…

C++——STL标准模板库——容器详解——string

一、基本概念 string本质是一个类,封装了c风格字符串(以\0结尾的字符数组),具备自动管理内存功能,提供了多种构造函数和多种删查增改的成员方法。string的本质特点归结以下几点: 1、动态数组:…

ubuntu上strace下载编译

下载 Releases strace/strace GitHub 编译 ./configure \--enable-mpersnomake sudo make install

OpenShift与Rancher

Rancher的部署 一、系统初始化 1&#xff09;设置IP地址和主机名称 hostnamectl set-hostname rancher 2&#xff09;添加地址解析和开启路由转发 cat >>/etc/hosts<<EOF 192.168.180.210 rancher 192.168.180.200 node1 192.168.180.190 node2 EOF vim/et…

完整的vite + ts + vue3项目,克隆就能用,傻瓜式保姆教程(第二篇)

目录 前言 一、基础知识准备 1.1 接口请求 &#xff08;本篇重点内容&#xff09; 1.1.1 Fetch API 1.1.2 XMLHttpRequest 1.1.3 axios&#xff08;推荐&#xff09; 1.1.4 EventSource 1.1.5 WebSocket 1.2 ts 类型定义 &#xff08;本篇内容&#xff09; 1.3 svg 雪…

java 图片裁剪与合并

前言 在使用阿里云人数检测时&#xff0c;为降低成本&#xff0c;我们需要将两个图片合并成一张图片&#xff0c;提交给阿里云图像识别&#xff0c;但我发现识别时由于一些感染因素&#xff0c;会有一定的错误率&#xff0c;所以就需要将图片进行裁剪后再拼接。 具体操作逻辑…

BearPi Std 板从入门到放弃 - 先天神魂篇(3)(RT-Thread I2C设备 读取光照强度BH1750)

简介 使用BearPi IOT Std开发板及其扩展板E53_SC1&#xff0c; SC1上有I2C1 的光照强度传感器BH1750 和 EEPROM AT24C02&#xff0c; 本次主要就是读取光照强度; 主板: 主芯片: STM32L431RCT6LED : PC13 \ 推挽输出\ 高电平点亮串口: Usart1I2C使用 : I2C1E53_SC1扩展板 : LE…