linux C++监听管道文件方式

方式一(传统读取文件,一直监听循环读取文件)

非阻塞打开文件,用read循环定时读取,性能不好

代码如下:

#include <iostream>
#include <fstream>
#include <functional>
#include <thread>
#include <chrono>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <cstring>constexpr int kBufferSize = 256;class FileDescriptor {
public:explicit FileDescriptor(int fd) : fd_(fd) {}~FileDescriptor() {if (fd_ >= 0) {::close(fd_);}}int get() const {return fd_;}private:int fd_;
};class PipeListener {
public:using DataCallback = std::function<void(std::string&&)>;PipeListener(const std::string& pipePath) : pipePath_(pipePath), stopListening_(false) {if (access(pipePath.c_str(), F_OK) == -1) {if (mkfifo(pipePath.c_str(), 0666) == -1) {std::cerr << "Failed to create the pipe file: " << pipePath_ << ". Error: " << strerror(errno) << std::endl;}}}~PipeListener() {stopListening();}void setDataCallback(DataCallback callback) {dataCallback_ = std::move(callback);}void startListening() {stopListening_ = false;listeningThread_ = std::thread(&PipeListener::listenThread, this);std::cout << "Listening for data on pipe: " << pipePath_ << std::endl;}void stopListening() {stopListening_ = true;if (listeningThread_.joinable()) {listeningThread_.join();std::cout << "Stopped listening on pipe: " << pipePath_ << std::endl;}}private:void listenThread() {auto fd = std::make_unique<FileDescriptor>(::open(pipePath_.c_str(), O_RDONLY | O_NONBLOCK));if (fd->get() < 0) {std::cerr << "Failed to open the pipe file: " << pipePath_ << ". Error: " << strerror(errno) << std::endl;return;}char buffer[kBufferSize];while (!stopListening_) {ssize_t bytesRead = ::read(fd->get(), buffer, sizeof(buffer));if (bytesRead > 0) {std::string data(buffer, bytesRead);if (!data.empty() && dataCallback_) {dataCallback_(std::move(data));}}std::this_thread::sleep_for(std::chrono::milliseconds(100));}}private:std::string pipePath_;DataCallback dataCallback_;std::thread listeningThread_;bool stopListening_;
};int main() {PipeListener pipeListener("/home/hello/tmp/test_pipe"); // 测试方式,启动程序之后,在终端用 echo "bt_upgrade" > /home/hello/tmp/test_pipe 命令就能触发pipeListener.setDataCallback([](std::string&& data) {std::cout << "Received size: " << data.size() << std::endl;std::cout << "Received data: " << data << std::endl;std::cout << "Received data (hex): ";for (char c : data) {std::cout << std::hex << (int)(unsigned char)c << " ";}std::cout << std::dec << std::endl;});pipeListener.startListening();std::cout << "main" << std::endl;std::this_thread::sleep_for(std::chrono::seconds(100));pipeListener.stopListening();return 0;
}

方式二(用epoll监听管道文件内容)

但是要停止程序时会阻塞在epoll_wait,等待管道消息之后才能正常退出

代码如下:

