目录
一、管道
1.1 匿名管道
1.2 命名管道
二、XSI IPC
2.1 概述
2.2 消息队列
2.2.1 msgget
2.2.2 msgsnd
2.2.3 msgrcv
2.2.4 msgctl
2.2.5 代码示例
2.3 信号量数组
2.3.1 semget
2.3.2 semop
2.3.3 semctl
2.3.4 代码示例
2.3 共享内存
2.3.1 shmget
2.3.2 shmat
2.3.3 shmdt
2.3.4 shmctl
2.3.5 代码示例
一、管道
最古老的通信机制
之前我们手写了一个管道,实现了线程之间的数据交互。其实内核也给我们提供了一个管道,可用于进程间通信。内核提供的管道和我们自己写的管道具有比较类似的特点:
- 管道通信为单工通信,一端为读端,一端为写端
- 管道内部自带同步和互斥机制
1.1 匿名管道
所谓匿名管道,指的就是管道文件我们在磁盘上是看不到的。可以通过如下函数创建一个匿名管道:
man 2 pipe
#include <unistd.h>int pipe(int pipefd[2]);
功能:创建一个匿名管道
- pipefd — pipefd 是一个数组,用于返回管道两端的文件描述符。成功创建管道后,代表读端的文件描述符会被回填至 pipefd[0],代表写端的文件描述符会被回填至 pipefd[1]
- 创建成功返回 0;失败返回 -1 并设置 errno
匿名管道文件在磁盘上是看不到的,因此常用于有父子关系的进程间通信,因为父进程能够通过 fork 使子进程获得同一个匿名管道两端的文件描述符
代码示例:通过匿名管道实现父子进程通信,要求父进程负责往管道中写,子进程负责从管道中读。基本思路是先调用 pipe,后调用 fork
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>#define BUFSIZE 1024int main() {int pd[2]; // pd[0]用于存放读端,pd[1]用于存放写端if (pipe(pd) < 0){perror("pipe()");exit(1);}int pid = fork();if (pid < 0){perror("fork()");exit(1);}else if (pid == 0) // child read{close(pd[1]); // 子进程不需要写,关闭写端char buf[BUFSIZE];int len = read(pd[0], buf, BUFSIZE);write(1, buf, len); // 在终端显示读取到的字符close(pd[0]);exit(0);}else // parent write{close(pd[0]); // 父进程不需要读,关闭读端write(pd[1], "Hello!", 6);close(pd[1]);exit(0);}
}
可以看到父进程写入管道的内容被子进程读取到并成功打印了出来
1.2 命名管道
所谓命名管道,指的就是管道以文件形式存在于磁盘上。可以通过如下命令创建一个命名管道:
可以看到我们成功创建了一个命名管道,且其文件类型为 “p”。 当然,我们也可以利用如下函数创建命名管道:
man 3 mkfifo
#include <sys/types.h>
#include <sys/stat.h>int mkfifo(const char *pathname, mode_t mode);
功能:创建一个命名管道
- pathname — 指定管道文件名
- mode — 指定管道文件的权限(实际创建出管道文件的权限为 mode & ~umask)
- 创建成功返回 0,失败返回 -1 并设置 errno
因为管道需要凑齐读写双方才能正常运行,因此内核提供的命名管道有这样的一种约束:有读才能写,有写才能读
二、XSI IPC
2.1 概述
XSI IPC 可用于同一主机内没有亲缘关系的两个进程之间通信
先了解 XSI IPC 是什么意思
- XSI — X/Open System Interface,其中 X/Open 是一个制定标准的国际组织,这部分学习的内容都是这个组织制定的标准
- IPC — Inter-Process Communication,即进程间通信
可以通过如下命令查看 XSI IPC 提供的进程间通信机制
我们可以看到,有三种类型的 IPC 资源,分别是消息队列、共享内存、信号量数组。每种类型的 IPC 资源可能有多个 IPC 实例,而 XSI IPC 的核心思想就是让多个进程操作同一个 IPC 实例,通过这个 IPC 实例实现进程间通信
接下来,我们基于 XSI IPC 的核心思想,分析 XSI IPC 所提供机制背后的一些设计思路
刚刚说到 XSI IPC 的核心思想是让多个进程操作同一个 IPC 实例。因此,比较关键的点是:如何让多个进程能够正确定位到某一个特定的 IPC 实例?
首先要将不同的 IPC 实例区分开。类似身份证号一样,我们需要给每个 IPC 实例提供独一无二的标识(id),以定位到特定的 IPC 实例
通过标识区分出 IPC 实例之后,接下来的任务就是让多个进程定位到同一个 IPC 实例。换言之,就是让多个进程能够获得相同标识的同类资源(因为标识能够定位到实例)
XSI IPC 所提供的机制是:通过 key 来获取 IPC 资源的 id 值!
可以将 key 看成所有进程都能够访问的背包。每个不同的 key 值代表了不同的背包,单个背包中最多可以放一个 msqid、一个 shmid 和一个 semid
假设已经有三个背包了,每个背包中已经存放的 id 数量和值不同。通过 key 获取 IPC 资源标识的过程如下:
这样一来,我们可以像下图那样,让多个进程都能定位到相同的标识
剩下最后一个问题:某个 key 值代表所有进程都能访问的某个背包,不同 key 值代表不同背包。无论是将 id 放入背包或者是从背包中获取某资源的 id 值,都需要先获取某个背包(k 值)。那么应该如何获取呢?
XSI IPC 提供了如下调用来获取 key 值(即获取 key 为某个值的背包)
man 3 ftok
#include <sys/types.h>
#include <sys/ipc.h>key_t ftok(const char *pathname, int proj_id);
功能:获取某个 key 值(获取 key 为某个值的背包)
- pathname — 指定某个可访问文件
- proj_id — 一个整数
- 根据 pathname 所指定文件的 inode 号与 proj_id 的值,结合某种固定规则,获取并返回一个 key 值
- 失败返回 -1 并设置 errno
- The resulting value is the same for all pathnames that name the same file, when the same value of proj_id is used. The value returned should be different when the (simultaneously existing) files or the project IDs differ
2.2 消息队列
2.2.1 msgget
man msgget
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>int msgget(key_t key, int msgflg);
功能:获取消息队列实例的 id
- key — 代表某个背包,从该背包内获取消息队列实例的 id
- msgflg — 特殊要求,其含义及值如下
值 | 含义 |
---|---|
IPC_CREAT | mode | 如果 key 所指定的背包中还没存入任何消息队列实例的 id,则先创建一个权限为 mode 的消息队列,并将其 id 添加进 key 所指定的背包 |
IPC_CREAT | mode | IPC_EXCL | 强制创建一个权限为 mode 的消息队列,并将其 id 添加进 key 所指定的背包;如果 key 所指定的背包中已有某消息队列实例的 id 了,则调用失败 |
- 成功则返回获取到的消息队列实例 id(非负整数);失败返回 -1 并设置 errno
2.2.2 msgsnd
man msgop
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
功能:向某个消息队列发送数据
- msqid — 表示将数据发送到 msqid 所标识的消息队列
- msgp — 指向某个具有如下形式的结构体的实例。msgsnd 会将这个结构体实例发送到消息队列。注意结构体中 mtype 字段后的内容才是用户实际想发送的数据
struct msgbuf {long mtype; /* message type, must be > 0 */... /* message data */
};
- msgsz — msgp 所指结构体除 mtype 字段之外的大小,代表着拟发送数据的实际字节数
- msgflg — 特殊要求,详见 man,没有写 0
- 成功返回 0;失败返回 -1 并设置 errno
2.2.3 msgrcv
man msgop
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp,int msgflg);
功能:从某个消息队列中接收数据
- msqid — 表示从 msqid 所标识的消息队列中接收数据
- msgp — msgrcv 会将从消息队列接收到的结构体实例填充至 msgp 所指的地址
- msgsz — msgp 所指结构体除 mtype 字段之外的大小,代表着拟接收数据的实际字节数
- msgtyp — 消息队列中可能已经收到了多个由 msgsnd 发送的结构体,而 msgtyp 可以指定从消息队列中选择接收哪一个结构体。其值及含义如下
值 | 含义 |
---|---|
0 | 返回队列中的第一个结构体 |
>0 | 返回队列中 mtype 字段值等于 msgtyp 的第一个结构体 |
<0 | 返回队列中 mtype 字段值小于等于 msgtyp 绝对值的结构体。如果这种结构体有若干个,则取 mtype 字段值最小的那一个 |
- msgflg — 特殊要求,详见 man,没有写 0
- 成功返回接收到数据的实际字节数;失败返回 -1 并设置 errno
2.2.4 msgctl
man msgctl
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>int msgctl(int msqid, int cmd, struct msqid_ds *buf);
功能:控制某个消息队列的实例
- msqid — 标识一个待控制的消息队列
- cmd — 控制选项,详见 man
- buf — 指向一个结构体,用于储存该消息队列实例的一些属性信息
2.2.5 代码示例
通过消息队列,实现两个进程之间的通信:发送端发送数据给接收端
proto.h,规定通信协议(通信规则)
#ifndef PROTO_H__
#define PROTO_H__#define KEYPATH "/etc/passwd"
#define KEYPROJ 'g' // 表示KEYPROJ是个整型数,转化成ASCALL码后 是一个整型数
#define NAMESIZE 32// 规定传输数据的格式
struct msg_st // 规定通信双方按照如下规则解析通信数据的内容
{long mtype; // 消息类型,msgrcv可通过该字段决定接收消息队列中 的哪一个结构体实例char name[NAMESIZE]; // 姓名int math; // 数学成绩int chinese; // 语文成绩
};#endif
rcver.c,接收端,应该优先运行,负责创建消息队列并持续运行,以监听发送端发送端数据
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/msg.h>
#include <signal.h>#include "proto.h"int msqid;void myexit() {msgctl(msqid, IPC_RMID, NULL); // 移除消息队列
}void handler(int s) { // 信号处理函数exit(0);
}int main() {key_t key = ftok(KEYPATH, KEYPROJ); // 获取值为key的背包if (key == -1){perror("ftok()");exit(1);}// 接收端,先运行,先创建消息队列msqid = msgget(key, IPC_CREAT | 0600 | IPC_EXCL); // 创建消息队列实例并放入背包// IPC_EXCL确保了该调用要么创建成功, 要么调用失败if (msqid < 0){perror("msgget()");exit(1);}atexit(myexit);signal(SIGINT, handler);struct msg_st rbuf; // 用于将从消息队列中接收到的结构体储存下来while (1) { // 不断从消息队列中接收数据if (msgrcv(msqid, &rbuf, sizeof(rbuf)-sizeof(long), 0, 0) < 0){perror("msgrcv()");exit(1);}printf("NAME = %s\n", rbuf.name);printf("MATH = %d\n", rbuf.math);printf("CHINESE = %d\n", rbuf.chinese);}exit(0);}
snder.c,发送端,负责向接收端创建的消息队列发送数据
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <string.h>#include "proto.h"int main() {key_t key = ftok(KEYPATH, KEYPROJ); // 获取值为key的背包if (key == -1){perror("ftok()");exit(1);}int msqid = msgget(key, 0); // 从值为key的背包中获取消息队列实例的idif (msqid < 0) {perror("msgget()");exit(1);}struct msg_st sbuf; // 待发送到消息队列的结构体sbuf.mtype = 1;strcpy(sbuf.name, "Yangjihua");sbuf.math = rand()%100;sbuf.chinese = rand()%100;if (msgsnd(msqid,&sbuf, sizeof(sbuf)-sizeof(long), 0) < 0){perror("msgsnd()");exit(1);}puts("ok!");exit(0);
}
首先 ipcs,可以看到目前没有任何可用的 IPC 资源
调用 ./rcver 后,再次查看
可以看到创建 IPC 资源成功,创建出来的 IPC 资源是一个消息队列,该消息队列的 id 存放在 key 为 0x671087fb 的背包中,其 msqid = 3,权限为 600
接下来尝试调用 ./snder 向消息队列发送数据
成功!
2.3 信号量数组
我们在线程章节曾经接触过“信号量”的概念
那个时候介绍的信号量其实就是一个有整数值的对象。通过合理利用整数的初始值,搭配操作该值的函数,可以实现条件变量或者互斥量的功能,能够用于线程之间的同步和互斥
而其实 XSI IPC 也给我们提供了用于进程间同步和互斥的信号量数组。之所以是“信号量数组”,是因为进程与进程之间可能需要利用多个信号量才能够完成任务(比如可能有多个共享资源的访问,每个共享资源都需要一个信号量来实现互斥)
注意,所提供信号量数组中的信号量和我们之前介绍的信号量挺类似的,这意味着我们这里所介绍的信号量也是一个“包含某个整数值的对象”,这个整数值我们称之为“信号量的值”
2.3.1 semget
man 2 semget
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>int semget(key_t key, int nsems, int semflg);
功能:获取信号量数组实例的 id
- key — 代表某个背包,从该背包内获取信号量数组实例的 id
- nsems — 若创建,则创建的信号量数组包含 nsems 个信号量
- msgflg — 特殊要求,其含义及值如下
值 | 含义 |
---|---|
IPC_CREAT | mode | 如果 key 所指定的背包中还没存入任何信号量数组实例的 id,则先创建一个权限为 mode 的信号量数组,并将其 id 添加进 key 所指定的背包 |
IPC_CREAT | mode | IPC_EXCL | 强制创建一个权限为 mode 的信号量数组,并将其 id 添加进 key 所指定的背包;如果 key 所指定的背包中已有某信号量数组实例的 id 了,则调用失败 |
- 成功则返回获取到的信号量数组实例 id(非负整数);失败返回 -1 并设置 errno
2.3.2 semop
man 2 semop
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>int semop(int semid, struct sembuf *sops, size_t nsops);
功能:操作信号量数组中的信号量
- semid — 表示操作 semid 所标识信号量数组中的信号量
- sops — 指向结构体数组(的首元素),数组中的每个结构体实例都是 struct sembuf 类型,代表着对某个信号量的特定操作
- nsops — sops 指向的是多大容量的结构体数组
- 成功返回 0;失败返回 -1 并设置 errno
其中,struct sembuf 代表了对某个信号量的特定操作,其成员包括如下:
unsigned short sem_num; /* semaphore number *//* 对信号量数组中哪个下标的信号量操作 */
short sem_op; /* semaphore operation *//* 对信号量的值增加sem_op的值 */
short sem_flg; /* operation flags *//* 特殊要求 */
sem_op 的正负和 sem_flg 的设置会影响 semop 的行为。当 sem_flg 设置为 0 时,默认 sem_op 的正负含义如下:
值 | 含义 |
---|---|
sem_op > 0 | 释放相应的资源数。将 sem_op 的值加到信号量的值上 |
sem_op = 0 | 进程阻塞直到信号量的相应值为 0。当信号量的值已经为 0,函数立即返回 |
sem_op < 0 | 请求 sem_op 的绝对值的资源。如果相应的资源数可以满足请求,则将该信号量的值减去 sem_op 的绝对值,然后立即返回;否则阻塞直到相应的资源数可以满足请求 |
2.3.3 semctl
man semctl
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>int semctl(int semid, int semnum, int cmd, ...);
功能:控制某个信号量数组的实例
- semqid — 标识一个待控制的信号量数组
- semnum — 指定控制信号量数组中的哪些信号量
- cmd — 控制选项,详见 man
2.3.4 代码示例
代码示例:多进程同时操作一个文件。每个进程都会:
- 从文件中获取值
- 将值加一
- 用加一后的值替换文件中原来的值
之前是通过文件锁实现,在这里我们用信号量数组实现
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <wait.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <errno.h>#define PROCNUM 2000 //创建2000个进程
#define LINESIZE 1024#define FILENAME "./tmp"static int semid;static void P() {struct sembuf op;op.sem_num = 0; // 对数组中下标为0的信号量操作op.sem_op = -1; // 请求资源op.sem_flg = 0; // 无特殊要求,默认行为while (semop(semid, &op, 1) < 0){ if (errno != EINTR || errno != EAGAIN){perror("semop()");exit(1);}}return;
}static void V() {struct sembuf op;op.sem_num = 0;op.sem_op = 1; // 释放资源op.sem_flg = 0;if (semop(semid, &op, 1) < 0){ perror("semop()");exit(1);}
}static void func_add(void) {FILE * fp = fopen(FILENAME, "r+");if (fp == NULL){perror("fopen()");exit(1);}char linebuf[LINESIZE];P(); // 请求资源,若信号量的值为0,则阻塞fgets(linebuf, LINESIZE, fp); // 获取fseek(fp, 0, SEEK_SET); // 将文件位置指针pos定位到文件首,这样才能实现覆盖原值fprintf(fp, "%d\n", atoi(linebuf)+1); // 加一后写回去fflush(fp);V(); // 释放资源fclose(fp);
}int main() {pid_t pid;semid = semget(IPC_PRIVATE, 1, IPC_CREAT | 0600); // 获取数组大小为1的信号量数组(我们仅需一个信号量)// IPC_PRIVATE适用于具有亲缘关系的IPCif (semid < 0) {perror("semget()");exit(1);}if (semctl(semid, 0, SETVAL, 1) < 0) // 设置信号量数组中下标为0的信号量的值为1{perror("semctl()");exit(1);}for (int i = 0; i < PROCNUM; ++i) { // 父进程不断创建子进程pid = fork();if (pid < 0) // error{perror("fork()");exit(1);}else if (pid == 0) // child do sth{func_add(); // do sthexit(0);}else // parent continue to fork{continue;}}for (int i = 0; i < PROCNUM; ++i) { // 对PROCNUM个进程收尸wait(NULL);}semctl(semid, 0, IPC_RMID);exit(0);}
注意,在我们调用 semget 时,并没有先利用 ftok 获取 key 值,然后传入 semget 的第一个形参,而是直接使用了 IPC_PRIVATE,为什么能这样?
我们之前说过,key 值代表着不同的背包,这个背包是用来装 IPC 资源实例的 id 的,采用这样的机制的原因是希望不同进程能够顺利获取 id 并定位到相同的 IPC 资源实例。但是,对于有亲缘关系的进程,通过 fork 能够很容易地使子进程获取到对应相同 IPC 资源实例的 id,因此不需要通过 key 这种比较麻烦的机制来获取 IPC 资源实例的 id 了。因此,对于有亲缘关系的进程,XSI IPC 为我们提供了一个偷懒的方式 —— 将 IPC_PRIVATE 传入获取资源的首个形参
2.3 共享内存
2.3.1 shmget
man shmget
#include <sys/ipc.h>
#include <sys/shm.h>int shmget(key_t key, size_t size, int shmflg);
功能:获取共享内存实例的 id
- key — 代表某个背包,从该背包内获取共享内存实例的 id
- size — 若创建,则创建共享内存的大小为 size 四舍五入至 PAGE_SIZE 的整倍数
- msgflg — 特殊要求,IPC_CREAT 表示允许创建,需要或运算一个 mode 表示创建出的共享内存的权限
2.3.2 shmat
man shmop
#include <sys/types.h>
#include <sys/shm.h>void *shmat(int shmid, const void *shmaddr, int shmflg);
功能:将共享内存映射至进程空间中的某个地址
- shmid — 标识一块共享内存实例,表示将哪个共享内存映射进来
- shmaddr — 指定将共享内存映射进当前进程空间的哪个地址,若为 NULL 则让系统自动决定映射到哪个地址
- shmflg — 特殊要求,详见 man,没有特殊要求则填 0
- 映射成功则返回实际映射进了当前进程空间的哪个地址;失败返回 (void*)-1 并设置 errno
补充一下:不要混淆了“获取共享内存实例”和“将共享内存映射至进程空间”
2.3.3 shmdt
man shmop
#include <sys/types.h>
#include <sys/shm.h>int shmdt(const void *shmaddr);
功能:解除已映射至某个地址的共享内存
- shmaddr — 某个进程空间中的地址,有共享内存被映射至该地址
- 成功返回 0;失败返回 -1 并设置 errno
2.3.4 shmctl
man 2 shmctl
#include <sys/ipc.h>
#include <sys/shm.h>int shmctl(int shmid, int cmd, struct shmid_ds *buf);
功能:控制某个共享内存的实例
- shmid — 表示控制 shmid 所标识的共享内存实例
- cmd — 控制选项,详见 man
- buf — 指向一个结构体,用于储存该共享内存实例的一些属性信息
2.3.5 代码示例
在高级 IO 中我们曾通过 mmap 实现过共享内存,在这里我们使用 XSI IPC 提供的共享内存再实现一遍
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/wait.h>
#include <unistd.h>
#include <string.h>#define MEMSIZE 1024int main() {int shmid = shmget(IPC_PRIVATE, MEMSIZE, IPC_CREAT | 0600); // 父子进程之间通信,用IPC_PRIVATE就行了if (shmid < 0){perror("shmget()");exit(1);}int pid = fork();if (pid < 0){perror("fork()");exit(1);}else if (pid == 0) { // child writechar * ptr = shmat(shmid, NULL, 0); // 将shmid所标识的共享内存映射进来// NULL表示让操作系统自己决定映射到哪里// 0表示无特殊需求// 返回值为实际将共享内存映射到了哪里if (ptr == (void *)-1){perror("shmat()");exit(1);}strcpy(ptr, "Hello!"); // 像操作字符串一样操作这片内存shmdt(ptr); // 解除映射exit(0);}else // parent read{wait(NULL);char * ptr = shmat(shmid, NULL, 0);if (ptr == (void *)-1){perror("shmat()");exit(1);}puts(ptr);shmdt(ptr);exit(0);}shmctl(shmid, IPC_RMID, NULL); // 销毁共享内存实例exit(0);
}
子进程写进共享内存的字符串成功在父进程中打印了出来,通信成功!
我们可以看到,本板块讲解的管道和 XSI IPC 允许在同一台计算机上运行的进程相互通信,其核心的思路都是通过不同进程都能够访问到的中介(管道和各种 XSI 资源),来实现进程间通信。那么,如果是不同计算机上的进程该如何通信呢?这将是我们接下来会讲解的部分:网络套接字