Postgresql源码(110)分析dsm动态共享内存分配与共享内存mq实例(dsm/toc接口备忘录)

相关
《Postgresql源码(90)共享内存申请CreateSharedMemoryAndSemaphores》
《Linux内存映射函数mmap与匿名内存块》
《Linux共享内存与子进程继承》

dsm/toc使用备忘

用dsm框架的流程

  1. 评估共享内存大小:多次用shm_toc_estimate_chunk、shm_toc_estimate_keys向estimate中增加数据结构,最后用shm_toc_estimate得出刚才增加的总大小。
  2. 申请共享内存:dsm_create
  3. 共享内存头部放kv管理结构toc:shm_toc_create,现在共享内存中可以使用toc接口,以kv的形式存放数据结构了。
  4. 例如需要存放FixedParallelState结构
    1. 第一步:在刚才申请的共享内存段中,申请一个起始地址:shm_toc_allocate
    2. 第二步:拿到地址后,转为FixedParallelState开始赋值。
    3. 第三步:最后调用toc接口shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps),将刚才的数据结构地址关联一个PARALLEL_KEY_FIXED,使用的时候用PARALLEL_KEY_FIXED查找即可fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED, false)

0 概念

数据结构含义:

  1. dsm_segment(动态共享内存段):
    • 每个后端进程可以通过使用dsm_segment来访问共享内存。dsm_segment包含了内存分配和释放等操作所需的信息,并提供了一组函数来管理和操作共享内存。每个会话(session)都可以有一个或多个dsm_segment。
    • 提供共享内存,dsm_segment表示申请的一个共享内存段,对于dsm api来说这是代表共享内存的最小单位。
  2. shm_mq_handle(共享内存消息队列句柄)
    • shm_mq_handle则是用于在共享内存中进行消息传递的句柄。它提供了一组函数来发送和接收消息,并提供了同步和互斥机制,确保多个进程之间的顺序和一致性。
    • 使用dsm_segment提供共享内存段做进程通信。

在实际使用中,通常会将它们组合在一起,以实现共享内存中的消息传递机制。

1 shm_toc初始化一段共享内存,共享内存是从哪来的?

在PG代码中可以看到shm_toc初始化一段内存,在头部放置shm_toc,这块内存叫做一个内存段,shm_toc_create函数接受已经申请好的内存地址,在头部初始化shm_toc(表示内存段头),后面可以存放用户自定义数据,比如message queue所需的结构体、数据等。

初始化一段共享内存,在头上放shm_toc结构,明显是用来记录内存段管理的一些数据。

struct shm_toc
{uint64		toc_magic;		/* Magic number identifying this TOC */slock_t		toc_mutex;		/* Spinlock for mutual exclusion */Size		toc_total_bytes;	/* Bytes managed by this TOC */Size		toc_allocated_bytes;	/* Bytes allocated of those managed */uint32		toc_nentry;		/* Number of entries in TOC */shm_toc_entry toc_entry[FLEXIBLE_ARRAY_MEMBER];
};

初始化

shm_toc *
shm_toc_create(uint64 magic, void *address, Size nbytes)
{shm_toc    *toc = (shm_toc *) address;Assert(nbytes > offsetof(shm_toc, toc_entry));toc->toc_magic = magic;SpinLockInit(&toc->toc_mutex);/** The alignment code in shm_toc_allocate() assumes that the starting* value is buffer-aligned.*/toc->toc_total_bytes = BUFFERALIGN_DOWN(nbytes);toc->toc_allocated_bytes = 0;toc->toc_nentry = 0;return toc;
}

那么shm_toc_create用的内存是从哪来的?

2 动态mmap一段新的共享内存(dsm机制)

  1. Postgresql能看到很多dsm开头的函数,这类函数属于运行时动态申请共享内存模块( dynamic shared memory)。
  2. 《Postgresql源码(90)共享内存申请CreateSharedMemoryAndSemaphores》介绍过,PG的共享内存是在启动时,直接用mmap 一次性申请大匿名块,然后自己切分使用的。但如果运行时需要申请新的、不定大小的共享内存块,肯定无法再启动时预先申请,这就引入了dsm模块。

3 dsm申请共享内存应用实例:mq