#include <iostream>
#include <fstream>
#include <functional>
#include <thread>
#include <chrono>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <cstring> constexpr int kBufferSize = 256;class FileDescriptor {
public:explicit FileDescriptor(int fd) : fd_(fd) {}~FileDescriptor() {if (fd_ >= 0) {std::cout << "close fd_ " << std::endl;::close(fd_);}}int get() const {return fd_;}private:int fd_;
};class PipeListener {
public:using DataCallback = std::function<void(std::string&&)>;PipeListener(const std::string& pipePath) : pipePath_(pipePath), stopListening_(false) {if (access(pipePath.c_str(), F_OK) == -1) {if (mkfifo(pipePath.c_str(), 0666) == -1) {std::cerr << "Failed to create the pipe file: " << pipePath_ << ". Error: " << strerror(errno) << std::endl;}}}~PipeListener() {stopListening();std::cout << "close epollfd_ " << std::endl;close(epollfd_);}void setDataCallback(DataCallback callback) {dataCallback_ = std::move(callback);}void startListening() {stopListening_ = false;listeningThread_ = std::thread(&PipeListener::listenThread, this);std::cout << "Listening for data on pipe: " << pipePath_ << std::endl;}void stopListening() {stopListening_ = true;if (listeningThread_.joinable()) {std::cout << " wait Stopped listening on pipe: " << pipePath_ << std::endl;listeningThread_.join();std::cout << "Stopped listening on pipe: " << pipePath_ << std::endl;}}private:void listenThread() {auto fd = std::make_unique<FileDescriptor>(::open(pipePath_.c_str(), O_RDONLY | O_NONBLOCK));if (fd->get() < 0) {std::cerr << "Failed to open the pipe file: " << pipePath_ << ". Error: " << strerror(errno) << std::endl;return;}int epollfd_ = epoll_create(1);if (epollfd_ == -1) {std::cerr << "Failed to create epoll instance. Error: " << strerror(errno) << std::endl;return;}struct epoll_event event;event.events = EPOLLIN | EPOLLET;  // Enable edge-triggered modeevent.data.fd = fd->get();if (epoll_ctl(epollfd_, EPOLL_CTL_ADD, fd->get(), &event) == -1) {std::cerr << "Failed to add file descriptor to epoll. Error: " << strerror(errno) << std::endl;return;}char buffer[kBufferSize];while (!stopListening_) {struct epoll_event events[1];int nfds = epoll_wait(epollfd_, events, 1, -1);if (nfds == -1) {std::cerr << "epoll_wait failed. Error: " << strerror(errno) << std::endl;break;}for (int i = 0; i < nfds; ++i) {if (events[i].events & EPOLLIN) {ssize_t bytesRead = ::read(fd->get(), buffer, sizeof(buffer));if (bytesRead > 0) {std::string data(buffer, bytesRead);if (!data.empty() && dataCallback_) {dataCallback_(std::move(data));}}}}}}private:std::string pipePath_;DataCallback dataCallback_;std::thread listeningThread_;bool stopListening_;int epollfd_;  // 新增的成员变量用于保存 epollfd_
};int main() {PipeListener pipeListener("/home/hello/tmp/test_pipe");// 测试方式,启动程序之后,在终端用 echo "bt_upgrade" > /home/hello/tmp/test_pipe 命令就能触发pipeListener.setDataCallback([](std::string&& data) {std::cout << "Received size: " << data.size() << std::endl;std::cout << "Received data: " << data << std::endl;std::cout << "Received data (hex): ";for (char c : data) {std::cout << std::hex << (int)(unsigned char)c << " ";}std::cout << std::dec << std::endl;});pipeListener.startListening();std::cout << "main" << std::endl;std::this_thread::sleep_for(std::chrono::seconds(1));// pipeListener.stopListening();std::cout << "main return" << std::endl;return 0;
}

方式三(用epoll监听管道文件内容)

用epoll监听管道文件内容,对象析构或者退出时会通过epoll唤醒epoll_wait,释放资源正常退出

代码如下:

#include <iostream>
#include <fstream>
#include <functional>
#include <thread>
#include <chrono>
#include <csignal>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <condition_variable>
#include <sys/eventfd.h>
#include <cstring>  constexpr int kBufferSize = 256;class PipeListener {
public:using DataCallback = std::function<void(std::string&&)>;PipeListener(const std::string& pipePath) : pipePath_(pipePath), stopListening_(true), epollfd_(0) {if (access(pipePath.c_str(), F_OK) == -1) {if (mkfifo(pipePath.c_str(), 0666) == -1) {std::cerr << "Failed to create the pipe file: " << pipePath_ << ". Error: " << strerror(errno) << std::endl;}}}~PipeListener() {stopListening();}void setDataCallback(DataCallback callback) {dataCallback_ = std::move(callback);}void startListening() {if (stopListening_ == false) {std::cout << "already start Listening " << std::endl;return;}stopListening_ = false;pipe_fd_ = open(pipePath_.c_str(), O_RDONLY | O_NONBLOCK);std::cout << "startListening pipe_fd_ ["<< pipe_fd_ <<"] " << std::endl;if (!pipe_fd_) {std::cerr << "Failed to open the pipe file: " << pipePath_ << ". Error: " << strerror(errno) << std::endl;return;}epollfd_ = epoll_create(1);std::cout << "startListening epollfd_ ["<< epollfd_ <<"] " << std::endl;if (epollfd_ == -1) {std::cerr << "Failed to create epoll instance. Error: " << strerror(errno) << std::endl;return;}struct epoll_event event;event.events = EPOLLIN | EPOLLET;  // Enable edge-triggered modeevent.data.fd = pipe_fd_;if (epoll_ctl(epollfd_, EPOLL_CTL_ADD, pipe_fd_, &event) == -1) {std::cerr << "Failed to add file descriptor to epoll. Error: " << strerror(errno) << std::endl;return;}// 创建用于通知的 eventfd,监听要求停止监听管道文件事件,方便安全释放资源退出程序eventfd_ = eventfd(0, EFD_NONBLOCK);std::cout << "startListening eventfd_ ["<< eventfd_ <<"] " << std::endl;if (eventfd_ == -1) {std::cerr << "Failed to create eventfd. Error: " << strerror(errno) << std::endl;return;}event.events = EPOLLIN | EPOLLET;event.data.fd = eventfd_;if (epoll_ctl(epollfd_, EPOLL_CTL_ADD, eventfd_, &event) == -1) {std::cerr << "Failed to add eventfd to epoll. Error: " << strerror(errno) << std::endl;return;}listeningThread_ = std::thread(&PipeListener::listenThread, this);std::cout << "Listening for data on pipe: " << pipePath_ << std::endl;}void stopListening() {if (stopListening_) {std::cout << "already stop Listening " << std::endl;return;}stopListening_ = true;// 写入一个字节到 eventfd 唤醒 epoll_wait,等待安全退出uint64_t value = 1;write(eventfd_, &value, sizeof(value));if (listeningThread_.joinable()) {listeningThread_.join();std::cout << "Stopped listening on pipe: " << pipePath_ << std::endl;}// 从 epoll 实例中删除文件描述符epoll_event event;event.data.fd = pipe_fd_;event.events = EPOLLIN | EPOLLET;epoll_ctl(epollfd_, EPOLL_CTL_DEL, pipe_fd_, &event);event.data.fd = eventfd_;event.events = EPOLLIN | EPOLLET;epoll_ctl(epollfd_, EPOLL_CTL_DEL, pipe_fd_, &event);close(pipe_fd_);close(eventfd_);close(epollfd_);std::cout << "pipe_fd_ ["<< pipe_fd_ <<"] close " << std::endl;std::cout << "epollfd_ ["<< epollfd_ <<"] close " << std::endl;std::cout << "eventfd_ ["<< eventfd_ <<"] close " << std::endl;}private:void listenThread() {std::cout << "listenThread start " << pipePath_ << std::endl;char buffer[kBufferSize];while (true) {struct epoll_event events[2];int nfds = epoll_wait(epollfd_, events, 2, -1);if (stopListening_) {break;}if (nfds == -1) {std::cerr << "epoll_wait failed. Error: " << strerror(errno) << std::endl;}for (int i = 0; i < nfds; ++i) {if (events[i].data.fd == pipe_fd_ && (events[i].events & EPOLLIN)) {ssize_t bytesRead = ::read(pipe_fd_, buffer, sizeof(buffer));if (bytesRead > 0) {std::string data(buffer, bytesRead);if (!data.empty() && dataCallback_) {dataCallback_(std::move(data));}}} else if (events[i].data.fd == eventfd_ && (events[i].events & EPOLLIN)) {// 读取 eventfd,清空它uint64_t value;read(eventfd_, &value, sizeof(value));}}}std::cout << "listenThread exit " << pipePath_ << std::endl;}private:std::string pipePath_;DataCallback dataCallback_;std::thread listeningThread_;bool stopListening_ = true; // 默认状态停止的int epollfd_;int eventfd_;int pipe_fd_;
};int main() {PipeListener pipeListener("/home/woan/tmp/test_pipe");// 测试方式,启动程序之后,在终端用 echo "bt_upgrade" > /home/woan/tmp/test_pipe 命令就能触发pipeListener.setDataCallback([](std::string&& data) {std::cout << "Received size: " << data.size() << std::endl;std::cout << "Received data: " << data << std::endl;std::cout << "Received data (hex): ";for (char c : data) {std::cout << std::hex << (int)(unsigned char)c << " ";}std::cout << std::dec << std::endl;});pipeListener.startListening();std::cout << "main" << std::endl;std::this_thread::sleep_for(std::chrono::seconds(10));pipeListener.stopListening();std::this_thread::sleep_for(std::chrono::seconds(2));pipeListener.startListening();std::this_thread::sleep_for(std::chrono::seconds(10));pipeListener.stopListening();return 0;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/207288.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

spring boot项目如何自定义参数校验规则

spring boot项目对参数进行校验时&#xff0c;比如非空校验&#xff0c;可以直接用validation包里面自带的注解。但是对于一些复杂的参数校验&#xff0c;自带的校验规则无法满足要求&#xff0c;此时需要我们自定义参数校验规则。自定义校验规则和自带的规则实现方式一样&…

时域频域(学习记录1)

1 小伙伴们&#xff0c;今天让我们一起来聊聊Something about DATA 系列。我们先回顾一下本系列对NVH测试中的数据采集做的整体介绍&#xff1a; A 数据采集过程&#xff1b; B 硬件设备&#xff1b; C 数采软件&#xff1b; D ATOM中的数据采集&#xff1b; 接下来的几篇文章…

java如何编写 Restful API

一、什么是RESTful API RESTful API是指符合REST&#xff08;Representational State Transfer&#xff09;架构风格的API。RESTful API是一种架构设计风格&#xff0c;它基于HTTP协议&#xff0c;使用常见的HTTP方法&#xff08;例如GET、POST、PUT、DELETE等&#xff09;对资…

cmake生成表达式

不积小流&#xff0c;无以成江海 <CONFIG:RELEASE> config这个关键字&#xff0c;主要是看CMAKE_BUILD_TYPE这个变量的值是不是和冒号后的一样&#xff0c;一样的话就返回true, 否则就是false. cmake_minimum_required(VERSION 3.10) project(Test) set(CMAKE_CXX_STA…

数据结构--二叉树

目录 1.二叉树链式结构的实现 1.1 前置说明 1.2 二叉树的遍历 1.2.1 前序、中序以及后序遍历 1.2.2 层序遍历及判断是否为完全二叉树 1.3 节点个数&#xff0c;叶子节点个数&#xff0c;第k层节点个数以及高度等 1.4 二叉树的创建和销毁 1.二叉树链式结构的实现 1.1 前置说…

Mysql启动占用内存过高解决

Hi, I’m Shendi Mysql启动占用内存过高解决 前言 最近服务器内存不够用了&#xff0c;甚至还出现了内存溢出问题&#xff0c;导致程序宕机。但请求与用户量并没有多少&#xff0c;所以从各种启动的程序中想方设法的尽可能的减少其占用的内存。 而在我的服务器中&#xff0c;…

几何尺寸智能测量仪为您带来经济效益提升

线材、棒材、管材、板材等产品的品质检测离不开一些基础几何尺寸的检测&#xff0c;随着产线自动化的提升&#xff0c;越来越多的产线开始使用智能测量仪&#xff0c;这不仅仅是因为其能带来品质的提升&#xff0c;更是因为其能带来的经济效益。 几何尺寸智能测量仪种类繁多&am…

JAVA网络编程——BIO、NIO、AIO深度解析

I/O 一直是很多Java同学难以理解的一个知识点&#xff0c;这篇帖子将会从底层原理上带你理解I/O&#xff0c;让你看清I/O相关问题的本质。 1、I/O的概念 I/O 的全称是Input/Output。虽常谈及I/O&#xff0c;但想必你也一时不能给出一个完整的定义。搜索了谷哥欠&#xff0c;发…

区块链创新应用场景不断拓展,实现去中心化

小编介绍&#xff1a;10年专注商业模式设计及软件开发&#xff0c;擅长企业生态商业模式&#xff0c;商业零售会员增长裂变模式策划、商业闭环模式设计及方案落地&#xff1b;扶持10余个电商平台做到营收过千万&#xff0c;数百个平台达到百万会员&#xff0c;欢迎咨询。 区块…

【Vulnhub 靶场】【BuffEMR: 1.0.1】【简单 - 中等】【20210831】

1、环境介绍 靶场介绍&#xff1a;https://www.vulnhub.com/entry/buffemr-101,717/ 靶场下载&#xff1a;https://download.vulnhub.com/buffemr/BuffEMR-v1.0.1.ova 靶场难度&#xff1a;简单 - 中等 发布日期&#xff1a;2021年08月31日 文件大小&#xff1a;4.6 GB 靶场作…

为什么每个 Java 开发者都需要了解 Scala

前面我们一起回顾了第九期 Scala & Java Meetup 中最受关注的话题 —— jdk 并发编程的终极解决方案&#xff1a;虚拟线程&#xff0c;探讨了这一新特性对包括 Scala 在内的响应式编程语言的影响。 本次 Meetup 的首位分享者 Chunsen&#xff0c;在加入 Tubi 成为 Scala 开…

【学习笔记】Burnside引理,Pólya定理及其应用

Burnside引理 书接上回&#xff0c;继续深入研究在群作用下集合的轨道与稳定子群的相关性质 现在我们想要研究这样一个问题&#xff1a; 有限群 G 在有限集合 S 上面有一个作用&#xff0c;那么 S 的 G − 轨道条数是多少 有限群G在有限集合S上面有一个作用&#xff0c;那么…

【投稿优惠|检索稳定】2024年信息系统、工程与数字化经济国际会议(ICISEDE 2024)

2024年信息系统、工程与数字化经济国际会议(ICISEDE 2024) 2024 International Conference on Information Systems and Engineering and the Digital Economy(ICISEDE 2024) [会议简介] 2024 年信息系统、工程与数字化经济国际会议(ICISEDE 2024)将于 2024 年 1 月 20 日在厦门…

Endnote在word中加入参考文献及自定义参考文献格式方式

第一部分&#xff1a;在word中增加引用步骤 1、先下载对应文献的endnote引用格式&#xff0c;如在谷歌学术中的下载格式如下&#xff1a; 2、在endnote中打开存储env的格式库&#xff0c;导入对应下载的文件格式&#xff1a;file>import>file>choose,import对应文件&a…

IT外包驻场加速企业IT创新

随着科技的快速发展&#xff0c;企业在追求创新和应用IT技术方面面临挑战。IT外包驻场服务成为许多企业的选择&#xff0c;助力企业实现快速、高效的IT项目实施和应用。 IT外包驻场服务的主要目标是帮助企业在IT创新方面取得突破。这种服务模式不仅仅是提供技术支持&#xff0c…

3D建模基础教程:模型UV讲解

本篇文章将带你探索3D建模中的模型UV。了解UV有助于你在3D建模中更好地进行纹理映射和材质应用&#xff0c;从而创建出更加逼真的3D场景。 uv坐标&#xff1a; UV坐标是用于映射纹理到3D模型表面的2D坐标。它们将2D纹理图像映射到3D模型的3D空间上&#xff0c;使模型表面在渲…

配电室无人值守改造

配电室无人值守改造是通过运用先进的技术和设备&#xff0c;将传统的需要人工值守的配电室改造成可以远程监控和管理的智能化配电室&#xff0c;从而实现无人值守。这种改造可以提高配电室的安全性、可靠性和效率&#xff0c;降低运维成本。 建立智能监控系统&#xff1a;通过安…

Vue3选项式-基础部分篇

Vue3选项式风格-基础部分篇 简介模板语法文本插值原始HTMLAttribute 绑定使用 JavaScript 表达式调用函数全局组件调用内置指令动态参数注意事项 data()data()深度响应 methods有状态的methods(防抖) DOM更新时机计算属性class和style绑定条件渲染列表渲染数组变换侦听事件处理…

Linux 系统设置cpu频率

source_code: https://github.com/emagii/cpufrequtils cpufreq-set - A small tool which allows to modify cpufreq settings.&#xff08;修改内存频率的工具&#xff09; cpufreq-set allows you to modify cpufreq settings without having to type e.g. “/sys/devices…

echart中定义brush,默认状态,触发状态

1.定义矩形选择笔刷&#xff1a;brush 2.设置brush的默认状态和选中逻辑