【Muduo】三大核心之EventLoop

Muduo网络库的EventLoop模块是网络编程框架中的核心组件,负责事件循环的驱动和管理。以下是对EventLoop模块的详细介绍:

作用与功能

  • EventLoop是网络服务器中负责循环的重要模块,它持续地监听、获取和处理各种事件,如IO事件、定时器事件等。
  • 它通过轮询访问Poller(如EPollPoller),获取激活的Channel列表,然后使Channel根据自身情况调用相应的回调函数来处理事件。
  • EventLoop确保了每个Loop都是相互独立的,拥有自己的事件循环、Poller监听者和Channel监听通道列表。

与Poller的关系

  • Poller负责从事件监听器上获取监听结果,即哪些文件描述符(fd)上发生了哪些事件。
  • EventLoop会轮询访问Poller,以获取这些发生事件的fd及其相关事件。

与Channel的关系

  • Channel类是对文件描述符(fd)以及其相关事件的封装。它保存了fd的感兴趣事件、实际发生的事件以及每种事件对应的处理函数。
  • 当Poller检测到某个fd上有事件发生时,EventLoop会找到对应的Channel,并调用其上的回调函数来处理该事件。

线程模型

  • EventLoop遵循“one loop one thread”的原则,即每个EventLoop都在一个独立的线程上运行。
  • 这种设计使得事件处理更加高效和清晰,避免了多线程环境下的竞态条件和同步问题。

mainLoop和subLoop

在Muduo网络库中,mainLoop和subLoop都是EventLoop的实例,它们分别代表主事件循环和子事件循环。

mainLoop(主事件循环)

  • mainLoop是整个Muduo网络库的核心事件循环。它负责监听服务器套接字(通常是listenfd),并接受来自客户端的连接请求。
  • mainLoop运行一个Accrptor,包含一个Poller,用于监听一个特定的非阻塞的服务器sockfd上的读事件。当Poller检测到有读事件发生时(一般是新用户连接),mainLoop会在线程池中通过轮询算法选择一个subLoop来处理这个连接的读写和关闭事件。Acceptor将在后续阐述。
  • mainLoop遵循 “one loop one thread” 的原则,即每个mainLoop都在一个独立的线程上运行。这确保了事件处理的高效性和清晰性,避免了多线程环境下的竞态条件和同步问题。

subLoop(子事件循环)

  • subLoop是mainLoop的子事件循环,用于处理已建立的连接的读写和关闭事件。每个subLoop都在一个独立的线程上运行,有一个用于唤醒自身的fd和Channel,运行一个Poller,并保存自己管理的多个Channel,以实现并发处理多个连接的目的。
  • 当mainLoop接受到一个新的连接请求时,它会根据EventLoopThreadPool中的线程来选择一个subLoop,将新创建的TcpConnection的Channel放入这个subLoop中。这个subLoop会接管该连接的fd,并监听其上的读写和关闭事件。
  • subLoop中的事件处理逻辑与mainLoop类似,也是通过Poller来监听fd上的事件,并调用相应的回调函数来处理这些事件。
  • 由于subLoop是独立的线程,因此它们可以并行处理多个连接,从而提高了服务器的并发处理能力。

总的来说,mainLoop和subLoop共同构成了Muduo网络库的事件驱动编程框架。mainLoop负责监听服务器套接字并接受连接请求,而subLoop则负责处理已建立的连接的读写和关闭事件。通过合理的线程调度和事件处理机制,Muduo网络库能够高效、稳定地处理大量的并发连接请求。

EventLoop.h

