Linux 进程通信:匿名管道、实现进程池

目录

 一、进程间通信

1、 为什么需要进程通信

2、发展和分类

二、管道

1、概念

2、特点

2、复制并共享

3、用fork来共享管道原理

4、站在文件描述符角度-深度理解管道

5、站在内核角度-管道本质

三、匿名管道

1、概念 

2、创建

3、snprintf 

4、父子进程中进行单向通信 

四、实现简单的进程池模型

Task.hhp:任务函数

1、全局变量

2、任务函数

3、初始化函数

4、辅助函数

ProcessPool.cc:进程池

1、初始化任务和进程槽

2、创建子进程和管道

3、waitCommand函数 

4、进行通信&执行任务

5、sendAndWakeup() 函数

6、父进程派发任务

运行示例


 一、进程间通信

1、 为什么需要进程通信

进程运行的独立性意味着它们在默认情况下是隔离的,使得进程间通信成为一项挑战。进程间通信(IPC)的核心目的是允许不同的进程访问共享资源,例如内存空间,以便于交流、控制和协同工作。进行进程间通信的主要动机包括:

  • 数据传输:实现数据的有效传递,允许一个进程将其数据发送至另一进程,促进信息的共享与处理。
  • 资源共享:通过允许多个进程访问相同的系统资源,优化资源的使用效率和系统性能。
  • 通知事件:使进程能够发送消息通知其他进程某些事件的发生,例如,一个进程在终止时需要告知其父进程。
  • 进程控制:支持特定进程(如调试进程)对其他进程进行控制,包括拦截异常、监视状态变化等,以实现更细致的系统管理和错误调试。

进程间通信是现代操作系统中不可或缺的一部分,它不仅增强了进程之间的协作能力,也提升了系统的整体效率和灵活性。 

2、发展和分类

管道、System V进程间通信(IPC)和POSIX进程间通信是操作系统中实现不同进程之间进行数据交换与同步的三种主要机制。

管道(Pipes)

  • 管道是一种最早的进程间通信机制,最初出现在Unix系统中。它允许两个相关进程之间通过一个单向或双向的数据通道传递字节流。在Linux环境下,有两种类型的管道:
  • 匿名管道(Anonymous Pipes):通常用于父子进程之间的通信,由pipe()系统调用创建,不具有文件系统的接口,生命周期依赖于创建它的进程。
  • 命名管道(FIFOs, Named Pipes):也称为命名队列,它是一个存在于文件系统中的特殊文件,任何知道其路径名的进程都可以打开并使用它进行通信,由mkfifo()系统调用创建。

System V 进程间通信 (IPC)

  • System V IPC 是一套较为复杂的进程间通信方法,主要用于多进程间的协作。它包括以下几种形式:
  • 消息队列(Message Queues):提供了一种异步通信方式,进程可以通过发送和接收消息来交换数据,消息队列可以保证消息的顺序和可靠传输。
  • 共享内存(Shared Memory Segments):允许多个进程直接访问同一块物理内存区域,从而实现高速的数据共享。
  • 信号量(Semaphores):提供了一种进程间同步手段,用于控制对共享资源的并发访问,避免竞态条件。

POSIX 进程间通信

  • POSIX(Portable Operating System Interface for UNIX)定义了一系列跨平台的标准API,为进程间通信提供了更为一致且易于移植的解决方案,主要包括:
  • 消息队列(POSIX Message Queues):类似于System V的消息队列,但接口更符合POSIX标准,旨在提高可移植性。
  • 共享内存(POSIX Shared Memory Objects):同样基于内存共享原理,但使用了不同的API如shm_open()mmap()等,提供了更多的灵活性。
  • 信号量(POSIX Semaphores):不同于System V的信号量,POSIX信号量提供了更统一的操作接口,可通过sem_open()sem_post()sem_wait()等函数进行操作。
  • 此外,POSIX还引入了其他同步机制,如互斥量(Mutexes)、条件变量(Condition Variables)、读写锁(Read-Write Locks),这些不仅适用于进程间同步,也是线程同步的重要工具。

二、管道

1、概念

管道是Unix系统中最早的进程间通信机制,其概念可以被形象地描述为连接两个进程之间的一个数据流通道。在操作系统内部实现上,管道本质上是一种特殊的内核管理的临时存储区,具有类似文件的行为特征,但与普通文件的关键区别在于,管道不需要将数据持久化到磁盘,而是在内存中缓冲和传输。

具体实现时,一个管道拥有两端:读端(入口)和写端(出口)。为了创建并使用管道进行通信,通常采取以下步骤:

  1. 父进程通过调用特定的系统调用如pipe()来创建一个管道,这会返回一对文件描述符,分别对应管道的读端和写端。
  2. 父进程接着执行fork()系统调用以创建子进程,此时,由于父子进程共享同一文件描述符表,因此双方都能访问到这个刚创建的管道资源。
  3. 为了确保正确的数据流向,父进程关闭它不需要的管道读端(如果它只负责写入),子进程则关闭其不需要的管道写端(如果它只负责读取)。

