Linux笔记---进程间通信:匿名管道

1. 管道通信

1.1 管道的概念与分类

管道(Pipe) 是进程间通信(IPC)的一种基础机制,主要用于在具有亲缘关系的进程(如父子进程、兄弟进程)之间传递数据,其核心特性是通过内核缓冲区实现单向或半双工的数据传输。

  • 匿名管道:通常用于具有亲缘关系的进程之间通信,如父子进程或兄弟进程。它是半双工的,数据只能在一个方向上流动,有固定的读端和写端,且只存在于内存中,不属于任何文件系统,但可以使用普通的read、write等函数进行读写。
  • 命名管道(FIFO):可以在无关的进程之间进行通信,有路径名与之相关联,以一种特殊设备文件形式存在于文件系统中。创建后,无关进程可以通过该文件进行通信,通信方式类似于使用文件传输数据,遵循先进先出原则。

管道是轻量级且高效的进程间通信方式,适用于简单的数据流场景,但其单向性和容量限制使其不适合复杂需求。命名管道扩展了应用范围,但需注意文件系统的依赖。

1.2 管道的原理

在操作系统还不支持进程间通信的时候,人们尝试使用操作系统已有的功能来实现进程间通信。

要实现进程间通信,就需要两个进程访问共享的资源,什么资源是各个进程都可以共享访问的呢?

答案显而易见:文件。

父进程打开一个文件并创建子进程,子进程就会继承父进程的文件描述符表,这样父子进程就可以访问同一个文件,通过向文件当中进行读写就可以实现进程间通信。

当然,对文件的访问是需要同步与互斥机制的,这一点由操作系统来实现,我们并不关心。

两个进程之间的通信一般都是些临时的小体量的消息,无需将其正真存入到文件当中(而且存入文件当中会造成较大的访存消耗)。实际上,我们只需要在struct file维护的文件缓冲区当中进行信息交换即可。

于是,在操作系统在这个思路的基础之上,实现了管道机制。

所谓管道,就是一种特殊的管道文件,其本质上是内核管理的一段环形内存缓冲区,通过文件描述符提供单向或半双工的数据流传输。

2. 匿名管道的使用

2.1 pipe函数

我们说,管道文件是一种特殊的文件,那么其打开的方式(或者说创建的方式)自然也要与一般的文件进行区别。

在Linux当中,我们使用pipe函数来创建一个匿名管道:

#include <unistd.h>
int pipe(int pipefd[2]);

返回值:成功返回 0,失败返回 -1 并设置 errno。

参数:pipefd 是长度为 2 的整型数组,用于返回两个文件描述符:

  • pipefd[0]:管道的读端,只能用于读取数据。
  • pipefd[1]:管道的写端,只能用于写入数据。

注意,管道只能进行单向数据传输,这意味着共享管道的父子进程一个只能读,一个只能写。

在实践当中,我们应当关闭当前进程未使用的端口:

