改造muduo,不依赖boost,用C++11重构

组件的实现

1. 序

1.1. 总述

        muduo库是基于多Reactor-多线程模型实现的TCP网络编程库,性能良好。如libev作者:“One loop per thread is usually a good model”,muduo库的作者陈硕在其《Linux多线程服务端编程》中也力荐这种“One loop per thread”的IO模型,使我们仅需要关注EventLoop的设计与实现,然后每个线程run一个loop即可。不过由于当时C++11并没有进入实用,在这一书中,作者没有谈及C++11,整个muduo库的实现,也依赖了boost库。

        而在项目设计与实现中,按照C++11标准对muduo库中核心部分进行重写,主要涉及了以下模块:Channel、Poller、EventLoop、Thread、EventLoopThread、EventLoopThreadPool、Socket、Acceptor、Buffer、TcpConnection、TcpServer,下面将进行分述。

1.2. One loop per thread

        在多Reactor-多线程模型中,运用one loop per thread的思想,由一个mainReactor负责accept连接,然后把该连接挂载到某个subReactor,多个连接分配到多个线程,充分利用CPU。

2. 核心部分

        在手写muduo库项目之中,存在三个核心部分,分别是Channel类、Poller类和EventLoop类,这三大类的组合,实现了reactor用以监听fd并同时处理相应的回调函数。其中Poller和Channel之间通过EventLoop相互通信。

2.1. Channel
  1. fd_:封装sockfd,两种Channel:listenfd-acceptorChannel,connfd-ConnectionChannel;
  2. events_:fd监听的事件类型;
  3. revents_:Poller返回的具体监听到的事件。
  4. callback:上层设置的各种类型事件回调;
  5. tie_:weak_ptr<void>,在事件监听器返回监听结果后,就会调用Channel中的handleEvent()函数。首先会把tie_这个weak_ptr提升为shared_ptr,它会指向当前的TcpConnection对象,即使外面调用了删除析构了其他所有指向该TcpConnection的智能指针,只要没有handleEvent()完,这个TcpConnection都不会被析构释放堆内存。
2.2. Poller/EpollPoller

        muduo库提供poll和epoll两种IO多路复用方法来实现事件监听,重写时,通过基类Poller和派生类EpollPoller,支持了Epoll。Poller主要扮演Reactor模型中Demultiplex事件分发器(也可以说是事件监听器)的角色。

  1. epollfd_:记录epoll_create返回的句柄
  2. channels_:用来记录注册在其上的Channel的unordered_map。
2.3. EventLoop

        EventLoop扮演Reactor模型中Reactor的角色,是对epoll的封装。EventLoop在epoll_create,注册各个Channel之后,处于epoll_wait阻塞状态,要想唤醒当前的EventLoop去执行新的连接,通过往wakefd上写入一个字符,唤醒当前的EventLoop。(而并非生产者-消费者模型)。

  1. 包含了所有的Channel
  2. 每一个loop都有一个wakeupFd
2.4. 具体方法的部分代码实现
  • EventLoop::loop()——开启事件循环
// 开启事件循环
void EventLoop::loop()
{// ...   while(!quit_){activeChannels_.clear();// 监听两种fd: client的fd、wakeupfdpollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); // epoll_wait发生的位置for(Channel *channel : activeChannels_){   // Poller监听哪些Channel发生事件了,然后上报给EventLoop,EventLoop通知处理相应的事件// handleEevent根据具体事件类型调用相应类型的回调函数channel->handleEvent(pollReturnTime_);}// ...}LOG_INFO("EventLoop %p stop looping. \n", this);// ...
}
  • EpollPoller::poll()——开启Poller事件监听,调用了::epoll_wait()
// 通过epoll_wait监听哪些Channel/fd发生事件
Timestamp EPollPoller::poll(int timeoutMs, ChannelList *activeChannels)
{// ...int numEvents = ::epoll_wait(epollfd_, &*events_.begin(), static_cast<int>(events_.size()), timeoutMs);int savedErrno = errno;Timestamp now(Timestamp::now());if(numEvents > 0)   // 有事件发生{LOG_DEBUG("%d events happened \n", numEvents);fillActiveChannels(numEvents, activeChannels);if(numEvents == events_.size()){events_.resize(events_.size() * 2);}}else if(numEvents == 0) // 超时{LOG_DEBUG("%s timeout! \n", __FUNCTION__);}else{if(savedErrno != EINTR){errno = savedErrno;LOG_ERROR("EPollPoller::poll() err! \n");}}return now;
}
  • 唤醒机制——通过向eventfd写一个数据

        在Linux操作系统上,可以通过三种方式唤醒fd:1. 通过管道pipe向绑定到epollfd的一端写一个字节;2. 使用Linux内核2.6版本之后的eventfd;3. 使用socketpair。而在本项目中,采用的是创建eventfd然后在需要唤醒的时候写数据(8个字节)来唤醒subLoop。

