Linux使用匿名管道实现进程池得以高效通信

                                               🎬慕斯主页修仙—别有洞天

                                              ♈️今日夜电波:Nonsense—Sabrina Carpenter

                                                                0:50━━━━━━️💟──────── 2:43
                                                                    🔄   ◀️   ⏸   ▶️    ☰  

                                      💗关注👍点赞🙌收藏您的每一次鼓励都是对我莫大的支持😍


目录

思路梳理

匿名管道知识回忆

匿名管道实现进程池思路

池化技术怎么提高效率?

具体实现

进程以及管道的创建操作

分发任务操作

回收资源操作

解决上述所提到Bug

总体代码及代码效果

Makefile

Task.hpp

mulpipe.cpp

实现效果


思路梳理

匿名管道知识回忆

        上一篇的文章中,详细介绍了管道的知识点,下面还是复习一下对于匿名管道相关的知识点。如下是匿名管道的一个示例图,这表现了两个“有血缘关系”的两个进程之间通过匿名管道进行通信的过程,我们通过控制struct files *fd_array[]中读或者写的struct files的开关来实现两个进程间的通信:

        我们主要通过如下的接口创建匿名管道来实现以上的匿名管道通信:

#include <unistd.h>
功能:创建一无名管道
原型
int pipe(int fd[2]);
参数
fd:文件描述符数组,其中fd[0]表示读端, fd[1]表示写端
返回值:成功返回0,失败返回错误代码

匿名管道实现进程池思路

        说大白话(●—●):我们使用一个父进程创建很多的管道,再创建对应数量的进程,然后这些管道分别与其他的进程进行直接的连接。这样我们就提前创建和联系好了一定数量的进程。我们想让父进程向其中一个子进程发消息就可通过选择管道直接发消息,而如果不发消息其它进程只会等待。通过信息的发送,我们可以就通过选择管道从而让子进程分别执行对应的任务。大致的图解如下:

池化技术怎么提高效率?

        池化技术通过资源共享和优化来提高效率,具体表现在以下几个方面:

  • 资源复用:池化技术通过重用已创建的资源,减少了频繁创建和销毁资源的开销。例如,线程池中的线程可以被多个任务重复使用,这样可以避免每次任务执行时都创建新线程的开销。
  • 减少等待时间:池化技术可以减少请求的等待时间。因为资源是预先分配好的,当有新的请求到来时,可以立即使用池中的资源,而不需要等待资源的创建过程。
  • 提高响应速度:由于资源已经准备好,池化技术可以快速响应请求,提高了处理速度。这对于需要快速响应的系统来说尤其重要。
  • 统一管理:池化技术提供了对资源的集中管理,这有助于监控系统资源的使用情况,及时回收不再使用的资源,避免资源浪费。
  • 优化性能:在大数据处理等场景中,池化技术可以通过合并多个请求来减少数据处理的时间和空间复杂度,从而提高数据处理的性能。

具体实现

进程以及管道的创建操作

        创建一个channel类用来来存储管道的读文件描述符ctrlfd以及进程描述符workerid,完成初始化操作以及后续的销毁操作。

static int number = 1;//标识对应的进程和管道class channel
{
public:channel(int fd, pid_t id) : ctrlfd(fd), workerid(id){name = "channel-" + std::to_string(number++);}public:int ctrlfd;pid_t workerid;std::string name;
};

        再根据先描述在组织的原则,我们在主函数使用一个std::vector<channel> channels来管理上面的结构体,可以根据需求进行增删查改等等操作。在完成这些预备操作后,创建对应数量的进程以及管道。

        特别注意:如下函数中的std::vector<int> old;以及如下代码是为了解决父子进程继承而产生的一些bug,这个将在最后解释:

        	std::vector<int> old;if(!old.empty()){for(auto fd : old){close(fd);}PrintFd(old);}old.push_back(pipefd[1]);

        如下函数的操作为创建管道以及进程,可以先忽略上述所提到的解决bug的代码,通过循环创建对应的子进程、关闭父子进程对应的读写文件,并且存储到vector<channel> *c(也就是主函数的中channels),下面的work()函数为子进程要做的工作,可以理解了后面的分发任务操作再来理解。