#pragma once
#include "noncopyable.h"
#include "Timestamp.h"
#include "CurrentThread.h"
#include "LogStream.h"#include <functional>
#include <vector>
#include <atomic>
#include <memory>
#include <mutex>
#include <sys/types.h>class Channel;
class Poller;/*** 事件循环类  两大模型:Channel  Poller* mainLoop只负责处理IO,并返回client的fd* subLoop负责监听poll,并处理相应的回调* 两者之间通过weakupfd进行通信
*/
class EventLoop : noncopyable
{
public:using Functor = std::function<void()>;EventLoop();~EventLoop();// 开启loopvoid loop();// 退出loopvoid quit();Timestamp pollReturnTime() const { return pollReturnTime_; }// 在当前loop执行cbvoid runInLoop(Functor cb);// 把cb放入队列,唤醒subloop所在的线程,执行cbvoid queueInLoop(Functor cb);size_t queueSize() const;// 唤醒loop所在的线程,EventLoop::queueInLoop中调用void wakeup();// EventLoop方法 =》 Poller的方法void updateChannel(Channel *channel);void removeChannel(Channel *channel);bool hasChannel(Channel *channel);// 判断EventLoop对象是否在自己的线程中bool isInLoopThread() const {return threadId_ == CurrentThread::tid();}private:// waked up后的一个操作 void handleRead();       // 执行回调void doPendingFunctors(); using ChannelList = std::vector<Channel *>;std::atomic_bool looping_; // 原子操作,通过CAS实现std::atomic_bool quit_;    // 标识退出loop循环const pid_t threadId_; // 记录当前loop所属的线程idTimestamp pollReturnTime_; // poller返回发生事件的channels的时间点std::unique_ptr<Poller> poller_;int wakeupFd_; // 当mainLoop获取一个新用户的channel,通过轮询算法选择一个subloop,通过该成员唤醒subloop处理channel。使用eventfd// unlike in TimerQueue, which is an internal class,// we don't expose Channel to client.std::unique_ptr<Channel> wakeupChannel_;// scratch variablesChannelList activeChannels_;std::atomic_bool callingPendingFunctors_; // 标识当前loop是否有需要执行的回调操作,正在执行则为truestd::vector<Functor> pendingFunctors_;    // 存储loop需要执行的所有回调操作std::mutex mutex_;                        // 保护pendingFunctors_线程安全
};

EventLoop.cc

