Postgresql源码(122)Listen / Notify与事务的联动机制

前言

Notify和Listen是Postgresql提供的不同会话间异步消息通信功能,例子:

LISTEN virtual;
NOTIFY virtual;
Asynchronous notification "virtual" received from server process with PID 8448.
NOTIFY virtual, 'This is the payload';
Asynchronous notification "virtual" with payload "This is the payload" received from server process with PID 8448.LISTEN foo;
SELECT pg_notify('fo' || 'o', 'pay' || 'load');
Asynchronous notification "foo" with payload "payload" received from server process with PID 14728.

功能使用PG的基础设施shm_mq + 信号机制拼装实现。

监听、通知的行为也兼容了数据库的事务的功能,事务回滚会删除监听、事务提交会触发通知。

本文对异步消息队列与事务的联动机制做一些分析。

事务提交触发

NOTIFY的功能必须等到事务提交才会触发:

postgres=# listen a1;
LISTEN
postgres=# begin;
BEGIN
postgres=*# notify a1;
NOTIFY
postgres=*# notify a1;
NOTIFY
postgres=*# commit;
COMMIT
Asynchronous notification "a1" received from server process with PID 17111.

流程比较简单,先从pendingActions中注册监听。再发信号触发异步notify。

void
AtCommit_Notify(void)
{...if (pendingActions != NULL){foreach(p, pendingActions->actions){ListenAction *actrec = (ListenAction *) lfirst(p);switch (actrec->action){case LISTEN_LISTEN:Exec_ListenCommit(actrec->channel);break;case LISTEN_UNLISTEN:Exec_UnlistenCommit(actrec->channel);break;case LISTEN_UNLISTEN_ALL:Exec_UnlistenAllCommit();break;}}}...if (pendingNotifies != NULL)SignalBackends();...
}

事务回滚清理

回滚后监听和通知都会清理:

postgres=# begin;
BEGIN
postgres=*# listen k123;
LISTEN
postgres=*# notify k123;
NOTIFY
postgres=*# abort;
ROLLBACK
postgres=# notify k123;
NOTIFY
postgres=#

事务回滚时执行清理动作:

void
AtAbort_Notify(void)
{if (amRegisteredListener && listenChannels == NIL)asyncQueueUnregister();pendingActions = NULL;pendingNotifies = NULL;
}

全部清理干净。

子事务提交不触发,交接给上一层事务

提交的子事务将notify交接给上一层事务。

postgres=# listen k000;
LISTEN
postgres=# begin;
BEGIN
postgres=*# savepoint sp1;
SAVEPOINT
postgres=*# savepoint sp2;
SAVEPOINT
postgres=*# notify k000;
NOTIFY
postgres=*# release sp2;
RELEASE
postgres=*# commit;
COMMIT
Asynchronous notification "k000" received from server process with PID 18902.

实现:

void
AtSubCommit_Notify(void)
{int			my_level = GetCurrentTransactionNestLevel();if (pendingActions != NULL &&pendingActions->nestingLevel >= my_level){if (pendingActions->upper == NULL ||pendingActions->upper->nestingLevel < my_level - 1){--pendingActions->nestingLevel;}else{ActionList *childPendingActions = pendingActions;pendingActions = pendingActions->upper;pendingActions->actions =list_concat(pendingActions->actions,childPendingActions->actions);pfree(childPendingActions);}}if (pendingNotifies != NULL &&pendingNotifies->nestingLevel >= my_level){Assert(pendingNotifies->nestingLevel == my_level);if (pendingNotifies->upper == NULL ||pendingNotifies->upper->nestingLevel < my_level - 1){--pendingNotifies->nestingLevel;}else{NotificationList *childPendingNotifies = pendingNotifies;ListCell   *l;pendingNotifies = pendingNotifies->upper;foreach(l, childPendingNotifies->events){Notification *childn = (Notification *) lfirst(l);if (!AsyncExistsPendingNotify(childn))AddEventToPendingNotifies(childn);}pfree(childPendingNotifies);}}
}
  • pendingActions:用于保存channel信息(LISTEN命令使用,Async_Listen中配置)

  • pendingNotifies:用于保存channel和payload信息(NOTIFY命令使用,Async_Notify中配置)

子事务提交时,notify并不会真正触发,也是和其他资源一样,将自己绑定的nestingLevel转移到上一层(注意这里是绑的nestingLevel不是xid比较合理)。

整体上会有两种情况:

情况一:子事务有间隔,走这个分支pendingActions->upper->nestingLevel < my_level - 1

begin;
savepoint sp1;
notify ch123;
savepoint sp2;
savepoint sp3;
notify ch789;
release sp3;

情况二:子事务无间隔,走else分支

begin;
savepoint sp1;
notify ch123;
savepoint sp2;
notify ch456;
savepoint sp3;
notify ch789;
release sp3;