2、特点

  1. 面向血缘进程通信:管道主要用于亲缘进程(如父子进程)之间的数据交换,通过调用pipe()系统调用创建,并在fork()后由父子进程共享同一管道资源。

  2. 访问控制与同步:管道的读写两端提供了自然的访问控制,确保了有序的数据流传递。当一个进程在管道的某一端进行读取或写入时,其他进程必须遵循适当的同步规则,以避免数据冲突或阻塞。

  3. 流式通信服务:管道提供的是面向字节流的通信服务,即数据是以连续的、无结构的字节序列进行传输的,协议相对简单且透明。

  4. 基于文件实现:尽管概念上管道类似于文件,但它是内核管理的临时内存区域,生命周期与关联进程紧密相关,随进程结束而自动清理。不同于磁盘上的普通文件,管道内容不会持久化存储。

  5. 半双工通信:管道通常只能单向传输数据,也就是说,它是一个半双工通信通道,同一时刻只允许单向的数据流动,要么从写端流向读端,要么反之。

    1. 需要双方通信时,需要建立起两个管道

  6. 管道容量与读写行为

    • 当写入速度超过读取速度时,管道内部的缓冲区会逐渐填满,直至达到上限,此时继续写入会导致写进程阻塞,直到有足够的空间。
    • 反之,如果读取速度快于写入速度,当管道中的数据被完全读取后,读进程将阻塞等待新的数据到来。
    • 如果写端关闭而读端仍在读取,当所有已写入的数据都被读出后,读取操作会返回0值,表示达到了逻辑上的“文件结尾”。
    • 若读端先关闭,则写进程在尝试写入管道时,由于没有读者,操作系统可能会发送SIGPIPE信号终止写进程。

2、复制并共享

在创建子进程时,操作系统执行了一个“复制并共享”的过程 

 在Linux操作系统,当通过fork()函数创建子进程时,子进程会继承父进程的文件描述符表。文件描述符(file descriptor)是一个指向内核中文件表项的索引,用于标识进程中打开的文件。每个进程都有自己的文件描述符表,但是这些文件描述符指向的是同一个内核文件表中的条目。

这里有几个关键点需要注意:

  1. 文件描述符的继承:当fork()被调用时,子进程获得父进程文件描述符表的副本。这意味着在fork()调用时,父进程中打开的文件在子进程中也会处于打开状态,且具有相同的文件描述符号。因此,父子进程可以共享打开的文件状态,如当前文件偏移量(file offset)和文件打开模式(例如,读、写)。

  2. 共享内核文件表:父子进程中的同一文件描述符虽然各自存在于各自的文件描述符表中,但它们实际上指向的是同一个内核级别的文件对象。这意味着对文件的操作会影响到所有引用该文件的进程。例如,如果父进程在一个文件描述符上进行了读取或写入操作,那么文件内部的偏移量将同时影响子进程在同一文件描述符上的读取结果。

  3. 文件的实际复制并不发生:重要的是要理解,子进程创建时,并没有对打开的文件数据进行物理复制。相反,复制的是文件描述符表的条目,这些条目指向内核中的文件表。这种机制是高效的,因为它避免了不必要的数据复制。

  4. 独立操作文件描述符:尽管父进程和子进程共享打开的文件,但他们可以独立地操作自己的文件描述符。例如,子进程可以关闭或改变它继承的文件描述符指向的文件的某些属性,而不会影响父进程。然而,对于共享的文件本身(如文件偏移量),更改会影响到所有拥有该文件描述符的进程。

3、用fork来共享管道原理

 

  • 当一个进程调用了 pipe 函数之后,它就会得到一对文件描述符(fd[0](读) 和 fd[1](写))用于访问管道。如果此时该进程又调用了 fork 函数创建了一个子进程,那么这个子进程也会继承这对文件描述符。
  • 在这个过程中,父子进程共享同一个管道,也就是说它们都可以通过这两个文件描述符来访问管道。但是,每个进程只能看到自己打开的那部分管道,l例如:父进程只能看到管道的写端,而子进程只能看到管道的读端。
  • 这样设计的原因是为了保证数据的安全性。因为管道是一个共享的数据结构,如果多个进程同时对它进行操作,就可能会出现数据冲突的问题。因此,操作系统规定,每个进程只能看到自己打开的那一部分管道,从而避免了这种问题的发生。
  • 另外,当一个进程不再需要使用某个文件描述符时,它可以将其关闭。这样做的好处是可以释放相应的系统资源,提高系统的性能。

4、站在文件描述符角度-深度理解管道

图中展示的是一个父进程通过 fork 创建子进程,并且使用管道进行通信的过程。

  • 首先,父进程创建了一个管道,这个管道有两个文件描述符:读端和写端。然后,父进程 fork 出了子进程。在 fork 的时候,子进程会继承父进程的所有资源,包括管道的两个文件描述符。
  • 接着,父进程关闭了管道的读端(fd[0]),而子进程则关闭了管道的写端(fd[1])。这样做的目的是为了确保只有父进程可以向管道写入数据,而只有子进程可以从管道读取数据。
  • 最后,父进程和子进程就可以通过管道进行通信了。父进程可以通过 write 函数将数据写入到管道的写端,而子进程则可以通过 read 函数从管道的读端读取数据。由于管道是全双工的,所以父进程也可以从管道读取数据,而子进程也可以向管道写入数据。

5、站在内核角度-管道本质

在图中,我们看到有两个进程分别对同一个文件进行了读写操作。在这个过程中,内核需要知道这个文件的相关信息,以便正确地处理这些操作。这就是inode的作用,它提供了所有必要的信息,让内核能够正确地处理文件的操作。

 

在图中,我们看到有两个进程通过管道进行通信。从 Linux 内核角度来看,管道是一种特殊的文件,它由内核维护,可以在不同的进程之间传递数据。管道的本质就是一个内存缓冲区,它被映射到了所有使用它的进程的地址空间中。它允许两个进程共享一个缓冲区,从而实现在不同进程中传递数据的功能。

  • 当一个进程想要向管道写入数据时,它会先检查管道的缓冲区是否已满。如果缓冲区未满,则可以直接将数据写入缓冲区;否则,就需要等待其他进程从管道中读取数据,直到缓冲区有空闲的空间为止。
  • 当另一个进程想要从管道中读取数据时,它会先检查管道的缓冲区是否有数据。如果有数据,则可以直接从缓冲区中读取;否则,就需要等待其他进程向管道中写入数据,直到缓冲区中有数据为止。
  • 管道的读写操作都是原子性的,这意味着一次读或写操作要么全部完成,要么不完成。这样可以防止数据的丢失或者损坏。
  • 总的来说,管道是一种非常有用的进程间通信机制,它可以帮助不同进程之间的数据交换变得更加简单和高效。