在源码中有一个非常好的例子,可以用来分析dsm申请内存用作mq的方法:

src/test/modules/test_shm_mq

下面对代码流程做一些分析,主要分析动态申请共享内存的过程,不涉及初始化后的使用流程。

3.1 内存初始化:test_shm_mq_setup

void
test_shm_mq_setup(int64 queue_size, int32 nworkers, dsm_segment **segp,shm_mq_handle **output, shm_mq_handle **input)
{dsm_segment *seg;test_shm_mq_header *hdr;shm_mq	   *outq = NULL;	/* placate compiler */shm_mq	   *inq = NULL;		/* placate compiler */worker_state *wstate;/* Set up a dynamic shared memory segment. */setup_dynamic_shared_memory(queue_size, nworkers, &seg, &hdr, &outq, &inq);*segp = seg;/* Register background workers. */wstate = setup_background_workers(nworkers, seg);/* Attach the queues. */*output = shm_mq_attach(outq, seg, wstate->handle[0]);*input = shm_mq_attach(inq, seg, wstate->handle[nworkers - 1]);/* Wait for workers to become ready. */wait_for_workers_to_become_ready(wstate, hdr);/** Once we reach this point, all workers are ready.  We no longer need to* kill them if we die; they'll die on their own as the message queues* shut down.*/cancel_on_dsm_detach(seg, cleanup_background_workers,PointerGetDatum(wstate));pfree(wstate);
}

第一步:申请共享内存setup_dynamic_shared_memory

static void
setup_dynamic_shared_memory(int64 queue_size, int nworkers,dsm_segment **segp, test_shm_mq_header **hdrp,shm_mq **outp, shm_mq **inp)
{shm_toc_estimator e;int			i;Size		segsize;dsm_segment *seg;shm_toc    *toc;test_shm_mq_header *hdr;

用estimator来评估需要多少共享内存(可以不用自己算了:)
评估器的接口:

  • shm_toc_initialize_estimator:初始化记录(key的个数和chunk的总大小)。
  • shm_toc_estimate_chunk:需要多大的chunk?调用一次记录一个指定大小的chunk,对齐到32字节上。
  • shm_toc_estimate_keys:需要几个key?这里记录一下
  • shm_toc_estimate:把之前记录的结构换算成大小

评估器的原理:记录key的个数和chunk的总大小即可。

	shm_toc_initialize_estimator(&e);shm_toc_estimate_chunk(&e, sizeof(test_shm_mq_header));for (i = 0; i <= nworkers; ++i)shm_toc_estimate_chunk(&e, (Size) queue_size);shm_toc_estimate_keys(&e, 2 + nworkers);segsize = shm_toc_estimate(&e);

评估完了开始用dsm_create申请新的共享内存了!

dsm_impl_op→dsm_impl_op→dsm_impl_mmap来具体从os申请共享内存,不深入了。

现在拿到seg就是dsm_segment动态共享内存段,代表一段共享内存(段内部有dsm_control一套管理机制,需要深入的话可以看看dsm_create代码),这里只需要指导这是一段按需要大小mmap出来的共享内存就好了。

	/* Create the shared memory segment and establish a table of contents. */seg = dsm_create(shm_toc_estimate(&e), 0);

在共享内存其实的位置,放一个shm_toc结构,作为后面数据的表头和索引

---------------- 共享内存起始地址
struct shm_toc
{uint64		toc_magic;				/* Magic number identifying this TOC */slock_t		toc_mutex;				/* Spinlock for mutual exclusion */Size		toc_total_bytes;		/* Bytes managed by this TOC */Size		toc_allocated_bytes;	/* Bytes allocated of those managed */uint32		toc_nentry;				/* Number of entries in TOC */shm_toc_entry toc_entry[0];
};
---------------- 申请空间是从最高地址开始的用的,索引数组toc_entry从低到高,数据从高到低,和page一样
toc_entry[0]   -------------\
----------------             |
toc_entry[1]                 |
----------------             |
toc_entry[2]  ---------\     |
----------------       |     |
data <-----------------/     |
...                          |
data                         |
...                          |
data <----------------------/
----------------
	toc = shm_toc_create(PG_TEST_SHM_MQ_MAGIC, dsm_segment_address(seg),segsize);

申请一段用户自定义数据空间,放一个test_shm_mq_header结构。

	hdr = shm_toc_allocate(toc, sizeof(test_shm_mq_header));SpinLockInit(&hdr->mutex);hdr->workers_total = nworkers;hdr->workers_attached = 0;hdr->workers_ready = 0;

初始化好数据结构后,把数据插进去,位置前面shm_toc_allocate已经申请好了,这一步shm_toc_insert主要是配置toc_entry那个索引数组!具体就是toc_entry[0]的key和offset,下次用的时候用0就能找到offset了。

	shm_toc_insert(toc, 0, hdr);/* Set up one message queue per worker, plus one. */for (i = 0; i <= nworkers; ++i){shm_mq	   *mq;

每个worker申请一个queue_size,按上面方法allocate+insert。

		mq = shm_mq_create(shm_toc_allocate(toc, (Size) queue_size),(Size) queue_size);shm_toc_insert(toc, i + 1, mq);if (i == 0){

把mq用户需要的数据结构传进去进行需要的配置即可,shm_toc_insert完了也可以用,alloc出来的地址直接指向共享内存,可以在当前进程一直使用。

			/* We send messages to the first queue. */shm_mq_set_sender(mq, MyProc);*outp = mq;}if (i == nworkers){/* We receive messages from the last queue. */shm_mq_set_receiver(mq, MyProc);*inp = mq;}}/* Return results to caller. */*segp = seg;*hdrp = hdr;
}

