文章目录
- 进程池
- 实现进程池
- 创建信道和进程
- 发送任务
- 释放资源
- 进程池代码
- 总结
本篇的主题是借助前面所学的基础管道实现一个进程池,那么在实现进程池前先了解进程池是什么,进程池有什么意义,进而对于进程池有一个基本的把握
进程池
给定一个进程,为这个进程创建多个管道,并且对于每一个管道都设置一个子进程,当父进程发送消息,子进程就能获取到消息的内容,而父进程的存在价值就是控制子进程,并且给子进程布置服务等等,而这个内容就叫做进程池
在谈到进程池前,要先提及的是池化技术,池化技术是很重要的技术,在STL容器中也存在诸如空间配置器这样的内容,这种内容诞生的原因就是因为,在调用系统中的资源是需要成本的,例如有空间资源,时间资源等等,而如果不断的申请这些资源就会耗费多余的资源,于是诞生了池化技术,提前申请一块大的空间,由申请人自己来管理,这个就是基本的池化技术
在STL的空间配置器中就有这样的内容,利用空间配置器可以提前申请一部分内容,并且基于这个空间进行自我管理,随时进行申请,就能减少从系统中申请资源带来的额外的成本,而进程池也是基于这样的原理,系统调用也是有成本的,就连最基本的函数跳转都有成本,所以才出现了内联函数来帮助提高效率,更别说对于这样大型的系统调用的接口,必然是有成本的,因此借助池化技术来完成工作时有很大的必要的
实现进程池
对于本篇实现的这个进程池来说,功能也是比较简单的:
- 创建信道和进程
- 发送任务
- 回收资源
创建信道和进程
要实现进程池,就要先清楚进程池整体的结构是什么,对于进程池来说,首先要有进程,有管道,这都进程池创建的必要组成部分,那么在创建进程池的阶段就要实现这些内容,所以要先创建并定义管道
// 定义要创建的子进程个数
const int num = 5;int main()
{for (int i = 0; i < num; i++){// 创建管道int pipefd[2];int n = pipe(pipefd);assert(n == 0);// 创建子进程pid_t id = fork();if (id == 0){// 子进程负责读close(pipefd[1]);// do something...cout << "this is child" << endl;exit(0);}// 父进程负责写close(pipefd[0]);cout << "this is father" << endl;}return 0;
}
但是这样写固然是有问题的,问题点在于,父进程创建的子进程的数据会丢失,不利于进行管理,那么在管理这样的数据之前,首先要对这些内容进行描述,所以要创建对应的结构体进行描述,要管理的这些数据包括有,父进程管理的子进程的pid是多少,父进程和这个子进程传输的文件描述符下标是多少,最好还能用名字来管理,基于这些信息就能创建对应的描述结构体
class channel
{
public:channel(int fd, pid_t id) : ctrlfd(fd), workerid(id){name = "channel-" + to_string(number++);}public:int ctrlfd;pid_t workerid;string name;
};
因此基于这个内容,就创建好了描述的内容,而可以创建一张表,用来描述存储信息的情况,这样父进程对于子进程的管理就转换成了对于顺序表的管理,这样就能对于数据做一个很好的管理工作,再对创建过程做出一个具体的封装,就能封装出下面的代码:
void Create(vector<channel> *c)
{for (int i = 0; i < num; i++){// 创建管道int pipefd[2];int n = pipe(pipefd);assert(n == 0);// 创建子进程pid_t id = fork();if (id == 0){// 子进程负责读close(pipefd[1]);// do something...cout << "this is child" << endl;exit(0);}// 父进程负责写close(pipefd[0]);c->push_back(channel(pipefd[1], id));}
}int main()
{vector<channel> channels;Create(&channels);return 0;
}
到此,封装的第一步就完成了,对象已经被创建完毕了
发送任务
对于任务这个版块,本篇的实现方式采用的是模拟调用的方式,给定三种调用的方式,然后利用随机调用的方式对这三种方式进行调用,用来模拟父进程给子进程派发任务这样的一个过程,具体的实现如下:
#pragma once
#include <iostream>
#include <functional>
#include <vector>
#include <ctime>
#include <unistd.h>
using namespace std;typedef function<void()> task_t;void Download()
{cout << "this is download" << endl;
}void PrintLog()
{cout << "this is PrintLog" << endl;
}void PushVideoStream()
{cout << "this is pushvideo" << endl;
}class Init
{
public:// 任务码const static int g_download_code = 0;const static int g_printlog_code = 1;const static int g_push_videostream_code = 2;// 任务集合vector<task_t> tasks;public:Init(){tasks.push_back(Download);tasks.push_back(PrintLog);tasks.push_back(PushVideoStream);srand(time(nullptr));}bool CheckSafe(int code){if (code >= 0 && code < tasks.size())return true;elsereturn false;}void RunTask(int code){return tasks[code]();}int SelectTask(){return rand() % tasks.size();}string ToDesc(int code){switch (code){case g_download_code:return "Download";case g_printlog_code:return "PrintLog";case g_push_videostream_code:return "PushVideoStream";default:return "Unknow";}}
};Init init;
将任务整合到函数体实现的对应位置,如下所示:
// 发送任务
void SendCommand(const vector<channel> &c, int count)
{int pos = 0;while (count--){// 1. 选择任务int command = init.SelectTask();// 2. 选择信道(进程)const auto &channel = c[pos++];pos %= c.size();// 3. 发送任务write(channel.ctrlfd, &command, sizeof(command));sleep(1);}cout << "SendCommand done" << endl;
}
释放资源
对于释放资源来讲,主体思路主要是将管道对应的读端和写端都关闭,这样就可以使得管道得以释放了,于是有初步的实现设计
void ReleaseChannels(vector<channel> c)
{for (const auto &channel : c){pid_t rid = waitpid(channel.workerid, nullptr, 0);if (rid == channel.workerid){std::cout << "wait child: " << channel.workerid << " success" << std::endl;}}
}
从表象上看,这样的释放过程是没有问题的,但是从深层次的角度来讲,代码中其实隐藏着一些问题,用下图来表示
下图所示的是在理想状态下,在操作系统内部应该形成的状态体系结构,但是实际上,这样的图是有问题的,在父进程创建子进程的时候,子进程会继承的还有父进程的文件描述符表,正是依据这个原理才能设计出管道这样的结构,但是在第二个子进程继承父进程时,也会继承一份指向,所以管道1的引用计数实际上是比想象的要多一个指针的
下图所示的是操作系统内部真正的示意图
基于这样的原因,在进行设计的时候,可以记录之前已经打开的文件描述符,在后续创建子进程的时候刻意的将这些文件描述符都关掉,这样就可以解决问题
进程池代码
// task.hpp
#pragma once
#include <iostream>
#include <functional>
#include <vector>
#include <ctime>
#include <unistd.h>
using namespace std;typedef function<void()> task_t;void Download()
{cout << "this is download" << endl;
}void PrintLog()
{cout << "this is PrintLog" << endl;
}void PushVideoStream()
{cout << "this is pushvideo" << endl;
}class Init
{
public:// 任务码const static int g_download_code = 0;const static int g_printlog_code = 1;const static int g_push_videostream_code = 2;// 任务集合vector<task_t> tasks;public:Init(){tasks.push_back(Download);tasks.push_back(PrintLog);tasks.push_back(PushVideoStream);srand(time(nullptr));}bool CheckSafe(int code){if (code >= 0 && code < tasks.size())return true;elsereturn false;}void RunTask(int code){return tasks[code]();}int SelectTask(){return rand() % tasks.size();}string ToDesc(int code){switch (code){case g_download_code:return "Download";case g_printlog_code:return "PrintLog";case g_push_videostream_code:return "PushVideoStream";default:return "Unknow";}}
};Init init;
// processpool.cc
#include <iostream>
#include <string>
#include <vector>
#include <cassert>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include "task.hpp"
using namespace std;// 定义要创建的子进程个数
const int num = 5;
static int number = 1;class channel
{
public:channel(int fd, pid_t id) : ctrlfd(fd), workerid(id){name = "channel-" + to_string(number++);}public:int ctrlfd;pid_t workerid;string name;
};void Work()
{while (true){int code = 0;ssize_t n = read(0, &code, sizeof(code));if (n == sizeof(code)){if (!init.CheckSafe(code))continue;init.RunTask(code);}else if (n == 0){break;}else{// do nothing}}cout << "child quit" << endl;
}void Create(vector<channel> *c)
{vector<int> old;for (int i = 0; i < num; i++){// 1. 定义并创建管道int pipefd[2];int n = pipe(pipefd);assert(n == 0);(void)n;// 2. 创建进程pid_t id = fork();assert(id != -1);// 3. 构建单向通信信道if (id == 0) // child{if (!old.empty()){for (auto fd : old){close(fd);}}close(pipefd[1]);dup2(pipefd[0], 0);Work();exit(0); // 会自动关闭自己打开的所有的fd}// fatherclose(pipefd[0]);c->push_back(channel(pipefd[1], id));old.push_back(pipefd[1]);// childid, pipefd[1]}
}// 发送任务
void SendCommand(const vector<channel> &c, int count)
{int pos = 0;while (count--){// 1. 选择任务int command = init.SelectTask();// 2. 选择信道(进程)const auto &channel = c[pos++];pos %= c.size();// 3. 发送任务write(channel.ctrlfd, &command, sizeof(command));sleep(1);}cout << "SendCommand done" << endl;
}// 回收资源
void ReleaseChannels(std::vector<channel> c)
{int num = c.size() - 1;for (; num >= 0; num--){close(c[num].ctrlfd);waitpid(c[num].workerid, nullptr, 0);}
}int main()
{vector<channel> channels;// 1. 创建进程Create(&channels);// 2. 发送任务SendCommand(channels, 10);// 3. 回收资源ReleaseChannels(channels);return 0;
}
运行结果如下所示
总结
对于进程池来说,就是预先创建一批进程,然后让master进程直接向目标进程排放任务,这个进程被预先创建好之后,有任务就处理,没有任务就进行阻塞