pendingActions和pendingNotifies都有自己的upper指针形成链式结构,两种数据结构在子事务提交时的行为都是将信息转移到上一层中,区别是pendingActions直接挂到上一层的actions链表;pendingNotifies调用AddEventToPendingNotifies接口完成同样的动作。

子事务回滚不触发,清理属于子事务的pendings

回滚的子事务会删除监听。

postgres=# begin;
BEGIN
postgres=*# savepoint sp1;
SAVEPOINT
postgres=*# listen k123;
LISTEN
postgres=*# savepoint sp2;
SAVEPOINT
postgres=*# listen k000;
LISTEN
postgres=*# rollback to sp2;
ROLLBACK
postgres=*# notify k123;
NOTIFY
postgres=*# notify k000;
NOTIFY
postgres=*# commit;
COMMIT
Asynchronous notification "k123" received from server process with PID 18098.
postgres=#
void
AtSubAbort_Notify(void)
{int			my_level = GetCurrentTransactionNestLevel();...while (pendingActions != NULL &&pendingActions->nestingLevel >= my_level){ActionList *childPendingActions = pendingActions;pendingActions = pendingActions->upper;pfree(childPendingActions);}while (pendingNotifies != NULL &&pendingNotifies->nestingLevel >= my_level){NotificationList *childPendingNotifies = pendingNotifies;pendingNotifies = pendingNotifies->upper;pfree(childPendingNotifies);}
}

子事务回滚的话,全部是直接删除,不在做向上归属。

Listen/Notify的实现原理

(This content is a summary derived from code comments.)

  1. 同一台机器上有多个后端进程。多个后端进程监听多个通道。(在代码的其他部分,通道也被称为“conditions”。)
  2. 在基于磁盘的存储中有一个中央队列(目录 pg_notify/),通过 slru.c 模块将活跃使用的页面映射到共享内存中。所有的通知消息都被放置在队列中,稍后由监听的后端进程读取。没有集中的信息知道哪个后端进程监听哪个通道;每个后端进程都有自己感兴趣的通道列表。虽然只有一个队列,但通知被视为数据库本地的;这是通过在每个通知消息中包含发送者的数据库 OID 来实现的。监听的后端进程会忽略不匹配其数据库 OID 的消息。这一点很重要,因为它确保了发送者和接收者有相同的数据库编码,不会错误解释通道名称或有效载荷字符串中的非 ASCII 文本。由于通知不期望在数据库崩溃后存活,我们可以在任何重启时简单地清除 pg_notify 数据,并且不需要 WAL 支持或 fsync。
  3. 每个至少监听一个频道的后端进程都会通过将其进程ID注册到AsyncQueueControl的数组中来进行注册。然后,它会扫描中央队列中的所有传入通知,首先将通知的数据库OID与自身的数据库OID进行比较,然后将通知的频道与其监听的频道列表进行比较。如果匹配成功,它会将通知事件传递给前端。不匹配的事件将被简单地跳过。
  4. NOTIFY语句(Async_Notify例程)将通知存储在后端本地列表中,直到事务结束才会处理。来自同一事务的重复通知只发送一次通知。这样做是为了节省工作量,例如,当触发器在一个200万行的表上触发时,会为每一行的更改发送一个通知。如果应用程序需要接收每个已发送的单个通知,可以在额外的有效负载参数中轻松添加一些唯一的字符串。当事务准备提交时,PreCommit_Notify()将待处理的通知添加到队列的头部。队列的头指针始终指向下一个空闲位置,而位置只是一个页号和该页中的偏移量。这是在将事务标记为已提交之前完成的。如果在写入通知时遇到问题,我们仍然可以调用elog(ERROR, …),事务将回滚。一旦我们将所有通知放入队列中,我们将返回到CommitTransaction(),然后执行实际的事务提交。在提交后,我们会再次被调用(AtCommit_Notify())。在这里,我们对有效的监听状态(listenChannels)进行任何实际的更新。然后,我们向可能对我们的消息感兴趣的后端进程发送信号(包括我们自己的后端进程,如果正在监听)。这是通过SignalBackends()完成的,它会扫描监听后端进程的列表,并向每个监听后端进程发送一个PROCSIG_NOTIFY_INTERRUPT信号(我们不知道哪个后端进程在监听哪个频道,因此必须向它们全部发送信号)。但是,我们可以排除那些已经是最新状态的后端进程,并且还可以排除其他数据库中的后端进程(除非它们远远落后,应该被踢出以使其前进指针)。最后,在完全退出事务并即将进入空闲状态时,我们会扫描队列中需要发送到前端的消息(可能是来自其他后端的通知,或者是自己发送的通知)。这一步骤不是CommitTransaction序列的一部分,有两个重要原因。首先,我们在向前端发送数据时可能会出现错误,而在事务提交后进行清理时出现错误是非常糟糕的。其次,在某些情况下,一个过程在单个前端命令中发出多个提交,我们不希望在命令完成之前向前端发送通知;但是对于其他后端来说,每次提交后的通知应该立即发送出去。
  5. 收到PROCSIG_NOTIFY_INTERRUPT信号后,信号处理程序会设置进程的latch,如果该后端处于空闲状态(即等待前端命令并且不在事务块内),则会立即触发事件处理(参见ProcessClientReadInterrupt())。否则,处理程序可能只设置一个标志,在下次进入空闲状态之前进行处理。入站通知处理包括读取自上次扫描以来到达的所有通知。我们读取每个通知,直到达到未提交事务的通知或者头指针的位置。
  6. 为了限制磁盘空间的消耗,需要推进尾指针,以便可以截断旧的页面。这是相对昂贵的操作(特别是,它需要一个独占锁),因此我们不希望经常执行。如果发送后端将队列头推进到新页面,则会执行此操作,但每QUEUE_CLEANUP_DELAY页只执行一次。

