文章目录
- 理解进程通信
- 管道理解
- 使用管道
- 1.创建管道
- 2.fork创建子进程
- 3.构建单向通信信道
- 子进程构建通信
- 父进程构建通信
- 使用管道的完整版代码
- 扩展
- Task.hpp
- ProcessPool.cc
理解进程通信
进程运行具有独立性—>进程想通信,难度其实是比较大的---->进程通信的本质:先让不同的进程看到同一份资源(内存空间)----->而这个内存空间不能隶属与任何一个进程,更改强调共享
为什么要进程间通信:
为了达到交互数据、控制、通知等目标
进程通信不是目的,而是手段
进程间通信目的
数据传输
:一个进程需要将它的数据发送给另一个进程资源共享
:多个进程之间共享同样的资源。通知事件
:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程)。进程控制
:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道它的状态改变。
进程间通信发展
- 管道---->Linux原生提供的通信方式
- System V进程间通信-----多进程------单机通信
- POSIX进程间通信----->多线程----->网络通信
他们都有一定的标准,标准在使用者看来,更多是接口上具有一定规律性
管道理解
什么是管道?
感性理解:
管道有出口,有入口
天然气管道,石油管道,自来水管道…
只能单向通信
传输的都是资源
计算机管道也是传输的资源是数据!
什么是管道?
管道是Unix中最古老的进程间通信的形式。
我们把从一个进程连接到另一个进程的一个数据流称为一个“管道”
计算机通信领域的设计者,设计了一种单向通信的方式—管道
管道通信背后是进程之间通过管道进行通信。
1.分别一读写方式打开一个文件
2.fork创建子进程
3.双方各自关闭自己不需要的文件描述符
假设我们只要父进程写入,子进程读取
管道通信是纯内存形的通信方式,不要把数据写到磁盘
以前的指令
who | wc -l
其中|就是管道
使用管道
如何做到让不同的进程,看到同一份资源呢?-----fork让子进程继承的----能够让具有血缘关系的进程进行进程间通信—常用于父子进程
管道通信的步骤
1.创建管道
接口
其中参数pipefd是什么?
pipefd是输出型参数,我们调用pipe把管道的读写两端fd放进pipefd数组中。
开始创建pipe管道。
那么pipe的返回值,是什么呢?
因为错误会放回-1,所以我们直接进行断言,管道都没创建成功我们也没有必要通信。
2.fork创建子进程
3.构建单向通信信道
子进程读取,父进程写入
子进程保留读端关闭写端
父进程保留写端关闭读端
子进程构建通信
父进程构建通信
使用管道的完整版代码
#include <iostream>
#include <string>
#include <vector>
#include <cstdio>
#include <cstring>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <assert.h>int main()
{//1.创建管道int pipefd[2] = {0};//pipefd[0]是读端,pipefd[1]是写端int n = pipe(pipefd);assert(n != -1);(void)n;//因为后面不使用,所以直接转成(void)以免报错//2.创建子进程int id = fork();//3.构建单向通信信道,父进程写入子进程读取if(id == -1){//出错perror("create child failed:");}else if(id == 0){//子进程 读取//关闭写端close(pipefd[1]);ssize_t n = 0;//定义数据换出去char buffer[1024];//读取数据while(true){//读取数据n = read(pipefd[0],buffer,sizeof(buffer));//read返回值差错初六if(n > 0){//第n为置为'\0'因为系统不带'\0'buffer[n] = 0;//输出获取到的数据std::cout << "child get a message["<<getpid()<<"] Father# " << buffer << std::endl;}else if(n == 0){//Father写端进程退出不再写数据,child读端进程也退出不再读取数据std::cout << "Father quit,I quit too!" << std::endl;break;}}close(pipefd[0]);exit(0);}else{//父进程 写入//关闭读端close(pipefd[0]);//要发送的数据std::string message = "I am father,I am writting message from you";int count = 0;char buffer[1024];while(true){//构建一个能发生变化的字符串snprintf(buffer,sizeof(buffer),"%s[%d] : %d",message.c_str(),getpid(),count++);//发送数据write(pipefd[1],buffer,strlen(buffer));//故意发送数据慢点以便观察sleep(2);}close(pipefd[1]);//进程等待pid_t ret = waitpid(id,nullptr,0);std::cout << "id: " <<id << "ret: "<<ret << std::endl;assert(ret > 0);(void)ret;}return 0;
}
为什么前面我们不定义全局的buffer来进行通信呢?
因为有写时拷贝的存在,无法更改通信。
扩展
1.管道是用来进行具有血缘关系的进程之间进行通信(常用于父子进程),管道子进程读取数据会等父进程写入后再读,否则就不会读。
显示器是一个文件,父子进程同时往显示器写入的时候,有没有说一个进程会等待另一个进程的情况?没有!----缺乏访问控制。
管道文件—读取具有访问控制。
2.管道具有通过让进程间协同的能力,因为提供了访问控制。
3.管道提供面向流式服务----面向字节流
管道写的一方fd关闭,读取的一方read会返回0表示读到了文件结尾。
4.管道是基于文件的,文件的什么周期是随进程的,管道的什么周期也随进程。
管道特点
只能用于具有共同祖先的进程(具有亲缘关系的进程)之间进行通信;
通常,一个管道由一个进程创建,然后该进程调用fork,此后父、子进程之间就可应用该管道。- 管道提供流式服务
一般而言,进程退出,管道释放,所以管道的生命周期随进程
一般而言,内核会对管道操作进行同步与互斥
管道是半双工的,数据只能向一个方向流动;需要双方通信时,需要建立起两个管道
管道是单向通信就是半双工通信的一种特殊情况,既可以读内容又可以写内容,但是写内容和读内容不能同时进行。
全双工:既可以读内容又可以写内容,读和写可以同时进行。
特殊情况:
1.读快写慢,管道没有数据的时候,读的一端必须等待写的一端。
2.写块读慢,管道写满了就不能再写了。
3.写端关闭,读端读取到0,表示读到文件结尾
4.读端关闭,写端继续写OS会自动终止进程
管道的作用?
父进程可以给子进程派发任务,也称进程池
Task.hpp
任务代码
#pragma once
#include <iostream>
#include <vector>
#include <string>
#include <unordered_map>
#include <functional>
#include <sys/types.h>
#include <unistd.h>typedef std::function<void()> func;std::vector<func> callbacks;
std::unordered_map<int,std::string> desc;void readMySQL()
{std::cout << "sub process["<<getpid()<<"] execute read MySQL task\n" << std::endl;
}void execuleUrl()
{std::cout << "sub process["<<getpid()<<"] execute execuleUrl task\n" << std::endl;
}void encryption()
{std::cout << "sub process["<<getpid()<<"] execute encryption task\n" << std::endl;
}void persistence()
{std::cout << "sub process["<<getpid()<<"] execute persistence task\n" << std::endl;
}void Load()
{desc.insert({callbacks.size(),"readMySQL"});callbacks.push_back(readMySQL);desc.insert({callbacks.size(),"execuleUrl"});callbacks.push_back(execuleUrl);desc.insert({callbacks.size(),"encryption"});callbacks.push_back(encryption);desc.insert({callbacks.size(),"persistence"});callbacks.push_back(persistence);
}void showHandler()
{for(const auto &iter : desc){std::cout << iter.first << "\t" << iter.second << std::endl;}
}int handlerSize()
{return callbacks.size();
}
ProcessPool.cc
#include <iostream>
#include <vector>
#include <unordered_map>
#include <cstdio>
#include <cstring>
#include <cassert>
#include <ctime>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <fcntl.h>
#include "Task.hpp"int waitcommand(int waitfd,bool &quit)//阻塞式等待
{uint32_t command = 0;//读取指令ssize_t n = read(waitfd,&command,sizeof(command));if(n == 0){quit = true;return -1;}assert(n == sizeof(uint32_t));return command;
}
void sendAndWakeup(pid_t who,int fd,uint32_t command)
{//发送指令write(fd,&command,sizeof(command));std::cout << "main process: call process " << who << " execute " << desc[command] << " through " << fd << std::endl;}#define PROCESS_NUM 5
int main()
{//加载任务Load();// 存储pid : pipefd的映射std::vector<std::pair<pid_t, int>> slots;// 创建子进程for (int i = 0; i < PROCESS_NUM; i++){// 创建管道int pipefd[2] = {0};int n = pipe(pipefd);assert(n == 0);//创建子进程int id = fork();if (id < 0){perror("create chlid failed");}else if (id == 0){// 子进程close(pipefd[1]);while (true){bool quit = false;//读取任务指令int command = waitcommand(pipefd[0], quit);if (quit)break;//执行任务if (command >= 0 && command < callbacks.size()){callbacks[command]();}else{std::cout << "the command unlawful" << std::endl;}}exit(1);}else{// 父进程,关闭读端close(pipefd[0]);//将每个进程的pid和pipefd存入pair<pid_t,int>数组中slots.push_back(std::pair<pid_t, int>(id, pipefd[1]));}}// 父进程派发任务srand((unsigned long)time(nullptr) ^ getpid() ^ 23323123123L); // 让数据源更随机while (true){int select = 0;int command = 0;printf("*****************************************\n");printf("**1.show handlers 2.send command**\n");printf("*****************************************\n");std::cout << "Please Enter Select:";std::cin >> select;if (select == 1){showHandler();}else if (select == 2){std::cout << "Please Enter command:";//选择任务std::cin >> command;//随机选择进程int choice = rand() % slots.size();//发送任务函数sendAndWakeup(slots[choice].first,slots[choice].second,command);}else{std::cout << "the select unlawful" << std::endl;}}//关闭所有子进程的fdfor(const auto& slot : slots){close(slot.second);}//等待子进程退出,回收子进程所有信息for(const auto& slot : slots){waitpid(slot.first,nullptr,0);}return 0;
}