什么是进程池?
进程池(Process Pool)是一种用于管理和复用多个进程的技术或设计模式。在进程池中,一定数量的进程会被预先创建并保持在内存中,以便在需要时立即使用,而不是每次需要进程时都重新创建新的进程,这样可以提高系统的性能和效率。
进程池通常用于需要频繁创建和销毁进程的场景,例如网络服务器等。通过预先创建一些进程并保持它们处于空闲状态,可以避免频繁创建和销毁进程所带来的开销,并且可以更好地控制同时进行的进程数量,以避免系统资源被耗尽。
一般来说,进程池包括以下几个基本组件:
1. 进程池管理器(Process Pool Manager):负责创建、管理和维护进程池中的进程,包括池中进程的初始化、分配和回收等操作。
2. 进程队列(Process Queue):用于存放空闲进程的队列,当有任务需要处理时,可以从队列中取出一个空闲进程进行任务处理。
3. 任务队列(Task Queue):用于存放需要处理的任务,当一个进程空闲时,可以从任务队列中取出一个任务进行处理。
4. 进程间通信机制:用于进程之间的通信,例如管道、共享内存、消息队列等。通过合理设计和使用进程池,可以提高系统的并发处理能力,降低系统资源消耗,同时也便于监控和管理进程。
以下是我们的简易进程池的框架。
因此在本次项目中,在面向对象思想的指导下,我们需要创建一个进程池,管理进程的相关操作。
思路
我们在写之前,首先需要明确项目的功能是什么?都需要实现哪些模块?
将大框架搭建好之后,逐步填充细节。
首先我们明确需要实现的功能是:一个父进程开辟进程池中的多个子进程,然后向子进程发送任务信息,由子进程执行任务。
实现的模块:进程池(包含子进程的创建,子进程的执行任务板块,子进程的销毁),任务模块,父进程控制块。
我们可以将进程封装为一个小类,再用进程池封装进程的类。利用匿名管道的特性实现父子进程间通信。
需要注意进程与任务间的负载均衡。
一个超级大Bug
我们知道,子进程是会继承父进程的文件信息列表的,因此当父进程以写端打开管道,其后创建的子进程将会继承当前父进程的所有wfd与rfd,但由于父进程rfd个数为0,但wfd会叠加,因此最后一个子进程将会继承前面父进程的所有wfd。也就是说,后面创建的进程,会保存前面创建的管道的写文件描述符。 因此倘若我们按从前往后的顺序关闭父进程写端同时进行wait等待,是没有结果的。
我们的解决方法是每创建一个子进程,都会关闭其从父进程那里继承来的所有写文件描述符。
当然也有别的办法,1.从后往前关闭管道,最后的管道只有父进程一个写端。
2. 将所有的写端全部结束之后再进行wait等待。
源码
task.hpp
任务模块,其内包含任务列表,与工作过程
#pragma once
#include<iostream>
#include<unistd.h>
using namespace std;typedef void (*work_t)(int);//函数指针类型
typedef void(*task_t)();void task1()
{cout<<"i'm task11111, hello people! from sub process: "<<getpid()<<endl;
}void task2()
{cout<<"i'm task22222, hello people! from sub process: "<<getpid()<<endl;
}void task3()
{cout<<"i'm task33333, hello people! from sub process: "<<getpid()<<endl;
}//任务的函数指针数组,存储任务列表
task_t taskarray[]={task1,task2,task3};//寻找下一个派发的任务
int NextTask()
{//随机抽取任务return rand()%3;
}//工作过程
void worker(int rfd)
{while(true){sleep(1);int taskcode=0;//接收任务码int n=read(rfd,&taskcode,sizeof(taskcode));//当可以读取到任务信息执行任务if(n==sizeof(taskcode)){//执行任务taskarray[taskcode]();cout<<"task success excute... processid: "<<getpid()<<endl<<endl;}else//读取不到任务信息即退出{cout<<"no task can excute,exit... processid: "<<getpid()<<endl<<endl;break;}}}
processpool.cc
完成进程池的创建,销毁与回收等待,同时保证任务的正确执行与退出。
#include<iostream>
#include<string>
#include<vector>
#include<sys/types.h>
#include<sys/wait.h>
#include<unistd.h>
#include"task.hpp"
using namespace std;//管理管道属性
class channel
{
public:channel(size_t wfd,size_t pid,string name):_wfd(wfd),_process_id(pid),_channel_name(name){}size_t wfd(){return _wfd;}size_t pid(){return _process_id;}string name(){return _channel_name;}void Close(){close(_wfd);}~channel(){}private:size_t _wfd;size_t _process_id;string _channel_name;
};//管理进程池
class processpool
{
public:processpool(int sub_process_num):_sub_process_num(sub_process_num){}//创建进程池void CreatProcessPool(work_t worker){for(int i=0;i<_sub_process_num;i++)//创建管道与进程{int pipefd[2];pipe(pipefd);pid_t pid=fork();vector<int> fds;//存放管道中除却父进程以外的写端,并在子进程中一一进行释放if(pid==0)//子进程读{close(pipefd[1]);for(int i=0;i<fds.size();i++){close(fds[i]);}//子进程接收父进程发送的任务并完成任务worker(pipefd[0]);exit(0);}//父进程为写端close(pipefd[0]);string name="channel-";name+=to_string(pid);_channels.push_back(channel(pipefd[1],pid,name));fds.push_back(pipefd[1]);//将父进程的wfd进行插入,当下一个子进程创建后会继承该文件信息//因此在子进程中需要关闭继承到的写端,以免管道出现多个写端的状况//多个写端造成后果,当父进程终止写入,管道仍旧有多个管道//父进程发送任务给子进程}}void PrintDebug()//打印进程池中进程相关信息{for(auto &e: _channels){cout<<"_wfd: "<<e.wfd()<<"\t";cout<<"_process_pid: "<<e.pid()<<"\t";cout<<"_channel_name: "<<e.name()<<"\t";cout<<endl;}}//寻找下一个分配任务的子进程int NextChannel(){static int cnt=0;int ret=cnt%_channels.size();cnt++;return ret;}//发送任务信息码给子进程void SendTaskMessage(int index,int taskcode){cout<<"taskcode:"<<taskcode<<" channel id: "<<_channels[index].pid()<<endl;int n=write(_channels[index].wfd(),&taskcode,sizeof(taskcode));}//杀死进程池中所有子进程void KillAll(){for(int i=0;i<_channels.size();i++){_channels[i].Close();}}//对所有子进程进行回收void Wait(){for(int i=0;i<_channels.size();i++){pid_t pid=_channels[i].pid();int ret=waitpid(pid,nullptr,0);if(ret=pid)cout<<"sub process already recyle success... processid: "<<pid<<endl;elsecout<<"sub process already recyle fail fail fail!!! processid: "<<pid<<endl;}}
private:int _sub_process_num;vector<channel> _channels;
};//控制进程池执行任务
void CtrlProcessPool(processpool Processpool,int cnt)
{while(cnt-->0){//挑选进程int index=Processpool.NextChannel();//挑选任务int taskcode=NextTask();//发送任务给进程sleep(1);cout<<"第"<<cnt<<"个任务"<<endl;Processpool.SendTaskMessage(index,taskcode);}
}
int main(int argc,char* argv[])
{if(argc!=2)//规范启动法则{cout<<"Please Re-Enter! Please enter subprocess numbers!"<<endl;return -1;}//启动成功int subprocess_num=stoi(argv[1]);//创建进程池processpool Processpool(subprocess_num);Processpool.CreatProcessPool(worker);//Processpool.PrintDebug();//控制子进程//挑选进程与任务,并将任务发送给对应进程CtrlProcessPool(Processpool,7);//结束后回收子进程//关闭写端,进而关闭子进程Processpool.KillAll();//父进程等待回收子进程Processpool.Wait();return 0;
}
代码中有详细注释。
运行结果
这里的任务与进程是整数倍的关系,因此显得比较规整。