// 创建wakeupfd,用来notify唤醒subReactor处理新的Channel
//O_CLOEXEC避免文件描述符被继承到子进程中
int createEventFd()
{int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);// ...return evtfd;
}// 用于唤醒loop所在线程: 向wakefd写一个数据
//wakeupFd_在构造函数中通过createsEventFd()函数初始化
void EventLoop::wakeup()
{uint64_t one = 1;ssize_t n = write(wakeupFd_, &one, sizeof(one));if(n != sizeof(one)){LOG_ERROR("EventLoop::wakeup() writes %lu bytes instead of 8 \n", n);}
}

3. 其他部分

3.1. EventLoopThreadPool

        EventLoopThreadPool类,可以理解为subLoop池,主要是对EventLoopThread的封装,而EventLoopThread又是对EventLoop(Reactor)和Thread(记录线程的详细信息)的封装。

        其中,初始化时,会提供一个baseLoop(mainLoop)来进行基本的事件循环。通过设置numthreads_来创建对应数量的subReactor,每当创建一个线程,就会生成一个EventLoop。

        在工作方式上,通过getNextLoop()方法,实现对subReactor的轮询。

// ...
class EventLoopThreadPool : noncopyable
{
public:using ThreadInitCallback = std::function<void(EventLoop *)>;EventLoopThreadPool(EventLoop *baseLoop, const std::string &nameArg);~EventLoopThreadPool();void setThreadNum(int numThreads) { numThreads_ = numThreads; }void start(const ThreadInitCallback &cb = ThreadInitCallback());// 如果工作在多线程中,baseLoop默认以轮询的方式分配Channel给subLoopEventLoop *getNextLoop();std::vector<EventLoop *> getAllLoops();bool started() const { return started_; }const std::string &name() const { return name_;}private:EventLoop *baseLoop_;   //EventLoop loop 用户线程std::string name_;bool started_;int numThreads_;int next_;std::vector<std::unique_ptr<EventLoopThread>> threads_;std::vector<EventLoop *> loops_;
};
3.2. Acceptor

        Acceptor类,封装的是服务器监听socketfd和相关处理函数。接收新用户连接后,通过轮询来选择subReactor并给它分发连接。

3.3. TcpConnection

        每个连接进来的客户端,对应一个TcpConnection,封装了一个connfd,一个Channel,各种回调函数(Callback)和读写缓冲区(Buffer)。

        state_:记录当前连接状态,一共有四种:kConnected、kConnecting、kDisconnecting、kDisconnected。

整个TcpConnection的工作流程

  1. TcpServer通过Acceptor监听用户新连接,用accept拿到connfd
  2. TcpConnection设置回调给Channel,Channel注册到Poller
  3. Poller监听到事件就通知调用Channel的回调
3.4. Buffer

        Buffer缓冲区通过vector来实现,空间不足时,通过vector类的成员函数resize()即可实现扩容。在空间的设计上,主要分为如下图三个区域(和Netty中Buffer的设计类似?)

3.5. TcpServer

        在TcpServer类中,有一个Acceptor,一个EventLoopThreadPool,一些回调函数,一个记录所有连接的unordered_map<string, TcpConnectionPtr>。