三、匿名管道

1、概念 

匿名管道(Anonymous Pipe)是操作系统提供的一种简单的进程间通信机制,主要用于父子进程或者有直接亲缘关系的进程之间进行数据交换。它是一种半双工的通信方式,即数据只能单向流动,或从父进程流向子进程,或从子进程流向父进程。

特点:

  1. 内存中存在:匿名管道是在内存中开辟的一段缓冲区,而不是在文件系统中创建一个实际的文件对象。

  2. 无名称标识:与命名管道不同,匿名管道没有明确的名称标识,它由操作系统在创建时分配,并通过句柄(文件描述符)来引用和操作。

  3. 血缘关系限制:通常情况下,匿名管道只能在创建它的进程及其直接子进程中使用。也就是说,只有具有直接亲缘关系的进程才能共享同一匿名管道,其他无关进程无法访问。

  4. 读写模式:匿名管道中的数据传输遵循先进先出(FIFO)原则。一个进程负责写入数据到管道的一端,而另一个进程则从管道的另一端读取数据。

  5. 单向或双向通信:由于匿名管道的半双工特性,若要实现双向通信,需要创建两个管道,分别用于两个方向的数据传输。

  6. 阻塞行为

    • 当管道为空时,尝试从管道读取数据的操作会阻塞,直到有数据可读。
    • 当管道已满时,尝试向管道写入数据的操作也会阻塞,直到有足够的空间可供写入。

2、创建

pipe() 函数是Unix和类Unix系统(包括Linux)中的一个用于创建匿名(无名)管道的系统调用。匿名管道是一种简单的进程间通信机制,允许父子进程或相关联的进程之间进行单向或双向数据传输。

#include <unistd.h>int pipe(int fd[2]);

参数说明:

  • fd: 一个大小为2的整数数组,类型为 int[2]。这个数组由 pipe() 函数填充,并返回两个文件描述符。
    • fd[0]:指向管道的读端(Read End)。从这个文件描述符可以读取通过管道传递过来的数据。
    • fd[1]:指向管道的写端(Write End)。数据可以通过这个文件描述符写入到管道中,进而被连接到读端的进程读取。

功能描述: 当调用 pipe() 函数时,操作系统会在内存中创建一段缓冲区,作为管道的内部实现。任何进程将数据写入到管道的写端时,这些数据会暂时存储在缓冲区中,然后可以从管道的读端读取出来。

返回值:

  • 成功时,pipe() 函数返回0,并且已成功分配了两个文件描述符给 fd 数组。
  • 失败时,返回负值表示错误发生,错误原因可以通过 errno 获取。

使用注意事项:

  1. 管道是半双工的,即一次只能在一个方向上传输数据。虽然理论上可以创建两个管道来模拟全双工通信,但每个管道只支持单向数据流。
  2. 管道具有一定的容量限制,当写入端连续写入数据而读取端没有及时读取时,如果管道满载,后续的写操作将会阻塞,直到有空间可写。
  3. 当读取端关闭后,写入端继续写入数据时,将会收到SIGPIPE信号,通常默认行为是导致进程终止;当然,也可以捕获该信号并采取其他行动。
  4. 管道可用于父子进程间的通信,或者不同进程中需要同步和协作的部分。在多进程编程中,常结合 fork() 和 exec() 家族函数使用,以实现在多个进程间传递信息的目的。

3、snprintf 

学习使用管道前,先拓展一下会用到的函数:

#include <stdio.h>int snprintf(char *str, size_t size, const char *format, ...);

snprintf 是C语言标准库中的一个函数,用于格式化输出到字符串,与 printf 类似,但它的输出受限于指定的缓冲区大小,能够防止缓冲区溢出的安全风险。 

参数

  • str:指向目标缓冲区的指针,用于存放格式化后的字符串。
  • size:指定缓冲区 str 的大小(以字节为单位),包括结束符 \0 所需的空间。如果生成的字符串长度小于或等于 size-1,则在字符串末尾添加 \0 结束符;若生成的字符串过长,则按 size 字节截断,并确保仍能正确终止(即至少包含一个结束符\0)。
  • format:是一个格式字符串,其中可能包含转换说明符(如 %d%s 等),它们将与可变参数列表中的相应数据匹配并进行格式化。
  • ...:是可变参数列表,包含了与 format 中转换说明符相匹配的数据项。

返回值

  • 如果成功且未发生截断,返回实际写入 str 缓冲区的字符数(不包括结束符 \0)。
  • 如果发生截断,返回需要写入的总字符数(即使它大于 size 参数),此时字符串仍然会被适当地截断并在缓冲区中填充了 \0 结束符。
char buffer[50];
int value = 12345;
snprintf(buffer, sizeof(buffer), "The value is %d", value);// 如果buffer足够大,例如大于"The value is 12345" + '\0'所需的长度,则结果将是:
// buffer == "The value is 12345"// 如果buffer太小,例如只有5个字符,则结果可能是:
// buffer == "The v"
// 返回值为10,表示如果没有截断的话,完整的字符串应该是10个字符(包括'\0')。

4、父子进程中进行单向通信 