第二步:配置bgworker:setup_background_workers

static worker_state *
setup_background_workers(int nworkers, dsm_segment *seg)
{MemoryContext oldcontext;BackgroundWorker worker;worker_state *wstate;int			i;oldcontext = MemoryContextSwitchTo(CurTransactionContext);

setup_background_workers函数目标是构造worker_state,下面是worker_state的内存结构:

------------------------------
typedef struct
{int			nworkers;BackgroundWorkerHandle *handle[];  //  flex数组,每个位置对应一个worker{int			slot;uint64		generation;}
} worker_state;
------------------------------
handle[0]    
------------------------------
handle[1]
------------------------------
handle[2]
------------------------------
	wstate = MemoryContextAlloc(TopTransactionContext,offsetof(worker_state, handle) +sizeof(BackgroundWorkerHandle *) * nworkers);wstate->nworkers = 0;on_dsm_detach(seg, cleanup_background_workers,PointerGetDatum(wstate));/* Configure a worker. */

配通用的worker通用部分:

	memset(&worker, 0, sizeof(worker));worker.bgw_flags = BGWORKER_SHMEM_ACCESS;worker.bgw_start_time = BgWorkerStart_ConsistentState;worker.bgw_restart_time = BGW_NEVER_RESTART;sprintf(worker.bgw_library_name, "test_shm_mq");sprintf(worker.bgw_function_name, "test_shm_mq_main");snprintf(worker.bgw_type, BGW_MAXLEN, "test_shm_mq");worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg));/* set bgw_notify_pid, so we can detect if the worker stops */worker.bgw_notify_pid = MyProcPid;

把worker通用的部分传进去,拼每个worker特有的部分

	/* Register the workers. */for (i = 0; i < nworkers; ++i){

BackgroundWorkerData->slot数组(bgworker管理的全局变量记录使用所有使用中未使用的bgworker,每个worker一个slot)里面拿到一个没使用的slot,记录slot的id到handle[i]中并返回即可。

		if (!RegisterDynamicBackgroundWorker(&worker, &wstate->handle[i]))ereport(ERROR,(errcode(ERRCODE_INSUFFICIENT_RESOURCES),errmsg("could not register background process"),errhint("You may need to increase max_worker_processes.")));++wstate->nworkers;}/* All done. */MemoryContextSwitchTo(oldcontext);return wstate;
}

第三步:剩余流程

