Linux——进程池
- 池化技术
- 进程池
- 信道
- 模拟任务
- 进程退出
- 一个bug
今天我们来学习一下管道的应用——进程池。如果有没看过上一篇管道的小伙伴可以点击这里:
https://blog.csdn.net/qq_67693066/article/details/136371517
池化技术
我们首先要了解一下池化技术:
池化技术(Pooling)在计算机技术中是一种常见的设计模型,主要用于优化资源使用和提高性能。其核心理念是提前保存并维护大量资源在一个特定的“池子”中,以备不时之需以及重复使用。这样可以显著减少资源创建和销毁的开销,从而提高系统的响应速度和效率。
池化技术的主类型
线程池:线程池类似于操作系统中的缓冲区概念。它预先创建并管理一定数量的线程,这些线程在初始状态下都处于睡眠状态。当有新任务或请求到来时,线程池会唤醒一个睡眠线程来处理该任务,处理完成后线程再次进入睡眠状态。这样可以避免频繁地创建和销毁线程,从而提高性能。
内存池:内存池用于管理内存资源。由于分配和释放内存涉及到系统调用,这会导致程序从用户态切换到内核态,是一个相对耗时的操作。内存池通过预先分配一定大小的内存块并统一管理,可以显著减少内存分配和释放的开销。
数据库连接池:数据库连接池用于管理数据库连接。由于创建和关闭数据库连接是一个相对耗时的操作,数据库连接池通过预先创建并管理一定数量的数据库连接,可以显著提高数据库访问的性能。
对象池:对象池是一种常见的对象缓存手段。它预先创建并管理一定数量的对象,当需要使用对象时,直接从对象池中取出而不是重新创建。这样可以减少对象创建和销毁的开销,提高对象的访问性能。
池化技术的优点主要包括:
提高资源使用效率:通过复用已有的资源,减少了频繁创建和销毁资源的开销。
降低系统资源消耗:通过统一管理资源,可以更好地控制资源的使用,避免资源的浪费。
提高系统性能:通过减少资源创建和销毁的开销,以及优化资源的使用,可以提高系统的响应速度和性能。
然而,池化技术也需要注意一些问题,如资源的管理和维护、资源的复用策略、资源的生命周期管理等。此外,不同的池化技术需要根据具体的应用场景和需求来选择和使用。
简单一点来说,就是“未雨绸缪”,计算的池化技术就是当处理某些事务的时候,先把对应的资源先准备好,到时候可以直接上手处理事务,省下了开销资源的时间。
进程池
我们今天要做的,是写一个进程池,就是提前先创建好一批进程,等到有任务来的时候,直接可以处理任务:
我们首先把架子搭好:
#include<iostream>
#include<unistd.h>
const int num = 5;
using namespace std;
#include<cassert>int main()
{//创建多个子进程for(int i = 0; i < num; i++){//创建管道int pipefd[2];int n = pipe(pipefd);//检查是否创建管道成功assert(n == 0);//创建父子进程pid_t id = fork();if(id == 0) //子进程{//关闭写端close(pipefd[1])}//父进程//关闭读端close(pipefd[0]);}
}
信道
现在我们创建好了进程,但是有个问题,我们并不知道什么时候该往哪个进程发配任务,现在我们的主进程跟我们创建的进程没有任何的关系,这个时候,我们就要用信道:
通过信道(本质上也是一种管道),我们主进程就知道该往哪个进程发配任务了。我们可以创建一个类对它进行管理:
#include<iostream>
#include<unistd.h>
#include<cstring>
#include<vector>
const int num = 5;
static int channel_number = 1; //信道起始数量
using namespace std;
#include<cassert>class channel
{
public:channel(int fd,pid_t id):ctrlfd(fd),workid(id){name = "channel->" + to_string(channel_number++);}int ctrlfd; //读写端的fdpid_t workid; //子进程idstring name; //管道名字
};int main()
{vector<channel> channels; //信道//创建多个子进程for(int i = 0; i < num; i++){//创建管道int pipefd[2];int n = pipe(pipefd);//检查是否创建管道成功assert(n == 0);//创建父子进程pid_t id = fork();if(id == 0) //子进程{//关闭写端close(pipefd[1])exit(0);}//父进程//关闭读端close(pipefd[0]);channels.push_back(channel(pipefd[1],id)); //往信道写入}
}
然后我们把创建信道的过程抽象出来形成一个函数:
#include<iostream>
#include<unistd.h>
#include<cstring>
#include<vector>
const int num = 5;
static int channel_number = 1; //信道起始数量
using namespace std;
#include<cassert>class channel
{
public:channel(int fd,pid_t id):ctrlfd(fd),workid(id){name = "channel->" + to_string(channel_number++);}int ctrlfd; //读写端的fdpid_t workid; //子进程idstring name; //管道名字
};void CreateChannel( vector<channel> *channels)
{//创建多个子进程for(int i = 0; i < num; i++){//创建管道int pipefd[2];int n = pipe(pipefd);//检查是否创建管道成功assert(n == 0);//创建父子进程pid_t id = fork();if(id == 0) //子进程{//关闭写端close(pipefd[1])exit(0);}//父进程//关闭读端close(pipefd[0]);channels->push_back(channel(pipefd[1],id)); //往信道写入}
}int main()
{vector<channel> channels; //信道//创建信道CreateChannel(&channels);}
这里我们规范一下传参方式:
传参形式:
- 输入参数:const &
- 输出参数:*
- 输入输出参数:&
我们创建一个函数来表示子进程的工作:
void Work()
{while(true){cout<< "I am running "<< getpid() << endl;sleep(1);}
}
#include<iostream>
#include<unistd.h>
#include<cstring>
#include<vector>
const int num = 5;
static int channel_number = 1; //信道起始数量
using namespace std;
#include<cassert>class channel
{
public:channel(int fd,pid_t id):ctrlfd(fd),workid(id){name = "channel->" + to_string(channel_number++);}int ctrlfd; //读写端的fdpid_t workid; //子进程idstring name; //管道名字
};void Work()
{while(true){cout<< "I am running "<< getpid() << endl;sleep(1);}
}void CreateChannel( vector<channel> *channels) //创建信道
{//创建多个子进程for(int i = 0; i < num; i++){//创建管道int pipefd[2];int n = pipe(pipefd);//检查是否创建管道成功assert(n == 0);//创建父子进程pid_t id = fork();if(id == 0) //子进程{//关闭写端close(pipefd[1]);//子进程要完成的工作Work();exit(0);}//父进程//关闭读端close(pipefd[0]);channels->push_back(channel(pipefd[1],id)); //往信道写入}
}//测试
void PrintChannel(const vector<channel> &channels) //输入型参数
{for(auto e: channels){cout<<e.name<<", "<<e.ctrlfd<<", "<<e.workid<<endl;}
}int main()
{vector<channel> channels; //信道//创建信道CreateChannel(&channels);PrintChannel(channels);sleep(10);return 0;}
我们可以运行一下看看:
此时我们完成了第一步,建立信道。
模拟任务
现在我们建立好了信道,接下来就是接收主进程给我们的任务就可以了,可是子进程如何接收和识别任务呢?我们这里规定:传不同的数字,做不同的任务:
首先,我们这里先重定向,从标准输入读取(省略传参):
if(id == 0) //子进程{//关闭写端close(pipefd[1]);//子进程要完成的工作dup2(pipefd[0],0); //重定向,向标准输入读Work();exit(0);}
void Work()
{while(true){int code = 0; //任务代码int n = read(0,&code,sizeof(code));assert(n == sizeof(code)); //要做的任务}
}
我们可以开一个hpp文件,来模拟我们的任务:
#pragma once#include<iostream>
#include<functional>
#include<vector>
#include <ctime>
#include<unistd.h>typedef std::function<void()> task_t; //管理任务void Download()
{std::cout << "I am a Download"<< " deal with: " << getpid() << std::endl;
}void PrintLog()
{std::cout << "I am a log"<< " deal with: " << getpid() << std::endl;
}void PushVideoStream()
{std::cout << "I am a vdieo"<< " deal with: " << getpid() << std::endl;
}class Init
{public:// 任务码,领取相应的任务码,做相应的任务const static int g_download_code = 0;const static int g_printlog_code = 1;const static int g_push_videostream_code = 2;// 任务集合std::vector<task_t> tasks;
public:Init(){tasks.push_back(Download);tasks.push_back(PrintLog);tasks.push_back(PushVideoStream);srand(time(nullptr) ^ getpid());}bool CheckSafe(int code){if (code >= 0 && code < tasks.size())return true;elsereturn false;}void RunTask(int code) //运行任务{return tasks[code]();}int SelectTask() //选择任务{return rand() % tasks.size();}std::string ToDesc(int code){switch (code){case g_download_code:return "Download";case g_printlog_code:return "PrintLog";case g_push_videostream_code:return "PushVideoStream";default:return "Unknow";}}
};Init init; //创建对象
我们相应文件的变化:
#include<iostream>
#include<unistd.h>
#include<cstring>
#include<vector>
const int num = 5;
static int channel_number = 1; //信道起始数量
using namespace std;
#include<cassert>
#include"Task.hpp"class channel
{
public:channel(int fd,pid_t id):ctrlfd(fd),workid(id){name = "channel->" + to_string(channel_number++);}int ctrlfd; //读写端的fdpid_t workid; //子进程idstring name; //管道名字
};void Work()
{while(true){int code = 0; //任务代码int n = read(0,&code,sizeof(code));assert(n == sizeof(code)); //要做的任务if(!init.CheckSafe(code)) continue;init.RunTask(code);}
}void CreateChannel(vector<channel> *channels)
{//创建多个子进程for(int i = 0; i < num; i++){//创建管道int pipefd[2];int n = pipe(pipefd);//检查是否创建管道成功assert(n == 0);//创建父子进程pid_t id = fork();if(id == 0) //子进程{//关闭写端close(pipefd[1]);//子进程要完成的工作dup2(pipefd[0],0); //重定向,向标准输入读Work();exit(0);}//父进程//关闭读端close(pipefd[0]);channels->push_back(channel(pipefd[1],id)); //往信道写入}
}void PrintChannel(const vector<channel> &channels) //输入型参数
{for(auto e: channels){cout<<e.name<<", "<<e.ctrlfd<<", "<<e.workid<<endl;}
}void SendCommand(const std::vector<channel> &channels, bool flag, int num = -1)
{int pos = 0;while (true){// 1. 选择任务int command = init.SelectTask();// 2. 选择信道(进程)const auto &channel = channels[pos++];pos %= channels.size();// debugstd::cout << "send command " << init.ToDesc(command) << "[" << command << "]"<< " in "<< channel.name << " worker is : " << channel.workid << std::endl;// 3. 发送任务write(channel.ctrlfd, &command, sizeof(command));// 4. 判断是否要退出if (!flag){num--;if (num <= 0)break;}sleep(1);}std::cout << "SendCommand done..." << std::endl;
}int main()
{vector<channel> channels; //信道//创建信道CreateChannel(&channels);//PrintChannel(channels);//选择任务,选择信道const bool g_always_loop = true;SendCommand(channels, !g_always_loop, 10);//sleep(10);return 0;}
我们可以运行一下:
进程退出
其实,我们想让进程退出,就只需要关闭写端就可以了。(此时会读到0,表示已经读到了文件末尾)
所以,我们之前写的代码,要稍微修改一下:
int main()
{vector<channel> channels; //信道//创建信道CreateChannel(&channels);//PrintChannel(channels);//选择任务,选择信道const bool g_always_loop = true;SendCommand(channels, !g_always_loop, 10);//进程退出,关闭写端for(const auto &channel : channels) //关闭写端{close(channel.ctrlfd);}//sleep(10);return 0;
}
我们可以把这几行代码封装起来(顺便回收子进程):
void ReleaseChannels(vector<channel> channels)
{for (const auto &channel : channels){close(channel.ctrlfd);}//回收子进程for(const auto &channel : channels){pid_t rid = waitpid(channel.workid,nullptr,0);if(rid == channel.workid){cout<<"wait child: "<<channel.workid<<" success"<<endl;}}
}
int main()
{vector<channel> channels; //信道//创建信道CreateChannel(&channels);//PrintChannel(channels);//选择任务,选择信道const bool g_always_loop = true;SendCommand(channels, !g_always_loop, 10);//进程退出,关闭写端ReleaseChannels(channels);//sleep(10);return 0;
}
我们可以运行一下:
一个bug
其实我们之前写的创建管道的代码有一点bug:
void CreateChannel(vector<channel> *channels)
{//创建多个子进程for(int i = 0; i < num; i++){//创建管道int pipefd[2];int n = pipe(pipefd);//检查是否创建管道成功assert(n == 0);//创建父子进程pid_t id = fork();if(id == 0) //子进程{//关闭写端close(pipefd[1]);//子进程要完成的工作dup2(pipefd[0],0); //重定向,向标准输入读Work();exit(0);}//父进程//关闭读端close(pipefd[0]);channels->push_back(channel(pipefd[1],id)); //往信道写入}
}
现在我们是结束一个进程,回收一个进程,就会有问题:
void ReleaseChannels(vector<channel> channels)
{for (const auto &channel : channels){close(channel.ctrlfd);waitpid(channel.workid,nullptr,0); //关掉一个收一个}// //回收子进程// for(const auto &channel : channels)// {// pid_t rid = waitpid(channel.workid,nullptr,0);// if(rid == channel.workid)// {// cout<<"wait child: "<<channel.workid<<" success"<<endl;// }// }
}
这个时候,进程会卡死。这是为什么呢?
其实,第一次创建子进程时,是没有啥问题的:
从第二次开始,每次创建的子进程会继承上一个文件描述符表的写端:
这种情况会一直累积,只有最后一个文件只有一个写端。这样会导致我们的信道不会为空,子进程读不到0,不会退出,发生阻塞。
解决方法也很简单,第一种,我们倒着回收:
第二种,在新的子进程中关闭多余的文件描述符,我们要在创建信道那里做一点小改动:
void CreateChannel(vector<channel> *channels)
{vector<int> tmp; //临时记录,用来记录老的fd//创建多个子进程for(int i = 0; i < num; i++){//创建管道int pipefd[2];int n = pipe(pipefd);//检查是否创建管道成功assert(n == 0);//创建父子进程pid_t id = fork();if(id == 0) //子进程{if(!tmp.empty()){for(auto fd : tmp){close(fd);}PrintFd(tmp);}//关闭写端close(pipefd[1]);//子进程要完成的工作dup2(pipefd[0],0); //重定向,向标准输入读Work();exit(0);}//父进程//关闭读端close(pipefd[0]);channels->push_back(channel(pipefd[1],id)); //往信道写入tmp.push_back(pipefd[1]); //记录老的文件描述符}
}
void PrintFd(const std::vector<int> &fds) //用来打印看看关闭了哪些fd
{cout << getpid() << " close fds: ";for(auto fd : fds){cout << fd << " ";}cout << endl;
}