//Makefile
mypipe:mypipe.ccg++ -o $@ $^ #-DDEBUG
.PHONY:clean
clean:rm -f mypipe// 引入必要的头文件
#include <iostream>
#include <string>
#include <cstdio>
#include <cstring>
#include <cassert>
#include <unistd.h> // 提供fork、close、write、read等系统调用
#include <sys/types.h> // 定义pid_t等类型
#include <sys/wait.h> // 提供waitpid函数using namespace std;// 主要功能:创建管道并在父子进程中实现单向通信
int main()
{// 1. 创建管道,pipefd[0]是读端口,pipefd[1]是写端口int pipefd[2];int n = pipe(pipefd); // 创建无名管道assert(n != -1); // 断言检查管道创建是否成功(void)n; //消除编译器可能发出的未使用变量n的警告
#ifdef DEBUG// 输出调试信息,显示管道两端的文件描述符cout << "pipefd[0]: " << pipefd[0] << endl;cout << "pipefd[1]: " << pipefd[1] << endl;
#endif// 2. 创建子进程pid_t child_pid = fork(); // 调用fork创建子进程assert(child_pid != -1); // 断言检查fork是否成功if (child_pid == 0){// 子进程部分(读端)// 3.1 子进程关闭不需要的管道写端close(pipefd[1]);// 缓冲区,用于接收父进程发送的消息char buffer[8 * 1024];// 循环读取管道中的数据,直到读到文件结束符(表示父进程已关闭写端口)while (true){// 从管道读取数据,返回读取的字节数,如果没有数据则阻塞等待ssize_t bytes_read = read(pipefd[0], buffer, sizeof(buffer) - 1);if (bytes_read > 0){// 将读取的数据转为C风格字符串buffer[bytes_read] = '\0';// 输出接收到的消息cout << "Child [" << getpid() << "] received a message: " << buffer << endl;}else if (bytes_read == 0) // 读取到文件结束,父进程已关闭写端{cout << "Writer quit (Father), child quitting too!" << endl;break;}}// close(pipefd[0]); // 实际上在while循环条件中可以判断并在此处关闭读端// 子进程完成任务后退出exit(0);}else{// 父进程部分(写端)// 3.1 父进程关闭不需要的管道读端close(pipefd[0]);// 初始化一条要发送的消息string message = "我是父进程,我正在给你发消息";int count = 0;char send_buffer[8 * 1024];// 循环向管道写入消息,直到达到指定次数while (true){// 构造要发送的消息内容snprintf(send_buffer, sizeof(send_buffer), "%s[%d] : %d", message.c_str(), getpid(), count++);// 向管道写入数据write(pipefd[1], send_buffer, strlen(send_buffer));// 模拟延时,每次发送消息后等待1秒sleep(1);cout << count << endl;if (count == 5) // 发送指定数量的消息后退出循环{cout << "Writer quit (Father)" << endl;break;}}// 发送完毕所有消息后,关闭写端close(pipefd[1]);// 4. 父进程等待子进程结束,并获取其退出状态pid_t result = waitpid(child_pid, nullptr, 0); // 等待子进程结束cout << "Child PID: " << child_pid << ", Return Value from waitpid: " << result << endl;assert(result > 0); // 断言检查waitpid是否成功(void)result;//消除编译器可能发出的未使用变量的警告}return 0;
}
[hbr@VM-16-9-centos mypipe]$ ./mypipe 
child get a message[30197] Father# 我是父进程,我正在给你发消息[30196] : 0
1
child get a message[30197] Father# 我是父进程,我正在给你发消息[30196] : 1
2
child get a message[30197] Father# 我是父进程,我正在给你发消息[30196] : 2
3
child get a message[30197] Father# 我是父进程,我正在给你发消息[30196] : 3
4
child get a message[30197] Father# 我是父进程,我正在给你发消息[30196] : 4
5
writer quit(father)
writer quit(father), me quit!!!
id : 30197 ret: 30197

四、实现简单的进程池模型

Task.hhp:任务函数

Task.hpp 是一个头文件,定义了一系列任务(函数)和全局变量,用于定义和管理在进程池模型中执行的任务。通过全局的任务列表和描述,父进程可以根据索引分配任务给子进程执行。

下面我们来逐一分析每个部分:

#pragma once#include <iostream>
#include <string>
#include <vector>
#include <unordered_map>
#include <unistd.h>// 提供fork、close等系统调用
#include <functional>// 用于定义函数对象// 定义函数对象类型
typedef std::function<void()> func;std::vector<func> callbacks;// 存储回调函数的全局向量
std::unordered_map<int, std::string> desc;// 存储回调函数描述的哈希表// 示例回调函数
void readMySQL()
{std::cout << "sub process[" << getpid() << " ] 执行访问数据库的任务\n" << std::endl;
}void execuleUrl()
{std::cout << "sub process[" << getpid() << " ] 执行url解析\n" << std::endl;
}void cal()
{std::cout << "sub process[" << getpid() << " ] 执行加密任务\n" << std::endl;
}void save()
{std::cout << "sub process[" << getpid() << " ] 执行数据持久化任务\n" << std::endl;
}void load()// 加载回调函数到全局向量和描述符
{desc.insert({callbacks.size(), "readMySQL: 读取数据库"});callbacks.push_back(readMySQL);desc.insert({callbacks.size(), "execuleUrl: 进行url解析"});callbacks.push_back(execuleUrl);desc.insert({callbacks.size(), "cal: 进行加密计算"});callbacks.push_back(cal);desc.insert({callbacks.size(), "save: 进行数据的文件保存"});callbacks.push_back(save);
}void showHandler()// 显示回调函数列表
{for(const auto &iter : desc ){std::cout << iter.first << "\t" << iter.second << std::endl;}
}int handlerSize()// 获取回调函数数量
{return callbacks.size();
}

1、全局变量

 std::function是一个模板类,用于封装几乎任何可调用的实体,包括普通函数、Lambda表达式、函数对象以及成员函数指针。std::function的一个重要特性是其类型安全,同时提供了足够的灵活性来存储不同类型的可调用实体。 

typedef std::function<void()> func;

typedef std::function<void()> func; 这行代码定义了一个类型别名func。这里,std::function<void()>是一个特化形式,表示它可以封装任何没有参数并且返回void的可调用实体。

