进程间通信
介绍
进程间通信的本质
- 进程间通信的前提,首先需要让不同的进程看到同一块“内存”
- 此“内存”一定不属于任何进程,而应该强调共享二字
进程间通信的目的
-
数据传输:一个进程需要将它的数据发送给另一个进程
-
资源共享:多个进程之间共享同样的资源。
-
通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程)。
-
进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道它的状态改变。
进程间通信的简单理解(举例)
-
看代码:
#include <iostream> #include <unistd.h> using namespace std; int main() {pid_t id=fork();if(id==0){cout<<"hello i am father!"<<endl;}else{cout<<"hello i am child"<<endl;;}return 0; }
-
运行结果:
为什么父子进程会向同一个显示器文件打印?
- 创建子进程时对应的struct files_struct也会拷贝一份给子进程,因此里面的结构体数组:struct file*fd_array[]中的内容也是一样的,数组中存放的文件指针指向的文件也是一样的,因此通信的本质看到同一个文件也就随之实现了:先让父进程打开一个文件,这样就有了一个文件描述符存放再让父进程创建子进程,这时两个进程就都指向了同一个文件,这个文件就可以作为通信渠道使父子间通信
进程间通信的分类
-
管道
匿名管道pipe
命名管道
-
System V IPC
消息队列
共享内存
信号量
-
POSIX IPC
消息队列
共享内存
信号量
互斥量
条件变量
读写锁
管道
何为管道?
-
管道是Unix中最古老的进程间通信的形式。
-
我们把从一个进程连接到另一个进程的一个数据流称为一个“管道“
who进程的运行结果通过标准输出将数据流入管道,wc -l 通过标准输入从管道内读取数据,处理后得到的结果再打到标准输出上让用户看到。
who命令查看当前服务器登录用户,wc -l 统计行数
匿名管道
仅限于父子进程间通信的管道文件,本质是双方进程一方打开写端关闭读端,另一端打开读端关闭写端,刻意营造单向流动的局面的一种管道
- 图解:
- 在文件描述符的视角看:
- 在内核角度看:
简明阐述:
- 父进程通过系统提供的接口创建管道文件,此文件是由操作系统管理的,其中的数据并不会被刷到磁盘上,纯内存级的文件,这样使得效率提高了,创建好文件后该接口会默认以只读和只写的模式打开此管道文件,这样就有了两个文件描述符,一个文件描述符是用来读文件的,一个文件描述符是用来写入文件的,此时父进程的file_struct内的fd_array数组也就有了两个描述符,随后父进程调用fork函数创建子进程,子进程此时是与父进程共享一份数据的,但是由于父进程需要关闭两个描述符的其中一个,会发生写时拷贝(注意:由于管道文件是系统创建并维护的,与两个进程是没有直接关系的,因此拷贝时并不会出现管道文件也被拷贝的情况,而是文件描述符表会被拷贝),此时子进程也拥有了管道文件的两个文件描述符,只需要关闭父进程关闭的那一个所相对的文件描述符,比如父进程关闭写端,那么子进程就关闭读端,这样就成功营造了单向流通的管道通信的局面!并且也符合不同进程看到同一份资源的条件!进程间通信也就完成了!这种要么在读,要么在写的通信方式也称为半双工通信
创建管道文件函数:pipe()
-
pipefd[0]和pipefd[1] 是成功打开文件后返回的两个文件描述符,pipefd[0]对应的是读端,pipefd[1]对应的是写端
使用pipe()完成进程间通信:提供框架,具体自行测试
-
// 半双工,要么在读要么在写 int main() {int pipefd[2] = {0};int n = pipe(pipefd);assert(n != -1);(void)n;pid_t id = fork();if (id == 0){// child:readclose(pipefd[1]); while (true){//读操作}close(pipefd[0]);exit(1);}else{// father:writeclose(pipefd[0]);while (true){//写操作}close(pipefd[1]);pid_t ret = waitpid(id, nullptr, 0);assert(ret > 0);(void)ret;}return 0; }
-
写操作示例:
// father:writeclose(pipefd[0]);char send_buffer[1024 * 8]; // 缓冲区while (true){fgets(send_buffer, sizeof send_buffer - 1, stdin);ssize_t s = write(pipefd[1], send_buffer, strlen(send_buffer));}
-
读操作示例:
// child:readclose(pipefd[1]);char buffer[1024 * 8];while (true){// sleep(5);ssize_t s = read(pipefd[0], buffer, sizeof(buffer) - 1);if (s > 0){buffer[s] = 0;if (strcmp(buffer, "quit") == 0){cout << "ready to close child" << endl;break;}cout << "copy that:[" << getpid() << "] " << buffer;}else{// 读不到东西了,写端关闭会走到这里cout << "writing quit, reading quit!" << endl;break;}
-
运行结果:
注意
- 子进程一次读的内容,可能是父进程进行无数次写入的内容,这就叫流式服务,通俗点理解就是,有一端在写的时候,另一端读端会被挂起阻塞,没有在读,等待写端完毕后,读端才会被唤醒(这个概念又称为互斥)
- 一个写端可以有多个读端,也就是父进程只有一个,而子进程有多个,但此时可以让所有的管道文件的写端文件描述符由父进程控制,而读端就由不同的子进程进行,这就是进程池
管道读写规则
- 没有数据可读时:
- O_NONBLOCK disable:read调用阻塞,即进程暂停执行,一直等到有数据来到为止。
- O_NONBLOCK enable:read调用返回-1,errno值为EAGAIN。
- 当管道满时
- O_NONBLOCK disable: write调用阻塞,直到有进程读走数据
- O_NONBLOCK enable:调用返回-1,errno值为EAGAIN
- 如果所有管道写端对应的文件描述符被关闭,则read返回0,也就是直接标志着读到文件末尾了
- 如果所有管道读端对应的文件描述符被关闭,则write操作会产生信号SIGPIPE,进而可能导致write进程退出
- 当要写入的数据量不大于PIPE_BUF时,linux将保证写入的原子性。
- 当要写入的数据量大于PIPE_BUF时,linux将不再保证写入的原子性。
- PIPE_BUF:
何为原子性?
- 简单来说,诸如管道通信的特点中,半双工通信时,要么处于读的状态,要么处于写的状态,写端在写时,那么读端就不读,一直阻塞,写端就一直写,读端读时,写端就不写,一直阻塞,读端就一直读,这种要么做,要么就不做,不存在你边读我边写的中间状态就称为原子性。
管道的特点
- 只能用于具有共同祖先的进程(具有亲缘关系的进程)之间进行通信;通常,一个管道由一个进程创建,然后该进程调用fork,此后父、子进程之间就可应用该管道。
- 管道提供流式服务
- 一般而言,进程退出,管道释放,所以管道的生命周期随进程
- 一般而言,内核会对管道操作进行同步与互斥
- 管道是半双工的,数据只能向一个方向流动;需要双方通信时,需要建立起两个管道
何为互斥与同步?
-
首先我们需要知道什么是临界资源?临界资源是一次仅允许一个进程独占使用的不可剥夺的资源,相应的,临界区就是进程访问临界资源的那段程序代码。一次仅允许一个进程在临界区中执行
-
互斥:当一个进程正在临界区中访问临界资源时,其他进程不能进入临界区
-
同步:合作的并发进程需要按先后次序执行,例如:一个进程的执行依赖于合作进程的消息或者信号,当一个进程没有得到来自于合作进程的消息或者信号时需要阻塞等待,直到消息或者信号到达后才被唤醒
-
以前面所提到的进程池为例,多个管道,但写端都是父进程,而读端是由父进程所创建的多个子进程,那么父进程向管道写进资源时,此时多个读端都会处于堵塞状态,等待父进程写完毕,这就体现了同步过程,一旦写完毕,多个子进程便会争相去读取这份临界资源,但每次最多只能有一个进程读取此时的管道数据,这就体现了互斥,当然这只是冰山一角,更深层次的还有待探讨。
再次理解管道读写规则的四种特殊情况
- 写端速度快于读端,写端写满了不能再写了,于是写端进入阻塞,等待读端唤醒读走数据(基于同步机制)
- 写端速度慢于读端,那么当管道没有数据时,读端必须进入阻塞状态等待写端(基于同步机制)
- 写端关闭,则管道内永远不会出现数据,则标志着读端读到了文件末尾,即read的返回值=0
- 读端关闭,写端如果再继续往管道内写数据,不会再被读走了,因此系统会直接发送SIGPIPE信号终止掉进程,导致写端进程退出
进程池代码举例
- processpool.cc文件:
// 进程池:父进程派发任务让多个子进程执行
#include <iostream>
#include <cstdlib>
#include <vector>
#include <unistd.h>
#include <cassert>
#include <sys/wait.h>
#include <sys/types.h>
#include <ctime>
#include "Task.hpp"#define PROCESS_NUM 5int waitCommand(int waitFd, bool &quit)
{// waiting for father's writing, now is blockingint command = 0;ssize_t s = read(waitFd, &command, sizeof(command));if (s == 0) // writing's closing{quit = true;return -1;}// promise of correct commandassert(s == sizeof(uint32_t));return command;
}
void distriAndWakeUp(pid_t id, int fd, uint32_t command)
{write(fd, &command, sizeof(command));std::cout << "main process: call procesee:[" << id << "] execute-> " << desc[command] << " through " << fd << std::endl;
}
int main()
{load();std::vector<std::pair<pid_t, int>> slots;for (int i = 0; i < PROCESS_NUM; ++i){int pipefd[2] = {0};int n = pipe(pipefd);assert(n != -1);(void)n;pid_t id = fork();assert(id != -1);if (id == 0){// exit in the process, ineffect of father// child:read// turn down writeclose(pipefd[1]);while (true){// wait commandbool quit = false;int command = waitCommand(pipefd[0], quit);if (quit)break;// coduct commandif (command >= 0 && command < handlerSize()){callbacks[command]();}else{std::cout <<"error command"<< command << std::endl;}}exit(1);}// father:writeclose(pipefd[0]);slots.push_back(std::pair<pid_t, int>(id, pipefd[1]));}// dispatch order//more randomsrand((unsigned long)time(nullptr) ^ getpid());//srand((unsigned long)time(nullptr) ^ getpid() ^ 23323123123L); // 让数据源更随机while (true){// choose a taskint command = rand() % handlerSize();// choose a processint choice = rand() % slots.size();// distribute to a pointed processdistriAndWakeUp(slots[choice].first, slots[choice].second, command);sleep(1);}// close fdfor (const auto &slot : slots){close(slot.second);}// recycle informationfor (const auto &slot : slots){waitpid(slot.first, nullptr, 0);}return 0;
}
-
Task.hpp文件:
//.hpp include func implementation #pragma once#include <iostream> #include <unistd.h> #include <unordered_map> #include <string> #include <vector> #include <functional>typedef std::function<void()> func;std::unordered_map<int, std::string> desc; std::vector<func> callbacks;void readMySQL() {std::cout << "sub process[ " << getpid() << " ]Database Access task!\n"<< std::endl; }void AnalyseURL() {std::cout << "sub process[ " << getpid() << " ]URL Analysis task!\n"<< std::endl; }void cal() {std::cout << "sub process[ " << getpid() << " ]Encryption task!\n"<< std::endl; }void save() {std::cout << "sub process[ " << getpid() << " ]Data Persistence task!\n"<< std::endl; }void load() {// load taskdesc.insert(std::make_pair(callbacks.size(), "readMySQL:Database Access task\n"));callbacks.push_back(readMySQL);desc.insert(std::make_pair(callbacks.size(), "URL Analysis task!\n"));callbacks.push_back(AnalyseURL);desc.insert(std::make_pair(callbacks.size(), "URL Analysis task!\n"));callbacks.push_back(cal);desc.insert(std::make_pair(callbacks.size(), "Data Persistence task!\n"));callbacks.push_back(save); } // Preview task void showHandler() {for (const auto &dc : desc){std::cout << dc.first << "\t" << dc.second << std::endl;} } // task number int handlerSize() {return callbacks.size(); }
命名管道
匿名管道是仅限与父子进程通信的渠道,而让没有关系的两个之间通信,可以使用命名管道。
-
命名管道是一种特殊类型的文件,又叫FIFO文件
-
这种文件不具有文件内容,但具有文件属性,也就是是实实在在存在于磁盘上的文件,但又和匿名管道一样,是内存级的文件,并且不会将数据刷到磁盘上
创建命名管道
-
命名管道可以从命令行上创建,命令行方法是使用下面这个命令:
mkfifo filename
-
可以从程序创建,相关函数:
int mkfifo(const char* filename,mode_t mode);
mode为文件的默认权限,会受到umask掩码的影响,因此在一个进程中可以将默认掩码设置为0
命名管道的打开规则
- 如果当前打开操作是为读而打开FIFO时
- O_NONBLOCK disable:阻塞直到有相应进程为写而打开该FIFO
- O_NONBLOCK enable:立刻返回成功
- 如果当前打开操作是为写而打开FIFO时、
- O_NONBLOCK disable:阻塞直到有相应进程为读而打开该FIFO
- O_NONBLOCK enable:立刻返回失败,错误码为ENXIO
- 如果当前打开操作是为读而打开FIFO时
匿名与命名管道的区别
- 匿名管道由pipe函数创建并打开。
- 命名管道由mkfifo函数创建,打开用open
- FIFO(命名管道)与pipe(匿名管道)之间唯一的区别在它们创建与打开的方式不同,一但这些工作完成之后,它们具有相同的语义。
用命名管道实现server/client间通信:
-
commu.hpp 文件代码
#pragma once#include <iostream> #include <unistd.h> #include <assert.h> #include <string> #include <string.h> #include <sys/wait.h> #include <sys/stat.h> #include <sys/types.h> #include <fcntl.h> #include "Log.hpp"#define SIZE 128 #define FIFO_MODE 0666 std::string ipcPath="./fifo.ipc";
-
Log.hpp文件代码
#pragma once#include <iostream> #include <time.h>#define DEBUG 0 #define NOTICE 1 #define WARNING 2 #define ERROR 3const std::string msg[] ={"DEBUG","NOTICE","WARNING","ERROR"};std::ostream &Log(const std::string message, int leval) {std::cout << " | " << (unsigned)time(nullptr) << " | " << msg[leval] << " | " << message << std::endl; }
-
client.cc文件代码
//open fifo -> write message to server #include "commu.hpp"int main() {//open fifoint fd=open(ipcPath.c_str(),O_WRONLY);assert(fd!=-1);//ipcstd::string buffer;while(true){std::cout<<"Please input the message :> ";std::getline(std::cin,buffer);write(fd,buffer.c_str(),buffer.size());}//close fifoclose(fd);return 0; }
-
server.cc文件代码
// make fifo -> open fifo -> read client #include "commu.hpp"void getMessage(int fd) {char buffer[SIZE];while (true){memset(buffer, 0, sizeof(buffer));ssize_t n = read(fd, buffer, sizeof(buffer) - 1);if (n > 0){if(strcmp(buffer,"quit")==0)break;std::cout << "[" << getpid() << "]"<< "client say: " << buffer << std::endl;}else if (n == 0){std::cout << "[" << getpid() << "]"<< "End of the File, client quit, server quit,too! " << std::endl;break;}else{perror("error");break;}} } int main() {// make fifoint n = mkfifo(ipcPath.c_str(), FIFO_MODE);assert(n != -1);Log("Creat fifo successfully!", DEBUG);// open fifoint fd = open(ipcPath.c_str(), O_RDONLY);assert(fd != -1);Log("Open fifo successfully!", DEBUG);// ipcint nums = 3;for (int i = 0; i < nums; ++i){pid_t id = fork();assert(id != -1);if (id == 0){// child:getMessage(fd);exit(1);}}for (int i = 0; i < nums; ++i){waitpid(-1, nullptr, 0);}// close fifoclose(fd);Log("close fifo successfully!", DEBUG);// delete fifounlink(ipcPath.c_str());Log("delete fifo successfully!", DEBUG);return 0; }
-
运行展示:
由于我设置了三个子进程同时接收,因此收到quit命令时,由于管道是临界资源,只有其中一个进程收到退出命令,其他进程依旧存在,所以需要quit三次才能将服务端退出。也作为一个验证的调试程序,可以自行根据要求修改代码。
用命名管道实现文件拷贝
-
整体代码只需要对ipc过程进行修改,因此只展示ipc部分代码:
-
server.cpp:
// ipcint fd_copy=open("test_copy.txt",O_WRONLY | O_CREAT,0666);assert(fd_copy);char msg[SIZE];ssize_t s=read(fd,msg,sizeof(msg)-1);if(s>0){write(fd_copy,msg,s);}
-
client.cpp:
//ipcchar buffer[SIZE];int fd_sorce=open("test.txt",O_RDONLY);assert(fd_sorce);while(true){ssize_t s =read(fd_sorce,buffer,sizeof(buffer)-1);if(s>0){write(fd,buffer,s);}else{//DEBUGbreak;}}
客户端运行后,服务端执行完后就立马退出了,而此时对应文件就已经拷贝完成
SystemV共享内存
除了使用管道文件让不同进程间看到同一份资源外,操作系统还专门设计有一种通信方式:System V IPC,其中System V共享内存就是我们要学习的一种临界资源。
共享内存区是最快的IPC形式。一旦这样的内存映射到共享它的进程的地址空间,这些进程间数据传递不再涉及到内核,换句话说是进程不再通过执行进入内核的系统调用来传递彼此的数据。通俗点理解,使用管道文件时,我们还需要用的系统调用接口来建立管道与使用管道,但共享内存是操作系统已经设计好的一种具有内存块和数据结构的资源,不再需要使用系统调用接口。
-
共享内存数据结构:
struct shmid_ds {struct ipc_perm shm_perm; /* operation perms */int shm_segsz; /* size of segment (bytes) */__kernel_time_t shm_atime; /* last attach time */__kernel_time_t shm_dtime; /* last detach time */__kernel_time_t shm_ctime; /* last change time */__kernel_ipc_pid_t shm_cpid; /* pid of creator */__kernel_ipc_pid_t shm_lpid; /* pid of last operator */unsigned short shm_nattch; /* no. of current attaches */unsigned short shm_unused; /* compatibility */void *shm_unused2; /* ditto - used by DIPC */void *shm_unused3; /* unused */ };
可以理解成临界资源从文件转到了内存里。
想要使用共享内存,我们需要经过以下步骤:
- 创建共享内存
- 将共享内存段链接到进程地址空间,通信的双方进程都要链接
- 通信过程
- 通信结束后,想要回收资源,首先要将共享内存段与当前进程脱离
- 脱离后回收共享内存段资源
共享内存函数
shmget函数
- 功能:创建共享内存
int shmget(key_t key,size_t size, int shmflg);
-
参数:key
key:不同进程找到相同共享内存段的键值,也就是标识共享内存段的特殊值
相当于有一扇门,叫做共享内存,而不同进程想要实现通信,就得打开这扇门,而打开这扇门的唯一密码就是key值,其中一个进程设定好key值后,并申请好共享内存空间,另一个进程想要通信,就得拥有相同的键值。键值一般通过算法来转化,我们使用ftok函数来转化获取key
-
key_t ftok(const char* pathname,int proj_id);
功能:用一个已存在的可访问的文件的路径名和一个非0的八比特位的整型通过特殊算法转化成IPC键值key
-
-
参数:size
size:共享内存大小,且大小最好为页的整数倍!页的大小:4096字节
-
参数:shmflg
shmflg:由九个权限标志构成,它们的用法和创建文件时使用的mode模式标志是一样的
- IPC_CREAT单独使用时:创建共享内存时,如果底层已经存在,则获取它,若不存在,则创建它
- IPC_EXCL单独使用时没用意义
- IPC_CREAT | IPC_EXCL:一起使用时,如果底层不存在,则创建它,若存在,则出错返回,因此一起使用时并且成功返回时,必定是全新的共享内存
-
返回值:成功返回一个非负整数,即该共享内存段的标识码,失败返回-1。类比文件成功打开时的文件描述符fd!
shmat函数
- 功能:将共享内存段连接到进程地址空间
void *shmat(int shmid,const void* shmaddr,int shmflg);
-
参数:shmid
shmid:共享内存标识,即shmget函数的返回值,旨在告诉编译器想要链接哪一块被申请的共享内存
-
参数:shmaddr
shmaddr:指定连接的地址
- shmaddr为NULL,核心自动选择一个地址
- shmaddr不为NULL且shmflg无SHM_RND标记,则以shmaddr为连接地址。
- shmaddr不为NULL且shmflg设置了SHM_RND标记,则连接的地址会自动向下调整为SHMLBA的整数倍。公式:shmaddr - (shmaddr % SHMLBA)
- shmflg=SHM_RDONLY,表示连接操作用来只读共享内存
说明:一般都为NULL,让系统自由挂接合适的位置
-
参数:shmflg
shmflg:它的两个可能取值是SHM_RND和SHM_RONLY
- SHM_RONLY:以只读方式挂接
- SHM_RND: 若设置了此选项且shmaddr不为NULL,则连接的地址会自动向下调整为SHMLBA的整数倍。公式:shmaddr - (shmaddr % SHMLBA)
- 0:默认以读写方式挂接
-
返回值:成功返回一个指针,指向共享内存的第一个节,失败返回-1
shmdt函数
-
功能:将共享内存段与当前进程脱离,又叫去关联
int shmdt(const void* shmaddr);
-
参数:shmaddr
shmaddr:由shmat所返回的指针
-
返回值:成功返回0;失败返回-1
-
注意:将共享内存段与当前进程脱离不等于删除共享内存段
shmctl函数
- 功能:用于控制共享内存
int shmctl(int shmid,int cmd.struct shmid_ds *buf);
-
参数:shmid
shmid:由shmget返回的共享内存标识码
-
参数:cmd
cmd:将要采取的动作->三个可取值
-
参数:buf
buf:指向一个保存着共享内存模式状态和访问权限的数据结构,若cmd设置为IPC_RMID即删除共享内存段时,buf设为nullptr
-
返回值:成功返回0;失败返回-1
共享内存的使用
使用共享内存进行server/client 进行ipc的大致框架
- server.cpp:
// creat shared memory -> link to shared memory -> ipc -> unlink -> delete
int main()
{// creat key for shmkey_t key = ftok(PATH_NAME, PROJ_ID);if (key == -1){exit(1);}// creat shared memoryint shmid = shmget(key, SHM_SIZE, IPC_CREAT | IPC_EXCL | MODE);if (shmid == -1){exit(1);}// link to shared memorychar *shmadd = (char *)shmat(shmid, nullptr, 0);if (shmadd == (void *)-1){exit(1);}// ipcwhile (true){//举例//Wait(fd);// printf("%s\n", shmadd);//sleep(1);//if (strcmp(shmadd, "quit") == 0)// break;}// unlinkint n = shmdt(shmadd);if (n == -1){exit(1);}// removen = shmctl(shmid, IPC_RMID, nullptr);if (n == -1){exit(1);}Closefifo(fd);return 0;
}
- client.cpp:
// get shared memory -> link to -> unlink
int main()
{// get shared memorykey_t key = ftok(PATH_NAME, PROJ_ID);if (key == -1){exit(1);}int shmid = shmget(key, SHM_SIZE, 0);if (shmid == -1){exit(1);}// link tochar *shmadd = (char *)shmat(shmid, nullptr, 0);if (shmadd == (void *)-1){exit(1);}// ipcwhile (true){//举例// ssize_t s = read(0, shmadd, SHM_SIZE - 1);// if (s > 0)// {// shmadd[s - 1] = 0;// Signal(fd);// if (strcmp(shmadd, "quit") == 0)// break;// }}Closefifo(fd);// unlinkint n = shmdt(shmadd);if (n == -1){exit(1);}return 0;
}
整体框架就是如此,具体ipc过程可根据需求测试。
命令ipcs -m 可以用来查看此时系统内被申请的共享内存的属性状态
命令ipcrm +shmid也可以用来删除共享内存,但此操作并不会去关联
共享内存解释几个结论
结论1:
-
只要是通信双方使用shm,一方直接向共享内存中写入数据,另一方,就可以立马看到,因此共享内存是所有进程间通信速度最快的!
原因:
-
共享内存进行通信时,中间的拷贝是最少的:下面为管道的拷贝次数
在此简单io中,相较于管道,若使用共享内存能减少2次拷贝
-
结论2:
- 共享内存缺乏访问控制,会带来并发问题
相比于管道文件通信方式,管道文件自带同步与互斥机制,因此能够有条不紊的进行,但由于共享内存专注于速度,少了访问控制,因此当多个进程一起看到同一份临界资源时,一旦有数据在临界资源里,这份数据将遭到哄抢,有可能会造成数据丢失或数据不一。