一、进程池的概念
1.1、什么是进程池
进程池是一种并发编程模式,用于管理和重用多个处理任务的进程。它通常用于需要频繁创建和销毁进程的情况,以避免因此产生的开销。
进程池的优点包括:
- 减少进程创建销毁的开销:避免频繁创建和销毁进程所带来的系统资源开销。
- 提高系统响应速度:由于进程已经初始化并且一直保持在内存中,可以立即分配执行任务,减少了任务等待时间。
- 控制资源使用:通过限制进程池中的进程数量,可以控制系统资源的使用情况,避免资源过度消耗。
1.2、管理进程
预先创建一些空闲进程,管理进程会把工作分发到空闲进程来处理,空闲进程处理结束后,通知管理进程。
管理进程需要将任务发给空闲的工作进程,这里就涉及到进程之间的通信,进程间的通信有以下三种方式,我们之前都做了详细的解释
管道:https://blog.csdn.net/weixin_43903639/article/details/138155634?spm=1001.2014.3001.5501
消息队列:https://blog.csdn.net/weixin_43903639/article/details/138155723?spm=1001.2014.3001.5501
共享内存:https://blog.csdn.net/weixin_43903639/article/details/138189200?spm=1001.2014.3001.5501
二、进程池模型
对于一个进程池,我们需要维护一个进程队列,如果进程在忙就等待,如果进程空闲,那么就给空闲进程发任务让进程去处理。
三、进程通信
我们这里采用了有名管道的方式。实际上使用匿名管道是一样的。为了方便管理,我们这里创建了一个Fifo的类,通过类将管道视为一个个的对象。
// 权限
#define Mode 0666
// 文件地址
#define Path "./default"class Fifo
{
public:Fifo(string path = Path, int mode = Mode) : _path(path), _mode(mode){int return_mkfifo_val = mkfifo(_path.c_str(), _mode);if (return_mkfifo_val < 0){cout << "mkfifo error:" << errno << " reason :" << strerror(errno) << endl;exit(1);}cout << "mkfifo success" << endl;}~Fifo(){int return_unlink_val = unlink(_path.c_str());if (return_unlink_val < 0){cout << "unlink error:" << errno << " reason :" << strerror(errno) << endl;}cout << "unlink namepipe success" << endl;}private:string _path;int _mode;
};
管道的默认权限是 0666
,管道的默认文件地址是 ./default
,这样管理的优势是便于管道的创建与销毁。
三、进程对象
我们将一个进程也视为一个对象,那么一个进程就需要以下的元素
fd0
: 管道,通过这个管道接收主进程的数据fd1
:管道,通过这个管道给主进程发数据pid
:子进程的pidisbusy
:此子进程是否在忙
class Process
{
public:Process(int fd0, int fd1, pid_t process_pid) : _fd0(fd0), _fd1(fd1), _process_pid(process_pid), _isbusy(false) {}~Process() {}// 获取父进程要写入的管道int get_fd0() { return _fd0; }// 获取子进程要写入的管道int get_fd1() { return _fd1; }// 获取子进程pidpid_t get_process_pid() { return _process_pid; }// 获取是否忙碌标志位bool get_isbusy() { return _isbusy; }// 修改标志为void set_isbusy(bool flag) { _isbusy = flag; }private:int _fd0; // 父进程要写入的管道int _fd1; // 父进程要读入的管道pid_t _process_pid; // 子进程pidbool _isbusy; // 是否忙碌标志位
};
四、进程池
进程池就是同时管理管道和进程的。其中包含多个进程对象,每个进程对象又要包含两个管道。
vector<string> pipe0
主进程写入,子进程读的管道名vector<string> pipe1
子进程写入,主进程读的管道名vector<Fifo *> fifo0
主进程写入,子进程读的管道vector<Fifo *> fifo1
子进程写入,主进程读的管道vector<Process> _processpool
进程池管理的进程int _processnum
进程池管理的进程数量
// 进程池
class ProcessPool
{
public:ProcessPool(int processnum) : _processnum(processnum) {}~ProcessPool(){for (int i = 0; i < _processnum; i++) {delete fifo0[i];delete fifo1[i];}// 要释放所有的子进程for (int i = 0; i<_processnum; i++) {// 通知子进程结束,通知失败的话直接杀死子进程if (kill(_processpool[i].get_process_pid(), SIGTERM) != 0) {// 杀死子进程kill(_processpool[i].get_process_pid(), SIGUSR1);}}}// 生成管道的名字void makePipeName(){pipe0.clear();pipe1.clear();for (int i = 0; i < _processnum; i++){string s0;s0 += "pipe0_" + to_string(i + 1);pipe0.push_back(s0);string s1;s1 += "pipe1_" + to_string(i + 1);pipe1.push_back(s1);}}// 创建进程池,接收一个参数的函数指针void CreateProcessPool(work_t work = worker){for (int i = 0; i < _processnum; i++){// 创建命名管道fifo0.push_back(new Fifo(pipe0[i]));fifo1.push_back(new Fifo(pipe1[i]));int id = fork();if (id == 0){// 子进程int fd0 = open(pipe0[i].c_str(), O_RDONLY);int fd1 = open(pipe1[i].c_str(), O_WRONLY);work(fd0, fd1);exit(0);}// 父进程打开管道,未发送任务int fd0 = open(pipe0[i].c_str(), O_WRONLY);int fd1 = open(pipe1[i].c_str(), O_RDONLY | O_NONBLOCK);// 设置为非阻塞模式fcntl(fd1, F_SETFL, O_NONBLOCK);_processpool.push_back({fd0, fd1, id});}}// 找到空闲进程返回进程在_processpool中的序号,没找到返回-1int getAChannal(){for (int i = 0; i < _processnum; i++){if (_processpool[i].get_isbusy() == false) {_processpool[i].set_isbusy(true);return i;}}return -1;}// 发送任务,任务就是数据void SendTask(char *data, int datasize){// 随机选中管道int luck_process = getAChannal();while (luck_process == -1) {// 看一下哪些进程是空闲的。每个进程结束后都会发送自己的pid。for (int i=0; i<_processnum; i++) {pid_t pid = 0;ssize_t bytes_read = read(_processpool[i].get_fd1(), &pid, sizeof(pid_t));if (bytes_read == -1) continue;// 哪个进程返回了数据,就认为Ta结束了。else {_processpool[i].set_isbusy(false);}}luck_process = getAChannal();}// 父进程向管道发送任务,发送任务就是向相应的管道写入数据write(_processpool[luck_process].get_fd0(), data, datasize);}static void worker(int fd0, int fd1){char buf[BUFSIZ];while (1){int read_return_value = read(fd0, buf, BUFSIZ);// 处理读取到的数据if (read_return_value > 0){cout << "my data is : " << buf << " my pid is : " << getpid() << endl;// 模拟处理读取到的数据sleep(1);}// 向消息队列传递自己的pid,表示已经完成任务pid_t pid = getpid();write(fd1, &pid, sizeof(pid_t));}}private:int _processnum;vector<string> pipe0; // 父进程写入,子进程读vector<string> pipe1; // 子进程写入,父进程读vector<Fifo *> fifo0;vector<Fifo *> fifo1;vector<Process> _processpool;
};
五、仿真
这里我们的工作函数是默认的工作函数,也就是打印传入的数据。
#include "ProcessPool.h"
#include <iostream>int main(int argc, char* argv[])
{ProcessPool* processpool = new ProcessPool(5);processpool->makePipeName();processpool->CreateProcessPool();char buf[3];for (int i=0;i<20;i++) {sprintf(buf,"%d",i+100);processpool->SendTask(buf,3);}delete processpool;return 0;
}
看一下仿真结果