std::vector<func> callbacks;
std::unordered_map<int, std::string> desc;
  • std::vector<func> callbacks;:一个函数指针的向量,用于存储可执行的任务。这些任务在运行时被添加到向量中,并且可以通过索引来调用。
  • std::unordered_map<int, std::string> desc;:一个哈希表,用于存储任务的描述。键是任务在callbacks向量中的索引,值是对任务的文字描述。

2、任务函数

void readMySQL()
{std::cout << "sub process[" << getpid() << " ] 执行访问数据库的任务\n" << std::endl;
}void execuleUrl()
{std::cout << "sub process[" << getpid() << " ] 执行url解析\n" << std::endl;
}void cal()
{std::cout << "sub process[" << getpid() << " ] 执行加密任务\n" << std::endl;
}void save()
{std::cout << "sub process[" << getpid() << " ] 执行数据持久化任务\n" << std::endl;
}
  • 文件中定义了几个任务函数,例如readMySQLexeculeUrlcalsave。这些函数模拟了不同的任务,如访问数据库、解析URL、执行计算和数据持久化。每个函数都打印出它正在执行的任务和当前子进程的ID。

3、初始化函数

void load()
{desc.insert({callbacks.size(), "readMySQL: 读取数据库"});callbacks.push_back(readMySQL);desc.insert({callbacks.size(), "execuleUrl: 进行url解析"});callbacks.push_back(execuleUrl);desc.insert({callbacks.size(), "cal: 进行加密计算"});callbacks.push_back(cal);desc.insert({callbacks.size(), "save: 进行数据的文件保存"});callbacks.push_back(save);
}
  • void load():这个函数初始化任务列表和任务描述。它将每个任务函数添加到callbacks向量中,并且在desc哈希表中为每个任务添加一个描述。这样,每个任务都有一个唯一的索引和描述。

4、辅助函数

void showHandler()
{for(const auto &iter : desc ){std::cout << iter.first << "\t" << iter.second << std::endl;}
}int handlerSize()
{return callbacks.size();
}
  • void showHandler():遍历desc哈希表,并打印出所有任务的索引和描述。这个函数可以用来显示当前可用的任务列表。
  • int handlerSize():返回当前任务列表callbacks的大小,即可用任务的数量。

ProcessPool.cc:进程池

#include <iostream>
#include <vector>
#include <cstdlib>
#include <ctime>
#include <cassert>
#include <unistd.h>
#include <sys/wait.h>
#include <sys/types.h>
#include "Task.hpp"#define PROCESS_NUM 5using namespace std;int waitCommand(int waitFd, bool &quit) //如果对方不发,我们就阻塞
{uint32_t command = 0;ssize_t s = read(waitFd, &command, sizeof(command));if (s == 0){quit = true;return -1;}assert(s == sizeof(uint32_t));return command;
}void sendAndWakeup(pid_t who, int fd, uint32_t command)
{write(fd, &command, sizeof(command));cout << "main process: call process " << who << " execute " << desc[command] << " through " << fd << endl;
}int main()
{// 代码中关于fd的处理,有一个小问题,不影响我们使用,但是你能找到吗??load();// pid: pipefdvector<pair<pid_t, int>> slots;// 先创建多个进程for (int i = 0; i < PROCESS_NUM; i++){// 创建管道int pipefd[2] = {0};int n = pipe(pipefd);assert(n == 0);(void)n;pid_t id = fork();assert(id != -1);// 子进程我们让他进行读取if (id == 0){// 关闭写端close(pipefd[1]);// childwhile (true){// pipefd[0]// 等命令bool quit = false;int command = waitCommand(pipefd[0], quit); //如果对方不发,我们就阻塞if (quit)break;// 执行对应的命令if (command >= 0 && command < handlerSize()){callbacks[command]();}else{cout << "非法command: " << command << endl;}}exit(1);}// father,进行写入,关闭读端close(pipefd[0]); // pipefd[1]slots.push_back(pair<pid_t, int>(id, pipefd[1]));}// 父进程派发任务srand((unsigned long)time(nullptr) ^ getpid() ^ 23323123123L); // 让数据源更随机while (true){// 选择一个任务, 如果任务是从网络里面来的?int command = rand() %  handlerSize();// 选择一个进程 ,采用随机数的方式,选择进程来完成任务,随机数方式的负载均衡int choice = rand() % slots.size();// 把任务给指定的进程sendAndWakeup(slots[choice].first, slots[choice].second, command);sleep(1);}// 关闭fd, 所有的子进程都会退出for (const auto &slot : slots){close(slot.second);}// 回收所有的子进程信息for (const auto &slot : slots){waitpid(slot.first, nullptr, 0);}
}

1、初始化任务和进程槽

