进程通信的目的
- 数据传输:一个进程需要将它的数据发送给另一个进程
- 资源共享:多个进程之间共享同样的资源。
- 通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程)。
- 进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道它的状态改变
进程间通信的认识
进程间通信的本质是让相互通信的进程能够看到(共享)同一份资源。
首先我们要知道,我们在打开一个文件时会形成文件描述符,而0 1 2默认是标准输入,标准输出,标准错误。所以我们打开的文件就会从最小的下标开始进行文件描述符的分配。
那么如果一个进程以不同的方式打开同一个文件多次的话,那会有几个文件描述符呢???
int main()
{int fd = open("test.txt",O_CREAT|O_WRONLY,0666);int fdd = open("test.txt",O_RDONLY|O_CREAT);int fddd = open("test.txt",O_RDONLY|O_CREAT);cout<<fd<<" "<<fdd<<" "<<fddd<<endl;return 0;
}
在验证了之后不难看出,就这同一个文件打开三次就形成了三个文件描述符,只不过这三个文件描述符指向的是同一个文件,也就是资源共享一份,只不过区别是不同文件描述符指向同一个文件的读写位置不同。
进程间通信分类
匿名管道
#include <unistd.h>
功能:创建一无名管道
原型
int pipe(int fd[2]);
参数
fd:文件描述符数组,其中fd[0]表示读端, fd[1]表示写端
返回值:成功返回0,失败返回错误代码
匿名管道的原理
匿名管道其实就是父进程创建管道,也就是分别通过读写方式打开同一个文件,然后创建子进程,创建子进程的PCB,此时子进程会和父进程的资源共享,会将进程地址空间、页表都拷贝一份。而对应的文件描述符表也会进行拷贝一份(浅拷贝)。那么此时的父子进程的文件描述符表中指向的文件都是相同的。最后分别关闭对应的文件读写端就可以构成单向通信(文件描述符指向文件会采用引用计数的方式)。此时也就形成了管道。所以可以得出结论:匿名管道允许的是具有血缘关系的进程进行通信。
管道读写规则
我们通过代码的方式来理解管道读写的规则方式,采用父进程读数据,子进程写数据的方式:(红色字体是结论)
int main()
{//建立管道int tmp[2]={0};int n=pipe(tmp);if(n==-1){cout<<"管道建立失败"<<endl;exit(-1);}//创建子进程,父子进程指向同一个文件int id=fork();if(id<0){cout<<"进程创建失败"<<endl;exit(-1);}//child-wif(id==0){close(tmp[0]);//关闭读文件char str[1024];int cnt=10;while(1){ snprintf(str,sizeof(str)-1,"hello parent, I am chilld,mypid = %d,cnt = %-2d",getpid(),cnt--);//防止str被写穿,预留\0write(tmp[1],str,strlen(str));//写入到文件中(文件中是不存在\0的)if(cnt==0)break;}//exit(0);//结束if语句就算子进程退出}//parent-rif(id>0){close(tmp[1]);//关闭写文件char buffer[1024];while(1){int sz = read(tmp[0],buffer,sizeof(buffer)-1);//防止读入buffer的数据超出临时空间,预留\0//正常情况sz=0的时候,是因为管道没数据,写端口阻塞if(sz>0){buffer[sz]=0;//字符串末尾加0cout<<buffer<<" parent_id:"<<getpid()<<endl;}}pid_t rid = waitpid(id,nullptr,0);//等待子进程退出if(rid==id){cout<<"wait ok"<<endl;}}return 0;
}
分析代码内容:先创建管道,本质就是通过读写方式打开同一个文件两次,然后创建子进程,此时子进程就会拷贝父进程的文件描述符表,父子进程分别执行自己对应的代码。然后分别在父子进程中关闭关闭其不需要的操作文件方式,此时关闭并不会直接将文件给关闭,因为我们文件的指向是采用引用计数的方式的,所以关闭进程的文件,只会使引用计数--,只有减到0才会真正的关闭文件。
但是我们发现,我们的结果是按照顺序读写的,也就是子进程写完一份内容,父进程才会读取一份内容。所以就可以得出第一个结论:写端目前没有向管道中写入数据,此时读端就会等待,直到有数据。如果其次就是进程子进程cnt=0的时候,写完了以后会退出,此时父进程还在while(1)中死循环并不会退出。
此时我们更改一下代码的内容(让读端等待,写端一直写):
int main()
{//建立管道...//创建子进程,父子进程指向同一个文件...//child-wif(id==0){close(tmp[0]);//关闭读文件char str[1024];int cnt=10000;while(1){ //写入到文件中(文件中是不存在\0的)...cout<<"write------------------"<<cnt<<endl;}}//parent-rif(id>0){close(tmp[1]);//关闭写文件char buffer[1024];while(1){sleep(10);//向管道中读取数据...}//等待子进程退出...}return 0;
}
最终写到cnt=8704的时候就会停下来,此时我们知道父子进程时都没有退出的。我们其实知道,此时的管道应该是被写满了,所以就听下来了,而父进程在sleep并没有将管道的数据给读走,所以我们的进程就卡住了,如果改一下时间为sleep(1):
此时我们的父进程在等待的一秒时间内,子进程会一瞬间将缓冲区写满,而父进程 ,此时就会将数据读走(相当于清除缓冲区的数据,只不过是允许覆盖式的清除方式,并没有清除缓冲区数据),然后继续写,继续读...
所以可以得出结论:如果管道写满了,写端必须等待,直到管道有空间为止(读端读走数据)。
再更改一下代码的内容(写端写一条信息就终止,读端一直读):
int main()
{//建立管道...//创建子进程,父子进程指向同一个文件...//child-wif(id==0){close(tmp[0]);//关闭读文件char str[1024];int cnt=10000;while(1){ //写入到文件中(文件中是不存在\0的)...cout<<"write------------------"<<cnt<<endl;close(tmp[1]);//写一次就关闭break;}}//parent-rif(id>0){close(tmp[1]);//关闭写文件char buffer[1024];while(1){//向管道中读取数据...}//等待子进程退出...}return 0;
}
此时的运行结果就是:子进程写了一条消息就退出进程了,所以子进程的文件描述符表也就都释放了,而父进程读取一条消息以后就停住了,也就是一直在while死循环中。
我们此时想要的结果其实就是子进程退出,父进程回收子进程的资源,然后也退出。所以我们的父进程就需要知道子进程的退出标识。
其实我们可以通过read系统调用函数的返回值来判断子进程是否写完数据并退出了。父进程read读取管道中信息,当管道中没有数据了以后,read函数的返回值就是0,所以此时就可以设置父进程的等待了。
if(id>0){ while(1){int sz = read(tmp[0],buffer,sizeof(buffer)-1);//防止读入buffer的数据超出临时空间,预留\0//正常情况sz=0的时候,是因为管道没数据,写端口阻塞if(sz>0)...else if(sz==0){cout<<"写端关闭了文件描述符或者子进程退出"<<endl;break;}}//等待子进程...}
得出结论:写端关闭,读端一读取的话,此时读端会读到read的返回值为0,表示读到文件结尾。
再再更改一下代码的内容(读端关闭,写端一直写):
int main()
{//建立管道...//创建子进程,父子进程指向同一个文件...//child-wif(id==0){close(tmp[0]);//关闭读文件char str[1024];int cnt=10000;while(1){ //写入到文件中(文件中是不存在\0的)...cout<<"write------------------"<<cnt<<endl;}}//parent-rif(id>0){close(tmp[1]);//关闭写文件char buffer[1024];while(1){//向管道中读取数据...close(tmp[0]);cout<<"读端关闭"<<endl;break;}sleep(10);//等待子进程退出...}return 0;
}
现象:在父进程关闭读端,sleep十秒的时间内,不难发现子进程已经退出了,并且是僵尸,也就是说,父进程还没回收子进程的资源。而在等待完十秒后,发现父进程与子进程都退出,也回收资源了。
所以可以得出:读端关闭,写端一直写入的话,操作系统会直接发送信号杀掉写端进程。
模拟实现进程池
进程池:进程池(Process Pool)是一种用于并发执行多个任务的机制。它通过提前创建一组固定数量的子进程,并将任务分配给这些子进程来执行,从而实现并行处理。有了进程池,可以有效地利用计算机的多核资源,提高程序的运行效率。
我们其实想实现的功能就是父进程创建信道与子进程,并且记录好信道与父进程的写端口对应关系,然后将任务通过一个个的数字码传递给子进程,然后子进程通过任务码执行对应的任务。这样就可以并发的执行多个任务。
...task.hpp...
#pragma once#include <iostream>
#include <string>
#include <vector>
#include<ctime>
#include<cstdlib>
#include <functional>
#include <cstring>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <fcntl.h>
using namespace std;typedef function<void()> task; // using task = function<void()>;// 任务
void a()
{cout << "---我是任务a---" << endl;
}
void b()
{cout << "---我是任务b---" << endl;
}
void c()
{cout << "---我是任务c---" << endl;
}// 需要将父进程传的任务码与对应的任务对应上
vector<task> v_task{a,b,c};
int a_code = 0;
int b_code = 1;
int c_code = 2;
...test.cpp...
#include "task.hpp"#define N 5
int num = 1;
class tunnel // 将管道与对应的子进程关联
{
public:tunnel(int x, int y): child_id(x), write_fd(y){_name = "tunnel" + to_string(num++);}public:pid_t child_id;int write_fd;string _name;
};void Printf(vector<tunnel> tunnels)
{for (int i = 0; i < N; i++){cout << tunnels[i]._name << " " << tunnels[i].child_id << " " << tunnels[i].write_fd << endl;}
}
void work(int fd)
{while (1){int code;size_t n = read(fd, &code, sizeof(int));if (n == 0){cout<<"写端关闭"<<endl;break;}// 先检查code是否合法if (!(code >= 0 && code < 3)){cout << "没有该任务" << endl;}v_task[code](); // 执行任务}
}
int main()
{vector<tunnel> tunnels;// 创建管道for (int i = 0; i < N; i++){int pipe_fd[2] = {0};int n = pipe(pipe_fd);if (n == -1){cout << "管道建立失败" << endl;exit(-1);}// 创建子进程pid_t id = fork();if (id == 0) // child-r{close(pipe_fd[1]);// 子进程读取命令执行任务work(pipe_fd[0]); // 所有子进程都在持续等待父进程派发任务exit(0); //}else if (id > 0) // parent-w{close(pipe_fd[0]);// 将子进程与父进程关联tunnels.push_back(tunnel(id, pipe_fd[1]));}else // id=-1{cout << "创建进程失败" << endl;exit(-1);}}//父进程创建并管理好了子进程//按顺序的向每个子进程中发送任务srand((unsigned int)time(nullptr)*11);//生成随机任务码while (1){ for (int i = tunnels[0].write_fd; i <= tunnels[N - 1].write_fd; i++){int k = rand() % v_task.size();write(i, &k, sizeof(int));}break;}//进程等待,回收资源//for(int i=0;i<N;i++)//存在bugfor(int i=N-1;i>=0;i--) {close(tunnels[i].write_fd);waitpid(tunnels[i].child_id,nullptr,0);cout<<"等待成功,"<<"pid:"<<tunnels[i].child_id<<endl;}//Printf(tunnels);return 0;
}
但是该代码是存在隐患的:也就是在父进程回收子进程的资源的时候可能会形成等待:其实就是因为我们创建管道并且fork子进程的时候。我们首先创建管道(实质就是打开同一个文件两次,形成两个文件描述符),然后再fork创建子进程,那么子进程会拷贝文件描述符表中的内容,此时,父子进程文件描述符表中的文件指针所指向的就是同一个文件,继续fork继续拷贝...
第一次fork()创建子进程时的文件描述符关系:
第二次 fork()创建子进程时的文件描述符关系:
不知道你是否发现了问题所在,其实就是除了第一次的fork创建子进程之后,其他子进程的文件描述符表中的文件指针会指向上一次父进程中的写端文件(五角星位置标记)。所以当我们的父进程自上向下的关闭写端文件的话,是并不能关闭的。因为我们第二次foek创建子进程的时候将第一次文件描述符表中的内容都拷贝过来了。并且由于写时拷贝的原因,我们关闭文件之后只是会引用计数减减,但是并没有减到0,所以文件写端并没有关闭,因此子进程的read并不会读到0,所以子进程也没有退出,父进程就会一直等待...
所以我们就选择自下向上的关闭父进程的写端文件描述符,此时子进程的读端read函数就会返回0,从而子进程退出。所以此时该子进程的文件描述符表中指向的文件都会关闭,因此就会将上一次拷贝过来的文件描述符给清理,从而使得上一次的写端文件是一对一的方式。
命名管道
匿名管道的通信是有要求的:必须是具有亲缘关系的进程间才能进行通信,实质就是创建匿名管道,然后再fork子进程,所以就会将管道文件拷贝到子进程中。但是我们的命名管道就可以对任意进程进行通信,其本质就是创建一个有名的管道,然后任意两个进程通过打开各自读端或写端口进行数据的传输。
创建方法:
- 命名管道可以从命令行上创建,命令行方法是使用下面这个命令:
$ mkfifo filename
- 命名管道也可以从程序里创建,相关函数有:
int mkfifo(const char *filename,mode_t mode);
实现两个进程的通信:
...one.cpp...//写数据
int main()
{//打开文件int fd = open("pipe",O_WRONLY|O_TRUNC);//写信息string tmp;while(1){cout<<"请输入你要传送的信息:";getline(cin,tmp);write(fd,tmp.c_str(),tmp.size());}close(fd);return 0;
}
...two.cpp...//读取数据
int main()
{//创建命名管道int ret = mkfifo("pipe",0666);if(ret<0){cout<<"错误码:"<<errno<<" 错误信息:"<<strerror(errno)<<endl;}//读的方式打开管道int fd = open("pipe",O_RDONLY);if(fd<0){cout<<"错误码:"<<errno<<" 错误信息:"<<strerror(errno)<<endl;exit(-1);}//读取信息char buffer[1024]={0};while(1){int cnt = read(fd,buffer,sizeof(buffer)-1);//防止buffer被读穿,预留\0if(cnt>0){buffer[cnt]=0;cout<<"read接收到: "<<buffer<<endl;}else if(cnt==0){cout<<"信息传输完毕"<<endl;break;}}close(fd);return 0;
}
匿名管道与命名管道的区别
- 匿名管道由pipe函数创建并打开。
- 命名管道由mkfifo函数创建,打开用open
- FIFO(命名管道)与pipe(匿名管道)之间唯一的区别在它们创建与打开的方式不同,一旦这些工作完成之后,它们具有相同的语义。
system V IPC
System V IPC(Inter-Process Communication)也是一组用于进程间通信的机制,通常在UNIX和Linux系统中使用。它包括三种主要机制:共享内存(shared memory)、消息队列(message queue)和信号量(semaphore)。
System V 共享内存
不论是管道还是共享内存,想要进行通信始终少不了:让不同的进程能够看到同一份资源。
而 System V 共享内存 的通信机制是先在物理空间中创建一份共享资源空间,然后对于想要通信的进程而言,只需要通过页表在该进程地址空间的共享区映射好这块共享内存即可。
而物理内存上可能不仅仅只创建了一个共享内存,会有多个共享内存的话就需要被操作系统进行管理,而管理就需要描述组织。所以我们对于每个共享内存都会有结构体struct shmid_ds来存放共享内存中的信息的。
指令认识:ipcs -m 查看系统共享内存信息
ipcrm -m shmid 移除用shmid标识的共享内存段
函数认识
shmget函数//创建共享内存
功能:用来创建共享内存原型:
int shmget(key_t key, size_t size, int shmflg);
参数:
key:这个共享内存段名字,一般通过ftok函数生成
size:共享内存大小
shmflg:由九个权限标志构成,它们的用法和创建文件时使用的mode模式标志是一样的
返回值:成功返回一个非负整数(shmid),即该共享内存段的标识码;失败返回-1
shmat函数//挂接共享内存
功能:将共享内存段连接到进程地址空间原型
void *shmat(int shmid, const void *shmaddr, int shmflg);
参数
shmid: 共享内存标识
shmaddr:指定连接的地址
shmflg:它的两个可能取值是SHM_RND和SHM_RDONLY
返回值:成功返回一个指针,指向共享内存第一个节(虚拟地址);失败返回-1
shmdt函数//程序结束清空页表时相当于此
功能:将共享内存段与当前进程脱离原型
int shmdt(const void *shmaddr);
参数
shmaddr: 由shmat所返回的指针
返回值:成功返回0;失败返回-1
注意:将共享内存段与当前进程脱离不等于删除共享内存段
shmctl函数
功能:用于控制共享内存原型
int shmctl(int shmid, int cmd, struct shmid_ds *buf);//shmid_ds是共享内存的标识
参数
shmid:由shmget返回的共享内存标识码
cmd:将要采取的动作(有三个可取值)
--IPC_RMID:删除共享内存
--IPC_STAY:将shmid_ds结构体当中的数据设为共享内存中的关联值
--IPC_SET:在进程权限足够的情况下,将共享内存中的关联值设为shmid_ds结构体中给的值
buf:指向一个保存着共享内存的模式状态和访问权限的数据结构(一般设为0)
返回值:成功返回0;失败返回-1
通信
...add.h...
#pragma once#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/types.h>
#include <string>
#include <cstring>
#include <cerrno>
#include <unistd.h>
#include <iostream>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
using namespace std;const string pathname = "/home/cr/git/linux/test_2_4";
const int proj_id = 20040712;
int size = 4096;
const string path = "pipe";key_t getkey()
{key_t key = ftok(pathname.c_str(), proj_id);if (key == -1){cout << "errno = " << errno << ",strerrno = " << strerror(errno) << endl;exit(1);}return key;
}int creatshm(key_t key)
{int shmid = shmget(key, size, IPC_CREAT | IPC_EXCL | 0664); // 选项相当于没有就创建,有就返回错误(可以保证创建的是全新的共享内存)必须带有权限,否则可能导致挂接不上if (shmid == -1) // 共享内存已经创建好了{cout << "errno = " << errno << ",strerrno = " << strerror(errno) << endl;exit(1);}return shmid;
}int getshm(key_t key)
{int shmid = shmget(key, size, IPC_CREAT | 0664); // 选项相当于没有就创建,有就返回shmidif (shmid == -1) // 共享内存已经创建好了{cout << "errno = " << errno << ",strerrno = " << strerror(errno) << endl;exit(1);}return shmid;
}void *shmatway(int shmid)
{char *start = (char *)shmat(shmid, nullptr, 0); // 第二个参数表示将共享内存挂载到地址空间的指定地址处,返回虚拟地址起始位置if (start == (char *)-1){cout << "errno = " << errno << ",strerrno = " << strerror(errno) << endl;exit(1);}
}// 创建命名管道
void creat_pipe(const string sp)
{int ret = mkfifo(sp.c_str(), 0666);if (ret < 0){cout << "错误码:" << errno << " 错误信息:" << strerror(errno) << endl;}
}
...shm_creat.cpp...
#include"add.h"int main()
{//创建共享内存key_t key = getkey();//生成key值int shmid = creatshm(key);//挂接共享内存cout<<"开始将共享内存映射到进程地址空间当中"<<endl;cout<<"shmid = "<<shmid<<endl;char* start = (char*)shmatway(shmid);//printf("%p\n",start);sleep(3);//进程结束以后,页表销毁,挂接关系消失//创建管道--提供同步机制creat_pipe(path);cout<<"创建管道"<<endl;int fd = open(path.c_str(),O_RDONLY);//同步机制会等待另一方也打开管道才会继续向下执行cout<<"打开管道"<<endl;//通信(创建共享内存--先运行--读数据)while(1){int tmp;ssize_t r = read(fd, &tmp, sizeof(tmp));if(r==0){break;//写端已经关闭}cout<<"共享内存的数据:"<<start<<endl;//共享内存不提供同步机制}//取消挂接和关闭管道删除管道shmdt(start);close(fd);unlink(path.c_str());//控制(清除)共享内存cout<<"开始将共享内存从操作系统中删除"<<endl;shmctl(shmid,IPC_RMID,nullptr);}
...shm_attach.cpp...
#include "add.h"int main()
{// 获取共享内存key_t key = getkey(); // 生成key值int shmid = getshm(key);// 挂接共享内存cout << "开始将共享内存映射到进程地址空间当中" << endl;cout << "shmid = " << shmid << endl;char *start = (char *)shmatway(shmid);// printf("%p\n",start);// 管道--提供同步机制int fd = open(path.c_str(), O_WRONLY | O_TRUNC);// 通信(得到共享内存--后运行运行--写数据)char c = 'C';while (c <= 'R'){start[c - 'a'] = c;c++;// 共享内存不提供同步机制cout << "向共享内存写数据中:" << endl;int tmp = 0;write(fd, &tmp, sizeof(tmp));sleep(1);}// 取消挂接关闭文件shmdt(start);close(fd);
}
共享内存相较于管道的优点
共享内存是所有进程通信中速度最快的。因为我们创建好共享内存之后的返回值是一个指针,而我们传输数据就可以直接通过指针的方式进行传递。而对于管道而言,我们必须得调用read函数和write函数将管道中的数据写入与写出(实质就是拷贝)而且在此还会多次经过缓冲区的拷贝。
System V 消息队列
消息队列其实就是提供一个进程给另一个进程发送数据块的能力。双方通过发送数据块和读取数据块来分别进行发送信息和接收信息。
其实消息队列也是需要进行管理,所以也会有描述消息队列信息快的结构体,就和共享内存其实是一样的:
代码使用其实与共享内存大致一样:
const string pathname = "/home/cr/git/linux/test_2_24";
const int proj_id = 20040712;
int size = 4096;std::string ToHex(int id)
{char buffer[1024];snprintf(buffer, sizeof(buffer), "0x%x", id);return buffer;
}
key_t getkey()
{key_t key = ftok(pathname.c_str(), proj_id);if (key == -1){cout << "errno = " << errno << ",strerrno = " << strerror(errno) << endl;exit(1);}return key;
}
#include"add.h"int main()
{key_t key=getkey();cout<<"生成的key = "<<key<<endl;int msqid = msgget(key,IPC_CREAT|IPC_EXCL);//创建消息队列(和共享内存一样)cout<<"msqid = "<<msqid<<endl;if (msqid == -1) {cout << "errno = " << errno << ",strerrno = " << strerror(errno) << endl;exit(1);}//获取消息队列里的内容struct msqid_ds ds;msgctl(msqid,IPC_STAT,&ds);cout<<"消息队列中的key = "<<ToHex(ds.msg_perm.__key)<<endl;sleep(10);//删除消息队列msgctl(msqid,IPC_RMID,nullptr);cout<<"删除消息队列..."<<endl;}
System V 信号量
信号量是一种用于进程间同步和互斥的机制。它可以用来协调多个进程对共享资源的访问,确保同一时间只有一个进程访问该资源。本质其实就是一把计数器。
互斥:任何时刻只允许一个执行流(进程)访问公共资源,加锁完成,如共享内存。
同步:多个执行流执行的时候,按照一定的顺序执行,如管道。
被保护起来的公共资源叫做临界资源,而访问该临街资源的代码称作临界区。
信号量通常是一个整数变量,可以采用计数器的形式来表示。如果需要申请资源的话就信号量--,直到信号量的值为0的时候就说明此时已经没有多余的资源可以提供给进程,所以该进程就必须等待。如果释放资源的话就信号量++,此时就可以将释放的空闲资源提供给等待的进程使用。它有两个基本操作:P (等待)和V (发出)。
通过合理地使用信号量,可以避免多个进程同时访问共享资源而引发的问题,实现进程间的同步和互斥。