基于多线程的Reactor模式的 回声服务器 EchoServer

记录下  

一个线程专门用来接受accept获取客户端的fd 

获取fd之后 从剩余的执行线程中 找到一个连接客户端数量最少的线程

然后将客户端的fd加入到这个线程中并通过EPOLL监听这个fd

线程之间通过eventfd来通信  将客户端的fd传到 对应的线程中  

参考了MediaServer   引入EventPollerPoll 和 EventPoller的 概念  

最少两个两个线程 设置为1的话 会改成2

cpp代码:

#include "durian.h"#include <sys/epoll.h>namespace DURIAN
{EventPoller::EventPoller(int id){m_id = id;}EventPoller::~EventPoller(){printf("~EventPoller signal m_id = %d m_run_flag = %d\n",m_id,m_run_flag);Wait();}bool EventPoller::Init(){m_poll_fd = epoll_create1(0);if(m_poll_fd == -1){return false;}m_event_fd = eventfd(0,0);if(m_event_fd == -1){printf("new fd failed\n");close(m_poll_fd);return false ;}return true;}void EventPoller::RunLoop(){static const int MAX_EVENTS = 1024;struct epoll_event events[MAX_EVENTS];while(m_run_flag){int ready_count = epoll_wait(m_poll_fd,events,MAX_EVENTS,2000);if(ready_count == -1){if(errno != EINTR){//exit(1);}//ready_count = 0;}else if(ready_count == 0){if(m_run_flag == false){//printf("time out and runflag = false exit thread\n");//break;}}for(int i = 0;i<ready_count;i++){const struct epoll_event &ev = events[i];int fd = events[i].data.fd;if(ev.events &(EPOLLIN | EPOLLERR |EPOLLHUP)){auto handler = m_accept_handlers[fd];handler(fd);}else if(ev.events & (EPOLLOUT | EPOLLERR | EPOLLHUP)){auto it = m_buffer_pool.find(fd);if(it!= m_buffer_pool.end()){auto &buf = it->second;if(buf.WriteData(fd) == false){Close(fd);}}}}}}int EventPoller::GetEventFD(){return m_event_fd;}int EventPoller::GetClients(){return m_accept_handlers.size();}void EventPoller::Stop(){m_run_flag = false;}void EventPoller::Start(){//printf("Enter EventPoller Start  m_id = %d pollfd = %d eventid = %d\n",m_id,m_poll_fd,m_event_fd);m_run_flag = true;m_thread_id = std::thread(&EventPoller::RunLoop,this);}void EventPoller::Wait(){if(m_thread_id.joinable()){m_thread_id.join();}}bool EventPoller::Add2Epoll(int fd){if(m_accept_handlers.count(fd) != 0){return false;}int flags = 1;if(ioctl(fd,FIONBIO,&flags) == -1){return false;}struct epoll_event ev;ev.events = EPOLLIN |EPOLLOUT |EPOLLET;ev.data.fd = fd;if(epoll_ctl(m_poll_fd,EPOLL_CTL_ADD,fd,&ev)==-1){return false;}return true;}void EventPoller::DeliverConn(int conn_fd){//printf("DeliverConn fd = %d\n",conn_fd);uint64_t count = conn_fd;if(write(m_event_fd,&count,sizeof(count)) == -1){printf("Deliverconn write failed\n");}}bool EventPoller::AddListener(int fd,ACCEPTER on_accept){if(Add2Epoll(fd) == false){return false;}std::cout<<"EventPoller AddListener fd = "<<fd<<std::endl;m_accept_handlers[fd] = [this,on_accept]( int server_fd){for(;;){int new_fd = accept(server_fd,nullptr,nullptr);std::cout<<"accept client fd = "<<new_fd<<std::endl;	if(new_fd == -1){if(errno!= EAGAIN){Close(server_fd);}return 0;}int enable = 1;setsockopt(new_fd,IPPROTO_TCP,TCP_NODELAY,&enable,sizeof(enable));on_accept(new_fd);}return 0;};return true;}bool EventPoller::AddEventer(int fd, EVENTER on_event){if(Add2Epoll(fd) == false){return false;}m_accept_handlers[fd] = [this,on_event](int cfd){for(;;){uint64_t count;if(read(cfd,&count,sizeof(count)) == -1){if(errno != EAGAIN){Close(cfd);}return 0;}on_event(count);}return 0;};return true;}bool EventPoller::AddReader(int fd, READER on_read){	if(Add2Epoll(fd) == false){return false;}m_accept_handlers[fd] = [this,on_read](int cfd){for(;;){char buf[4096] = {0};ssize_t ret = read(cfd,buf,sizeof(buf));if(ret == -1){if(errno != EAGAIN){Close(cfd);}return -1;}if(ret == 0){Close(cfd);printf("客户端关闭了连接 %d\n",cfd);return 0 ;}on_read(cfd,buf,ret);}};return true;}void EventPoller::Close(int fd){m_accept_handlers.erase(fd);m_buffer_pool.erase(fd);close(fd);}bool EventPoller::FlushData(int fd, const char * buf, size_t len){WriteBuffer *wb = nullptr;auto it = m_buffer_pool.find(fd);if(it == m_buffer_pool.end()){while(len >0){ssize_t ret = write(fd,buf,len);if(ret == -1){if(errno != EAGAIN){Close(fd);return false;}wb = &m_buffer_pool[fd];break;}buf+= ret;len-=ret;}if(len == 0){//Successreturn true;}}else{wb = &it->second;}wb->Add2Buffer(buf,len);return true;}static size_t g_pool_size = 0;
void EventPollerPool::SetPoolSize(size_t size)
{g_pool_size = size;
}
EventPollerPool & EventPollerPool::Instance()
{static std::shared_ptr<EventPollerPool> s_instance(new EventPollerPool()); static EventPollerPool &s_instance_ref = *s_instance; return s_instance_ref; 
}EventPollerPool::EventPollerPool()
{auto size = g_pool_size;auto cpus = std::thread::hardware_concurrency();size = size > 0 ? size : cpus;std::cout<<"Thread size:"<<size<<std::endl;if(size <2)size = 2;for (int i = 0; i < size; ++i) {std::shared_ptr<EventPoller> poller = std::make_shared<EventPoller>(i);m_pollers.emplace_back(poller);}
}std::shared_ptr<EventPoller> EventPollerPool::GetPoller()
{if(m_pollers.size()>1){int min_clients = 10000;int target_index = 0;for(int i = 1;i<m_pollers.size();i++){if(m_pollers[i]-> GetClients() < min_clients){min_clients = m_pollers[i]->GetClients();target_index = i;}}//printf("target index = %d min_clients = %d\n",target_index,min_clients);return m_pollers[target_index];}return m_pollers[0];}
std::shared_ptr<EventPoller> EventPollerPool::GetFirstPoller()
{return m_pollers[0];
}void EventPollerPool::StartPollers()
{for(int i = 1;i<m_pollers.size();i++){m_pollers[i]->Init();int event_fd = m_pollers[i]->GetEventFD();m_pollers[i]->AddEventer(event_fd,[&,i](uint64_t cfd){READER reader = [&,i](int fd,const char*data,size_t len){printf("Len[%s] content[%d] m_pollers[i] = %p i = %d\n",data,len,m_pollers[i],i);m_pollers[i]->FlushData(fd,data,len);return 0;};m_pollers[i]->AddReader(cfd,reader);return 0;});m_pollers[i]->Start();	}
}void EventPollerPool::Stop()
{for(int i = 0;i<m_pollers.size();i++){m_pollers[i]->Stop();}}}

头文件

#include <string.h>
#include <unistd.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <errno.h>
#include <netinet/tcp.h>#include <sys/eventfd.h>
#include <signal.h>#include <iostream>
#include <memory>
#include <list>
#include <vector>
#include <functional>
#include <thread>
#include <mutex>#include <unordered_map>namespace DURIAN
{class WriteBuffer{private:std::list<std::string> buf_items;size_t offset = 0;public:bool IsEmpty() const{return buf_items.empty();}void Add2Buffer(const char* data,size_t len){if(buf_items.empty() || buf_items.back().size()+len >4096){buf_items.emplace_back(data,len);}else{buf_items.back().append(data,len);}}bool WriteData(int fd){while (IsEmpty() == false){auto const &item = buf_items.front();const char *p = item.data() + offset;size_t len = item.size() -offset;while(len >0){ssize_t ret = write(fd,p,len);if(ret == -1){if(errno == EAGAIN){return true;}return false;}offset += ret;p+=ret;len-= ret;}buf_items.pop_front();}return true;}};using ACCEPTER = std::function<int(int)>;using WRITER = std::function<int(int)>;using EVENTER = std::function<int(int)>;using READER = std::function<int(int,const char *data,size_t)>;//static thread_local std::unordered_map<int fd,READER>g_th_handlers;class EventPoller{private:int m_poll_fd = -1;int m_id;bool m_run_flag = false;std::unordered_map<int,ACCEPTER> m_accept_handlers;std::unordered_map<int,WriteBuffer> m_buffer_pool;std::mutex m_connction_lock;int m_event_fd;std::thread m_thread_id ;std::vector<int>m_connections;void RunLoop();public:EventPoller(int i);~EventPoller();int GetEventFD();int GetClients();std::vector<int> & GetConnections();bool Init();void Start();void Stop();void Wait();	void DeliverConn(int conn_fd);bool AddListener(int fd,ACCEPTER on_listen);bool AddEventer(int fd,EVENTER on_event);bool AddReader(int fd,READER on_read);void Close(int fd);bool Add2Epoll(int fd);bool FlushData(int fd,const char *buf,size_t len);};class EventPollerPool{public:static EventPollerPool &Instance();static void SetPoolSize(size_t size = 0);std::shared_ptr<EventPoller>GetPoller(); std::shared_ptr<EventPoller>GetFirstPoller(); 	void StartPollers();void Stop(); private:int m_size;std::vector<std::shared_ptr<EventPoller>> m_pollers;EventPollerPool();		  };}

main文件

#include "durian.h"static bool g_run_flag = true;
void sig_handler(int signo)
{signal(SIGINT, SIG_IGN);signal(SIGTERM, SIG_IGN);signal(SIGKILL, SIG_IGN);g_run_flag = false;printf("Get exit flag\n");if (SIGINT == signo || SIGTSTP == signo || SIGTERM == signo|| SIGKILL == signo){g_run_flag = false;printf("\033[0;31mprogram exit by kill cmd !\033[0;39m\n");}}bool StartServer()
{int listen_fd = socket(AF_INET,SOCK_STREAM,0);if(listen_fd == -1){printf("Create socket failed\n");return false;}else{printf("Server listen fd is:%d\n",listen_fd);}int reuseaddr = 1;if(setsockopt(listen_fd,SOL_SOCKET,SO_REUSEADDR,&reuseaddr ,sizeof(reuseaddr)) == -1){return false;}struct sockaddr_in listen_addr = {0};listen_addr.sin_family  = AF_INET;listen_addr.sin_addr.s_addr = INADDR_ANY;listen_addr.sin_port = htons(8888);if(bind(listen_fd,(struct sockaddr*)&listen_addr,sizeof(listen_addr)) == -1){printf("bind failed\n");return false;}if(listen(listen_fd,100) == -1){printf("listen failed\n");return false;}DURIAN::EventPollerPool::SetPoolSize(1);DURIAN::EventPollerPool& pool = DURIAN::EventPollerPool::Instance(); pool.StartPollers();auto poller = pool.GetFirstPoller(); if(poller->Init()){if(poller->AddListener(listen_fd,[&](int conn_fd){printf("将新的fd加到epoll监听 fd =%d\n",conn_fd);//Deliver client fd to other pollerspool.GetPoller()->DeliverConn(conn_fd);return 0;}) == false){return false;}poller->Start();}while(g_run_flag){sleep(2);}pool.Stop();}void StopServer()
{DURIAN::EventPollerPool& pool = DURIAN::EventPollerPool::Instance(); pool.Stop();
}int main(int argc,char *argv[])
{printf(" cpp version :%d\n",__cplusplus);int thread_size = 1;bool run_flag = true;signal(SIGPIPE,SIG_IGN);signal(SIGTERM, sig_handler);signal(SIGKILL, sig_handler);signal(SIGINT,sig_handler); StartServer();return 0;
}

性能测试

ulimit -HSn 102400

ab -n 100000 -c 20000 http://192.168.131.131:8888/index.html
 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/101424.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Vue面试题十七】、你知道vue中key的原理吗?说说你对它的理解

文章底部有个人公众号&#xff1a;热爱技术的小郑。主要分享开发知识、学习资料、毕业设计指导等。有兴趣的可以关注一下。为何分享&#xff1f; 踩过的坑没必要让别人在再踩&#xff0c;自己复盘也能加深记忆。利己利人、所谓双赢。 面试官&#xff1a;你知道vue中key的原理吗…

Android Studio 配置Git SVN忽略文件

在使用Android Studio进行版本控制时&#xff0c;经常会遇到需要忽略某些文件的情况&#xff0c;比如临时文件、编译生成的文件等。这些文件虽然在项目中存在&#xff0c;但不希望被加入到版本控制中。 在Android Studio中设置忽略文件 在Android Studio中&#xff0c;我们可…

关于网络协议的若干问题(二)

1、网络号、IP 地址、子网掩码和广播地址的先后关系是什么&#xff1f; 答&#xff1a;当在一个数据中心或者一个办公室规划一个网络的时候&#xff0c;首先是网络管理员规划网段&#xff0c;一般是根据将来要容纳的机器数量来规划&#xff0c;一旦定了&#xff0c;以后就不好…

Soul CEO张璐团队以用户安全为核心,探索社交平台安全治理新路径

“认同感”,是现代年轻人当下的核心社交需求之一,作为年轻人喜爱的新型开放式社交平台,Soul APP为年轻人们提供了一个自在表达、轻松互动的平台,为用户带来了志趣相投、精神共鸣的高质量网络连接。在Soul日活近千万的用户中,超过七成为Z世代年轻群体,如何能够为Z世代提供更安全…

sql 注入 文件读写 木马植入 远程控制

sql 注入 文件读写 木马植入 远程控制 一, 检测读写权限 查看mysql全局变量 SHOW GLOBAL VARIABLES LIKE %secure%secure_file_priv 空, 则任意读写secure_file_priv 路径, 则只能读写该路径下的文件secure_file_priv NULL, 则禁止读写二, 读取文件, 使用 load_file() 函数…

ArcGIS/GeoScene脚本:基于粒子群优化的支持向量机回归模型

参数输入 1.样本数据必须包含需要回归的字段 2.回归字段是数值类型 3.影响因子是栅格数据&#xff0c;可添加多个 4.随机种子可以确保每次运行的训练集和测试集一致 5.训练集占比为0-1之间的小数 6.迭代次数&#xff1a;迭代次数越高精度越高&#xff0c;但是运行时间越长…

NodeJs内置模块child_process

内置模块child_process子进程 写在前面 子进程是Nodejs的核心Api&#xff0c;如果你会shell命令&#xff0c;它会有非常大的帮助&#xff0c;或者你喜欢编写前端工程化工具之类&#xff0c;它也有很大的用处&#xff0c;以及处理CPU密集型应用。 创建子进程 Nodejs创建子进…

黑群晖video station评级问题

黑群晖video station评级问题 环境 群晖Version: 6.2.3-25423video station 2.4.10方法1,py文件 登录ssh,获取sudo权限 cd /var/packages/VideoStation/target/plugins/syno_themoviedbsudo vim search.py替换movie_data[vote_average] 替换为 round(movie_data[vote_avera…

【开题报告】如何借助chatgpt完成毕业论文开题报告

步骤 1&#xff1a;确定论文主题和研究问题 首先&#xff0c;你需要确定你的论文主题和研究问题。这可以是与软件开发、算法、人工智能等相关的任何主题。确保主题具有一定的研究性和可行性。 步骤 2&#xff1a;收集相关文献和资料 在开始撰写开题报告之前&#xff0c;收集相…

MYSQL 敏感数据加密后进行模糊查询

问题&#xff1a; 对于一些敏感数据&#xff0c;比如用户的手机号、身份证号、银行卡号之类进行加密处理&#xff0c;是一些系统的常用处理方式。但是这保证了数据的安全之外又诞生了另外一个问题&#xff0c;就是搜索这些信息的时候&#xff0c;模糊查询变得困难。 解决方案…

ACL访问控制列表的解析和配置

ACL的解析 个人简介 ACL - Access Control List 访问控制列表 策略 ------行为 允许/拒绝 ACL --包含两种 标准ACL 扩展ACL 标准ACL&#xff1a;只能针对源IP地址做限制 针对路由条目的限制 -路由策略 思科编号&#xff1a;1-99之间或1300-1999 扩展ACL&#xff1a;针对…

Java Spring Boot中的爬虫防护机制

随着互联网的发展&#xff0c;爬虫技术也日益成熟和普及。然而&#xff0c;对于某些网站来说&#xff0c;爬虫可能会成为一个问题&#xff0c;导致资源浪费和安全隐患。本文将介绍如何使用Java Spring Boot框架来防止爬虫的入侵&#xff0c;并提供一些常用的防护机制。 引言&a…

python每日一练(5)

&#x1f308;write in front&#x1f308; &#x1f9f8;大家好&#xff0c;我是Aileen&#x1f9f8;.希望你看完之后&#xff0c;能对你有所帮助&#xff0c;不足请指正&#xff01;共同学习交流. &#x1f194;本文由Aileen_0v0&#x1f9f8; 原创 CSDN首发&#x1f412; 如…

黑马点评-06缓存雪崩问题(大量key失效)及其解决方案

缓存雪崩问题(大量key失效) 解决方案 缓存雪崩是指在同一时段大量的缓存key同时失效或者Redis服务宕机,导致大量请求到达数据库给服务器带来巨大压力 当我们批量导入缓存数据的时候可以给不同的Key的TTL添加随机值,让其在不同时间段分批失效利用Redis集群提高服务的可用性 使…

点云模板匹配

点云模板匹配是一种在点云数据中寻找特定形状或模式的方法。它通常用于计算机视觉和三维图像处理中&#xff0c;可以应用于物体识别、姿态估计、场景分析等任务。点云模板匹配的基本思想是将一个称为模板的小点云形状与输入的大点云进行匹配&#xff0c;以找到最佳的对应关系。…

Python笔记;库,包,模块

在Python中库没有官方说法。 是其他地方沿用过来的。 姑且认为他是一个包或多个包的集合。 包里有子包和模块。 模块以.py格式存储。 下图是一个例子&#xff0c;对于Robot包&#xff1a; import math a math.sqrt(9) 等价于 from math import * a sqrt(9) from math im…

(Vue3)大事记管理系统 首页

首页 先搭架子-用element-ui中的组件&#xff1a;container组件、layout组件 不知道的属性学会看文档&#xff01; :default-active"$route.path" 配默认高亮菜单项 $route.path 字符串&#xff0c;等于当前路由对象的路径&#xff0c;如“/home/news $route…

分布式锁2:基于redis实现分布式锁

一 redis实现分布式锁 1.1 原理 setnxexpiredel 命令实现redis的分布式锁&#xff1b;其中 setnx 不存在则新增&#xff1b;存在则忽略。即先用setnx来抢锁&#xff0c;如果抢到之后&#xff0c;再用expire给锁设置一个过期时间&#xff0c;防止锁忘记了释放。例如&#xf…

mysql面试题33:Blob和text有什么区别

该文章专注于面试&#xff0c;面试只要回答关键点即可&#xff0c;不需要对框架有非常深入的回答&#xff0c;如果你想应付面试&#xff0c;是足够了&#xff0c;抓住关键点 面试官&#xff1a;Blob和text有什么区别 Blob和text是数据库中存储大文本数据的两种数据类型&#…

# 02 初识Verilog HDL

02 初识Verilog HDL ‍ 对于Verilog的语言的学习&#xff0c;我认为没必要一开始就从头到尾认真的学习这个语言&#xff0c;把这个语言所有细节都搞清楚也不现实&#xff0c;我们能够看懂当前FPGA的代码的程度就可以了&#xff0c;随着学习FPGA深度的增加&#xff0c;再不断的…