int main()
{load();vector<pair<pid_t, int>> slots;
  • 加载任务:通过调用load()函数,初始化全局的任务列表callbacks和任务描述desc
  • 定义进程槽:使用vector<pair<pid_t, int>> slots;定义一个容器来存储子进程ID和对应的管道写端文件描述符。

2、创建子进程和管道

for (int i = 0; i < PROCESS_NUM; i++)
{// 创建管道int pipefd[2] = {0};int n = pipe(pipefd);assert(n == 0);(void)n;pid_t id = fork();assert(id != -1);
  • 循环创建子进程:通过for循环,创建PROCESS_NUM个子进程。每次循环中,都会创建一个管道(pipe(pipefd)),用于父子进程间的通信。
    • 使用 assert(n == 0) 来确保管道创建成功(在Linux系统中,成功时返回0)
    • (void)n; 是为了Release模式下消除编译器可能产生的未使用变量警告。
  • 管道文件描述符pipefd[0]是管道的读端,pipefd[1]是管道的写端。
  • 使用 fork() 系统调用来创建一个新的进程。fork() 调用后,会生成一个与父进程几乎完全相同的子进程。
    • assert(id != -1); 来检查 fork() 是否成功执行,如果失败则 fork() 会返回-1,程序将终止执行并输出错误信息。

3、waitCommand函数 

waitCommand 函数是这段代码中自定义的一个用于从管道读取命令的函数,其作用是阻塞等待父进程通过管道发送过来的命令,并在接收到特定信号时决定是否退出循环。

int waitCommand(int waitFd, bool &quit) //如果对方不发,我们就阻塞
{uint32_t command = 0;ssize_t s = read(waitFd, &command, sizeof(command));if (s == 0){quit = true;return -1;}assert(s == sizeof(uint32_t));return command;
}

 参数:

  • int waitFd: 这是管道的读端文件描述符(如上文中的 pipefd[0]),用来从管道中读取数据。
  • bool &quit: 这是一个引用类型的布尔变量,用来标记子进程是否需要退出。

函数逻辑:

  • 首先定义一个整型变量 command 用于存储从管道读取到的命令。
  • 使用 read() 系统调用尝试从给定的管道读取指定大小的数据(这里是4字节,假设命令是一个无符号32位整数)。
  • 如果 read() 返回0,这意味着管道另一端关闭了连接,通常这表示父进程打算结束与子进程的通信,因此将 quit 设置为 true 并返回一个非正常值(这里为 -1),以指示子进程应该退出其任务执行循环。
  • 如果 read() 成功读取到了4个字节的数据(等于 sizeof(uint32_t)),则断言成功,并将读取到的命令作为整数值返回。

4、进行通信&执行任务

这段代码描述的是子进程和父进程各自执行的任务,基于之前创建的管道进行通信: 

if (id == 0)
{// 关闭写端close(pipefd[1]);// childwhile (true){   // 等命令bool quit = false;int command = waitCommand(pipefd[0], quit); //如果对方不发,我们就阻塞if (quit)break;// 执行对应的命令if (command >= 0 && command < handlerSize()){callbacks[command]();}else{cout << "非法command: " << command << endl;}}exit(1);
}
// father,进行写入,关闭读端
close(pipefd[0]); // pipefd[1]
slots.push_back(pair<pid_t, int>(id, pipefd[1]));

子进程部分if (id == 0)):

  • 关闭写端: 子进程中不需要向管道写入数据,所以它会关闭管道的写端 close(pipefd[1])
  • 循环等待命令: 子进程进入无限循环,不断从管道读端口 pipefd[0] 等待接收来自父进程的命令。

接收并执行命令

  • waitCommand()用于从管道读取并解析命令,并在接收到特定命令表示退出时设置 quit 为 true

  • 处理命令: 当接收到父进程发来的命令值 command 之后,执行 callbacks[command](); 就是调用预先注册到向量中的对应任务函数,完成实际的工作内容。如果命令非法,则输出错误信息。

  • 退出循环: 当检测到 quit 为 true,即接收到父进程发送的退出命令时,子进程跳出循环并调用 exit(1) 结束自身。

父进程部分

  • 关闭读端: 父进程不需要从管道读取数据,因此关闭管道的读端 close(pipefd[0])

  • 存储子进程信息: 将子进程的ID (id) 和该子进程对应的管道写端口 (pipefd[1]) 保存在一个结构体(这里是一个 pair<pid_t, int> 类型的对象)中,并将其添加到名为 slots 的容器(如向量或列表)中。这样父进程可以管理多个子进程及其对应的管道写端口,以便将来向每个子进程发送不同的命令。

5、sendAndWakeup() 函数

sendAndWakeup()函数用于向指定进程通过管道发送一个命令,并在控制台上打印相关信息。具体说明如下:

void sendAndWakeup(pid_t who, int fd, uint32_t command)
{write(fd, &command, sizeof(command));cout << "main process: call process " << who << " execute " << desc[command] << " through " << fd << endl;
}

参数:

  • pid_t who:表示接收命令的目标子进程的进程ID。
  • int fd:这是管道的写端文件描述符,父进程通过这个描述符将命令写入管道,以通知目标子进程执行任务。
  • uint32_t command:要发送的命令编号,对应于之前定义的任务函数。

函数实现:

  • 使用 write(fd, &command, sizeof(command)) 将命令写入到管道中。这里的 command 是一个整数索引,指向存储在全局变量 callbacks 中的任务函数列表。

  • 在控制台输出一条消息,显示主进程正在通过管道 fd 呼叫进程 who 执行任务 desc[command]。这里的 desc 是一个无序_map(std::unordered_map<int, std::string> desc; ),键是命令索引,值是对应的描述信息。

6、父进程派发任务

父进程持续地以随机方式向各个子进程派发任务,并在完成任务调度后有序地回收子进程资源的功能。

    srand((unsigned long)time(nullptr) ^ getpid() ^ 23323123123L); // 让数据源更随机while (true){// 选择一个任务int command = rand() %  handlerSize();// 选择一个进程 ,采用随机数的方式,选择进程来完成任务,随机数方式的负载均衡int choice = rand() % slots.size();// 把任务给指定的进程sendAndWakeup(slots[choice].first, slots[choice].second, command);sleep(1);}// 关闭fd, 所有的子进程都会退出for (const auto &slot : slots){close(slot.second);}// 回收所有的子进程信息for (const auto &slot : slots){waitpid(slot.first, nullptr, 0);}
}

初始化随机数种子

  • 这行代码使用当前时间戳、当前进程ID以及一个常数值来初始化随机数生成器(srand() 函数)。
  • 异或操作符 (^) 将多个不同的随机源混合起来,以提高生成种子的随机性。这样可以确保不同时间启动的进程或同一进程中多次调用 rand() 都能得到不同的随机数。