const int num = 5;//全局定义创建管道以及进程数
void CreateChannels(std::vector<channel> *c)
{std::vector<int> old;for (int i = 0; i < num; i++){// 1. 定义并创建管道int pipefd[2];int n = pipe(pipefd);assert(n == 0);(void)n;// 2. 创建进程pid_t id = fork();assert(id != -1);// 3. 构建单向通信信道if (id == 0) // child{if(!old.empty()){for(auto fd : old){close(fd);}PrintFd(old);}close(pipefd[1]);//关闭写dup2(pipefd[0], 0);//重定向写入Work();//子进程工作exit(0); // 会自动关闭自己打开的所有的fd}// fatherclose(pipefd[0]);c->push_back(channel(pipefd[1], id));old.push_back(pipefd[1]);// childid, pipefd[1]}
}

        因为前面我们已经重定向了管道的写入作为子进程的写入,接下来通过read就会读取对应的操作,需要注意的是:我们是通过一个int类型的变量来控制要完成的任务,后续再task.hpp中会定义对应要完成的任务。通过read的返回值来判断是要执行任务还是退出,我们在管道的知识中知道,read会等待write,也就是等待数据的输入。当写进程退出,读进程会跟着退出,我们根据以上特性来判断是要执行任务还是等待任务还是退出进程:

void Work()
{while (true){int code = 0;ssize_t n = read(0, &code, sizeof(code));if (n == sizeof(code)){if (!init.CheckSafe(code))continue;init.RunTask(code);}else if (n == 0){break;}else{// do nothing}}std::cout << "child quit" << std::endl;
}

分发任务操作

        当我们创建好管道以及进程后,可以注意到接下来的都是没有经过 if (id == 0) 控制下的程序,也就是说接下来的程序都是父进程运行的程序,因此我们就可以让父进程来分配任务使得子进程完成任务。接下来,我们创建一个task.hpp来模拟创建的任务以及对应的封装、任务分发等等操作如下:

#pragma once#include <iostream>
#include <functional>
#include <vector>
#include <ctime>
#include <unistd.h>
#include <stdlib.h>// using task_t = std::function<void()>;
typedef std::function<void()> task_t;void Download()
{std::cout << "我是一个下载任务"<< " 处理者: " << getpid() << std::endl;
}void PrintLog()
{std::cout << "我是一个打印日志的任务"<< " 处理者: " << getpid() << std::endl;
}void PushVideoStream()
{std::cout << "这是一个推送视频流的任务"<< " 处理者: " << getpid() << std::endl;
}// void ProcessExit()
// {
//     exit(0);
// }class Init
{
public:// 任务码const static int g_download_code = 0;const static int g_printlog_code = 1;const static int g_push_videostream_code = 2;// 任务集合std::vector<task_t> tasks;public:Init(){tasks.push_back(Download);tasks.push_back(PrintLog);tasks.push_back(PushVideoStream);srand(time(nullptr) ^ getpid());}bool CheckSafe(int code){if (code >= 0 && code < tasks.size())return true;elsereturn false;}void RunTask(int code){return tasks[code]();}int SelectTask(){return rand() % tasks.size();}std::string ToDesc(int code){switch (code){case g_download_code:return "Download";case g_printlog_code:return "PrintLog";case g_push_videostream_code:return "PushVideoStream";default:return "Unknow";}}
};Init init; // 定义对象

        传入主函数用于管理的channels,flag用于控制进程执行完任务后是否需要退出(1表示要退出,0表示不退出),num为要执行任务的次数,需要注意的是num是要在flag为1的前提下才能有效的,如果不传则程序只会执行一次。在选择完任务以及进程后,通过write来向指定的管道写入。具体读写操作可看:Linux进程间通信(IPC)机制之一:管道(Pipes)详解:匿名管道的特性与情况

void SendCommand(const std::vector<channel> &c, bool flag, int num = -1)
{int pos = 0;while (true){// 1. 选择任务int command = init.SelectTask();// 2. 选择信道(进程)const auto &channel = c[pos++];pos %= c.size();// debugstd::cout << "send command " << init.ToDesc(command) << "[" << command << "]"<< " in "<< channel.name << " worker is : " << channel.workerid << std::endl;// 3. 发送任务write(channel.ctrlfd, &command, sizeof(command));// 4. 判断是否要退出if (!flag){num--;if (num <= 0)break;}sleep(1);}std::cout << "SendCommand done..." << std::endl;
}

回收资源操作

        通过close关闭对应的管道,waitpid等待子进程的结束。

const int num = 5;//全局定义创建管道以及进程数
void ReleaseChannels(std::vector<channel> c)
{// version 2// int num = c.size() - 1;// for (; num >= 0; num--)// {//     close(c[num].ctrlfd);//     waitpid(c[num].workerid, nullptr, 0);// }// version 1for (const auto &channel : c){close(channel.ctrlfd);waitpid(channel.workerid, nullptr, 0);}// for (const auto &channel : c)// {//     pid_t rid = waitpid(channel.workerid, nullptr, 0);//     if (rid == channel.workerid)//     {//         std::cout << "wait child: " << channel.workerid << " success" << std::endl;//     }// }
}

解决上述所提到Bug

        如下代码:

        	std::vector<int> old;if(!old.empty()){for(auto fd : old){close(fd);}PrintFd(old);}old.push_back(pipefd[1]);

        这是个什么Bug呢?如果不加上上这段代码,那么关闭进程及管道就必须从最后面生成的进程和管道向前关闭。为啥呢?这是因为当我们父进程依次创建子进程,其中的对于管道的读写操作struct file也被继承了下来,也就是说有着上一个生成的子进程会被下一个子进程的写操作struct file指向,而下下个生成的子进程会指向前面两个子进程,以此类推...因此,我们需要在创建的时候关闭新创建子进程对应的写操作。

        具体的子进程继承写操作的struct file例子如下:

总体代码及代码效果

Makefile

processpool:mulpipe.cppg++ -o $@ $^ -std=c++11
.PHONY:clean
clean:rm -f processpool

Task.hpp

#pragma once#include <iostream>
#include <functional>
#include <vector>
#include <ctime>
#include <unistd.h>
#include <stdlib.h>// using task_t = std::function<void()>;
typedef std::function<void()> task_t;void Download()
{std::cout << "我是一个下载任务"<< " 处理者: " << getpid() << std::endl;
}void PrintLog()
{std::cout << "我是一个打印日志的任务"<< " 处理者: " << getpid() << std::endl;
}void PushVideoStream()
{std::cout << "这是一个推送视频流的任务"<< " 处理者: " << getpid() << std::endl;
}// void ProcessExit()
// {
//     exit(0);
// }class Init
{
public:// 任务码const static int g_download_code = 0;const static int g_printlog_code = 1;const static int g_push_videostream_code = 2;// 任务集合std::vector<task_t> tasks;public:Init(){tasks.push_back(Download);tasks.push_back(PrintLog);tasks.push_back(PushVideoStream);srand(time(nullptr) ^ getpid());}bool CheckSafe(int code){if (code >= 0 && code < tasks.size())return true;elsereturn false;}void RunTask(int code){return tasks[code]();}int SelectTask(){return rand() % tasks.size();}std::string ToDesc(int code){switch (code){case g_download_code:return "Download";case g_printlog_code:return "PrintLog";case g_push_videostream_code:return "PushVideoStream";default:return "Unknow";}}
};Init init; // 定义对象

mulpipe.cpp

#include <iostream>
#include <string>
#include <vector>
#include <cassert>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include "Task.hpp"const int num = 5;
static int number = 1;class channel
{
public:channel(int fd, pid_t id) : ctrlfd(fd), workerid(id){name = "channel-" + std::to_string(number++);}public:int ctrlfd;pid_t workerid;std::string name;
};void Work()
{while (true){int code = 0;ssize_t n = read(0, &code, sizeof(code));if (n == sizeof(code)){if (!init.CheckSafe(code))continue;init.RunTask(code);}else if (n == 0){break;}else{// do nothing}}std::cout << "child quit" << std::endl;
}void PrintFd(const std::vector<int> &fds)
{std::cout << getpid() << " close fds: ";for(auto fd : fds){std::cout << fd << " ";}std::cout << std::endl;
}// 传参形式:
// 1. 输入参数:const &
// 2. 输出参数:*
// 3. 输入输出参数:&
void CreateChannels(std::vector<channel> *c)
{// bugstd::vector<int> old;for (int i = 0; i < num; i++){// 1. 定义并创建管道int pipefd[2];int n = pipe(pipefd);assert(n == 0);(void)n;// 2. 创建进程pid_t id = fork();assert(id != -1);// 3. 构建单向通信信道if (id == 0) // child{if(!old.empty()){for(auto fd : old){close(fd);}PrintFd(old);}close(pipefd[1]);//关闭写dup2(pipefd[0], 0);//重定向写入Work();exit(0); // 会自动关闭自己打开的所有的fd}// fatherclose(pipefd[0]);c->push_back(channel(pipefd[1], id));old.push_back(pipefd[1]);// childid, pipefd[1]}
}void PrintDebug(const std::vector<channel> &c)
{for (const auto &channel : c){std::cout << channel.name << ", " << channel.ctrlfd << ", " << channel.workerid << std::endl;}
}void SendCommand(const std::vector<channel> &c, bool flag, int num = -1)
{int pos = 0;while (true){// 1. 选择任务int command = init.SelectTask();// 2. 选择信道(进程)const auto &channel = c[pos++];pos %= c.size();// debugstd::cout << "send command " << init.ToDesc(command) << "[" << command << "]"<< " in "<< channel.name << " worker is : " << channel.workerid << std::endl;// 3. 发送任务write(channel.ctrlfd, &command, sizeof(command));// 4. 判断是否要退出if (!flag){num--;if (num <= 0)break;}sleep(1);}std::cout << "SendCommand done..." << std::endl;
}
void ReleaseChannels(std::vector<channel> c)
{// version 2// int num = c.size() - 1;// for (; num >= 0; num--)// {//     close(c[num].ctrlfd);//     waitpid(c[num].workerid, nullptr, 0);// }// version 1for (const auto &channel : c){close(channel.ctrlfd);waitpid(channel.workerid, nullptr, 0);}// for (const auto &channel : c)// {//     pid_t rid = waitpid(channel.workerid, nullptr, 0);//     if (rid == channel.workerid)//     {//         std::cout << "wait child: " << channel.workerid << " success" << std::endl;//     }// }
}
int main()
{std::vector<channel> channels;// 1. 创建信道,创建进程CreateChannels(&channels);// 2. 开始发送任务const bool g_always_loop = true;// SendCommand(channels, g_always_loop);SendCommand(channels, !g_always_loop, 10);// 3. 回收资源,想让子进程退出,并且释放管道,只要关闭写端ReleaseChannels(channels);return 0;
}

实现效果


                   感谢你耐心的看到这里ღ( ´・ᴗ・` )比心,如有哪里有错误请踢一脚作者o(╥﹏╥)o! 

                                       

                                                                        给个三连再走嘛~  

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/654499.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

python基础四------完结(概念在下面,代码看不懂了再看)

# a_list [1,2,3,4,5] # # print(a_list)# 根据下标来删除列表中的元素 # 爬取的数据中 有个别的数据 是我们不想要的 那么我们就可以通过下标的方式来删除 # del a_list[2] # print(a_list)# b_list [1,2,3,4,5] # print(b_list) # pop是删除列表中的最后一个元素 # b_list.…

Kali 基本命令大全

一、系统信息 arch 显示机器的处理器架构(1) uname -m 显示机器的处理器架构(2) uname -r 显示正在使用的内核版本 dmidecode -q 显示硬件系统部件- (SMBIOS / DMI) hdparm -i /dev/hda 罗列一个磁盘的架构特性 hdparm -tT /dev/sda 在磁盘上执行测试性读取操作 cat /proc/cpu…

AP5216 平均电流型LED降压恒流驱动IC 手电筒汽车摩托车灯芯片

产品描述 AP5216 是一款 PWM工作模式, 高效率、外围简单、内置功率管&#xff0c;适用于5V&#xff5e;100V输入的高精度降压 LED 恒流驱动芯片。输出最大功率可达9W&#xff0c;最大电流 1.0A。AP5216 可实现全亮/半亮功能切换&#xff0c;通过MODE 切换&#xff1a;全亮/半亮…

SAP HANA 报错信息,如何根据报错关键词去进行处理

HANA建模其实上手会比较快&#xff0c;基本会SQL就可以进行开发。 在实际开发中&#xff0c;难点一个是建模思路&#xff0c;另外一个则是建模中报错的处理。 现在将HANA中报错进行一个整理&#xff0c;这里的并不是完整的报错信息&#xff0c;大家可以根据关键词进行查看。 …

ChatGPT更新了Mention功能,集结若干GPTs作战,AI智能体的心智入口;向量数据库的挑战和未来

&#x1f989; AI新闻 &#x1f680; ChatGPT更新了Mention功能&#xff0c;集结若干GPTs作战&#xff0c;AI智能体的心智入口 摘要&#xff1a;OpenAI在ChatGPT中引入了一个新功能&#xff0c;允许用户在聊天时任意一个GPTs&#xff08;即ChatGPT最新推出的AI Agent 智能应用…

scrapy的入门使用

1 安装scrapy 命令: sudo apt-get install scrapy或者&#xff1a; pip/pip3 install scrapy2 scrapy项目开发流程 创建项目: scrapy startproject mySpider生成一个爬虫: scrapy genspider itcast itcast.cn提取数据:     根据网站结构在spider中实现数据采集相关内…

C/C++ - 函数进阶(C++)

目录 默认参数 函数重载 内联函数 函数模板 递归函数 回调函数 默认参数 定义 默认参数是在函数声明或定义中指定的具有默认值的函数参数。默认参数允许在调用函数时可以省略对应的参数&#xff0c;使用默认值进行替代。 使用 默认参数可以用于全局函数和成员函数。默认参…

C语言KR圣经笔记 5.12 复杂声明

5.12 复杂声明 C 语言有时会因为声明的语法而受到谴责&#xff0c;特别是涉及函数指针的声明语法。语法试图使声明和使用一致&#xff1b;在简单的情况下它的效果不错&#xff0c;但在更复杂的情况下会让人困惑&#xff0c;因为声明不能从左往右读&#xff0c;而且括号被过度使…

Linux文本三剑客---sed经典案例

Sed介绍&#xff1a; sed是一种流编辑器&#xff0c;它一次处理一行内容。处理时&#xff0c;把当前处理的行存储在临时缓冲区中&#xff0c;称为“模式空间”&#xff0c;接着用sed命令处理缓冲区中的内容&#xff0c;处理完成后&#xff0c;把缓冲区的内容送往屏幕。接着处理…

Mybatis-plus原生pages分页未生效的解决方案

文章目录 前言原因1、Mybatis Plus版本的问题2、Mapper.xml文件中SQL语句格式问题3、Mybatis Plus默认分页拦截器问题4、分页参数传参问题5、分页配置的问题 解决方案1、升级对应的Mybatis-plus版本分页插件配置问题3、自定义分页拦截器4、正确的参数5、不同版本的配置文件3.4.…

Hugging Face创始人分享:企业如何在ChatGPT浪潮下实现战略布局

Hugging Face创始人兼首席执行官 Clem Delangue在IBM一年一度的 THINK大会中研讨了当前人工智能发展趋势&#xff0c;特别是ChatGPT模型以及其对行业的影响。他的演讲还涉及到一个关键的议题&#xff0c;在ChatGPT这样的通用模型出现后&#xff0c;企业如何在人工智能领域找到自…

QR 分解cpu程序

1. 代码 Makefile EXE : hello_qrSRC_QR qr_main.c qr_func.c $(EXE): $(SRC_QR)gcc $^ -o $ -lm.PHONY: clean clean:-rm -rf $(EXE) qr_main.c #include "stdio.h"int maqr(double* a,int m,int n,double* q);int main() {int i,j;static double q[4][4],a[4]…

全能相似度计算与语义匹配搜索工具包,多维度实现多种算法,涵盖文本、图像等领域。支持文图搜索,满足您在不同场景下的搜索需求

全能相似度计算与语义匹配搜索工具包,多维度实现多种算法,涵盖文本、图像等领域。支持文图搜索,满足您在不同场景下的搜索需求。 Similarities:精准相似度计算与语义匹配搜索工具包,多维度实现多种算法,覆盖文本、图像等领域,支持文搜、图搜文、图搜图匹配搜索 Similar…

window下如何安装ffmpeg(跨平台多媒体处理工具)

ffmpeg是什么? FFmpeg是一个开源的跨平台多媒体处理工具&#xff0c;可以用于录制、转换和流媒体处理音视频。它包含了几个核心库和工具&#xff0c;可以在命令行下执行各种音视频处理操作&#xff0c;如剪辑、分割、合并、媒体格式转换、编解码、流媒体传输等。FFmpeg支持多…

C++类和对象引入以及类的介绍使用

文章目录 一、面向过程和面向对象的初步认识二、类的引入2.2 类的引入 三、类的访问限定符及封装3.3 访问限定符3.4 【面试题】C中struct和class的区别3.5 类的两种定义方式 四、封装【面试题】面向对象的三大特性 五、类的作用域六、类的实例化七、类对象模型7.1 类对象的存储…

postman之接口参数签名(js接口HMAC-SHA256签名)

文章目录 postman之接口参数签名&#xff08;js接口签名&#xff09;一、需求背景二、签名生成规则三、postman js接口签名步骤1. postman设置全局、或环境参数2. 配置Pre-request Scripts脚本 四、Pre-request Scripts脚本 常见工作整理1. js获取unix时间戳2. body json字符串…

day35WEB 攻防-通用漏洞XSS 跨站反射存储DOMBeef-XSS

目录 一&#xff0c;XSS 跨站-原理&分类&手法&探针 1、原理 2、分类 3、危害 二&#xff0c;反射型XSS 1&#xff0c;案例演示 三&#xff0c;存储型XSS 1&#xff0c;案例演示 四&#xff0c;DOM 型XSS 五&#xff0c;XSS 利用环境-XSS 平台&Beef-XS…

探讨UI自动化测试几步骤

随着软件开发的不断发展&#xff0c;UI自动化测试变得越来越重要&#xff0c;它能够提高测试效率、降低人为错误&#xff0c;并确保软件交付的质量。本文将介绍UI自动化测试的一般步骤和一些最佳实践&#xff0c;以帮助开发团队更好地实施自动化测试。 需求分析和选择测试工具&…

QT+VS实现Kmeans++

1、Kmeans的原理如下&#xff1a; &#xff08;1&#xff09;首先选取样本中任一数据点作为第一个聚类中心&#xff1b; &#xff08;2&#xff09;计算样本每一个数据点至现所有聚类中心的最近距离&#xff0c;并记录下来&#xff1b; &#xff08;3&#xff09;逐一挑选所…

电脑监控软件:提升工作效率与保障信息安全

随着信息技术的飞速发展&#xff0c;电脑已经成为了我们日常生活和工作中不可或缺的工具。然而&#xff0c;随着电脑使用频率的增加&#xff0c;工作效率低下、信息泄露等问题也日益凸显。为了解决这些问题&#xff0c;电脑监控软件应运而生&#xff0c;为我们的工作和生活提供…