void
test_shm_mq_setup(int64 queue_size, int32 nworkers, dsm_segment **segp,shm_mq_handle **output, shm_mq_handle **input)
{dsm_segment *seg;test_shm_mq_header *hdr;shm_mq	   *outq = NULL;	/* placate compiler */shm_mq	   *inq = NULL;		/* placate compiler */worker_state *wstate;/* Set up a dynamic shared memory segment. */setup_dynamic_shared_memory(queue_size, nworkers, &seg, &hdr, &outq, &inq);*segp = seg;/* Register background workers. */wstate = setup_background_workers(nworkers, seg);

第三步到这里了,shm_mq_handle的作用是构建一个shm_mq_handle,记录dsm seg、mq、bgworker的handle

	/* Attach the queues. */*output = shm_mq_attach(outq, seg, wstate->handle[0]);*input = shm_mq_attach(inq, seg, wstate->handle[nworkers - 1]);

用取一个PID的方式检查worker是不是ready了。

	wait_for_workers_to_become_ready(wstate, hdr);cancel_on_dsm_detach(seg, cleanup_background_workers,PointerGetDatum(wstate));pfree(wstate);
}

到这里内存初始化的工作做完了。后面可以直接调用mq的api发送信息,本篇不在关注。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/29438.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Git 快速入门

Git 快速入门 文章目录 Git 快速入门一、代码托管平台&#xff08;远程仓库&#xff09;二、安装Git三、Git的命令实践Git 的四个区域Git 管理代码的3个场景Git 工作区的理念Git 工作区的生命周期Git 版本回退Git 文件重命名Git查看版本提交日志Git StashGit分支Git标签 四、创…

Nginx与docker配置安装

目录&#xff1a; Nginx的安装配置&#xff1a; 1、安装依赖包&#xff1a; 2、下载Nginx安装包&#xff1a; 3、解压Nginx压缩包&#xff1a; 4、配置Nginx编译环境&#xff1a; 5、编译并安装Nginx&#xff1a; 6、安装完Nginx后&#xff0c;可以切换到Nginx的安装目录…

【TypeScript】TS接口interface类型(三)

【TypeScript】TS接口interface类型&#xff08;三&#xff09; 【TypeScript】TS接口interface类型&#xff08;三&#xff09;一、接口类型二、实践使用2.1 常规类型2.2 设置属性只读 readonly2.3 设置索引签名2.4 设置可选属性2.5 函数类型接口 一、接口类型 TypeScript中的…