无限循环派发任务

  • 使用 rand() % handlerSize() 从所有可用的任务中随机选择一个任务索引。
  • 使用 rand() % slots.size() 从所有已创建的子进程中随机选择一个子进程。
  • 调用 sendAndWakeup() 函数,向选定的子进程发送选中的任务命令。这里通过管道写端口将命令传递给子进程,并唤醒其执行相应任务。
    sendAndWakeup(slots[choice].first, slots[choice].second, command);
    这一行代码是调用 sendAndWakeup() 函数并传入三个参数,来向一个子进程发送命令。详细讲解如下:
    • slots[choice].firstslots 是一个存储了子进程信息的 vector<pair<pid_t, int>> 类型容器,在循环中每个元素代表一个子进程及其管道写端口的文件描述符。choice 是通过随机数生成器确定的一个随机索引,用于从 slots 容器中选择一个子进程。

    • slots[choice].first 就是根据这个随机索引获取到的子进程ID(pid_t 类型),它将作为 sendAndWakeup() 函数的第一个参数传递给函数,用来标识要唤醒执行任务的具体子进程。

    • 同样地,slots[choice].second 代表与所选子进程对应的管道写端口的文件描述符(int 类型)。这是第二个参数,用于在函数内部调用 write() 系统调用,将命令通过管道写入到选定子进程,从而通知该子进程开始执行相应的任务。

    • commandcommand 是之前通过 rand() % handlerSize(); 随机生成的任务编号,它是一个整数值(uint32_t 类型)。此值作为第三个参数传递给 sendAndWakeup() 函数,表示要派发给子进程执行的具体任务。

  • 每次派发完任务后,让父进程休眠1秒(sleep(1)),模拟任务之间的间隔。

关闭管道写端口

    for (const auto &slot : slots){close(slot.second);}
  • 当不再需要向子进程发送任务时,父进程遍历 slots 容器,关闭与每个子进程关联的管道写端口。这会导致读取端(在子进程中)检测到 EOF 或异常,进而促使子进程退出其等待命令的循环。

回收子进程信息

    for (const auto &slot : slots){waitpid(slot.first, nullptr, 0);}
  • 父进程再次遍历 slots 容器,对每个子进程调用 waitpid() 函数,用于等待子进程结束并回收其资源。传入参数 nullptr 表示不关心子进程的退出状态码,0 表示阻塞直到子进程结束。通过这种方式,父进程能够确保所有的子进程都已正常结束,并正确释放系统资源。

运行示例

[hbr@VM-16-9-centos ProcessPoll]$ ./ProcessPool 
main process: call process 32283 execute readMySQL: 读取数据库 through 6
sub process[32283 ] 执行访问数据库的任务main process: call process 32281 execute execuleUrl: 进行url解析 through 4
sub process[32281 ] 执行url解析main process: call process 32285 execute readMySQL: 读取数据库 through 8
sub process[32285 ] 执行访问数据库的任务main process: call process 32282 execute cal: 进行加密计算 through 5
sub process[32282 ] 执行加密任务main process: call process 32283 execute execuleUrl: 进行url解析 through 6
sub process[32283 ] 执行url解析main process: call process 32283 execute execuleUrl: 进行url解析 through 6
sub process[32283 ] 执行url解析main process: call process 32283 execute save: 进行数据的文件保存 through 6
sub process[32283 ] 执行数据持久化任务main process: call process 32281 execute execuleUrl: 进行url解析 through 4
sub process[32281 ] 执行url解析main process: call process 32281 execute readMySQL: 读取数据库 through 4
sub process[32281 ] 执行访问数据库的任务main process: call process 32281 execute cal: 进行加密计算 through 4
sub process[32281 ] 执行加密任务main process: call process 32281 execute execuleUrl: 进行url解析 through 4
sub process[32281 ] 执行url解析^C

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/768752.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

小程序英文口语发音评测

一、英文口语评测需求 在全球化的今天&#xff0c;英语已经成为了世界上最重要的国际语言之一。无论是在国际商务、科技研究、教育还是日常生活中&#xff0c;英语都扮演着举足轻重的角色。因此&#xff0c;掌握英文口语的能力对于个人的职业发展、学术研究以及跨文化交流都具…

【Linux】vim配置及安装方法

注 安装方法在文章最后 配置文件的位置 在目录 /etc/ 下面&#xff0c;有个名为vimrc的文件&#xff0c;这是系统中公共的vim配置文件&#xff0c;对所有用户都有效。而在每个用户的主目录下&#xff0c;都可以自己建立私有的配置文件&#xff0c;命名为“.vimrc”。例如&…

atoi函数详解

atoi函数使用方法 在c官网中是这样介绍atoi函数的 通俗的讲就是把字符串中的字符数字转换为整形数字&#xff0c;遇到空格就跳过&#xff0c;如果在字符串开始遇到不是有效的整数比如说abc就直接返回0&#xff0c;如果遇到像这种情况123abc345这个就只返回123&#xff0c;这个…

C语言数据结构之计数排序

世中逢尔 雨中逢花 目录 计数排序的介绍 代码展示 时间复杂度和空间父复杂度 计数排序的用途 计数排序的局限性 计数排序的介绍 排序原理 计数排序又称为鸽巢原理&#xff0c;是对哈希直接定址法的变形应用。 是一个不比较排序算法&#xff0c;通过计数将时间复杂度降到了O…

大型网站集群管理负载均衡

课程介绍 结合企业大规模应用&#xff0c;解决应用高并发问题&#xff0c;解决单节点故障问题&#xff0c;缓存数据库的应用。学完掌握知识点&#xff1a;企业应用实现四七层负载均衡&#xff0c;以及Nginx等应用的高可用性&#xff0c;Redis缓存数据库的部署应用以及高可用方…

【Redis】优惠券秒杀

