文章目录
- 一、引言
- 二、管道的基本原理
- 1、管道的定义与结构
- 2、管道的工作原理
- 三、匿名管道(Anonymous Pipe)
- 1、匿名管道的概念
- 2、匿名管道的创建与使用
- 3、匿名管道的读写规则
- 4、匿名管道的特点
- 5、使用匿名管道实现进程池
- 四、命名管道(Named Pipe/FIFO)
- 1、命名管道的概念
- 2、命名管道的创建与使用
- 3、命名管道的打开特点
- 4、命名管道的案例
- 五、管道文件和FIFO文件的对比
- 六、管道与shell命令组合
一、引言
进程间通信(IPC,Inter-Process Communication)是操作系统中不同进程之间共享信息和数据的机制。由于进程具有独立性,每个进程在其自己的地址空间中运行,它们不能直接访问彼此的内存空间,因此需要使用特定的通信机制来交换数据。
简单来说,进程间通信就是为了让不同的进程,看到同一份资源。
理解通信方式:单工、半双工和双工通信是三种不同的通信方式,它们的主要区别在于数据传输的方向和方式。
-
单工通信:单工通信是一种单向通信方式,即数据只能沿一个方向传输。在单工通信中,发送方只能发送数据,而接收方只能接收数据,两者不能互换角色。
-
半双工通信:半双工通信允许数据在两个方向上传输,但在同一时间只能在一个方向上传输。也就是说,两个设备可以交替地发送和接收数据,但不能同时发送和接收。
-
全双工通信:双工通信允许数据在同一时刻进行双向传输,即两个设备可以同时发送和接收数据。
二、管道的基本原理
1、管道的定义与结构
Linux中的“万物皆文件”原则使得管道也被抽象为一个文件对象。这意味着进程可以通过标准的文件I/O操作(如read、write等)来访问管道,从而实现进程间的通信。因此我们可以说管道是基于文件的,让不同进程看到同一份资源的通信方式。
对于管道而言,它允许一个进程的输出直接作为另一个进程的输入,而无需使用临时文件或其他中间存储机制。具体来说,管道在内存中创建了一个缓冲区,用于存储从写入端流向读取端的数据。这个缓冲区对于进程来说是透明的,进程只需要通过标准的文件I/O操作(如read
、write
等)来访问它。
我们之前的文章中常用到的查看进程的状态的命令:ps ajx | grep myprogram
也是使用了管道的方式:
其中,ps ajx
命令和 grep myprogram
命令都是两个可执行程序,当它们运行起来后就变成了两个进程。那么这条命令是如何运行的呢?
首先,ps ajx
命令会列出系统上所有进程的详细信息。然后,这些输出会通过管道 |
被传递给 grep myprogram
命令。grep myprogram
命令会搜索这些输出中是否包含 myprogram
字符串,并只显示包含该字符串的行。
管道分为匿名管道(Anonymous Pipe)和命名管道(Named Pipe)。
-
匿名管道:这是由内核管理的一块缓冲区,是一种半双工的通信手段,通过让不同进程都能访问同一块缓冲区,来实现进程间通讯。它仅限于本地父子进程之间通信,结构简单,相对于命名管道,其占用小,实现简单。
-
命名管道:也称为FIFO(First In First Out),它在文件系统中有一个具体的实体,有名字标识,使得不同进程之间可以通过这个名字来访问和使用这个管道。它也是由内核管理的一块缓冲区,是一种全双工的通信手段。支持无亲缘关系进程间的通信。
结构:管道在内存中开辟一个缓冲区,用于存储进程间传输的数据。这个缓冲区是一个循环队列(FIFO),遵循先进先出(FIFO)的原则。
在Linux系统中,管道(Pipe)被视为一种特殊的文件,用于进程间的通信。它不属于任何具体的文件系统(如ext3、ext4等),而是自立门户,单独构成一种文件系统,称为“管道文件系统”。这种文件系统只存在于内存中,当进程结束或管道不再需要时,它会自动消失。
那我们为什么不直接创建一个文件,然后让需要通信的进程访问文件呢?
直接访问文件的限制:
- 磁盘I/O:每次文件被写入或读取时,都需要进行磁盘I/O操作。磁盘I/O操作通常比内存操作慢得多,这会导致通信延迟和性能下降。
- 数据同步:当数据被写入文件时,操作系统通常需要确保数据已经安全地写入磁盘。这涉及到将数据从内存缓冲区刷新(或称为“刷盘”)到磁盘的过程,这也会增加通信的延迟。
- 并发访问:当多个进程同时访问同一个文件时,需要额外的同步机制来确保数据的一致性和完整性。这可能会使通信变得更加复杂和缓慢。
管道的优势:
- 内存存储:管道使用内存中的缓冲区来存储数据,而不是磁盘。这意味着数据的读写操作可以在没有磁盘I/O延迟的情况下进行,从而大大提高了通信效率。
- 异步通信:命名管道允许进程异步地交换数据,这意味着一个进程可以在写入数据后立即继续执行,而不需要等待数据被写入磁盘。这进一步提高了系统的并发性和响应能力。
- 安全性与访问控制:命名管道具有明确的安全描述符和访问权限控制机制,可以限制哪些进程可以访问管道。这有助于确保通信的安全性和数据的完整性。
因此,在需要高效进程间通信的场景中,命名管道通常是一个更好的选择。它们避免了磁盘I/O延迟和数据同步问题,同时提供了简洁的接口和强大的安全性与访问控制功能。
2、管道的工作原理
管道(Pipe)是一种特殊的文件,它提供了一种单向的数据流通道。管道有两个端口,分别称为读端和写端。一个进程可以将数据写入管道的写端,而另一个进程则可以从管道的读端读取这些数据。
管道的工作原理涉及到了操作系统中的两种执行状态以及它们之间的交互。
- 用户态:当进程执行用户空间的代码时,它处于用户态。在用户态下,进程只能访问受限的资源和内存区域,不能直接执行特权操作或访问系统级资源。
- 内核态:当进程执行操作系统内核的代码时,它处于内核态。在内核态下,进程拥有更高的权限,可以执行特权指令、访问系统级资源,并处理系统调用请求。
管道的工作原理主要基于操作系统中的进程间通信(IPC)机制。以下是管道工作原理的概述:
- 创建管道
- 当一个进程想要与另一个进程进行通信时,它可以创建一个管道。
- 创建管道的函数是系统调用,系统调用导致进程从用户态切换到内核态。(使用
mkfifo
或pipe
函数时,内核在内核空间创建管道) - 进程从内核态切换回用户态,并继续执行用户空间的代码。
- 数据传输
- 一旦管道被创建,相关的进程就可以使用文件描述符来读写数据。
- 写入进程将数据写入管道的写端,这些数据会被存储在内核的缓冲区中。
- 写入进程使用写文件描述符将数据写入管道。
- 如果管道未满,写入操作会立即完成,数据被存储在内核空间的管道缓冲区中。
- 管道有一个固定大小的缓冲区(在Linux中通常是64KB)。如果缓冲区已满,且写端尝试写入更多数据,那么写操作会阻塞,直到有足够的空间写入数据(有数据被读取进程读取)或读端被关闭。
- 当读端关闭其文件描述符,并且写端尝试写入数据时,写操作将收到一个
SIGPIPE
信号。默认情况下,该信号会终止进程。
- 读取进程从管道的读端读取数据,这些数据从内核缓冲区被复制到读取进程的地址空间中。
- 读取进程使用读文件描述符从管道中读取数据。
- 如果管道中有数据,读取操作会立即完成,数据从内核空间的管道缓冲区被复制到读取进程的地址空间中。
- 如果管道为空,且读端尝试从管道中读取数据,那么读取进程会阻塞,直到有数据写入管道或写端被关闭。这确保了读取进程不会读取到旧的数据或在没有数据的情况下继续执行。
- 当写端关闭其文件描述符时,读端可以继续读取管道中剩余的数据,直到数据被全部读取。一旦所有数据都被读取,读操作将返回0,表示文件末尾(EOF)。
因此可知,管道有数据父进程就读,没有就不读, 管道没写满,子进程就写,写满就停。即:管道自带同步机制。
- 用户态与内核态的切换:
在整个管道通信过程中,每当进程执行与管道相关的操作(如写入、读取、阻塞等)时,都会触发系统调用,导致进程从用户态切换到内核态。
内核负责处理与管道相关的所有底层细节(如数据复制、缓冲区管理、进程同步等),并确保数据按照正确的顺序在进程间传输。
由于管道是通过内核进行管理的,每次数据的写入和读取都需要经过内核的缓冲区。数据需要在内核缓冲区和用户态进程之间进行拷贝。内核态与用户态切换的次数较多。
一旦内核完成了与管道相关的操作,它会将结果返回给进程,并将进程切换回用户态,让进程继续执行用户空间的代码。
三、匿名管道(Anonymous Pipe)
1、匿名管道的概念
匿名管道用于同一台计算机上不同进程之间的通信。由于匿名管道没有名字,因此只存在于创建它的进程及其子进程之间。匿名管道提供了一个单向的通信通道,在一个进程写入数据时,另一个进程可以从管道中读取这些数据。这种通信方式通过内存缓冲区来传输数据,遵循先进先出的原则,确保数据的顺序性。
2、匿名管道的创建与使用
我们使用man 2 pipe
来查看pipe
函数系统调用相关的手册页:
pipefd[2]
:一个包含两个整数类型元素的数组,用于接收两个文件描述符。pipefd[0]
是读端,pipefd[1]
是写端。
这个函数会返回一个文件描述符数组,数组中的两个文件描述符分别用于管道的读取和写入。一旦创建了管道,就可以通过这两个文件描述符在进程之间进行通信。如果成功,pipe()
返回0。如果失败,返回-1并设置errno
以指示错误。
从上图展示了在Linux系统中,用户空间与内核空间通过pipe()
系统调用来创建管道并进行交互的详细步骤。以下是详细解释:
- 用户空间:
- 用户在用户空间(通常是应用程序代码)中发起一个请求,要求创建一个新的管道。这个请求通常是通过调用
pipe()
函数来触发的。
- 用户在用户空间(通常是应用程序代码)中发起一个请求,要求创建一个新的管道。这个请求通常是通过调用
- 调用pipe():
- 应用程序中的
pipe()
函数调用将参数(一个包含两个整数元素的数组,如int pipefd[2]
)传递给内核。这个数组用于接收创建管道后返回的两个文件描述符。
- 应用程序中的
- 内核空间:
- 当
pipe()
函数调用发生时,控制从用户空间切换到内核空间。内核接收到pipe()
函数的调用请求,并开始处理。 - 内核在内存中分配一个缓冲区,用于在管道的读端和写端之间传递数据。
- 内核为读端和写端分配两个文件描述符(在图中是
pipefd[0]
和pipefd[1]
),并将这些文件描述符的值写入用户空间提供的数组中。
- 当
- 返回文件描述符:
- 内核处理完
pipe()
函数的请求后,将控制权返回给用户空间,并同时将创建好的两个文件描述符(pipefd[0]
和pipefd[1]
)返回给应用程序。
- 内核处理完
- 用户空间继续执行:
- 应用程序现在有了两个文件描述符,一个用于写入数据到管道(
pipefd[1]
),另一个用于从管道读取数据(pipefd[0]
)。 - 应用程序可以使用标准的文件I/O函数(如
write()
和read()
)来通过这两个文件描述符与管道进行交互。
- 应用程序现在有了两个文件描述符,一个用于写入数据到管道(
- 数据传递:
- 当一个进程(通常是父进程)通过
write()
系统调用向pipefd[1]
写入数据时,数据被复制到内核空间中的管道缓冲区。 - 当另一个进程(通常是子进程)通过
read()
系统调用从pipefd[0]
读取数据时,数据从内核空间中的管道缓冲区被复制到用户空间。
- 当一个进程(通常是父进程)通过
在父进程创建匿名管道时,会在父进程的文件描述符表中,会为匿名管道的读端和写段各自分配一个文件描述符。
接下来,在父进程调用fork()
创建子进程时,子进程并不是简单地“继承”父进程的文件描述符表,而是复制了父进程的文件描述符表的一个副本。这意味着父进程和子进程各自拥有独立的文件描述符表,但它们在fork()
调用时刻的内容是相同的(即它们指向相同的被打开文件和管道等)。
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/types.h>
#include <sys/wait.h>int main()
{int pipefd[2];pid_t cpid;char buf[256];if (pipe(pipefd) == -1){perror("pipe");exit(EXIT_FAILURE);}cpid = fork();if (cpid == -1){perror("fork");exit(EXIT_FAILURE);}if (cpid == 0){ // child processclose(pipefd[1]); // close unused write endread(pipefd[0], buf, sizeof(buf));write(STDOUT_FILENO, buf, strlen(buf));close(pipefd[0]);_exit(EXIT_SUCCESS);}else{ // parent processclose(pipefd[0]); // close unused read endwrite(pipefd[1], "Hello, pipe!\n", 13);close(pipefd[1]); // reader will see EOFwait(NULL); // wait for child}return 0;
}
我们在使用时,一个进程只需要用到其中的一个文件描述符。因此,通常的做法是关闭当前进程不需要的文件描述符。
3、匿名管道的读写规则
我们在使用管道时,由于管道的同步机制(写入进程写入后,读取进程才能读取),我们来思考下面问题:
问题一:如果管道中没有数据,且读端尝试从管道中读取数据,它会出现什么情况呢?
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <iostream>
void writer(int wfd)
{const char *str = " this is child , this is child! ";char buffer[128];int cnt = 0;pid_t pid = getpid();while (1){}
}void reader(int rfd)
{char buffer[1024];while (1){ssize_t n = read(rfd, buffer, sizeof(buffer) - 1);buffer[n] = '\0';if (n > 0){buffer[n] = '\0'; // 添加null字节printf("parent get a message: %s", buffer);}else if (n == 0){std::cout << "read() return 0\n";return;}}
}
int main()
{int pipefd[2];int n = pipe(pipefd);if (n < 0)return 1;pid_t id = fork();if (id == 0){close(pipefd[0]); // child: wwriter(pipefd[1]);exit(0);}close(pipefd[1]); // parent: rstd::cout<<"start:\n";reader(pipefd[0]);std::cout<<"end....\n";int status = 0;pid_t ret = waitpid(id, &status, 0);if (ret > 0){printf("exitcode = %d, sig = %d\n", WEXITSTATUS(status), (status & 0x7F));}
}
我们运行如上代码会发现,父进程输出了个 start:
后,一直被阻塞,因此如果管道中没有数据,且读端尝试从管道中读取数据, 读端(父)就要阻寒等待,直到pipe有数据。
我们得出结论:管道内部没有数据,而且写进程不关闭自己的写端文件fd, 读进程就要阻寒等待,直到pipe中有数据。
问题二:如果管道内被写满了,但读进程不读,会出现什么结果?
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/wait.h>#define PIPE_SIZE 4096 // 假设管道大小为4096字节(这通常不是真实的管道大小,而是用于示例)
#define BUF_SIZE 1024 // 缓冲区大小为1024字节void writer(int wfd)
{char buffer[BUF_SIZE];memset(buffer, 'a', sizeof(buffer)); // 填充缓冲区int cnt = 0;while (1){ssize_t n = write(wfd, buffer, sizeof(buffer));if (n < 0){perror("write");exit(EXIT_FAILURE);}printf("Writer wrote %zd bytes , cnt : %d \n", n, cnt++);// 为了演示,这里故意休眠一段时间,以便在管道满时阻塞// sleep(1);}
}void reader(int rfd)
{// 假设读进程不读,所以这个函数实际上什么也不做// 在真实情况下,读进程应该执行read系统调用来从管道中读取数据// 但为了演示阻塞,我们让读进程休眠或什么都不做while (1){sleep(10); // 休眠很长时间,以便写进程阻塞}
}int main()
{int pipefd[2];if (pipe(pipefd) == -1){perror("pipe");exit(EXIT_FAILURE);}pid_t pid = fork();if (pid == -1){perror("fork");exit(EXIT_FAILURE);}if (pid == 0){ // 子进程close(pipefd[0]); // 关闭读端writer(pipefd[1]); // 写入数据到管道exit(EXIT_SUCCESS);}else{ // 父进程close(pipefd[1]); // 关闭写端reader(pipefd[0]); // 读取数据(但在这个示例中实际上不读)// 父进程通常不会退出,因为它负责等待子进程结束// 但为了示例,我们在这里休眠一段时间然后退出sleep(5);printf("Parent exiting before reader can read\n");exit(EXIT_SUCCESS);}int status = 0;pid_t ret = waitpid(pid, &status, 0);if (ret > 0){printf("exitcode = %d, sig = %d\n", WEXITSTATUS(status), (status & 0x7F));}return 0; // 这行代码实际上不会被执行,因为父进程已经调用了exit
}
观察实验结果:
我们观察到,当cnt = 63
时,不再向管道内写入数据。
因此我们观察出:如果管道内被写满了,读写端都不关闭,但读进程不读,那么写进程将阻塞在write
调用上,等待读进程从管道中读取数据以释放空间。
我们得出结论:如果管道内部写满,且读端不关闭自己的fd,管道被写满后,写端进程就要阻塞等待。
问题三: 如果管道的写端写了一会不写了,且关闭了写进程的写文件描述符,读端会怎么办?
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <iostream>
void writer(int wfd)
{const char *str = " this is child , this is child! ";char buffer[128];int cnt = 0;pid_t pid = getpid();while (1){sleep(1);snprintf(buffer, sizeof(buffer), "message: %s,pid: %d, count: %d\n", str, pid, cnt);write(wfd, buffer, strlen(buffer));cnt++;if (cnt > 5)break;}
}void reader(int rfd)
{char buffer[1024];while (1){ssize_t n = read(rfd, buffer, sizeof(buffer) - 1);buffer[n] = '\0';if (n != 0){buffer[n] = '\0'; // 添加null字节printf("parent get a message: %s", buffer);}else if (n == 0){std::cout << "read() return 0\n";return;}}
}
int main()
{int pipefd[2];int n = pipe(pipefd);if (n < 0)return 1;pid_t id = fork();if (id == 0){close(pipefd[0]); // child: wwriter(pipefd[1]);exit(0);}close(pipefd[1]); // parent: rreader(pipefd[0]);wait(NULL);return 0;
}
我们会在控制台上看到一系列由父进程打印出来的消息,这些消息是子进程通过管道发送的。由于子进程是每隔1秒发送一次消息,所以我们会看到每隔1秒控制台上就会出现一条新的消息,但父进程没被限制,一直在读。那么我们再去观察运行结果,父进程没有一直输出,而是每1秒输出一次。
我们观察出:我们的父进程在遇到空管道时,被阻塞。并且,子进程在写入时,父进程并没有立即读取,而是等子进程写完才读取(如果子进程在写的时候,父进程就开始读,就会出现读取的数据不完整的情况)。
那么我们得出结论:由于管道具有同步机制,即只有写端写入数据后,读端才能读取,读写存在先后顺序。当写端文件描述符关闭时,读端使用
read()
返回结果为0,表示读到文件末尾。
问题四:如果管道的读端读了一会不读了,且关闭了读进程的读文件描述符,写端会怎么样?
#include <unistd.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <iostream>
void writer(int wfd)
{const char *str = "hello father, I am child";char buffer[128];int cnt = 0;pid_t pid = getpid();while (1){sleep(1);char c = 'A';write(wfd, &c, 1);cnt++;std::cout << "cnt :" << cnt << std::endl;}close(wfd);
}void reader(int rfd)
{char buffer[1024];int cnt = 10;while (1){ssize_t n = read(rfd, buffer, sizeof(buffer) - 1);if (n > 0)printf("father get a message: %s, n: %ld\n", buffer, n);else if (n == 0){std::cout << "read pipe done, read file done!" << std::endl;break;}elsebreak;if (cnt-- == 0)break;}close(rfd);std::cout << "read endpoint close!" << std::endl;
}
int main()
{int pipefd[2];int n = pipe(pipefd);if (n < 0)return 1;printf("pipefd[0]: %d, pipefd[1]: %d\n", pipefd[0] /*read*/, pipefd[1] /*write*/); // 3, 4pid_t id = fork();if (id == 0){close(pipefd[0]); // child: wwriter(pipefd[1]);exit(0);}close(pipefd[1]); // father: rreader(pipefd[0]);int status;pid_t rid = waitpid(id, &status, 0);if (rid == id){std::cout << "exit code :" << WEXITSTATUS(status) << " exit singal :" << (status & 0x7f) << std::endl;}return 0;
}
上述程序中,父进程读取了10次管道文件,读端文件描述符关闭。此时子进程收到了13号信号,即SIGPIPE信号。
当管道的读端关闭时(即没有进程再打开该管道的读文件描述符),任何尝试向该管道写入的进程都会收到一个SIGPIPE
信号。默认情况下,进程收到SIGPIPE
信号会终止,但可以通过信号处理函数来修改这一行为。
在上述writer
函数中,并没有设置信号处理函数来处理SIGPIPE
信号,因此默认情况下,当子进程尝试向已经关闭的读端写入时,子进程会收到SIGPIPE
信号并终止。但是,由于writer
函数是一个无限循环,并且没有检查任何错误条件(如write
函数的返回值),所以即使子进程收到SIGPIPE
信号,它也可能不会立即退出,直到其他因素(如操作系统强制终止进程)发生。
那么我们得出结论:当读端文件描述符关闭时,写端收到一个SIGPIPE信号,该信号会终止进程。
小结:
管道有一个固定大小的缓冲区(在Linux中通常是64KB,但可能因系统而异)。
- 如果管道中没有数据,且读端尝试从管道中读取数据,那么读操作会阻塞,直到有数据写入管道或写端被关闭。
- 如果管道中数据已满,且写端尝试向管道中写入数据,那么写操作会阻塞,直到有足够的空间写入数据(有数据从读端被读取)或读端被关闭。
- 当写端关闭其文件描述符时,读端可以继续读取管道中剩余的数据,直到数据被全部读取。一旦所有数据都被读取,读操作将返回0,表示文件末尾(EOF)。
- 当读端关闭其文件描述符,并且写端尝试写入数据时,写操作将收到一个
SIGPIPE
信号。默认情况下,该信号会终止进程。但是,进程可以选择忽略该信号或为其设置处理程序。
4、匿名管道的特点
-
父子进程会进行进程协同的,匿名管道会提供同步与互斥机制:
管道提供了一种同步机制,因为写入管道的数据在读取之前会保留在管道中。当没有数据可读时,读操作会被阻塞,直到有数据可用。同样,当管道满时(达到管道缓冲区的大小限制),写操作也会被阻塞,直到有空间可用。这种机制确保了数据在管道中的安全传输,避免了数据丢失或覆盖。 -
具有血缘关系的进程才可以进行通信(常用于父子通信):
管道通常是在父子进程之间创建的,用于它们之间的通信,也可以用于兄弟进程之间。这是因为管道是通过调用pipe()
函数在进程内创建的,并且返回的文件描述符是在该进程和其子进程之间共享的。其他没有血缘关系的进程(即不是通过fork()
创建的进程)通常不能直接访问这些文件描述符。管道是基于内存的,其他进程没有办法找到这块内存,也就没有办法通信了。
-
管道是半双工的,数据只能单向流动:
半双工通信是指在一个通信链路上,通信双方可以交替发送和接收信息,但不能同时进行发送和接收。管道具有两个端点:一个读端点和一个写端点。但管道的数据只能从写端点流向读端点,不能反向流动。如果需要双向通信,通常需要创建两个管道,或者使用其他通信机制。 -
管道通信是面向字节流的:
管道提供了一种基于字节流的通信方式。它将数据视为字节序列进行传输。无论是文本数据还是二进制数据,都可以通过管道进行传输。这意味着管道不关心数据的具体格式或结构,只关注字节的传输。 -
管道是基于文件的,而文件的生命周期是随进程的:
虽然管道在概念上类似于文件(具有文件描述符和类似的I/O操作),但它们并不是真正的文件系统对象。然而,从某些方面来看,管道可以视为一种特殊的文件。管道的生命周期虽然与创建它的进程相关,但它们的生命周期并不是完全相同的。当进程结束时,该进程打开的所有文件描述符(包括管道的读/写端点)都会被内核自动关闭。但是,如果有一个或多个进程仍然持有对管道文件描述符的引用(即它们尚未关闭这些描述符),那么管道本身仍然会存在,直到所有引用它的文件描述符都被关闭。此时,内核会释放与管道相关的资源。实际上,管道的生命周期取决于是否还有进程持有对它的文件描述符的引用。
管道的大小
当要写入的数据量不大于
PIPE_BUF
时,linux将保证写入的原子性。当要写入的数据量大于
PIPE_BUF
时,linux将不再保证写入的原子性。
5、使用匿名管道实现进程池
procespool.cc
#include <sys/types.h>
#include <sys/wait.h>
#include <iostream>
#include <vector>
#include <unistd.h>
#include <cstdlib>
#include "task.hpp"
using namespace std;class Channel
{
public:Channel(int wfd, pid_t sub_id, const std::string &name): _wfd(wfd), _sub_process_id(sub_id), _name(name){}void PrintDebug(){cout << "_wfd: " << _wfd;cout << ",_sub_process_id: " << _sub_process_id;cout << ", _name: " << _name << endl;}string name() { return _name; }int wfd() { return _wfd; }pid_t pid() { return _sub_process_id; }void Close() { close(_wfd); }~Channel() {}private:int _wfd;pid_t _sub_process_id;string _name;
};class ProcessPool
{
public:ProcessPool(int sub_process_num) : _sub_process_num(sub_process_num) {}int CreateProcess(std::function<void(int)> work){std::vector<int> fds;for (int number = 0; number < _sub_process_num; number++){int pipefd[2]{0};int n = pipe(pipefd);if (n < 0)return PipeError;pid_t id = fork();if (id == 0) // child : r{if (!fds.empty()){std::cout << "close w fd:";for (auto fd : fds){close(fd);std::cout << fd << " ";}std::cout << std::endl;}close(pipefd[1]);dup2(pipefd[0], 0);work(pipefd[0]);exit(0);}// parent wstring cname = "channel-" + to_string(number);close(pipefd[0]);channels.push_back(Channel(pipefd[1], id, cname));fds.push_back(pipefd[1]);}return 0;}int NextChannel(){static int next = 0;int c = next;next++;next %= channels.size();return c;}void SendTaskCode(int index, uint32_t code){cout << "send code: " << code << " to " << channels[index].name() << " sub prorcess id: " << channels[index].pid() << endl;write(channels[index].wfd(), &code, sizeof(code));}void Debug(){for (auto &channel : channels){channel.PrintDebug();}}~ProcessPool(){for (auto &channel : channels){channel.Close();pid_t pid = channel.pid();pid_t rid = waitpid(pid, nullptr, 0);if (rid == pid){std::cout << "wait sub process: " << pid << " success..." << std::endl;}std::cout << channel.name() << " close done" << " sub process quit now : " << channel.pid() << std::endl;}}private:vector<Channel> channels;int _sub_process_num;
};
void CtrlProcessPool(ProcessPool *ptr, int cnt)
{while (--cnt){int channel = ptr->NextChannel();uint32_t code = NextTask();ptr->SendTaskCode(channel, code);sleep(1);}
}
int main(int argc, char *argv[])
{if (argc != 2){Usage(argv[0]);return UsageError;}int sub_num_process = stoi(argv[1]);if (sub_num_process < 0)return ArgError;srand((uint64_t)time(nullptr));ProcessPool *ptr = new ProcessPool(sub_num_process);ptr->CreateProcess(worker);CtrlProcessPool(ptr, 10);std::cout << "task run done" << std::endl;delete ptr;return 0;
}
task.hpp
#pragma once
#include <iostream>
#include <unistd.h>
#include <functional>
using namespace std;
enum
{UsageError = 1,ArgError,PipeError
};void Usage(const std::string &proc)
{cout << "Usgae: " << proc << " subprocess-num" << endl;
}void PrintLog(int fd, pid_t pid)
{cout << "sub process: " << pid << ", fd: " << fd << ", task is : printf log task\n"<< endl;
}void ReloadConf(int fd, pid_t pid)
{cout << "sub process: " << pid << ", fd: " << fd << ", task is : reload conf task\n"<< endl;
}void ConnectMysql(int fd, pid_t pid)
{cout << "sub process: " << pid << ", fd: " << fd << ", task is : connect mysql task\n"<< endl;
}
uint32_t NextTask() { return rand() % 3; }
void worker(int fd)
{std::function<void(int, pid_t)> tasks[3] = {PrintLog, ReloadConf, ConnectMysql};while (true){uint32_t command_code = 0;ssize_t n = read(0, &command_code, sizeof(command_code));// ssize_t n = read(fd, &command_code, sizeof(command_code));if (n == sizeof(command_code)){if (command_code >= 3)continue;tasks[command_code](fd, getpid());}else if (n == 0){std::cout << "sub process: " << getpid() << " quit now..." << std::endl;break;}}
}
四、命名管道(Named Pipe/FIFO)
1、命名管道的概念
命名管道是一种特殊的文件类型,它在文件系统中具有一个唯一的名称,并提供了一个双向的、有名字的通信通道,允许不同进程之间进行数据交换。命名管道的主要优势在于它允许两个毫无关系的进程进行通信,而不仅仅是具有亲缘关系的父子进程。
命名管道在文件系统中是持久的,即使创建它的进程已经终止,其他正在使用它的进程仍然可以访问和使用它。
2、命名管道的创建与使用
在Linux系统中,可以使用mkfifo
命令创建命名管道。这些命令在文件系统中创建一个具有指定名称和权限的特殊文件。
它的使用与匿名管道类似,进程可以使用标准的文件I/O函数(如open
)来打开命名管道。通常,一个进程以只写的方式打开命名管道用于发送数据,而另一个进程以只读的方式打开命名管道用于接收数据。
我们观察到创建的文件类似是p
,代表当前文件是命名管道文件。
命名管道的读写规则与匿名管道类似,我们不再赘述。
下面我们使用C语言库函数来创建命名管道man 3 mkfifo
:
#include <sys/types.h>
#include <sys/stat.h>int mkfifo(const char *pathname, mode_t mode);
pathname
:指向要创建的命名管道文件名的指针。mode
:指定了新创建文件的权限。这通常是八进制数(如0666
),与chmod
和open
函数的mode
参数类似。但是,新文件的实际权限将受到进程umask
的影响。
如果成功,mkfifo
返回 0。如果失败,返回 -1 并设置 errno
以指示错误。
下面是一个简单的例子,展示了如何使用 mkfifo
创建一个命名管道:
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <errno.h> int main() { const char *myfifo = "/tmp/myfifo"; // 尝试创建命名管道 if (mkfifo(myfifo, 0666) == -1) { perror("mkfifo"); exit(EXIT_FAILURE); } // 在此处可以添加代码来打开命名管道,进行读写等操作... // 清理:删除命名管道 unlink(myfifo); return 0;
}
请注意,unlink
函数用于删除命名管道文件。尽管这会导致文件系统中的条目消失,但任何仍然打开该文件的进程都可以继续读写它,直到所有引用该文件的描述符都被关闭。
3、命名管道的打开特点
命名管道的打开规则具体为:
- 如果当前打开操作是为读而打开FIFO时,读操作会阻塞,直到有相应的进程为写而打开该FIFO。这意味着,如果当前没有进程为写而打开FIFO,那么尝试读取该FIFO的进程将被挂起,直到有写进程打开FIFO。
- 如果当前打开操作是为写而打开FIFO时,写操作会阻塞,直到有相应的进程为读而打开该FIFO。这意味着,如果当前没有进程为读而打开FIFO,那么尝试写入该FIFO的进程将被挂起,直到有读进程打开FIFO。
这些规则确保了命名管道在进程间通信时的正确性和有效性,避免了数据丢失或混乱的情况。
4、命名管道的案例
如果我们希望两个没有亲缘关系的进程进行通信,我们就可以使用命名管道,那么他们首先需要一个进程创建命名管道,我们将命名管道封装成一个 Comm.hpp
,让命名管道的创建和销毁包含在其中:
#ifndef __COMM_HPP__
#define __COMM_HPP__#include <iostream>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <string>
#include <string.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/wait.h>using namespace std;#define Mode 0666
#define Path "./fifo"class Fifo
{
public:Fifo(const string &path) : _path(path){umask(0);int n = mkfifo(_path.c_str(), Mode);if (n == 0){cout << "mkfifo success" << endl;}else{cerr << "mkfifo failed, errno: " << errno << ", errstring: " << strerror(errno) << endl;}}~Fifo(){int n = unlink(_path.c_str()); // 删除一个文件if (n == 0){cout << "remove fifo file " << _path << " success" << endl;}else{cerr << "remove failed, errno: " << errno << ", errstring: " << strerror(errno) << endl;}}private:string _path;
};#endif
那么现在我们只需要一个进程来创建管道并对管道进行读,一个进程使用管道来对管道进行写。
读端:
#include "Comm.hpp"
int main()
{Fifo fifo(Path);int rfd = open(Path, O_RDONLY);if (rfd < 0){cerr << "open failed, errno: " << errno << ", errstring: " << strerror(errno) << endl;return 1;}// 如果我们的写端没打开,先读打开,open的时候就会阻塞,直到把写端打开,读open才会返回cout << "reader open success" << endl;char buffer[1024];while (true){ssize_t n = read(rfd, buffer, sizeof(buffer) - 1);if (n > 0){buffer[n] = 0;cout << "client say : " << buffer << endl;}else if (n == 0){cout << "client quit, me too!!" << endl;break;}else{cerr << "read failed, errno: " << errno << ", errstring: " << strerror(errno) << endl;break;}}close(rfd);return 0;
}
写端:
#include "Comm.hpp"
int main()
{int wfd = open(Path, O_WRONLY);if (wfd < 0){cerr << "open failed, errno: " << errno << ", errstring: " << strerror(errno) << endl;return 1;}cout << "writer open success" << endl;string inbuffer;while (true){cout << "Please Enter Your Message# ";std::getline(cin, inbuffer);if (inbuffer == "quit")break;ssize_t n = write(wfd, inbuffer.c_str(), inbuffer.size());if (n < 0){cerr << "write failed, errno: " << errno << ", errstring: " << strerror(errno) << endl;break;}}close(wfd);return 0;
}
此时,运行两程序,我们可以发现,只有将读写端都启动,我们的对管道的读写才能开始。
因此,对于命名管道来说,通信的开始需要至少一个进程作为服务器(pipe server)来创建并等待连接,而另一个进程作为客户端(pipe client)来连接这个命名管道。在连接建立之后,服务器和客户端就可以开始读写数据了。
五、管道文件和FIFO文件的对比
管道文件和FIFO(First In First Out)文件在Linux系统中都是用于进程间通信(IPC)的机制,但它们之间存在一些关键的区别:
- 命名与匿名:
- 管道文件:通常指的是无名管道,它们没有名字,只能用于具有共同祖先(父子进程,兄弟进程)的进程之间的通信。因为它们是匿名的,所以只能通过文件描述符在进程间传递。
- FIFO文件:也被称为命名管道,它们有一个在文件系统中的路径名,因此任何进程都可以通过该路径名来访问FIFO。这使得FIFO可以用于无亲缘关系进程之间的通信。
- 生命周期:
- 管道文件:随着创建它的进程的结束而结束。当最后一个引用它的进程结束时,管道文件会自动关闭和删除。
- FIFO文件:作为文件系统中的实体存在,因此它们的生命周期与文件系统中的其他文件相同。它们不会随着创建它们的进程的结束而自动删除,需要显式地删除它们。
- 访问方式:
- 管道文件:只能通过文件描述符在进程间传递来访问。一旦文件描述符被关闭,就不能再访问该管道。
- FIFO文件:可以通过文件系统中的路径名来访问,就像访问其他文件一样。多个进程可以同时打开和访问同一个FIFO文件。
- 通信方式:
- 管道文件:它只提供单向通信,即一个进程向管道中写入数据,另一个进程从管道中读取数据。同时,它只能用于具有血缘关系的进程间通信,如父子进程、兄弟进程。如果需要双向通信,需要创建两个管道。
- FIFO文件:虽然也是基于先进先出(FIFO)的原则,但它们支持双向通信。两个进程可以同时打开同一个FIFO文件,一个用于写入,另一个用于读取。因此它属于全双工的。
总结来说,管道文件和FIFO文件都是Linux系统中用于进程间通信的机制,但它们在命名、生命周期、访问方式、通信方式和用途等方面存在明显的区别。
六、管道与shell命令组合
在shell中,管道符号(|
)用于将一个命令的输出作为另一个命令的输入。这种组合使用方式极大地提高了shell脚本的灵活性和效率。例如,我们可以使用cut
、grep
、awk
、sort
等命令,并通过管道将它们连接起来,以实现对文本数据的复杂处理。
例如,假设您有一个包含多行文本的文件file.txt
,您想要提取每行的第一个字段(假设字段之间由制表符分隔),并且只显示包含“error”的行。您可以使用以下命令组合来实现:
cat file.txt | cut -f1 -d$'\t' | grep 'error'
这个命令首先使用cat
命令读取文件内容,然后通过管道将输出传递给cut
命令。cut
命令使用制表符作为分隔符,提取每行的第一个字段。最后,grep
命令过滤出包含“error”的行。
while :; do ps ajx | head -1 && ps ajx | grep myprocess | grep -v grep ; sleep 1; done
我们之前会使用该命令,来查看进程状态,我们来逐步解释这个while
循环中的命令组合和它们是如何通过管道(|
)来工作的。
首先,这是整个循环的结构:
while :; do # 命令组合 sleep 1;
done
这个while
循环会无限次地执行其内部的命令组合,并在每次迭代之间暂停1秒(由sleep 1
控制)。
接下来,我们看命令组合部分:
ps ajx | head -1 && ps ajx | grep myprocess | grep -v grep
这里使用了两个主要的命令:ps
和grep
,以及一个文本处理命令head
。它们之间通过管道(|
)连接。
- 第一个管道
ps ajx | head -1
ps ajx
:这个命令会列出系统上所有的进程,并以详细的格式(包括命令行参数)显示它们。a
表示列出所有用户的进程,j
表示列出与作业控制相关的信息(在某些系统上可能没有作用),x
表示列出没有控制终端的进程。|
:这是管道符号,它将前一个命令的输出作为后一个命令的输入。head -1
:这个命令从输入中读取前一行并输出。在这里,它从ps ajx
的输出中读取并显示标题行(即列名)。
- 逻辑与操作符
&&
&&
是一个逻辑与操作符,它用于连接两个命令。如果左边的命令(ps ajx | head -1
)成功执行(即返回值为0),那么右边的命令才会被执行。但在这个特定的例子中,ps ajx | head -1
总是成功的(因为它只是显示标题行),所以右边的命令总是会执行。
- 第二个和第三个管道
ps ajx | grep myprocess | grep -v grep
ps ajx
:再次执行,以列出所有的进程。grep myprocess
:这个命令搜索包含myprocess
的行。这意味着它会过滤出包含“myprocess”文本的进程行。|
:再次使用管道符号,将grep myprocess
的输出作为下一个命令的输入。grep -v grep
:这个命令会过滤掉包含“grep”的行。这是因为在搜索包含“myprocess”的行时,grep myprocess
命令本身也会作为一个进程出现在ps
的输出中。使用grep -v grep
可以确保这个进程不会被包括在结果中。
所以,整个循环的作用是:每秒显示一次ps
命令的标题行,并紧接着显示所有包含“myprocess”的进程(但排除搜索命令本身)。由于sleep 1
的存在,这个操作会每秒重复一次。
注意:命令行里面的
|
是匿名管道。