linux下I/O模型并发的epoll多进程池协程实现

方法1

主要思路:

  1. 定义了一个EventData结构体,用于存储事件相关的数据,如文件描述符、epoll 文件描述符、协程 ID 等。
  2. EchoDeal函数用于处理请求消息,并生成响应消息。
  3. handlerClient函数是协程的执行函数,用于处理客户端连接。它通过循环读取数据、解析请求、执行业务处理、发送响应等步骤,实现了对客户端请求的处理。
  4. handler函数是主函数,用于创建监听套接字、初始化 epoll、设置非阻塞模式、添加读事件等操作。然后进入一个循环,通过 epoll_wait 等待事件发生,并根据事件类型进行相应的处理,如接受新连接、处理客户端请求等。
  5. 在主函数中,通过 fork 创建多个子进程,每个子进程都执行handler函数,从而实现多进程并发处理。

示例代码: 

#include <arpa/inet.h>  // 包含网络地址转换相关的头文件
#include <assert.h>  // 包含断言相关的头文件
#include <fcntl.h>  // 包含文件控制相关的头文件
#include <netinet/in.h>  // 包含网络协议相关的头文件
#include <stdio.h>  // 包含标准输入输出相关的头文件
#include <stdlib.h>  // 包含标准库相关的头文件
#include <sys/epoll.h>  // 包含 epoll 相关的头文件
#include <sys/socket.h>  // 包含套接字相关的头文件
#include <unistd.h>  // 包含 Unix 标准相关的头文件#include <iostream>  // 包含 C++ 的输入输出流头文件#include "../coroutine.h"  // 包含自定义的协程相关头文件
#include "../epollctl.hpp"  // 包含自定义的 epoll 控制相关头文件// 定义 EventData 结构体,用于存储事件相关的数据
struct EventData {  EventData(int fd, int epoll_fd) : fd_(fd), epoll_fd_(epoll_fd){};  // 构造函数,初始化成员变量int fd_{0};  // 文件描述符int epoll_fd_{0};  // epoll 文件描述符int cid_{MyCoroutine::INVALID_ROUTINE_ID};  // 协程 IDMyCoroutine::Schedule *schedule_{nullptr};  // 协程调度器指针
};  // 结构体定义结束// EchoDeal 函数,用于处理请求消息,并生成响应消息
void EchoDeal(const std::string reqMessage, std::string &respMessage) { respMessage = reqMessage; }  // 函数定义结束// handlerClient 函数,协程的执行函数,用于处理客户端连接
void handlerClient(void *arg) {  EventData *eventData = (EventData *)arg;  // 将传入的 void* 指针转换为 EventData* 指针auto releaseConn = [&eventData]() {  // 定义一个匿名函数用于释放连接资源EchoServer::ClearEvent(eventData->epoll_fd_, eventData->fd_);  // 清除事件delete eventData;  // 释放内存};ssize_t ret = 0;  // 用于存储读取或写入的返回值EchoServer::Codec codec;  // 编解码器对象std::string reqMessage;  // 请求消息字符串std::string respMessage;  // 响应消息字符串while (true) {  // 读操作循环uint8_t data[100];  // 数据缓冲区ret = read(eventData->fd_, data, 100);  // 尝试从文件描述符读取数据if (ret == 0) {  // 对端关闭连接perror("peer close connection");  // 打印错误信息releaseConn();  // 释放连接资源return;  // 函数返回}if (ret < 0) {  // 读取错误if (EINTR == errno) continue;  // 被中断,继续尝试读取if (EAGAIN == errno or EWOULDBLOCK == errno) {  // 无数据可读MyCoroutine::CoroutineYield(*eventData->schedule_);  // 让出 CPUcontinue;  // 继续下一次循环}perror("read failed");  // 打印读取失败的错误信息releaseConn();  // 释放连接资源return;  // 函数返回}codec.DeCode(data, ret);  // 解码读取的数据if (codec.GetMessage(reqMessage)) {  // 从解码数据中获取完整的请求消息break;  // 跳出读循环}}// 执行到这里说明已经读取到一个完整的请求EchoDeal(reqMessage, respMessage);  // 调用处理函数生成响应EchoServer::Packet pkt;  // 数据包对象codec.EnCode(respMessage, pkt);  // 对响应进行编码EchoServer::ModToWriteEvent(eventData->epoll_fd_, eventData->fd_, eventData);  // 切换为监听可写事件ssize_t sendLen = 0;  // 已发送数据的长度while (sendLen!= pkt.Len()) {  // 写操作循环ret = write(eventData->fd_, pkt.Data() + sendLen, pkt.Len() - sendLen);  // 尝试写入数据if (ret < 0) {  // 写入错误if (EINTR == errno) continue;  // 被中断,继续尝试写入if (EAGAIN == errno or EWOULDBLOCK == errno) {  // 不可写MyCoroutine::CoroutineYield(*eventData->schedule_);  // 让出 CPUcontinue;  // 继续下一次循环}perror("write failed");  // 打印写入失败的错误信息releaseConn();  // 释放连接资源return;  // 函数返回}sendLen += ret;  // 更新已发送数据的长度}releaseConn();  // 释放连接资源
}// handler 函数,主函数,用于创建监听套接字、初始化 epoll、设置非阻塞模式、添加读事件等操作
void handler(char *argv[]) {  int sockFd = EchoServer::CreateListenSocket(argv[1], atoi(argv[2]), true);  // 创建监听套接字if (sockFd < 0) {  // 如果创建失败return;  // 函数返回}epoll_event events[2048];  // epoll 事件数组int epollFd = epoll_create(1024);  // 创建 epoll 实例if (epollFd < 0) {  // 如果创建失败perror("epoll_create failed");  // 打印错误信息return;  // 函数返回}EventData eventData(sockFd, epollFd);  // 创建事件数据对象EchoServer::SetNotBlock(sockFd);  // 设置套接字为非阻塞EchoServer::AddReadEvent(epollFd, sockFd, &eventData);  // 添加可读事件MyCoroutine::Schedule schedule;  // 协程调度器MyCoroutine::ScheduleInit(schedule, 10000);  // 初始化协程池int msec = -1;  // 超时时间while (true) {  // 无限循环int num = epoll_wait(epollFd, events, 2048, msec);  // 等待 epoll 事件if (num < 0) {  // 如果等待失败perror("epoll_wait failed");  // 打印错误信息continue;  // 继续下一次循环} else if (num == 0) {  // 超时无事件sleep(0);  // 让出 CPUmsec = -1;  // 下次超时时间设置为 -1continue;  // 继续下一次循环}msec = 0;  // 下次大概率还有事件,故 msec 设置为 0for (int i = 0; i < num; i++) {  // 处理事件EventData *eventData = (EventData *)events[i].data.ptr;  // 获取事件数据指针if (eventData->fd_ == sockFd) {  // 是监听套接字的事件EchoServer::LoopAccept(sockFd, 2048, [epollFd](int clientFd) {EventData *eventData = new EventData(clientFd, epollFd);  // 创建新的事件数据对象EchoServer::SetNotBlock(clientFd);  // 设置套接字为非阻塞EchoServer::AddReadEvent(epollFd, clientFd, eventData);  // 监听可读事件});continue;  // 继续下一次循环}if (eventData->cid_ == MyCoroutine::INVALID_ROUTINE_ID) {  // 第一次事件,则创建协程if (MyCoroutine::CoroutineCanCreate(schedule)) {  // 可以创建协程eventData->schedule_ = &schedule;  // 设置协程调度器eventData->cid_ = MyCoroutine::CoroutineCreate(schedule, handlerClient, eventData, 0);  // 创建协程MyCoroutine::CoroutineResumeById(schedule, eventData->cid_);  // 唤醒刚刚创建的协程处理客户端请求} else {std::cout << "MyCoroutine is full" << std::endl;  // 输出协程已满的信息}} else {MyCoroutine::CoroutineResumeById(schedule, eventData->cid_);  // 唤醒之前主动让出 cpu 的协程}}MyCoroutine::ScheduleTryReleaseMemory(schedule);  // 尝试释放协程内存}
}// main 函数
int main(int argc, char *argv[]) {  if (argc!= 3) {  // 如果命令行参数数量不正确std::cout << "invalid input" << std::endl;  // 输出错误信息std::cout << "example:./EpollReactorProcessPoolCoroutine 0.0.0.0 1688" << std::endl;  // 给出正确用法示例return -1;  // 程序返回 -1}for (int i = 0; i < EchoServer::GetNProcs(); i++) {  // 循环创建子进程pid_t pid = fork();  // 创建进程if (pid < 0) {  // 创建失败perror("fork failed");  // 打印错误信息continue;  // 继续下一次循环}if (0 == pid) {  // 子进程handler(argv);  // 处理客户端请求exit(0);  // 子进程退出}}while (true) sleep(1);  // 父进程进入死循环return 0;  // 程序正常结束返回 0
}

方法2:

思路:

  1. main 函数首先检查命令行参数数量是否正确。
  2. 然后通过循环创建子进程。
  3. 子进程调用 handler 函数。
  4. 在 handler 函数中,创建监听套接字和 epoll 实例,设置套接字为非阻塞并添加可读事件,初始化协程调度器。
  5. 进入一个无限循环,通过 epoll_wait 等待事件发生。
  6. 根据事件的类型进行处理:
    • 如果是监听套接字的事件,处理新的连接。
    • 对于客户端连接的事件,如果是第一次事件则创建协程并唤醒,否则唤醒已有的协程。
  7. 协程中的 handlerClient 函数处理客户端的读写操作。
#include <arpa/inet.h>
#include <assert.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <unistd.h>#include <iostream>#include "../coroutine.h"
#include "../epollctl.hpp"// 定义事件数据结构体
struct EventData {EventData(int fd, int epoll_fd) : fd_(fd), epoll_fd_(epoll_fd){};  // 构造函数int fd_{0};  // 文件描述符int epoll_fd_{0};  // epoll 文件描述符int cid_{MyCoroutine::INVALID_ROUTINE_ID};  // 协程 IDMyCoroutine::Schedule *schedule_{nullptr};  // 协程调度器指针
};// 处理请求并生成响应的函数
void EchoDeal(const std::string reqMessage, std::string &respMessage) { respMessage = reqMessage; }// 处理客户端的函数
void handlerClient(void *arg) {EventData *eventData = (EventData *)arg;  // 获取事件数据指针auto releaseConn = [&eventData]() {  // 定义释放连接的匿名函数EchoServer::ClearEvent(eventData->epoll_fd_, eventData->fd_);delete eventData;  // 释放内存};ssize_t ret = 0;  // 读取结果EchoServer::Codec codec;  // 编解码器对象std::string reqMessage;  // 请求消息std::string respMessage;  // 响应消息while (true) {  // 读操作循环uint8_t data[100];  // 数据缓冲区ret = read(eventData->fd_, data, 100);  // 尝试读取数据if (ret == 0) {  // 对端关闭连接perror("peer close connection");releaseConn();return;}if (ret < 0) {  // 读取错误if (EINTR == errno) continue;  // 被中断,继续尝试if (EAGAIN == errno or EWOULDBLOCK == errno) {  // 无数据可读MyCoroutine::CoroutineYield(*eventData->schedule_);  // 让出 CPUcontinue;}perror("read failed");releaseConn();return;}codec.DeCode(data, ret);  // 解码数据if (codec.GetMessage(reqMessage)) {  // 获取完整请求break;}}// 执行到这里说明已经读取到一个完整的请求EchoDeal(reqMessage, respMessage);  // 处理请求生成响应EchoServer::Packet pkt;codec.EnCode(respMessage, pkt);  // 编码响应EchoServer::ModToWriteEvent(eventData->epoll_fd_, eventData->fd_, eventData);  // 切换为监听可写事件ssize_t sendLen = 0;  // 已发送长度while (sendLen!= pkt.Len()) {  // 写操作循环ret = write(eventData->fd_, pkt.Data() + sendLen, pkt.Len() - sendLen);  // 尝试写入if (ret < 0) {  // 写入错误if (EINTR == errno) continue;  // 被中断,继续尝试if (EAGAIN == errno or EWOULDBLOCK == errno) {  // 不可写MyCoroutine::CoroutineYield(*eventData->schedule_);  // 让出 CPUcontinue;}perror("write failed");releaseConn();return;}sendLen += ret;  // 更新已发送长度}releaseConn();  // 释放连接资源
}// 主处理函数
void handler(char *argv[]) {int sockFd = EchoServer::CreateListenSocket(argv[1], atoi(argv[2]), true);  // 创建监听套接字if (sockFd < 0) {return;}epoll_event events[2048];  // epoll 事件数组int epollFd = epoll_create(1024);  // 创建 epoll 实例if (epollFd < 0) {perror("epoll_create failed");return;}EventData eventData(sockFd, epollFd);  // 创建事件数据对象EchoServer::SetNotBlock(sockFd);  // 设置套接字为非阻塞EchoServer::AddReadEvent(epollFd, sockFd, &eventData);  // 添加可读事件MyCoroutine::Schedule schedule;  // 协程调度器MyCoroutine::ScheduleInit(schedule, 10000);  // 初始化协程池int msec = -1;  // 超时时间while (true) {int num = epoll_wait(epollFd, events, 2048, msec);  // 等待 epoll 事件if (num < 0) {  // 等待失败perror("epoll_wait failed");continue;} else if (num == 0) {  // 超时无事件sleep(0);  // 让出 CPUmsec = -1;  // 下次超时时间设置为 -1continue;}msec = 0;  // 有事件,下次超时时间设置为 0for (int i = 0; i < num; i++) {  // 处理事件EventData *eventData = (EventData *)events[i].data.ptr;if (eventData->fd_ == sockFd) {  // 是监听套接字的事件EchoServer::LoopAccept(sockFd, 2048, [epollFd](int clientFd) {EventData *eventData = new EventData(clientFd, epollFd);EchoServer::SetNotBlock(clientFd);EchoServer::AddReadEvent(epollFd, clientFd, eventData);  // 处理新连接});continue;}if (eventData->cid_ == MyCoroutine::INVALID_ROUTINE_ID) {  // 第一次事件if (MyCoroutine::CoroutineCanCreate(schedule)) {  // 可以创建协程eventData->schedule_ = &schedule;eventData->cid_ = MyCoroutine::CoroutineCreate(schedule, handlerClient, eventData, 0);  // 创建协程MyCoroutine::CoroutineResumeById(schedule, eventData->cid_);  // 唤醒协程} else {std::cout << "MyCoroutine is full" << std::endl;}} else {MyCoroutine::CoroutineResumeById(schedule, eventData->cid_);  // 唤醒已有协程}}MyCoroutine::ScheduleTryReleaseMemory(schedule);  // 尝试释放协程内存}
}int main(int argc, char *argv[]) {if (argc!= 3) {  // 检查命令行参数数量std::cout << "invalid input" << std::endl;std::cout << "example:./EpollReactorProcessPoolCoroutine 0.0.0.0 1688" << std::endl;return -1;}for (int i = 0; i < EchoServer::GetNProcs(); i++) {  // 循环创建子进程pid_t pid = fork();  // 创建进程if (pid < 0) {  // 创建失败perror("fork failed");continue;}if (0 == pid) {  // 子进程handler(argv);  // 处理客户端请求exit(0);}}while (true) sleep(1);  // 父进程进入死循环return 0;
}

总的来说,程序通过多进程和协程的结合,实现了对客户端连接的处理和高效的 I/O 操作。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/861340.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

gc.log中 CMS-concurrent-abortable-preclean

问题 在gc日志中看到 2024-06-26T16:16:07.5040800: 64690272.666: [CMS-concurrent-abortable-preclean-start]CMS: abort preclean due to time 2024-06-26T16:16:12.5530800: 64690277.716: [CMS-concurrent-abortable-preclean: 1.052/5.049 secs] [Times: user1.33 sys0…

Ubuntu系统安装软件---以安装QQ为例

以安装QQ为例&#xff0c;首先你的Ubuntu系统需要连上网&#xff0c;连上网的网络状态如下图所示。 在ubuntu系统的网页中搜索QQ&#xff0c;如下图所示。 进入QQ官网&#xff0c;点击Linux&#xff0c;如下图所示。 随后会让你选择什么架构的版本&#xff0c;如何查看自己的是…

【Python机器学习】分类向量——One-Hot编码(虚拟变量)

为了学习分类特征&#xff0c;以某国成年人收入数据集&#xff08;adult&#xff09;为例&#xff0c;adult数据集的任务是预测一名工人的收入是高于50k还是低于50k&#xff0c;这个数据集的特征包括工人的年龄、雇佣方式、教育水平、性别、每周工作时长、职业等。 这个任务属于…

【LinuxC语言】pthread_join与pthread_detach函数

文章目录 前言线程分离pthread_join函数作用函数原型参数含义返回值示例代码易混pthread_detach函数详解函数作用函数原型参数含义返回值示例代码总结前言 在并发编程中,线程的管理是一个重要的主题。特别是当我们需要处理线程的生命周期和资源管理时,这就变得尤为重要。在L…

代码随想录算法训练营第50天(py)| 动态规划 | 1143.最长公共子序列、1035.不相交的线、53. 最大子序和、392.判断子序列

1143.最长公共子序列 力扣链接 给定两个字符串 text1 和 text2&#xff0c;返回这两个字符串的最长 公共子序列&#xff08;未必连续&#xff09; 的长度。如果不存在 公共子序列 &#xff0c;返回 0 。 思路 确定dp含义 dp[i][j]&#xff1a;长度为[0,i-1]和[0,j-1]的最长公…

ONLYOFFICE 桌面编辑器 8.1使用体验分享

目录 编辑器市场现状与用户选择 ONLYOFFICE桌面编辑器概览和功能 ONLYOFFICE桌面编辑器概览 功能丰富的PDF编辑器 演示文稿编辑器的创新 文档编辑的灵活性 电子表格的高级功能 语言和本地化 用户界面和体验 媒体播放 云服务和本地处理 跨平台支持 总结 在线亲身体…

mapstruct实现各个实体间的类型转换(DTO转BO、BO转Entity)的实践

一、引入 在没有遇见mapstruct的时候&#xff0c;实现各个实体之间的转换&#xff0c;都是手动转换实现的&#xff0c;属性少一带你还好&#xff0c;当属性一多&#xff0c;代码就会变得很冗余&#xff0c;没必要的非逻辑的代码就会加多。。。。 比如&#xff1a; public cl…

vue封装原生table表格方法

适用场景&#xff1a;有若干个表格&#xff0c;前面几列格式不一致&#xff0c;但是后面几列格式皆为占一个单元格&#xff0c;所以需要封装表格&#xff0c;表格元素自动根据数据结构生成即可&#xff1b;并且用户可新增列数据。 分类&#xff1a; 固定数据部分 就是根据数据…

React--两种常见的组件嵌套方式

组件嵌套 简介在父组件外部直接使用子组在父组件的实现内部引入并使用子组件区别总结灵活性&#xff1a;可配置性&#xff1a;使用场景&#xff1a; 选择 简介 在 React 中&#xff0c;有两种常见的组件嵌套方式&#xff1a; 在父组件中直接使用子组件。在父组件的实现内部引…

openlayers性能优化——开启图层预加载、减少空白等待时间

使用切片图层时、地图拖拽会有空白图片&#xff0c;为了减少空白等待时间&#xff0c;我们可以开始图层预加载。 const map_top new Map({layers: [new TileLayer({preload:Infinity, //预加载source: new StadiaMaps({layer: "outdoors",}),}),],target: "ma…

LINKAI工作流的建立与调试,用到COW项目的微信机器人上

连接时需要把右边的号连到下一个框的输入&#xff0c;开始与结束是默认的。 可以单独调试模块 可以对模块进行个性化定义 最后进行总流程调试 将这里的code放到config.json文件中 接着又做了一个较复杂的工作流DgPz9wJaoh   QlCc34a8bP 原项目网址&#xff1a; https:/…

小程序中UnionID,AppID,AppSecret,OpenID怎么理解?

小程序中UnionID&#xff0c;AppID&#xff0c;AppSecret&#xff0c;OpenID怎么理解&#xff1f; 个人理解 UnionID:同一用户&#xff0c;对同一个微信开放平台下的不同应用&#xff0c;UnionID 是相同的。 AppID:小程序的身份证号码&#xff0c;是微信公众平台上的小程序 I…

【学习笔记-机器学习】感知机模型

Author&#xff1a;赵志乾 Date&#xff1a;2024-06-26 Declaration&#xff1a;All Right Reserved&#xff01;&#xff01;&#xff01; 1. 基本概念 数据集的线性可分性&#xff1a;给定一个数据集 其中&#xff0c;&#xff0c;&#xff0c;,如果存在某个超平面S 能够将数…

Python的100道练习题目,每日一练,必成大神!!!

整理了100道Python的题目&#xff0c;如果你是一位初学者&#xff0c;这一百多道题可以 帮助你轻松的使用Python。初学 者每天可以尝试3-5个问题&#xff0c;经过这一百道题的练习&#xff0c;要把练习昨晚并且完全懂了&#xff0c;基本上Python就已 经入门了。如果你不是初学者…

Day 34:2368. 受限条件下可到达节点的数目

Leetcode 2368. 受限条件下可到达节点的数目 现有一棵由 n 个节点组成的无向树&#xff0c;节点编号从 0 到 n - 1 &#xff0c;共有 n - 1 条边。 给你一个二维整数数组 edges &#xff0c;长度为 n - 1 &#xff0c;其中 edges[i] [ai, bi] 表示树中节点 ai 和 bi 之间存在一…

OpenCV 车道检测

OpenCV 车道检测 前言模型分析车道检测相关链接 前言 如果要检测道路图像中的车道&#xff0c;方法之一是利用深度学习的语义分割技术。而在 OpenCV 中解决此问题可以使用边缘检测器。在本节中&#xff0c;我们将了解如何使用边缘检测和直线检测识别道路图像中的车道。 模型分…

测试用例的基本要素与设计方法

测试用例的基本要素 测试用例&#xff08;Test Case&#xff09;是为了实施测试而向被测试的系统提供的一组集合&#xff0c;这组集合包含&#xff1a;测试环境、操作步骤、测试数据、预期结果等要素。 好的测试用例是一个不熟悉业务的人也能依据用例来很快的进行测试评价测试用…

RT-Thread使用HAL库实现双线程控制LED交替闪烁

如何创建工程我的其他文中你面有可以进去查看 1创建线程&#xff08;以动态方式实现&#xff09; 1-2创建函数入口 1-2启动函数 main.c文件源码 /** Copyright (c) 2006-2024, RT-Thread Development Team** SPDX-License-Identifier: Apache-2.0** Change Logs:* Date …

【课程总结】Day11(下):YOLO的入门使用

前言 YOLO的简介 YOLO&#xff08;You Only Look Once&#xff09;是一种流行的目标检测算法&#xff0c;由Joseph Redmon等人于2015年提出。YOLO的设计思想是将目标检测任务转化为单个神经网络的回归问题&#xff0c;通过在图像上划分网格并对每个网格预测边界框和类别置信度…

【鸿蒙】鸿蒙的Stage和 FA 有什么区别

鸿蒙的Stage模型和FA&#xff08;Feature Ability&#xff09;模型在多个方面存在显著的区别。以下是它们之间的主要差异点&#xff1a; 设计思想和出发点&#xff1a; Stage模型&#xff1a;设计基于为复杂应用而开发的出发点&#xff0c;旨在提供一个更好的开发方式&#xff…