// 对外服务器编程需要使用的类
class TcpServer : noncopyable
{
public:using ThreadInitCallback = std::function<void(EventLoop *)>;enum Option{kNoReusePort,kReusePort,};TcpServer(EventLoop *loop, const InetAddress &listenAddr, const std::string &nameArg,Option option = kNoReusePort);~TcpServer();void setThreadInitCallback(const ThreadInitCallback &cb) { threadInitCallback_ = cb; }void setConnectionCallback(const ConnectionCallback &cb) { connectionCallback_ = cb; }void setMessageCallback(const MessageCallback &cb) { messageCallback_ = cb; }void setWriteCompleteCallback(const WriteCompleteCallback &cb) { writeCompleteCallback_ = cb; }// 设置subLoop个数void setThreadNum(int numThreads);// 开启服务器监听void start();
private:void newConnection(int sockfd, const InetAddress &peerAddr);void removeConnection(const TcpConnectionPtr &conn);void removeConnectionInLoop(const TcpConnectionPtr &conn);using ConnectionMap = std::unordered_map<std::string, TcpConnectionPtr>;EventLoop *loop_;   // baseLoopconst std::string ipPort_;const std::string name_;std::unique_ptr<Acceptor> acceptor_;    // 运行在mainLoop,监听新连接事件std::shared_ptr<EventLoopThreadPool> threadPool_; // one loop per threadConnectionCallback connectionCallback_;         // 有新连接时的回调MessageCallback messageCallback_;               // 有读写消息时的回调WriteCompleteCallback writeCompleteCallback_;   // 消息发送完成后的回调ThreadInitCallback threadInitCallback_;         // loop线程初始化的回调std::atomic_int started_; int nextConnId_;ConnectionMap connections_; // 保存连接的HashMap
};

        start():启动EventLoopThreadPool,调用acceptor_的listen()方法,监听客户端的连接套接字。

        newConnection():该方法被注册到了acceptor_中,当acceptor_监听到新用户连接时会执行该回调,轮询选择一个subReactor;根据连接成功的sockfd,创建一个连接对象并加入到TcpServer的存储连接信息的connections_中;给这个连接设置回调;然后在mainLoop执行connectEstablished();

        上面提到的关闭连接的回调函数,真实的调用过程:TcpConnection::setCloseCallBack() --> TcpServer::removeConnection() --> TcpServer::removeConnectionInLoop() --> TcpConnection::connectionDestroyed()

4. 工作流程

4.1. 安装

下载到文件夹后,sudo ./autobuild.sh,运行编译和安装脚本,相关头文件也会添加到系统路径。

4.2. 测试代码

下面的内容是一个回射服务器,可以编译运行后,使用telnet、netcat等工具进行简单测试。

