目录
1.代码介绍
2.channel 类
3.进程池类编写
4.主函数及其他
5. 源码
1.代码介绍
本文代码采用一主多从式(一个主进程(master)多个子进程(worker))通过管道进行通信,实现主进程分发任务,子进程完成任务,当主进程关闭管道时,子进程执行完任务后退出。
2.channel 类
创建一个channel类用于描述子进程,方便主进程与其通信。
成员变量:
int _wfd; //记录管道写描述符用于主进程发送数据
pid_t _pid; //子进程pid
string _name;
代码实现:
class Channel
{
public:Channel(int wfd, pid_t pid, const string &name) : _wfd(wfd),_pid(pid), _name(name) {}~Channel() {}int wfd() { return _wfd; }string name() { return _name; }pid_t pid() { return _pid; }void Close() { close(_wfd); }//关闭写描述符,子进程读取完毕后会退出
private:int _wfd;pid_t _pid;string _name;
};
3.进程池类编写
该类用于管理子进程,具体功能:
1.创建子进程。
2.获取子进程(轮转方式保证负载均衡)。
3.主进程发送任务码给子进程。
4.进程等待。
5.控制进程退出。
代码实现:
class ProcessPool
{
public:ProcessPool(int num) : _process_num(num) {}void createProcess(work_t work){vector<int> fds;for (int i = 0; i < _process_num; ++i){int pipefd[2]{0};pipe(pipefd);pid_t pid = fork();if (pid == 0){if(!fds.empty()){for(auto& fd:fds){//关闭之前子进程管道的写端close(fd);}}close(pipefd[1]);dup2(pipefd[0], 0);work();exit(0);//子进程执行任务完毕会退出}close(pipefd[0]);string cname = "channel-" + to_string(i);_channels.push_back(Channel(pipefd[1], pid, cname));fds.push_back(pipefd[1]);}}int NextChannel(){static unsigned int index = 0;return (index++) % _process_num;}void SendTaskCode(int index, uint32_t code){cout << "send code: " << code << " to " << _channels[index].name() << " sub prorcess id: " << _channels[index].pid() << endl;write(_channels[index].wfd(), &code, sizeof(code));}void Wait(){waitpid(-1, nullptr, 0);}void KillAll(){for (auto &channel : _channels){channel.Close();}}~ProcessPool() {}private:int _process_num;vector<Channel> _channels;
};
4.主函数及其他
主进程发送任务码,通过管道发送给子进程执行对应任务。
void CtrlProcessPool(const shared_ptr<ProcessPool> &processpool_ptr, int cnt)
{while (cnt){// a. 选择一个进程和通道int channel = processpool_ptr->NextChannel();// cout << channel.name() << endl;// b. 你要选择一个任务uint32_t code = NextTask();// c. 发送任务processpool_ptr->SendTaskCode(channel, code);sleep(1);cnt--;}
}
int main(int argc, char *argv[])
{if (argc != 2){printf("\n\t usage ./processPool num\n");return 1;}int process_num = stoi(argv[1]);shared_ptr<ProcessPool> process_ptr = make_shared<ProcessPool>(process_num);process_ptr->createProcess(worker);CtrlProcessPool(process_ptr, 5);process_ptr->KillAll();process_ptr->Wait();return 0;
}
模拟任务:
#pragma once#include <iostream>
#include <unistd.h>
#include <functional>
using namespace std;using work_t = function<void()>;
using task_t = function<void()>;void PrintLog()
{cout << "printf log task" << endl;
}void ReloadConf()
{cout << "reload conf task" << endl;
}void ConnectMysql()
{cout << "connect mysql task" << endl;
}task_t tasks[3] = {PrintLog, ReloadConf, ConnectMysql};uint32_t NextTask()
{return rand() % 3;
}void worker()
{// 从0中读取任务即可!while(true){uint32_t command_code = 0;ssize_t n = read(0, &command_code, sizeof(command_code));if(n == sizeof(command_code)){if(command_code >= 3) continue;tasks[command_code]();}else if(n == 0) //管道写端关闭读端返回值位0{cout << "sub process: " << getpid() << " quit now..." << endl;break;}}
}
5. 源码
processPool.cc
#include <iostream>
#include <string>
#include <cstdlib>
#include <vector>
#include <unistd.h>
#include <ctime>
#include <sys/wait.h>
#include <sys/types.h>
#include <memory>
#include "task.hpp"using namespace std;
class Channel
{
public:Channel(int wfd, pid_t pid, const string &name) : _wfd(wfd),_pid(pid), _name(name) {}~Channel() {}int wfd() { return _wfd; }string name() { return _name; }pid_t pid() { return _pid; }void Close() { close(_wfd); }//关闭写描述符,子进程读取完毕后会退出
private:int _wfd;pid_t _pid;string _name;
};
class ProcessPool
{
public:ProcessPool(int num) : _process_num(num) {}void createProcess(work_t work){vector<int> fds;for (int i = 0; i < _process_num; ++i){int pipefd[2]{0};pipe(pipefd);pid_t pid = fork();if (pid == 0){if(!fds.empty()){for(auto& fd:fds){close(fd);}}close(pipefd[1]);dup2(pipefd[0], 0);work();exit(0);//子进程执行任务完毕会退出}close(pipefd[0]);string cname = "channel-" + to_string(i);_channels.push_back(Channel(pipefd[1], pid, cname));fds.push_back(pipefd[1]);}}int NextChannel(){static unsigned int index = 0;return (index++) % _process_num;}void SendTaskCode(int index, uint32_t code){cout << "send code: " << code << " to " << _channels[index].name() << " sub prorcess id: " << _channels[index].pid() << endl;write(_channels[index].wfd(), &code, sizeof(code));}void Wait(){waitpid(-1, nullptr, 0);}void KillAll(){for (auto &channel : _channels){channel.Close();}}~ProcessPool() {}private:int _process_num;vector<Channel> _channels;
};void CtrlProcessPool(const shared_ptr<ProcessPool> &processpool_ptr, int cnt)
{while (cnt){// a. 选择一个进程和通道int channel = processpool_ptr->NextChannel();// cout << channel.name() << endl;// b. 你要选择一个任务uint32_t code = NextTask();// c. 发送任务processpool_ptr->SendTaskCode(channel, code);sleep(1);cnt--;}
}
int main(int argc, char *argv[])
{if (argc != 2){printf("\n\t usage ./processPool num\n");return 1;}int process_num = stoi(argv[1]);shared_ptr<ProcessPool> process_ptr = make_shared<ProcessPool>(process_num);process_ptr->createProcess(worker);CtrlProcessPool(process_ptr, 5);process_ptr->KillAll();process_ptr->Wait();return 0;
}
task.hpp:
#pragma once#include <iostream>
#include <unistd.h>
#include <functional>
using namespace std;using work_t = function<void()>;
using task_t = function<void()>;void PrintLog()
{cout << "printf log task" << endl;
}void ReloadConf()
{cout << "reload conf task" << endl;
}void ConnectMysql()
{cout << "connect mysql task" << endl;
}task_t tasks[3] = {PrintLog, ReloadConf, ConnectMysql};uint32_t NextTask()
{return rand() % 3;
}void worker()
{// 从0中读取任务即可!while(true){uint32_t command_code = 0;ssize_t n = read(0, &command_code, sizeof(command_code));if(n == sizeof(command_code)){if(command_code >= 3) continue;tasks[command_code]();}else if(n == 0) //管道写端关闭读端返回值位0{cout << "sub process: " << getpid() << " quit now..." << endl;break;}}
}