一个在相同频道上监听的应用程序将会收到自己发送的NOTIFY消息。如果这些消息对应用程序没有用处,可以通过将NOTIFY消息中的be_pid与应用程序自身后端的PID进行比较来忽略它们。(从FE/BE协议2.0开始,在启动期间,后端的PID会提供给前端。)上述设计确保通过忽略自我通知,不会错过来自其他后端的通知。用于通知管理的共享内存使用量(NUM_NOTIFY_BUFFERS)可以根据需要进行调整,而不会影响除性能之外的任何内容。可以同时排队的通知数据的最大量由max_notify_queue_pages GUC确定。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/689123.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Unity笔记:数据持久化的几种方式

正文 主要方法&#xff1a; ScriptableObjectPlayerPrefsJSONXML数据库&#xff08;如Sqlite&#xff09; 1. PlayerPerfs PlayerPrefs 存储的数据是全局共享的&#xff0c;它们存储在用户设备的本地存储中&#xff0c;并且可以被应用程序的所有部分访问。这意味着&#xf…

深入浅出熟悉OpenAI最新大作Sora文生视频大模型

蠢蠢欲动&#xff0c;惴惴不安&#xff0c;朋友们我又来了&#xff0c;这个春节真的过的是像过山车&#xff0c;Gemini1.5 PRO还没过劲&#xff0c;OpenAI又放大招&#xff0c;人类真的要认输了吗&#xff0c;让我忍不住想要再探究竟&#xff0c;到底是什么让文生视频发生了质的…

头歌C++语言之选择排序练习题

目录 第1关:第二统计数字 任务描述 相关知识 数组声明: 初始化数组: 访问数组元素 选择排序 编程要求 第2关:运动会排名 任务描述 相关知识 多维数组 访问二维数组 编程要求 第3关:单词排序 任务描述 相关知识 strcmp()函数 编程要求

流星蝴蝶剑之七夜听雪中文版下载

软件介绍&#xff1a; 中文名称: 流星蝴蝶剑七夜听雪 英文名称: Meteor 游戏类型: 3D武侠格斗 发行时间: 2002年08月 制作发行: 流星江湖悠悠客栈 语言 :中文 配置要求: 操作系统&#xff1a;Windows 95 / 98 / Me / 2000 / XP 最低配置 CPU&#xff1a;Pentium II 450MHz 以上…

记录 | git win C://User/Administrator/.ssh下没有id_rsa.pub找不到

在用 ssh-keygen -t rsa -C "xxx163.com”生成后&#xff0c;在 C://User/Administrator/.ssh 下找不到 id_rsa.pub 文件 在这个下面找找&#xff1a; C:\Users\Administrator\AppData\Roaming\SPB_Data\.ssh 或者直接看 ssh-keygen 生成的终端日志&#xff0c;上面有说…

单向/双向V2G环境下分布式电源与电动汽车充电站联合配置方法(matlab代码)

目录 1 主要内容 目标函数 电动汽车负荷建模 算例系统图 程序亮点 2 部分代码 3 程序结果 4 下载链接 1 主要内容 该程序复现博士文章《互动环境下分布式电源与电动汽车充电站的优化配置方法研究》第五章《单向/双向V2G环境下分布式电源与电动汽车充电站联合配置方法》…

0206-1-网络层

第 4 章 网络层 网络层提供的两种服务 虚电路服务 数据报服务 概要: 虚电路服务与数据报服务的对比 网际协议 IP 网际协议 IP 是 TCP/IP 体系中两个最主要的协议之一。与 IP 协议配套使用的还有四个协议&#xff1a; 地址解析协议 ARP (Address Resolution Protocol)逆地…

【鸿蒙系统学习笔记】TypeScript开发语言