#include <ee_muduo_cpp11/TcpServer.h>
#include <ee_muduo_cpp11/Logger.h>#include <string>
#include <functional>class EchoServer
{
public:EchoServer(EventLoop *loop, const InetAddress &addr, const std::string &name): server_(loop, addr, name), loop_(loop){// 注册回调函数server_.setConnectionCallback(std::bind(&EchoServer::onConnection, this, std::placeholders::_1));server_.setMessageCallback(std::bind(&EchoServer::onMessage, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));// 设置合适的loop线程数量 loopthreadserver_.setThreadNum(3);}void start(){server_.start();}
private:// 连接建立或者断开的回调void onConnection(const TcpConnectionPtr &conn){if (conn->connected()){LOG_INFO("Connection UP : %s", conn->peerAddress().toIpPort().c_str());}else{LOG_INFO("Connection DOWN : %s", conn->peerAddress().toIpPort().c_str());}}// 可读写事件回调void onMessage(const TcpConnectionPtr &conn,Buffer *buf,Timestamp time){std::string msg = buf->retrieveAllAsString();conn->send(msg);conn->shutdown(); // 写端   EPOLLHUP =》 closeCallback_}EventLoop *loop_;TcpServer server_;
};int main()
{EventLoop loop;InetAddress addr(8000);EchoServer server(&loop, addr, "EchoServer"); // Acceptor non-blocking listenfd  create bind server.start(); // listen  loopthread  listenfd => acceptChannel => mainLoop =>loop.loop(); // 启动mainLoop的底层Pollerreturn 0;
}
4.3. 工作流程

对于整个库:

  1. 用户创建mainLoop,主线程作为mainReactor,主要用来接收/断开用户连接。
  2. 给TcpServer设置连接和读写事件回调,TcpServer再给TcpConnection设置回调(用户设置的),TcpConnection再给Channel设置回调(先执行这个,再执行用户回调)。
  3. TcpServer根据用户设置传入的线程数,去ThreadPool中开启几个线程。如果没有设置,mainLoop还要负责读写事件的任务。
  4. 当有新连接进来,创建一个TcpConnection,然后由Acceptor轮询唤醒subLoop来提供服务。
  5. 每个subLoop在服务时,其所包含的Poller没有事件就会处于循环阻塞状态,发生事件之后,根据类型再去执行相应的回调操作。

5. 参考资料

  • 《高性能服务结构设计思想——one-thread-one-loop》,张小方,CppGuide,05. 高性能服务结构设计思想——one-thread-one-loop
  • 《Linux多线程服务器编程:使用muduo C++网络库》,陈硕
  • 《Muduo网络库源代码分析:EventLoopThread和EventLoopThreadPool的封装》,blfbuaa,https://www.cnblogs.com/blfbuaa/p/7263398.html
  • 《图解操作系统》,小林coding,https://mp.weixin.qq.com/mp/appmsgalbum?action=getalbum&__biz=MzUxODAzNDg4NQ==&scene=1&album_id=1408057986861416450&count=3#wechat_redirect

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/719926.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

每日五道java面试题之mysql数据库篇(四)

目录&#xff1a; 第一题&#xff1a; Hash索引和B树所有有什么区别或者说优劣呢?第二题&#xff1a;数据库为什么使用B树而不是B树&#xff1f;第三题&#xff1a;B树在满足聚簇索引和覆盖索引的时候不需要回表查询数据&#xff1f;第四题&#xff1a;什么是聚簇索引&#xf…

浅谈WPF之Binding数据校验和类型转换

在WPF开发中&#xff0c;Binding实现了数据在Source和Target之间的传递和流通&#xff0c;就像现实生活中的一条条道路&#xff0c;建立起了城镇与城镇之间的衔接&#xff0c;而数据校验和类型转换&#xff0c;就像高速公路之间的收费站和安检站。那在WPF开发中&#xff0c;如何…

Redis核心数据结构之SDS(一)

数据结构与对象 简单动态字符串 概述 Redis没有直接使用C语言传统的字符串表示(以空字符结尾的字符数组&#xff0c;简称C字符串)&#xff0c;而是自己构建了一种名为简单动态字符串(Simple Dynamic String, SDS)的后向类型&#xff0c;并将SDS用作Redis的默认字符串表示。在…

数据库学习案例20240304-mysql数据库案例总结(碎片,统计信息)

1 表中的碎片 在InnoDB中删除行的时候&#xff0c;这些行只是被标记为“已删除”&#xff0c;而不是真正从物理存储上进行了删除&#xff0c;因而存储空间也没有真正被释放回收。InnoDB的Purge线程会异步地来清理这些没用的索引键和行。但是依然没有把这些释放出来的空间还给操…

基于SSM的农业电商服务系统(农产品销售管理系统)(有报告)。Javaee项目。ssm项目。

演示视频&#xff1a; 基于SSM的农业电商服务系统&#xff08;农产品销售管理系统&#xff09;&#xff08;有报告&#xff09;。Javaee项目。ssm项目。 项目介绍&#xff1a; 采用M&#xff08;model&#xff09;V&#xff08;view&#xff09;C&#xff08;controller&#…

五、软考-系统架构设计师笔记-信息安全技术基础知识

信息安全技术基础知识 1、信息安全基础知识概述 信息安全的概念 信息安全包括 5 个基本要素&#xff1a; 机密性:确保信息不暴露给未授权的实体或进程。完整性:只有得到允许的人才能修改数据&#xff0c;并且能够判别出数据是否已被篡改。可用性:得到授权的实体在需要时可以…

SpringBoot源码解读与原理分析(四十)基于jar/war包的运行机制

文章目录 前言第14章 运行SpringBoot应用14.1 部署打包的两种方式14.1.1 以可独立运行jar包的方式14.1.2 以war包的方式 14.2 基于jar包的独立运行机制14.2.1 可独立运行jar包的相关知识14.2.2 SpringBoot的可独立运行jar包结构14.2.3 JarLauncher的设计及工作原理14.2.3.1 Jar…

06、MongoDB -- MongoDB 基本用法(删除文档、查询文档、查询运算符)

目录 MongoDB 基本用法演示前提&#xff1a;登录单机模式的 mongodb 服务器命令登录【admin】数据库的 mongodb 客户端命令登录【test】数据库的 mongodb 客户端命令 删除文档语法格式两个变体版本&#xff1a;1、remove&#xff1a;根据【name】字段删除一条文档2、deleteOne&…

代码工具APEX的入门使用(未包含安装)

第一次使用APEX是2019年&#xff0c;这个技术成名已久只是我了解的比较晚。请看Oracle ACE的网站&#xff0c;这就是用APEX做的。实际上有一次我看O记的人操作他们的办公流程&#xff0c;都是用APEX做的。 那一年&#xff0c;我用APEX做了一个CMDB的管理系统。那时候还没有流行…

从0搭建Azure DevOps Server

Windows虚拟机搭建DevOps 服务器 背景资源准备安装软件需求流程版本兼容性安装SQL ServerSSMS安装visual StudioAzure DevOps Server测试本地访问端口更改及外界访问 背景 搭建一台Azure DevOps Server 供我们运维项目开发&#xff0c;现在DevOps运维已成为一个主流&#xff0…

C向C++的一个过渡

思维导图 输入输出&#xff0c;以及基础头文件 在c语言中我们常用scanf("%d",&n);和printf("%d\n",n);来输出一些变量和常量&#xff0c;在C中我们可以用cin;和cout;来表示输入输出。 在C语言中输入输出有头文件&#xff0c;在C也有头文件&#xff0…

软件应用,财务收支系统试用版操作教程,佳易王记录账单的软件系统

软件应用&#xff0c;财务收支系统试用版操作教程&#xff0c;佳易王记录账单的软件系统 一、前言 以下软件操作教程以 佳易王账单记账统计管理系统V17.0为例说明 软件文件下载可以点击最下方官网卡片——软件下载——试用版软件下载 如上图&#xff0c;统计报表包含 收支汇…

在vue前端开发中基于refreshToken和axios拦截器实现token的无感刷新

文章目录 一、需求背景二、token刷新的方案1、根据过期时间重新获取2、定时刷新token接口3、使用了RefreshToken 三、关于RefreshToken四、Refresh Token的优点五、Refresh Token的工作原理六、Refresh Token的使用流程七、Refresh Token的实现步骤1、登录成功后保存AccessToke…

前端CSS常考问题总结

目录 CSS盒模型 CSS选择器的优先级 隐藏元素的方法 px和rem的区别是什么? 重绘重排有什么区别? 重排&#xff08;回流&#xff09;&#xff1a; 重绘&#xff1a; 浏览器的渲染机制: 浏览器如何解析CSS&#xff1f; 元素水平垂直居中的方式 CSS的哪些属性哪些可以…

php开发项目 docx,pptx,excel表格上传阿里云,腾讯云存储后截取第一页生成缩略图

服务器或者存储上传的word,ppt和excel表格需要截取内容展示的时候,就需要管理后台每次上传文件时根据不同文件类型截取图片保存起来,并讲图片的地址保存到数据字段中.网上搜索了很多相关文章遇到的坑不少,经过2天时间终于完成了,将代码和遇到的问题完整记录下来. 本文用的…

【前端寻宝之路】总结学习使用CSS的引入方式

&#x1f308;个人主页: Aileen_0v0 &#x1f525;热门专栏: 华为鸿蒙系统学习|计算机网络|数据结构与算法|MySQL| ​&#x1f4ab;个人格言:“没有罗马,那就自己创造罗马~” #mermaid-svg-BNJBIEvpN0GHNeJ1 {font-family:"trebuchet ms",verdana,arial,sans-serif;f…

IDEA自动导入provided的依赖

最近在学习flink 流程序&#xff0c;在写demo程序的时候依赖flink依赖&#xff0c;依赖的包在flink集群里面是自己已经提供了的&#xff0c;在导入的时候配置为provided&#xff0c;像下面这样&#xff0c;以使打包的时候不用打到最终的程序包里面。 <dependency><gro…

Java8,函数式编程应用:

持续更新中&#xff1a; 函数式(Functional)接口 什么是函数式(Functional)接口 只包含一个抽象方法的接口&#xff0c;称为函数式接口。 你可以通过 Lambda 表达式来创建该接口的对象。&#xff08;若 Lambda 表达式 抛出一个受检异常(即&#xff1a;非运行时异常)&#xff0c…

js创建对象方式总结

js创建对象方式总结 字面量方式 使用大括号 {} 创建一个新对象&#xff0c;这是最简单直接的方式。适用于创建单个对象&#xff0c;可以直接在大括号内定义属性和方法。 let person {name: John,age: 30,gender: male};let preson2 {name: John,age: 30,gender: male};cons…

光伏发电预测

XGB、LGB在datacamp(学习网站) data fountain与国家电投系列赛,光伏发电预测 题目:给一组特征,预测瞬时发电量,训练集9000个点,测试集8000个点,特征包含光伏板的属性和外部环境等。 数据字段:ID、光伏电池板背侧温度、光伏电站现场温度、计算得到的平均转换效率、数…