【肌电图信号分析】通道肌电图并查找收缩周期的数量、振幅、最大值和持续时间(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

arcgis宗地或者地块四至权利人信息提取教程

ARCGIS怎样将图斑四邻的名称及方位加入其属性表 以前曾发表过一篇《 如何把相邻图斑的属性添加在某个字段中》的个人心得,有些会员提出了进一步的要求,不但要相邻图斑的名称,还要求有方位,下面讲一下自己的做法。 基本思路是:连接相邻图斑质心,根据连线的角度确定相邻图斑…

dpkg: error: dpkg frontend lock is locked by another process

问题 在Ubuntu 系统终端用&#xff1a; dpkg -i xxxxxxxxxx.deb 安装软件包的时候&#xff0c;报错 dpkg: error: dpkg frontend lock is locked by another process 方案 找到这个锁文件的进程id lsof /var/lib/dpkg/lock-frontend 再kill 掉 sudo kill -9 PID 删除锁文件 …

SQL-每日一题【1164. 指定日期的产品价格】

题目 产品数据表: Products 写一段 SQL来查找在 2019-08-16 时全部产品的价格&#xff0c;假设所有产品在修改前的价格都是 10 。 以 任意顺序 返回结果表。 查询结果格式如下例所示。 示例 1: 解题思路 1.题目要求我们查找在 2019-08-16 时全部产品的价格&#xff0c;假设所…

处理nacos、tomcat、nginx日志增长过快问题

1.nacos日志清理 修改nacos-logback.xml 将日志级别改为error级&#xff0c;减少info级日志产生量 将<maxHistory>调整为2以下&#xff0c;将 <totalSizeCap>调整为2GB左右 比如&#xff1a; [rootiZ0jlapur4hqjezy8waee0Z logs]# ll -h total 2.1G -rw-r--r-…

[Securinets CTF Quals 2023] PolyLCG DigestiveV2

PolyLCG 第1个题是个LCG问题,通过一堆参数生成两个序列&#xff0c;如果flag位为1则输出x序列为0则输出 y序列 from random import randintxcoeff[2220881165502059873403292638352563283672047788097000474246472547036149880673794935190953317495413822516051735501183996…

设计实现数据库表扩展的7种方式

设计实现数据库表扩展的7种方式 在软件开发过程中&#xff0c;数据库是一项关键技术&#xff0c;用于存储、管理和检索数据。数据库表设计是构建健壮数据库系统的核心环节之一。然而&#xff0c;随着业务需求的不断演变和扩展&#xff0c;数据库表中的字段扩展变得至关重要。 …

[OnWork.Tools]系列 06-屏幕水印

简介 屏幕水印功能主要是在开会分享屏幕的时候在屏幕上增加水印 水印使用 水印启用和颜色设置 水印文字和大小设置 水印间距,透明度,角度调整

centos安装python3的多个版本

标题 原本安装了python3.6&#xff0c;但是又有一个项目需要py3.7&#xff0c;所以需要让两个版本共存 操作 1、下载py3.7 wget https://www.python.org/ftp/python/3.7.3/Python-3.7.3.tgz2、解压 tar -xzvf Python-3.7.3.tgz进入到文件夹 cd Python-3.7.33、安装 本人c…

day10 快速排序 方法重载 和 方法递推

方法重载 斐波拉契数列问题 使用重载思想解决 public static int method(int n){if (n 2 ){return 1 ;}return (n-1)*2method(n-1);}public static int f(int n){if (n 1){return 1;}if (n 2){return 2;}return f(n-1)f(n-2);} 快速排序 思维很简单&#xff0c;类似二…

【从零单排Golang】第十三话:使用WaitGroup等待多路并行的异步任务

在后端开发当中&#xff0c;经常会遇到这样的场景&#xff1a;请求给了批量的输入&#xff0c;对于每一个输入&#xff0c;我们都要给外部发请求等待返回&#xff0c;然后才能继续其它自己的业务逻辑。在这样的case下&#xff0c;如果每一个输入串行处理的话&#xff0c;那么很…

C语言进阶-4

1、常用位操作符 1.1、位与& (1)注意&#xff1a;位与符号是一个&&#xff0c;两个&&是逻辑与。 (2)真值表&#xff1a;1&00 1&11 0&00 0&10 (3)从真值表可以看出&#xff1a;位与操作的特点是&#xff0c;只有1和1位于结果为1&…

Zookeeper 面试题

一、ZooKeeper 基础题 1.1、Zookeeper 的典型应用场景 Zookeeper 是一个典型的发布/订阅模式的分布式数据管理与协调框架&#xff0c;开发人员可以使用它来进行分布式数据的发布和订阅。 通过对 Zookeeper 中丰富的数据节点进行交叉使用&#xff0c;配合 Watcher 事件通知机…

【java】【maven】【高级】MAVEN聚合继承属性等

目录 1、模块开发与设计 2、聚合 2、继承 3、属性 4、版本管理 5、资源配置 6、多环境配置 7、多环境开发配置 8、跳过测试 9、私服 前言&#xff1a;maven的高级使用包含分模块开发与设计、聚合、继承、属性、版本管理、资源配置、多环境配置、多环境开发配置、跳过…

Github 80 个键盘快捷键和一些搜索技巧的备忘清单

文章目录 键盘快捷键站点范围的快捷方式资料库源代码编辑源码浏览注释问题和拉取请求列表问题和拉取请求拉取请求的变化项目板通知网络图搜索技巧范围搜索文件路径完全符合布尔运算符基于仓库的条件基于 issue 的条件基于用户的条件参考网址键盘快捷键 站点范围的快捷方式 S …

Spring Cloud 的版本和SpringBoot的版本

Spring Cloud 的版本选择 Spring Cloud 和SpringBoot的版本存在对应关系 Spring Cloud 的版本和SpringBoot的版本&#xff0c;存在对应关系。最新的SpringCloud版本&#xff08;发布文章时为2022.0.3&#xff09;&#xff0c;需要SpringBoot&#xff08;3.0.9&#xff09; 的…

数据分析DAY1

数据分析 引言 这一周&#xff1a;学习了python的numpy和matplotlib以及在飞桨paddle上面做了几个小项目 发现numpy和matplotlib里面有很多api&#xff0c;要全部记住是不可能的&#xff0c;也是不可能全部学完的&#xff0c;所以我们要知道并且熟悉一些常用的api&#xff0…