【Linux】 进程池 一主多从 管道通信

目录

1.代码介绍

2.channel 类

3.进程池类编写

4.主函数及其他

5. 源码


1.代码介绍

本文代码采用一主多从式(一个主进程(master)多个子进程(worker))通过管道进行通信,实现主进程分发任务,子进程完成任务,当主进程关闭管道时,子进程执行完任务后退出。

2.channel 类

创建一个channel类用于描述子进程,方便主进程与其通信。

成员变量:

int _wfd;  //记录管道写描述符用于主进程发送数据

pid_t _pid; //子进程pid

string _name; 

代码实现:

class Channel
{
public:Channel(int wfd, pid_t pid, const string &name) : _wfd(wfd),_pid(pid), _name(name) {}~Channel() {}int wfd() { return _wfd; }string name() { return _name; }pid_t pid() { return _pid; }void Close() { close(_wfd); }//关闭写描述符,子进程读取完毕后会退出
private:int _wfd;pid_t _pid;string _name;
};

3.进程池类编写

该类用于管理子进程,具体功能:

1.创建子进程。

2.获取子进程(轮转方式保证负载均衡)。

3.主进程发送任务码给子进程。

4.进程等待。

5.控制进程退出。

代码实现:

class ProcessPool
{
public:ProcessPool(int num) : _process_num(num) {}void createProcess(work_t work){vector<int> fds;for (int i = 0; i < _process_num; ++i){int pipefd[2]{0};pipe(pipefd);pid_t pid = fork();if (pid == 0){if(!fds.empty()){for(auto& fd:fds){//关闭之前子进程管道的写端close(fd);}}close(pipefd[1]);dup2(pipefd[0], 0);work();exit(0);//子进程执行任务完毕会退出}close(pipefd[0]);string cname = "channel-" + to_string(i);_channels.push_back(Channel(pipefd[1], pid, cname));fds.push_back(pipefd[1]);}}int NextChannel(){static unsigned int index = 0;return (index++) % _process_num;}void SendTaskCode(int index, uint32_t code){cout << "send code: " << code << " to " << _channels[index].name() << " sub prorcess id: " << _channels[index].pid() << endl;write(_channels[index].wfd(), &code, sizeof(code));}void Wait(){waitpid(-1, nullptr, 0);}void KillAll(){for (auto &channel : _channels){channel.Close();}}~ProcessPool() {}private:int _process_num;vector<Channel> _channels;
};

4.主函数及其他

主进程发送任务码,通过管道发送给子进程执行对应任务。

void CtrlProcessPool(const shared_ptr<ProcessPool> &processpool_ptr, int cnt)
{while (cnt){// a. 选择一个进程和通道int channel = processpool_ptr->NextChannel();// cout << channel.name() << endl;// b. 你要选择一个任务uint32_t code = NextTask();// c. 发送任务processpool_ptr->SendTaskCode(channel, code);sleep(1);cnt--;}
}
int main(int argc, char *argv[])
{if (argc != 2){printf("\n\t usage ./processPool num\n");return 1;}int process_num = stoi(argv[1]);shared_ptr<ProcessPool> process_ptr = make_shared<ProcessPool>(process_num);process_ptr->createProcess(worker);CtrlProcessPool(process_ptr, 5);process_ptr->KillAll();process_ptr->Wait();return 0;
}

模拟任务:

#pragma once#include <iostream>
#include <unistd.h>
#include <functional>
using namespace std;using work_t = function<void()>;
using task_t = function<void()>;void PrintLog()
{cout << "printf log task" << endl;
}void ReloadConf()
{cout << "reload conf task" << endl;
}void ConnectMysql()
{cout << "connect mysql task" << endl;
}task_t tasks[3] = {PrintLog, ReloadConf, ConnectMysql};uint32_t NextTask()
{return rand() % 3;
}void worker()
{// 从0中读取任务即可!while(true){uint32_t command_code = 0;ssize_t n = read(0, &command_code, sizeof(command_code));if(n == sizeof(command_code)){if(command_code >= 3) continue;tasks[command_code]();}else if(n == 0) //管道写端关闭读端返回值位0{cout << "sub process: " << getpid() << " quit now..." << endl;break;}}
}

5. 源码

processPool.cc

#include <iostream>
#include <string>
#include <cstdlib>
#include <vector>
#include <unistd.h>
#include <ctime>
#include <sys/wait.h>
#include <sys/types.h>
#include <memory>
#include "task.hpp"using namespace std;
class Channel
{
public:Channel(int wfd, pid_t pid, const string &name) : _wfd(wfd),_pid(pid), _name(name) {}~Channel() {}int wfd() { return _wfd; }string name() { return _name; }pid_t pid() { return _pid; }void Close() { close(_wfd); }//关闭写描述符,子进程读取完毕后会退出
private:int _wfd;pid_t _pid;string _name;
};
class ProcessPool
{
public:ProcessPool(int num) : _process_num(num) {}void createProcess(work_t work){vector<int> fds;for (int i = 0; i < _process_num; ++i){int pipefd[2]{0};pipe(pipefd);pid_t pid = fork();if (pid == 0){if(!fds.empty()){for(auto& fd:fds){close(fd);}}close(pipefd[1]);dup2(pipefd[0], 0);work();exit(0);//子进程执行任务完毕会退出}close(pipefd[0]);string cname = "channel-" + to_string(i);_channels.push_back(Channel(pipefd[1], pid, cname));fds.push_back(pipefd[1]);}}int NextChannel(){static unsigned int index = 0;return (index++) % _process_num;}void SendTaskCode(int index, uint32_t code){cout << "send code: " << code << " to " << _channels[index].name() << " sub prorcess id: " << _channels[index].pid() << endl;write(_channels[index].wfd(), &code, sizeof(code));}void Wait(){waitpid(-1, nullptr, 0);}void KillAll(){for (auto &channel : _channels){channel.Close();}}~ProcessPool() {}private:int _process_num;vector<Channel> _channels;
};void CtrlProcessPool(const shared_ptr<ProcessPool> &processpool_ptr, int cnt)
{while (cnt){// a. 选择一个进程和通道int channel = processpool_ptr->NextChannel();// cout << channel.name() << endl;// b. 你要选择一个任务uint32_t code = NextTask();// c. 发送任务processpool_ptr->SendTaskCode(channel, code);sleep(1);cnt--;}
}
int main(int argc, char *argv[])
{if (argc != 2){printf("\n\t usage ./processPool num\n");return 1;}int process_num = stoi(argv[1]);shared_ptr<ProcessPool> process_ptr = make_shared<ProcessPool>(process_num);process_ptr->createProcess(worker);CtrlProcessPool(process_ptr, 5);process_ptr->KillAll();process_ptr->Wait();return 0;
}

task.hpp:

#pragma once#include <iostream>
#include <unistd.h>
#include <functional>
using namespace std;using work_t = function<void()>;
using task_t = function<void()>;void PrintLog()
{cout << "printf log task" << endl;
}void ReloadConf()
{cout << "reload conf task" << endl;
}void ConnectMysql()
{cout << "connect mysql task" << endl;
}task_t tasks[3] = {PrintLog, ReloadConf, ConnectMysql};uint32_t NextTask()
{return rand() % 3;
}void worker()
{// 从0中读取任务即可!while(true){uint32_t command_code = 0;ssize_t n = read(0, &command_code, sizeof(command_code));if(n == sizeof(command_code)){if(command_code >= 3) continue;tasks[command_code]();}else if(n == 0) //管道写端关闭读端返回值位0{cout << "sub process: " << getpid() << " quit now..." << endl;break;}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/63207.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Ubuntu环境安装RabbitMQ

1.安装Erlang RabbitMq需要Erlang语⾔的⽀持&#xff0c;在安装rabbitMq之前需要安装erlang # 更新软件包 sudo apt-get update # 安装 erlang sudo apt-get install erlang 查看erlang版本 : erl 退出命令:halt(). 2. 安装RabbitMQ # 更新软件包 sudo apt-get update # 安装 …

集合框架(3)Map

Map接口 现实生活与开发中&#xff0c;我们常会看到这样的一类集合&#xff1a;用户ID与账户信息、学生姓名与考试成绩、IP地址与主机名等&#xff0c;这种一一对应的关系&#xff0c;就称作映射。Java提供了专门的集合框架用来存储这种映射关系的对象&#xff0c;即java.util…

力扣--199.二叉树的右视图

题目 给定一个二叉树的 根节点 root&#xff0c;想象自己站在它的右侧&#xff0c;按照从顶部到底部的顺序&#xff0c;返回从右侧所能看到的节点值。 提示: 二叉树的节点个数的范围是 [0,100] -100 < Node.val < 100 代码 class Solution { public List rightSideV…

Ubuntu Server 22.04.5 LTS重启后IP被重置问题

Ubuntu Server 22.04.5 LTS重启后IP被重置问题 最近在使用Ubuntu Server 22.04做项目开发测试时发现每次重启和关机后&#xff0c;所设置的静态IP地址都会回复到安装系统时所设置的ip Ubuntu Server 22.04 官网下载地址&#xff1a;Ubuntu官方下载地址 对虚拟机下安装Ubuntu感…

Python+OpenCV系列:Python和OpenCV的结合和发展

PythonOpenCV系列&#xff1a;Python和OpenCV的结合和发展 **引言****Python语言的发展****1.1 Python的诞生与发展****1.2 Python的核心特性与优势****1.3 Python的应用领域** **OpenCV的发展****2.1 OpenCV的起源与发展****2.2 OpenCV的功能特性****2.3 OpenCV的应用场景** *…

kube-proxy的iptables工作模式分析

系列文章目录 iptables基础知识 文章目录 系列文章目录前言一、kube-proxy介绍1、kube-proxy三种工作模式2、iptables中k8s相关的链 二、kube-proxy的iptables模式剖析1.集群内部通过clusterIP访问到pod的流程1.1.流程分析 2.从外部访问内部service clusterIP后端pod的流程2.1…

CSMM 软件能力成熟度评估认证补贴政策汇总!

CSMM认证&#xff0c;全称为“软件能力成熟度评估”&#xff0c;也被称作“中国版CMMI认证”。这是中国自主制定的软件能力成熟度评估标准&#xff0c;于2021年6月8日发布。该标准由中国电子技术标准化研究院联合多家产学研用相关单位制定&#xff0c;旨在适合中国国情以及中国…

华为网络设备配置文件备份与恢复(上传、下载、导出,导入)

在日常运维工作中&#xff0c;会经常存在网络割接的情况&#xff0c;为了保证网络割接失败时能重新回退至原有配置&#xff0c;从而不影响原有的办公环境&#xff0c;在网络割接前的备份工作就非常有必要了。 备份方式&#xff1a;FTP 备份技术&#xff1a;PC客户端<---&g…

Linux HTTP代理Squid 基本变更配置及目标白名单方式限制转发

1、文件管理转发白名单 sudo touch /etc/squid/whitelistip sudo touch /etc/squid/whitelistdomain # 目的地ip地址 acl whitelistip dst "/etc/squid/whitelistip" http_access allow whitelistip# 目的地域名限制&#xff0c;可使用.xxx.com 放开整个子域名 acl…

清风数学建模学习笔记——Topsis法

数模评价类&#xff08;2&#xff09;——Topsis法 概述 Topsis:Technique for Order Preference by Similarity to Ideal Solution 也称优劣解距离法&#xff0c;该方法的基本思想是&#xff0c;通过计算每个备选方案与理想解和负理想解之间的距离&#xff0c;从而评估每个…

[软件工程]九.可依赖系统(Dependable Systems)

9.1什么是系统的可靠性&#xff08;reliability&#xff09; 系统的可靠性反映了用户对系统的信任程度。它反映了用户对其能够按照预期运行且正常使用中不会失效的信心程度。 9.2什么是可依赖性&#xff08;dependablity&#xff09;的目的 其目的是覆盖系统的可用性&#x…

减少30%人工处理时间,AI OCR与表格识别助力医疗化验单快速处理

在医疗行业&#xff0c;化验单作为重要的诊断依据和数据来源&#xff0c;涉及大量的文字和表格信息&#xff0c;传统的手工输入和数据处理方式不仅繁琐&#xff0c;而且容易出错&#xff0c;给医院的运营效率和数据准确性带来较大挑战。随着人工智能技术的快速发展&#xff0c;…

Jackson使用实例:将后端返回的 JSON 字段名转换为大写(多种方案详细实例实现)

目录 将返回 JSON 字段名转换为大写背景解决方案1. **局部字段名转换为大写** — 使用 JsonNaming 注解方案概述步骤 2. **全局字段名转换为大写** — 配置 ObjectMapper方案概述步骤 3. **手动指定字段名称** — 使用 JsonProperty 注解方案概述步骤 4. **总结**推荐方案 将返…

汽车一键启动开关 、一键启动按键 、一键启动按钮

‌汽车一键启动按钮是智能汽车的重要部分&#xff0c;通常用于启动和关闭引擎‌。 ‌具体功能‌&#xff1a; ‌启动引擎‌&#xff1a;在许多现代汽车中&#xff0c;一键启动按键取代了传统的钥匙启动方式。只需轻轻按下一键启动按钮&#xff0c;车辆电源即被接通&#xff0c…

.NET用C#导入Excel数据到数据库

将Excel文件中的数据导入到数据库中不仅能够提升数据处理的效率和准确性&#xff0c;还能极大地促进数据分析和决策制定的过程。尤其在企业级应用中&#xff0c;Excel作为数据输入和初步整理的工具非常普遍&#xff0c;但其功能对于复杂查询、大规模数据管理和跨部门的数据共享…

python中数组怎么转换为字符串

1、数组转字符串 #方法1 arr [a,b] str1 .join(arr)#方法2 arr [1,2,3] #str .join(str(i) for i in arr)#此处str命名与str函数冲突&#xff01; str2 .join(str(i) for i in arr) 2、字符串转数组 #方法一 str_x avfg st_list list(str_x) #使用list()#方法二 list_s…

国内管理咨询公司哪家落地辅导做的好?

在当今快速变化的市场环境中&#xff0c;企业面临着前所未有的竞争压力与转型挑战。为了在这场没有硝烟的战争中脱颖而出&#xff0c;许多企业开始寻求外部专业力量的帮助&#xff0c;以期通过科学的管理咨询实现战略升级和业绩突破。而在众多的管理咨询公司中&#xff0c;思博…

前端进阶指南:详解 Source Map 的作用与工作原理,解析.map文件

前言 在前端开发中&#xff0c;代码的压缩与混淆是提升网页性能的常见做法。然而&#xff0c;这种优化措施也带来了调试难度的增加&#xff0c;因为压缩后的代码往往难以阅读和理解。这时&#xff0c;Source Map 技术应运而生&#xff0c;作为连接源代码和构建后代码的桥梁&am…

Cursor vs VSCode:主要区别与优势分析

Cursor - The AI Code Editor 1. AI 集成能力 Cursor的优势 原生AI集成&#xff1a; # Cursor可以直接通过快捷键调用AI # 例如&#xff1a;按下 Ctrl K 可以直接获取代码建议 def complex_function():# 在这里&#xff0c;你可以直接询问AI如何实现功能# AI会直接在编辑器中…

python+selenium的八大定位方式

1.id定位 元素的id属 driver.find_element_by_id(By.ID,"username")2.name定位 driver.find_element_by_id(By.NAME,"username")#一个login_btn_list webdriver.find_elements(By.CLASS_NAME,)#多个元素组成的列表&#xff0c; login_btn_list[1].click…