#include "EventLoop.h"
#include "LogStream.h"
#include "Poller.h"
#include "Channel.h"#include <sys/eventfd.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <iostream>// 防止一个线程创建多个EventLoop    threadLocal
__thread EventLoop *t_loopInThisThread = nullptr;// 定义Poller超时时间
const int kPollTimeMs = 10000;// 创建weakupfd,用来notify唤醒subReactor处理新来的channel
int createEventfd()
{int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);if (evtfd < 0){LOG_FATAL << "Failed in eventfd" << errno;}return evtfd;
}EventLoop::EventLoop(): looping_(false),quit_(false),callingPendingFunctors_(false),threadId_(CurrentThread::tid()),poller_(Poller::newDefaultPoller(this)),wakeupFd_(createEventfd()),wakeupChannel_(new Channel(this, wakeupFd_))
{LOG_DEBUG << "EventLoop created " << this << " in thread " << threadId_;if (t_loopInThisThread){LOG_FATAL << "Another EventLoop " << t_loopInThisThread<< " exists in this thread " << threadId_;}else{t_loopInThisThread = this;}// 设置weakupfd的事件类型以及发生事件后的回调操作wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead, this));// we are always reading the wakeupfd// 每一个EventLoop都将监听weakupChannel的EPOLLIN读事件了// 作用是subloop在阻塞时能够被mainLoop通过weakupfd唤醒wakeupChannel_->enableReading();
}EventLoop::~EventLoop()
{LOG_DEBUG << "EventLoop " << this << " of thread " << threadId_<< " destructs in thread " << CurrentThread::tid();wakeupChannel_->disableAll();wakeupChannel_->remove();::close(wakeupFd_);t_loopInThisThread = NULL;
}void EventLoop::handleRead()
{uint64_t one = 1;ssize_t n = read(wakeupFd_, &one, sizeof one);if (n != sizeof one){LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";}
}void EventLoop::loop()
{looping_ = true;quit_ = false;LOG_INFO << "EventLoop " << this << " start looping";while (!quit_){activeChannels_.clear();// 当前EventLoop的Poll,监听两类fd,client的fd(正常通信的,在baseloop中)和 weakupfd(mainLoop 和 subLoop 通信用来唤醒sub的)pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);for (Channel *channel : activeChannels_){// Poller监听哪些channel发生事件了,然后上报给EventLoop,通知channel处理相应的事件channel->handleEvent(pollReturnTime_);}// 执行当前EventLoop事件循环需要处理的回调操作/*** IO线程 mainLoop 只 accept 然后返回client通信用的fd <= 用channel打包 并分发给 subloop* mainLoop事先注册一个回调cb(需要subLoop来执行),weakup subloop后,* 执行下面的方法,执行之前mainLoop注册的cb操作(一个或多个)*/doPendingFunctors();}LOG_INFO << "EventLoop " << this << " stop looping";looping_ = false;
}/*** 退出事件循环* 1、loop在自己的线程中 调用quit,此时肯定没有阻塞在poll中* 2、在其他线程中调用quit,如在subloop(woker)中调用mainLoop(IO)的qiut**                  mainLoop* *      Muduo库没有 生产者-消费者线程安全的队列 存储Channel*      直接使用wakeupfd进行线程间的唤醒       ** subLoop1         subLoop2        subLoop3*/
void EventLoop::quit()
{quit_ = true;// 2中,此时,若当前woker线程不等于mainLoop线程,将本线程在poll中唤醒if (!isInLoopThread()){wakeup();}
}void EventLoop::runInLoop(Functor cb)
{// LOG_DEBUG<<"EventLoop::runInLoop  cb:" << (cb != 0);if (isInLoopThread()) // 产生段错误{ // 在当前loop线程中 执行cbLOG_DEBUG << "在当前loop线程中 执行cb";cb();}else{ // 在其他loop线程执行cb,需要唤醒其loop所在线程,执行cbLOG_DEBUG << "在其他loop线程执行cb,需要唤醒其loop所在线程,执行cb";queueInLoop(cb);}
}void EventLoop::queueInLoop(Functor cb)
{{std::unique_lock<std::mutex> ulock(mutex_);pendingFunctors_.emplace_back(cb);}// 唤醒相应的,需要执行上面回调操作的loop线程// 若当前线程正在执行回调doPendingFunctors,但是又有了新的回调cb// 防止执行完回调后又阻塞在poll上无法执行新cb,所以预先wakeup写入一个数据if (!isInLoopThread() || callingPendingFunctors_) {wakeup(); // 唤醒loop所在线程}
}// 用来唤醒loop所在的线程,向wakeupfd写一个数据,wakeupChannel就发生读事件,当前loop线程就会被唤醒
void EventLoop::wakeup()
{uint64_t one = 1;ssize_t n = ::write(wakeupFd_, &one, sizeof one);if (n != sizeof one){LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";}
}void EventLoop::updateChannel(Channel *channel)
{// channel是发起方,通过loop调用pollpoller_->updateChannel(channel);
}void EventLoop::removeChannel(Channel *channel)
{// channel是发起方,通过loop调用pollpoller_->removeChannel(channel);
}bool EventLoop::hasChannel(Channel *channel)
{return poller_->hasChannel(channel);
}// 执行回调,由TcpServer提供的回调函数
void EventLoop::doPendingFunctors()
{std::vector<Functor> functors;callingPendingFunctors_ = true; // 正在执行回调操作{ // 使用swap,将原pendingFunctors_置空并且释放,其他线程不会因为pendingFunctors_阻塞std::unique_lock<std::mutex> lock(mutex_);functors.swap(pendingFunctors_);}for (const Functor &functor : functors){functor(); // 执行当前loop需要的回调操作}callingPendingFunctors_ = false;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/13792.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

10个最佳Android数据恢复工具,用于恢复已删除的文件

由于我们现在在智能手机上存储了许多重要文件&#xff0c;因此了解数据恢复工具变得很重要。您永远不会知道何时需要使用适用于Android的数据恢复工具。 由于不乏Windows数据恢复工具&#xff0c;因此从崩溃的计算机中恢复文件很容易。但是&#xff0c;当涉及到从Android恢复数…

兆原数通基于Apache SeaTunnel的探索实践

随着大数据技术的不断发展&#xff0c;数据同步工具在企业中的应用变得愈发重要。为了满足复杂多样的业务需求&#xff0c;找到一款高效、灵活的数据同步工具变得尤为关键。 在这篇文章中&#xff0c;我们将分享兆原数通研发经理李洪军对Apache SeaTunnel的选择、应用及经验。这…

jinkens打包前端依赖下载失败怎么办

不知道有没有小伙伴遇见这种问题&#xff0c;项目在本地可以正常下载、运行打包&#xff0c;但在jinkens上就不行了&#xff0c;配置了几种镜像也还是不行&#xff0c;这要如何解决呢&#xff1f; 那就只能去到jinkens配置的工作空间那里&#xff0c;找到对应的项目 &#xff…

Django使用Celery实现异步和定时任务功能

1、装库 celery==4.4.2 django-celery-beat==2.2.0 django-celery-results==2.0.12、添加应用 安装完依赖之后,把上面的两个应用添加到Django配置文件settings.py的INSTALLED_APPS里面: 添加celery配置信息 在项目的配置文件中添加如下配置信息,具体的配置作用见注释 I…

YOLOv5/v7 应用轻量级通用上采样算子CARAFE

1. 介绍 CARAFE&#xff08;Content-Aware ReAssembly of FEatures&#xff09;是一种轻量级的通用上采样算子&#xff0c;它可以用于提高卷积神经网络&#xff08;CNN&#xff09;中特征图的分辨率。CARAFE 的特点是计算量小、参数少、易于实现&#xff0c;因此非常适合用于移…

开放重定向漏洞

开放重定向漏洞 1.开放重定向漏洞概述2.攻击场景&#xff1a;开放重定向上传 svg 文件3.常见的注入参数 1.开放重定向漏洞概述 开放重定向漏洞&#xff08;Open Redirect&#xff09;是指Web应用程序接受用户提供的输入&#xff08;通常是URL参数&#xff09;&#xff0c;并将…

代码随想录算法训练营第四十五天|139.单词拆分

139.单词拆分 这题首先肯定是排序问题而不是组合问题&#xff0c;因为单词的顺序是一定的&#xff0c;所以组成单词的字符串顺序也是一定的。所以应该先循环背包容量&#xff0c;再循环物品个数。 背包容量就是s字符串&#xff0c;物品就是数组中的单词&#xff0c;字符串的循…

Kubernetes 文档 / 概念 / 工作负载 / Pod / Init 容器

Kubernetes 文档 / 概念 / 工作负载 / Pod / Init 容器 此文档从 Kubernetes 官网摘录 中文地址 英文地址 本页提供了 Init 容器的概览。Init 容器是一种特殊容器&#xff0c;在 Pod 内的应用容器启动之前运行。Init 容器可以包括一些应用镜像中不存在的实用工具和安装脚本。…

总结优秀的prompt案例,学习更有效的prompt提示词工程写法,值得收藏

Prompt 提示词工程大多数人都在用&#xff0c;而且都会用&#xff0c;但是不一定写的好&#xff1f;很多人都在想怎么写好&#xff0c;更能满足自己的业务需求&#xff0c;或者实际场景。 我最近工作中也写了很多的prompt&#xff0c;像zero-shot、few-shot、COT这些都尝试过、…

FSC认证是什么?森林认证的好处是什么?

FSC认证&#xff08;Forest Stewardship Council&#xff0c;森林管理委员会认证&#xff09;是一种运用市场机制来促进森林可持续经营&#xff0c;实现生态、社会和经济目标的工具。以下是关于FSC认证的详细介绍&#xff1a; 一、FSC认证包括两个方面&#xff1a; 森林经营认…

乡村振兴与农业现代化:以现代农业科技为引领,提升农业综合生产能力,打造高产高效、生态安全的美丽乡村

目录 一、引言 二、现代农业科技在乡村振兴中的作用 &#xff08;一&#xff09;提高农业生产效率 &#xff08;二&#xff09;促进农业产业升级 &#xff08;三&#xff09;改善农村生态环境 三、提升农业综合生产能力的途径 &#xff08;一&#xff09;加强农业科技研…

spring中依赖注入(DI)是什么?

好的&#xff0c;让我以尽可能通俗易懂的方式来解释什么是依赖注入&#xff08;DI&#xff0c;Dependency Injection&#xff09;。 假设你正在制作一款游戏&#xff0c;游戏中有个角色需要使用武器。在没有依赖注入的情况下&#xff0c;这个角色可能需要自己创建一个武器。这…

计算理论基础:4、复杂性理论

复杂性理论 c e n t e r p r o b l e m : P ≠ N P center\ \ problem:P \ne NP center problem:PNP 1.P、EXP、NP 定义1 D T I M E DTIME DTIME ​ T : N → N T:\N\rightarrow \N T:N→N,语言 L ∈ D T I M E ( T ( n ) ) L\in DTIME(T(n)) L∈DTIME(T(n)),当且仅当存…

Diffusion Policy:基于扩散模型的机器人动作生成策略

项目地址&#xff1a; Diffusion Policy (columbia.edu) 一、摘要 本文介绍了 "扩散策略"&#xff0c;这是一种生成机器人行为的新方法&#xff0c;它将机器人的视觉运动策略&#xff08;visuomotor policy&#xff09;表示为条件去噪扩散过程&#xff08;conditi…

添加、修改和删除列表元素

自学python如何成为大佬(目录):https://blog.csdn.net/weixin_67859959/article/details/139049996?spm1001.2014.3001.5501 添加、修改和删除列表元素也称为更新列表。在实际开发时&#xff0c;经常需要对列表进行更新。下面我们介绍如何实现列表元素的添加、修改和删除。 …

TypeScript进阶 类型演算与高级内置类型

简介&#xff1a; TypeScript 是一种静态类型检查的 JavaScript 超集&#xff0c;它通过类型注解和类型推断来提供更强大的类型系统。在 TypeScript 中&#xff0c;类型演算是一种重要的概念&#xff0c;它允许我们在编译时对类型进行操作和计算。本文将深入探讨 TypeScript 类…

如何使用Matlab进行三角剖分(自定义函数实现delaunayTriangulation 使用Bowyer-Watson 算法)

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 目录 前言 一、Delaunay三角形 二、使用步骤 1.Bowyer-Watson算法 2.算法步骤 三、动画演示 四、核心代码 五、对比matlab自带函数和我们的算法&#xff1a; 总结 前…

谷歌开源项目BERT源码解读与应用实例

数据及代码见文末 基于BERT的中文情感分析实战:基于BERT的中文情感分析实战-CSDN博客 基于BERT的中文命名实体识别识别实战:基于BERT的中文命名实体识别识别实战-CSDN博客 1.项目配置文件 GLUE/BERT_BASE_DIR是项目的预训练权重,预训练权重主要包含3个部分:参数配置文件…

打气球小游戏

1.气球往上飘 我们声明两个符号常量来作为窗体的长和宽,接着就是常规操作 #define WINDOW_WIDTH 800 #define WINDOW_HEIGHT 600#include<easyx.h> #include<stdio.h> int main() {initgraph(WINDOW_WIDTH, WINDOW_HEIGHT);setbkcolor(WHITE);cleardevice();get…

网关过滤器使用及其原理分析

1.网关过滤器介绍 网关过滤器的用途一般是修改请求或响应信息,例如编解码、Token验证、流量复制等 官方文档地址:Spring Cloud Gateway 网关过滤器分为GloablFilter、GatewayFilter及DefaultFilter 过滤器的执行顺序由Order决定,Order值越小,优先级越高,越先执行 1.1…