一、背景 HarmonyOS 应用的主要开发语言是 ArkTS&#xff0c;它由 TypeScript&#xff08;简称TS&#xff09;扩展而来&#xff0c;在继承TypeScript语法的基础上进行了一系列优化&#xff0c;使开发者能够以更简洁、更自然的方式开发应用。值得注意的是&#xff0c;TypeScrip…

34、商城系统(十五):认证服务,短信验证码,密码加盐,OAuth2.0社交登录,SpringSession认证功能,单点登录

目录 一、新建认证服务 1.后端项目启动 2.前端页面复制 3.配置域名 4.配置gateway

在 CentOS 上安装 JDK 1.8

要在 CentOS 上安装 JDK 1.8&#xff0c;您可以按照以下步骤进行操作&#xff1a; 打开终端&#xff08;命令行界面&#xff09;。 检查您的系统是否已经配置了正确的软件源。可以执行以下命令来更新软件包索引&#xff1a; sudo yum update使用以下命令安装 OpenJDK 1.8&…

Spring整合Mybatis之DAO层、Service层开发

3. Spring整合Mybatis编程DAO层开发 1. 项目引入相关依赖spring mybatis mysql mybatis-spring druid2. 编写spring.xml整合&#xff1a;spring 接管 mybatis 中 SqlSessionFactory对象的创建<!--创建DataSource--><bean class"com.alibaba.druid.pool.…

8、内网安全-横向移动RDPKerberos攻击SPN扫描WinRMWinRS

用途&#xff1a;个人学习笔记&#xff0c;有所借鉴&#xff0c;欢迎指正 目录 一、域横向移动-RDP-明文&NTLM 1.探针服务&#xff1a; 2.探针连接&#xff1a; 3.连接执行&#xff1a; 二、域横向移动-WinRM&WinRS-明文&NTLM 1.探针可用&#xff1a; 2.连接…

【PyQt】14-绘图-QPainter

文章目录 前言一、QPainter二、绘制文本-drawTextQt里面的文本对齐方式 运行结果 三、像素点总结 前言 1、学会画图方法 一、QPainter 通常可以绘制文本、各种图形&#xff08;点、线、椭圆、弧、扇形、多边形等等&#xff09;、图像。 必须在painrEvent事件方法中绘制各种元…

Eclipse 分栏显示同一文件

Eclipse 分栏显示同一文件 1. Window -> EditorReferences 1. Window -> Editor Toggle Split Editor (Horizontal) &#xff1a;取消或设置水平分栏显示 Toggle Split Editor (Vertical) &#xff1a;取消或设置垂直分栏显示 References [1] Yongqiang Cheng, https:/…

Django后端开发——mysql数据库连接遇到的问题及解决

文章目录 参考资料问题描述情况描述解决方案step1&#xff1a;管理员权限进入mysql&#xff0c;重置root密码step2&#xff1a;重启mysql服务器 参考资料 stackflow帖子&#xff1a;https://stackoverflow.com/questions/39281594/error-1698-28000-access-denied-for-user-ro…

Unity ScreenPointToRay 获取到的坐标不准确

&#x1f47e;奇奇怪怪的 &#x1f959;问题描述&#x1f96a;解决方案&#x1f37f;验证代码 &#x1f959;问题描述 使用&#xff1a;Camera.main.ScreenPointToRay 将鼠标坐标转换成射线&#xff0c;然后通过&#xff1a;Physics.Raycast 获取到射线碰撞到的坐标&#xff0…

kubeasz部署k8s:v1.27.5集群

安装k8s集群相关系统及组件的详细版本号 Ubuntu 22.04.3 LTS k8s: v1.27.5 containerd: 1.6.23 etcd: v3.5.9 coredns: 1.11.1 calico: v3.24.6 安装步骤清单&#xff1a; 1.deploy机器做好对所有k8s node节点的免密登陆操作 2.deploy机器安装好python2版本以及pip&#xff0c;…

windows安装Mysql解压版

windows安装Mysql解压版 一、下载mysql-8.0.36-winx64.zip二、解压三、配置3.1. 添加环境变量&#xff1a;新建MYSQL_HOME3.2.如何验证是否添加成功&#xff1a;必须以管理员身份启动3.3. 初始化MySQL&#xff1a;必须以管理员身份启动3.4. 注册MySQL服务&#xff1a;必须以管理…

OpenAI视频生成模型Sora背后的技术及其深远的影响

前言 Sora的视频生成技术在保真度、长度、稳定性、一致性、分辨率和文字理解等方面都达到了当前最优水平。其核心技术包括使用视觉块编码将不同格式的视频统一编码成Transformer可训练的嵌入向量&#xff0c;以及类似于扩散过程的UNet方法进行降维和升维的加噪与去噪操作。通过…