#include <unistd.h>int main()
{int pipefd[2] = {0};int n = pipe(pipefd);if(n == -1){perror("pipe:");return 1;}int id = fork();if(id == 0){// 子进程写close(fd[0]);// ...}else{// 父进程读close(fd[1]);// ...}
}

2.2 管道读写规则

当没有数据可读时:

  • O_NONBLOCK disable:read调用阻塞,即进程暂停执行,一直等到有数据来到为止。
  • O_NONBLOCK enable:read调用返回 -1,errno值为EAGAIN。

当管道满的时:

  • O_NONBLOCK disable: write调用阻塞,直到有进程读走数据
  • O_NONBLOCK enable:调用返回-1,errno值为EAGAIN

文件描述符关闭:

  • 如果所有管道写端对应的文件描述符被关闭:read不再阻塞而是返回0。
  • 如果所有管道读端对应的文件描述符被关闭:write操作会产生信号SIGPIPE,进而可能导致write进程退出。

原子性规则: 

  • 小数据写入(≤ PIPE_BUF,通常 4KB): 内核保证写入的原子性,即数据要么完整写入,要么完全不写入。
  • 大数据写入(PIPE_BUF): 不保证原子性,数据可能被其他进程的写入操作穿插,且可能部分写入。

注:O_NONBLOCK为pipe2的选项(比pipe多一个选项参数)。 

 3. 进程池

学习完匿名管道的基本使用,我们可以动手尝试编写一个基于匿名管道的进程池。

平时,各个子进程就阻塞在read处等待,当父进程通过管道对其下达任务时就会将其唤醒。

.hpp后缀的文件其实就是.cpp和.h文件的结合体,类似于java的包。

3.1 Channel.hpp

首先,我们定义一个Channel类用于管理父子进程之间的通信管道(信道):

#include <vector>
#include <sys/types.h>
#include <sys/wait.h>
#include <cassert>
#include <iostream>class Channel
{
public:Channel(int wfd, int pid):_wfd(wfd), _process(pid){}~Channel(){}void CloseAndWait(){close(_wfd);std::cout << _process << "的信道关闭成功" << std::endl;waitpid(_process, nullptr, 0);std::cout << "进程" << _process << "已被成功回收" << std::endl;}// 通过信道将任务提交给子进程执行void ExecuteTask(int code){std::cout << "将任务" << code << "派遣给" << _process << std::endl;write(_wfd, &code, sizeof(code));}int GetPid(){return _process;}
private:int _wfd;pid_t _process;
};

由于进程池中进程的数量可能很多,信道也相对变多,我们应当定义一个类来管理这些信道:

class ChannelManager
{
public:void Insert(Channel&& channel){_Channels.push_back(channel);}int Size(){return _Channels.size();}// 选择进程并将任务分派出去void GiveTask(int code){int channel = SelectChannel();std::cout << "选择进程: " << _Channels[channel].GetPid() << std::endl;_Channels[channel].ExecuteTask(code);}void CloseAndWait(){for(auto& channel : _Channels){channel.CloseAndWait();}}private:// 选择进程int SelectChannel(){// 轮询分派任务static int next = 0;assert(_Channels.size());int tmp = next;next = (next + 1) % _Channels.size();return tmp;}std::vector<Channel> _Channels;
};

3.2 Task.hpp

任务实际上就是一个个的函数,同样地,由于任务可能有很多,我们也使用一个类来进行管理:

#include <functional>
#include <vector>
#include <iostream>
#include <unistd.h>
#include <cassert>
using Task = std::function<void()>;class TaskManager
{
public:// 注册,即将任务插入数组并管理起来void RegisterTask(Task&& task){_Tasks.push_back(task);}int Size(){return _Tasks.size();}// 根据任务码(数组下标)返回相应的任务对象Task& GetTask(int code){assert(code >= 0 && code < _Tasks.size());return _Tasks[code];}
private:std::vector<Task> _Tasks;
};

3.3 ProcessPool.hpp

完成上面的准备工作,我们就可以开始着手构建我们ProcessPool类了,TODO:

  • 对ChannelManager和TaskManager进行封装。
  • 提供给用户插入任务,发布任务等的接口。
  • 开启(Start):创建子进程并使其开始等待任务到达、创建信道并插入ChannelManager。
  • 终止(Stop):销毁信道并回收子进程。
#include "Channel.hpp"
#include "Task.hpp"class ProcessPool
{
public:ProcessPool(int size = 5):_size(size){std::cout << "ProcessPool已创建" << std::endl;}~ProcessPool(){// 假如用户忘记终止并回收进程if(_activate){_CM.CloseAndWait();}}// 子进程转入此函数并循环等待任务到达后执行void Work(int rfd){int code = 0;std::cout << "子进程" << getpid() << "开始工作" << std::endl;while(true){ssize_t n = read(rfd, &code, sizeof(code));if(n == 0) {std::cout << "进程" << getpid() << "退出" << std::endl;break;}else if(n < 0){std::cout << "进程" << getpid() << "获取任务时发生错误" << std::endl;break;}else _TM.GetTask(code)();}}void Start(){for(int i = 0; i < _size; i++){int fds[2] = {0};int n = pipe(fds);if(n == -1){perror("pipe:");}int id = fork();if(id < 0){perror("fork:");exit(1);}else if(id == 0){// 子进程close(fds[1]);Work(fds[0]);close(fds[0]);exit(0);}// 父进程close(fds[0]);_CM.Insert(Channel(fds[1], id));}_activate = true;}// 用户发布任务的接口,交由ChannelManager处理void LaunchTask(int code){assert(code >= 0 && code <= _TM.Size());std::cout << "发布任务: " << code << std::endl;_CM.GiveTask(code);}void Stop(){_CM.CloseAndWait();_activate = false;}// 封装TaskManager的接口,使用户自定义任务void RegisterTask(Task&& task){_TM.RegisterTask(std::forward<Task>(task));}
private:int _size;bool _activate = false;ChannelManager _CM;TaskManager _TM;
};

3.4 Main.cpp

#include "ProcessPool.hpp"
#include <ctime>int main()
{std::cout << "程序启动" << std::endl;srand((unsigned int)time(nullptr));ProcessPool processpool;// 生成n个测试任务int n = 10;for(int i = 0; i < 10; i++){processpool.RegisterTask(([i](){std::cout << "进程" << getpid() << "正在执行任务" << i << std::endl;}));}processpool.Start();// 随机发布10个任务while(n--){int code = rand() % 10;processpool.LaunchTask(code);sleep(2);}processpool.Stop();return 0;
}

 3.5 匿名管道的死锁问题

上面的代码实际上存在一个严重的问题,那就是在10个任务执行结束之后进行信道的销毁时:

在第一个信道提示关闭之后,并没有显式子进程退出的消息,而是直接卡住不动了。查看源代码会发现问题就是出在这一行,说明在信道被关闭之后,子进程并没有退出。

这是由于后创建的子进程继承了父进程对之前创建的子进程的写端口: 

所以,在父进程的视角上关闭信道之后,管道1的写端依然没有完全关闭,子进程就会继续在read处阻塞等待。子进程因等待父进程下达指令或关闭信道而阻塞;父进程因等待子进程退出而阻塞。 此时就形成了死锁,导致程序卡住。

解决方案
  • 方案1:先关闭所有的信道再等待子进程退出。
  • 方案2:逆向关闭信道并退出。
  • 方案3:关闭子进程从父进程那里继承下来的写入端。

4. 最终代码

代码最终采用的是第三种方案,因为该方案的安全性更高,当然前两种方案被部分注释了,读者可以自己尝试修改死锁的解决方案。

4.1 Channel.hpp

#include <vector>
#include <string>
#include <sys/types.h>
#include <sys/wait.h>
#include <cassert>
#include <iostream>class Channel
{
public:Channel(int wfd, int pid):_wfd(wfd), _process(pid){}~Channel(){}void SubProcessCloseBrother(){close(_wfd);}void Close(){std::cout << "关闭" << _process << "的信道" << std::endl;close(_wfd);std::cout << _process << "的信道关闭成功" << std::endl;}void Wait(){waitpid(_process, nullptr, 0);std::cout << "进程" << _process << "已被成功回收" << std::endl;}// 确保调用该函数的信道为当前最后启动的 或者 事先关闭所有子进程的写入端,否则会造成死锁void CloseAndWait(){close(_wfd);std::cout << _process << "的信道关闭成功" << std::endl;waitpid(_process, nullptr, 0);std::cout << "进程" << _process << "已被成功回收" << std::endl;}void ExecuteTask(int code){std::cout << "将任务" << code << "派遣给" << _process << std::endl;write(_wfd, &code, sizeof(code));}int GetPid(){return _process;}
private:int _wfd;pid_t _process;
};class ChannelManager
{
public:void Insert(Channel&& channel){_Channels.push_back(channel);}int Size(){return _Channels.size();}void GiveTask(int code){int channel = SelectChannel();std::cout << "选择进程: " << _Channels[channel].GetPid() << std::endl;_Channels[channel].ExecuteTask(code);}// 方案1:先关闭后回收void CloseChannels(){for(auto& channel : _Channels){channel.Close();}}void WaitProcesses(){for(auto& channel : _Channels){std::cout << "回收进程" << channel.GetPid() << std::endl;channel.Wait();}}// 方案2:反向关闭回收// void CloseAndWait()// {//     for(int i = _Channels.size() - 1; i >= 0; i--)//     {//         _Channels[i].CloseAndWait();//     }// }// 方案3:关闭所有子进程的写入端,可以任意方式关闭回收void CloseAndWait(){for(auto& channel : _Channels){channel.CloseAndWait();}}void SubProcessCloseBrothers(){for(auto& channel : _Channels){channel.SubProcessCloseBrother();}}private:int SelectChannel(){// 轮询分派任务static int next = 0;assert(_Channels.size());int tmp = next;next = (next + 1) % _Channels.size();return tmp;}std::vector<Channel> _Channels;
};

4.2 Task.hpp

#include <functional>
#include <vector>
#include <iostream>
#include <unistd.h>
#include <cassert>
using Task = std::function<void()>;class TaskManager
{
public:void RegisterTask(Task&& task){_Tasks.push_back(task);}int Size(){return _Tasks.size();}Task& GetTask(int code){assert(code >= 0 && code < _Tasks.size());return _Tasks[code];}
private:std::vector<Task> _Tasks;
};

4.3 ProcessPool.hpp

#include "Channel.hpp"
#include "Task.hpp"class ProcessPool
{
public:ProcessPool(int size = 5):_size(size){std::cout << "ProcessPool已创建" << std::endl;}~ProcessPool(){if(_activate){_CM.CloseChannels();_CM.WaitProcesses();}}void Work(int rfd){int code = 0;std::cout << "子进程" << getpid() << "开始工作" << std::endl;while(true){ssize_t n = read(rfd, &code, sizeof(code));if(n == 0) {std::cout << "进程" << getpid() << "退出" << std::endl;break;}else if(n < 0){std::cout << "进程" << getpid() << "获取任务时发生错误" << std::endl;break;}else _TM.GetTask(code)();}}void Start(){for(int i = 0; i < _size; i++){int fds[2] = {0};int n = pipe(fds);if(n == -1){perror("pipe:");}int id = fork();if(id < 0){perror("fork:");exit(1);}else if(id == 0){// 子进程close(fds[1]);// 将子进程的写入端全部关闭_CM.SubProcessCloseBrothers();Work(fds[0]);close(fds[0]);exit(0);}// 父进程close(fds[0]);_CM.Insert(Channel(fds[1], id));}_activate = true;}void LaunchTask(int code){assert(code >= 0 && code <= _TM.Size());std::cout << "发布任务: " << code << std::endl;_CM.GiveTask(code);}void Stop(){// _CM.CloseChannels();// _CM.WaitProcesses();_CM.CloseAndWait();_activate = false;}void RegisterTask(Task&& task){_TM.RegisterTask(std::forward<Task>(task));}
private:int _size;bool _activate = false;ChannelManager _CM;TaskManager _TM;
};

4.4 Makefile

ProcessPool:Main.cppg++ -o $@ $^ -std=c++11.PHONY:clean
clean:rm ProcessPool

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/80210.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Ollama API 应用指南

1. 基础信息 默认地址: http://localhost:11434/api数据格式: application/json支持方法: POST&#xff08;主要&#xff09;、GET&#xff08;部分接口&#xff09; 2. 模型管理 API (1) 列出本地模型 端点: GET /api/tags功能: 获取已下载的模型列表。示例:curl http://lo…

【OSCP-vulnhub】Raven-2

目录 端口扫描 本地/etc/hosts文件解析 目录扫描&#xff1a; 第一个flag 利用msf下载exp flag2 flag3 Mysql登录 查看mysql的运行权限 MySql提权&#xff1a;UDF 查看数据库写入条件 查看插件目录 查看是否可以远程登录 gcc编译.o文件 创建so文件 创建临时监听…

Podman Desktop:现代轻量容器管理利器(Podman与Docker)

前言 什么是 Podman Desktop&#xff1f; Podman Desktop 是基于 Podman CLI 的图形化开源容器管理工具&#xff0c;运行在 Windows&#xff08;或 macOS&#xff09;上&#xff0c;默认集成 Fedora Linux&#xff08;WSL 2 环境&#xff09;。它提供与 Docker 类似的使用体验…

极狐GitLab 权限和角色如何设置?

极狐GitLab 是 GitLab 在中国的发行版&#xff0c;关于中文参考文档和资料有&#xff1a; 极狐GitLab 中文文档极狐GitLab 中文论坛极狐GitLab 官网 权限和角色 (BASIC ALL) 将用户添加到项目或群组时&#xff0c;您可以为他们分配角色。该角色决定他们在极狐GitLab 中可以执…

解锁现代生活健康密码,开启养生新方式

在科技飞速发展的当下&#xff0c;我们享受着便捷生活&#xff0c;却也面临诸多健康隐患。想要维持良好状态&#xff0c;不妨从这些细节入手&#xff0c;解锁科学养生之道。​ 肠道是人体重要的消化器官&#xff0c;也是最大的免疫器官&#xff0c;养护肠道至关重要。日常可多…

Kafka 主题设计与数据接入机制

一、前言&#xff1a;万物皆流&#xff0c;Kafka 是入口 在构建实时数仓时&#xff0c;Kafka 既是 数据流动的起点&#xff0c;也是后续流处理系统&#xff08;如 Flink&#xff09;赖以为生的数据源。 但“消息进来了” ≠ “你就能处理好了”——不合理的 Topic 设计、接入方…

【绘制图像轮廓|凸包特征检测】图像处理(OpenCV) -part7

15 绘制图像轮廓 15.1 什么是轮廓 轮廓是一系列相连的点组成的曲线&#xff0c;代表了物体的基本外形。相对于边缘&#xff0c;轮廓是连续的&#xff0c;边缘不一定连续&#xff0c;如下图所示。轮廓是一个闭合的、封闭的形状。 轮廓的作用&#xff1a; 形状分析 目标识别 …

uniapp中使用<cover-view>标签

文章背景&#xff1a; uniapp中遇到了原生组件(canvas)优先级过高覆盖vant组件 解决办法&#xff1a; 使用<cover-view>标签 踩坑&#xff1a; 我想实现的是一个vant组件库中动作面板的效果&#xff0c;能够从底部弹出框&#xff0c;让用户进行选择&#xff0c;我直…

Kafka常见问题及解决方案

Kafka 是一个强大的分布式流处理平台&#xff0c;广泛用于高吞吐量的数据流处理&#xff0c;但在实际使用过程中&#xff0c;也会遇到一些常见问题。以下是一些常见的 Kafka 问题及其对应的解决办法的详细解答&#xff1a; 消息丢失 一、原因 1.生产端 网络故障、生产者超时…

leetcode 二分查找应用

34. Find First and Last Position of Element in Sorted Array 代码&#xff1a; class Solution { public:vector<int> searchRange(vector<int>& nums, int target) {int low lowwer_bound(nums,target);int high upper_bound(nums,target);if(low high…

【Docker】在容器中使用 NVIDIA GPU

解决容器 GPU 设备映射问题&#xff0c;实现 AI 应用加速 &#x1f517; 官方文档&#xff1a;NVIDIA Container Toolkit GitHub 常见错误排查 若在运行测试容器时遇到以下错误&#xff1a; docker: Error response from daemon: could not select device driver ""…

通过Quartus II实现Nios II编程

目录 一、认识Nios II二、使用Quartus II 18.0Lite搭建Nios II硬件部分三、软件部分四、运行项目 一、认识Nios II Nios II软核处理器简介 Nios II是Altera公司推出的一款32位RISC嵌入式处理器&#xff0c;专门设计用于在FPGA上运行。作为软核处理器&#xff0c;Nios II可以通…

JAVA设计模式——(三)桥接模式

JAVA设计模式——&#xff08;三&#xff09;桥接模式&#xff08;Bridge Pattern&#xff09; 介绍理解实现武器抽象类武器实现类涂装颜色的行为接口具体颜色的行为实现让行为影响武器修改武器抽象类修改实现类 测试 适用性 介绍 将抽象和实现解耦&#xff0c;使两者可以独立…

k8s 证书相关问题

1.重新生成新证书 kubeadm init phase certs apiserver-etcd-client --config ~/kubeadm.yaml这个命令表示生成 kube-apiserver 连接 etcd 使用的证书,生成后如下 -rw------- 1 root root 1.7K Apr 23 16:35 apiserver-etcd-client.key -rw-r--r-- 1 root root 1.2K Apr 23 …

比较:AWS VPC peering与 AWS Transit Gateway

简述: VPC 对等连接和 Transit Gateway 用于连接多个 VPC。VPC 对等连接提供全网状架构,而 Transit Gateway 提供中心辐射型架构。Transit Gateway 提供大规模 VPC 连接,并简化了 VPC 间通信管理,相比 VPC 对等连接,支持大量 VPC 的 VPC 间通信管理。 VPC 对等连接 AWS V…

制造企业PLM深度应用:2025年基于PDCA循环的7项持续改进指标

制造企业的产品生命周期管理&#xff08;PLM&#xff09;在数字化转型的浪潮中扮演着至关重要的角色。PLM深度应用不仅能够提升产品研发效率、保证产品质量&#xff0c;还能增强企业在市场中的竞争力。随着2025年智能制造目标的推进&#xff0c;基于PDCA循环的持续改进对于PLM的…

极狐GitLab 的压缩和合并是什么?

极狐GitLab 是 GitLab 在中国的发行版&#xff0c;关于中文参考文档和资料有&#xff1a; 极狐GitLab 中文文档极狐GitLab 中文论坛极狐GitLab 官网 压缩和合并 (BASIC ALL) 在你处理一个特性分支时&#xff0c;通常会创建一些小的、独立的提交。这些小提交帮助描述构建特性…

解耦旧系统的利器:Java 中的适配器模式(Adapter Pattern)实战解析

在现代软件开发中&#xff0c;我们经常需要与旧系统、第三方库或不一致接口打交道。这时候&#xff0c;如果能优雅地整合这些不兼容组件&#xff0c;又不破坏原有结构&#xff0c;就需要一位“翻译官” —— 适配器模式。本文将通过 Java 实例&#xff0c;详细讲解适配器模式的…

03-谷粒商城笔记

一个插件的install和生命周期的报错是不一样的 Maven找不到ojdbc6和sqljdbc4依赖包 这时候我找到了jar包&#xff0c;然后我就先找到一个jar安装到了本地仓库。 在终端上进行命令了&#xff1a; mvn install:install-file -DfileD:\ojdbc6-11.2.0.4.jar -DgroupIdcom.oracle …

黑马点评redis改 part 5

达人探店 发布探店笔记 那第一张表block表它里边的结构呢是这个 首先呢第一个字段是i d&#xff0c;就是主键&#xff0c;第二个呢是shop id&#xff0c;就是商户你发的这个比例啊&#xff0c;它是跟哪个商户有关系的。第三个呢用户id就是谁发的这篇笔记&#xff0c;第四个呢标…