全局唯一ID 全局唯一ID生成策略&#xff1a; UUIDRedis自增snowflake算法数据库自增 Redis自增ID策略&#xff1a;每天一个key&#xff0c;方便统计订单量ID构造是 时间戳 计数器 Component public class RedisIdWorker {// 2024的第一时刻private static final long BEGIN…

Unity 背包系统中拖拽物体到指定位置或互换位置效果的实现

在Unity中&#xff0c;背包系统是一种常见的游戏系统&#xff0c;可以用于管理和展示玩家所持有的物品、道具或装备。 其中的拖拽功能非常有意思&#xff0c;具体功能就是玩家可以通过拖拽物品图标来移动物品在背包中的位置&#xff0c;或者将物品拖拽到其他位置或界面中&…

29双体系Java学习之编程的基本过程和类的通用格式

编程的基本过程 类的通用格式 ★小贴士 类的设计应遵循单一职责原则&#xff08;SRP&#xff09;&#xff0c;即只能让一个类有且仅有一个职责&#xff0c;以保证修改的可能性尽量少。

DEll R440 LC下的硬件日志收集步骤(注意:此方法收集的日志里无操作系统的日志);

一&#xff0c; LC下的硬件日志收集步骤&#xff08;注意&#xff1a;此方法收集的日志里无操作系统的日志&#xff09;&#xff1b; 开机看到屏幕左上角出现F10的提示的时候&#xff0c;敲击F10&#xff0c;进入LC的界面&#xff1a; 找到U盘的类似文件&#xff0c;就是最终生…

无人驾驶中的坐标转换

无人驾驶中的坐标转换 无人车上拥有各种各样的传感器&#xff0c;每个传感器的安装位置和角度又不尽相同。对于传感器的提供商&#xff0c;开始并不知道传感器会以什么角度&#xff0c;安装在什么位置&#xff0c;因此只能根据传感器自身建立坐标系。无人驾驶系统是一个多传感器…

Windows/Linux-openEuler系统使用路由侠内网穿透,部署项目详细教程

文章目录 Windows/Linux-openEuler系统使用路由侠内网穿透&#xff0c;部署项目详细教程一、在windows系统下载安装路由侠并实现项目部署1、下载路由侠并注册安装到Windows系统2、点击内网映射&#xff0c;添加映射&#xff0c;注册域名前缀3、选择网站应用4、配置你想要代理项…

ubuntu上一款好用的串口工具screen

看名字&#xff0c;你猜他是什么&#xff1f; 安装 sudo apt install screen 然后将USB串口接到虚拟机&#xff0c;执行dmesg命令查看串口设备名&#xff1a; 测试&#xff1a; sudo screen /dev/ttyUSB0 115200确实很简单。

数据结构(五)——树森林

5.4 树和森林 5.4.1 树的存储结构 树的存储1&#xff1a;双亲表示法 用数组顺序存储各结点&#xff0c;每个结点中保存数据元素、指向双亲结点(父结点)的“指针” #define MAX_TREE_SIZE 100// 树的结点 typedef struct{ElemType data;int parent; }PTNode;// 树的类型 type…

【Godot4.2】像素直线画法及点求取函数

概述 基于CanvasItem提供的绘图函数进行线段绘制只需要直接调用draw_line函数就可以了。 但是对于可以保存和赋值节点直接使用的纹理图片&#xff0c;却需要依靠Image类。而Image类没有直接提供基于像素的绘图函数。只能依靠set_pixel或set_pixelv进行逐个像素的填色。 所以…

C++项目——集群聊天服务器项目(三)muduo网络库

今天来介绍集群聊天器项目中网络模块代码的核心模块——muduo网络库&#xff0c;一起来看看吧~ 环境搭建C项目——集群聊天服务器项目(一)项目介绍、环境搭建、Boost库安装、Muduo库安装、Linux与vscode配置-CSDN博客 Json第三方库C项目——集群聊天服务器项目(二)Json第三方库…

Linux的介绍以及其发展历史

文章目录 前言一、技术是推动社会发展的基本动力1.人为什么能成为万物之长呢&#xff1f;2.人为什么要发明工具&#xff0c;进行进化呢&#xff1f;3.人是如何发明工具的&#xff1f;4.为什么要有不同的岗位和行业&#xff1f; 二、计算机(操作系统)发展的基本脉络1.第一台计算…

Xilinx高级调试方法--多卡调试

Xilinx高级调试方法--多卡调试 1 测试工程2 驱动修改3 工程测试 本文主要介绍基于XVC技术实现多卡调试的方法 1 测试工程 加速卡1 Verdor ID&#xff1a;1BD4Device ID&#xff1a;903E 加速卡2 Verdor ID&#xff1a;1BD4Device ID&#xff1a;903F 2 驱动修改 为了同时识…

大数据技术原理与应用 01.大数据概述

不可以垂头丧气&#xff0c;会显矮 —— 24.3.24 参考学习&#xff1a;厦门大学 林子雨老师 大数据技术原理与应用 一、大数据时代 大数据概念、影响、应用、关键技术 大数据与云计算、物联网的关系 ①三次信息化浪潮时代 ②第三次信息化浪潮的技术支撑 1>存储设备容量不断…

ARM:按键中断

key_inc.c #include"key_inc.h"void key1_it_config(){//使能GPIOF外设时钟RCC->MP_AHB4ENSETR | (0x1<<5);//将PF9设置为输入模式GPIOF->MODER & (~(0x3<<18));//设置由PF9管脚产生EXTI9事件EXTI->EXTICR3 & (~(0XFF<<8));EXTI…

msyq类型类转换造成索引失效

今天碰到一个慢sql的问题&#xff0c;sql明明按照最前缀的原则写的&#xff0c;但是索引就是不生效&#xff0c;最终排查发现是因为索引字段发生类型转换造成的。 一、表结构 1、表字段 2、表索引 二、问题sql EXPLAIN SELECT * FROM t_res WHERE open 1 AND res_date &…