muduo源码剖析之EventLoop事件循环类

简介

EventLoop.cc就相当于一个reactor,多线程之间的函数调用(用eventfd唤醒),epoll处理,超时队列处理,对channel的处理。运行loop的进程被称为IO线程,EventLoop提供了一些API确保相应函数在IO线程中调用,确保没有用互斥量保护的变量只能在IO线程中使用,也封装了超时队列的基本操作。

成员及属性解析

一个事件循环,注意,一个创建了EventLoop对象的线程是workloop线程

主要接口

loop

死循环,阻塞在Poller的poll函数,等待唤醒唤醒后执行ChannelList中每个Channel的回调最后执行任务队列中的Functor

runInLoop

在IO线程中执行用户回调Functor,若调用者非IO线程,则会调用queueInLoop

queueInLoop

当调用者并非当前EventLoop所在线程时,将Functor存入EventLoop的任务队列从而保证Functor由IO线程执行,这是线程安全的保证之一

updateChannel与removeChannel

核心中的核心,通过这个公有接口建立起Channel和Poller沟通的桥梁Channel通过这个接口向Poller注册或者移除自己的fd实现了Poller和Channel两端的解耦

核心实现:handleEvent

遍历所有的activeChannelList_,并依次执行这些Channel中注册的回调函数这个环节非常非常关键,是一切事件派发机制中回调执行的地方

主要成员

wakeupchannel_

通过eventfd唤醒的channel

EventLoop可以通过这个Channel唤醒自己执行定时任务

activeChannelList_

通过一次poll获得的所有发生事件的Channel指针列表

pendingFunctors_

所有非IO线程调用的用户回调都会存放在这个队列中,通过mutex互斥量保护

poller_

一个多路复用实例

源码剖析

EventLoop.h

#ifndef MUDUO_NET_EVENTLOOP_H
#define MUDUO_NET_EVENTLOOP_H#include <atomic>
#include <functional>
#include <vector>#include <boost/any.hpp>#include "muduo/base/Mutex.h"
#include "muduo/base/CurrentThread.h"
#include "muduo/base/Timestamp.h"
#include "muduo/net/Callbacks.h"
#include "muduo/net/TimerId.h"namespace muduo
{
namespace net
{class Channel;
class Poller;
class TimerQueue;///
/// Reactor, at most one per thread.
///
/// This is an interface class, so don't expose too much details.
class EventLoop : noncopyable
{public:typedef std::function<void()> Functor;EventLoop();~EventLoop();  // force out-line dtor, for std::unique_ptr members.//开启事件循环void loop();//退出事件循环void quit();//轮询返回的时间,通常意味着数据到达。Timestamp pollReturnTime() const { return pollReturnTime_; }int64_t iteration() const { return iteration_; }/// Runs callback immediately in the loop thread./// It wakes up the loop, and run the cb./// If in the same loop thread, cb is run within the function./// Safe to call from other threads.///在当前loop中执行cbvoid runInLoop(Functor cb);/// Queues callback in the loop thread./// Runs after finish pooling./// Safe to call from other threads.///将cb放入队列中,唤醒loop所在的线程执行void queueInLoop(Functor cb);size_t queueSize() const;// timers////// Runs callback at 'time'./// Safe to call from other threads.///TimerId runAt(Timestamp time, TimerCallback cb);////// Runs callback after @c delay seconds./// Safe to call from other threads.///TimerId runAfter(double delay, TimerCallback cb);////// Runs callback every @c interval seconds./// Safe to call from other threads.///TimerId runEvery(double interval, TimerCallback cb);////// Cancels the timer./// Safe to call from other threads.///void cancel(TimerId timerId);// internal usage//唤醒loop所在的线程void wakeup();//调用poller的方法void updateChannel(Channel* channel);void removeChannel(Channel* channel);bool hasChannel(Channel* channel);// pid_t threadId() const { return threadId_; }void assertInLoopThread(){if (!isInLoopThread()){abortNotInLoopThread();}}//判断eventloop对象是否在自己的线程bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }// bool callingPendingFunctors() const { return callingPendingFunctors_; }bool eventHandling() const { return eventHandling_; }void setContext(const boost::any& context){ context_ = context; }const boost::any& getContext() const{ return context_; }boost::any* getMutableContext(){ return &context_; }static EventLoop* getEventLoopOfCurrentThread();private:void abortNotInLoopThread();void handleRead();  // waked upvoid doPendingFunctors();//在loop一次后执行pendingFunctors_中的所有方法(会清空队列)void printActiveChannels() const; // DEBUGtypedef std::vector<Channel*> ChannelList;bool looping_; /* atomic */std::atomic<bool> quit_;//标识loop的退出bool eventHandling_; /* atomic *///标识当前loop是否需要有执行的回调操作bool callingPendingFunctors_; /* atomic */int64_t iteration_;const pid_t threadId_;//记录thread所在的线程pidTimestamp pollReturnTime_;std::unique_ptr<Poller> poller_;std::unique_ptr<TimerQueue> timerQueue_;//主要作用,当mainLoop获取到一个accept新用户的channel,通过轮询算法选择一个subloop,通过该成员唤醒subloop处理,使用eventfdint wakeupFd_;// unlike in TimerQueue, which is an internal class,// we don't expose Channel to client.std::unique_ptr<Channel> wakeupChannel_;boost::any context_;// scratch variablesChannelList activeChannels_;Channel* currentActiveChannel_;mutable MutexLock mutex_;//保证pendingFunctors_的线程安全操作std::vector<Functor> pendingFunctors_ GUARDED_BY(mutex_);//存储loop需要执行的所有操作
};}  // namespace net
}  // namespace muduo#endif  // MUDUO_NET_EVENTLOOP_H

eventloop.cc

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.// Author: Shuo Chen (chenshuo at chenshuo dot com)#include "muduo/net/EventLoop.h"#include "muduo/base/Logging.h"
#include "muduo/base/Mutex.h"
#include "muduo/net/Channel.h"
#include "muduo/net/Poller.h"
#include "muduo/net/SocketsOps.h"
#include "muduo/net/TimerQueue.h"#include <algorithm>#include <signal.h>
#include <sys/eventfd.h>
#include <unistd.h>using namespace muduo;
using namespace muduo::net;namespace
{
//保证一个线程只有一个loop
__thread EventLoop* t_loopInThisThread = 0;
//poll超时时间
const int kPollTimeMs = 10000;int createEventfd()
{int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);if (evtfd < 0){LOG_SYSERR << "Failed in eventfd";abort();}return evtfd;
}#pragma GCC diagnostic ignored "-Wold-style-cast"
class IgnoreSigPipe
{public:IgnoreSigPipe(){::signal(SIGPIPE, SIG_IGN);// LOG_TRACE << "Ignore SIGPIPE";}
};
#pragma GCC diagnostic error "-Wold-style-cast"IgnoreSigPipe initObj;
}  // namespaceEventLoop* EventLoop::getEventLoopOfCurrentThread()
{return t_loopInThisThread;
}//创建了EventLoop对象的线程称为IO线程
EventLoop::EventLoop(): looping_(false),                                     //判断是否在loopquit_(false),                                        //判断是否退出的标志eventHandling_(false),                               //处理handevent的标志callingPendingFunctors_(false),                      //判断当前是不是在执行方法队列iteration_(0),threadId_(CurrentThread::tid()),                     //当前线程IDpoller_(Poller::newDefaultPoller(this)),             //创建一个 poll 或 epoll 对象timerQueue_(new TimerQueue(this)),                   //创建一个计时器wakeupFd_(createEventfd()),                          //发送唤醒loop消息的描述符,随便写点消息即可唤醒wakeupChannel_(new Channel(this, wakeupFd_)),        //wakeupChannel_用来自己给自己通知的一个通道,该通道会纳入到poller来管理currentActiveChannel_(NULL)                          //当前活跃的channel链表指针
{LOG_DEBUG << "EventLoop created " << this << " in thread " << threadId_;if (t_loopInThisThread)                                //判断是否是本线程的loop,是一个loop类型的指针{LOG_FATAL << "Another EventLoop " << t_loopInThisThread<< " exists in this thread " << threadId_;        //用LOG_FATAL终止abort它}else{t_loopInThisThread = this; //this赋给线程局部数据指针}//设定wakeupChannel的回调函数,即EventLoop自己的的handleRead函数wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead, this));          //channel->handleEventWithGuard会调用到handleRead// we are always reading the wakeupfdwakeupChannel_->enableReading();    //注册wakeupFd_到poller
}EventLoop::~EventLoop()
{LOG_DEBUG << "EventLoop " << this << " of thread " << threadId_<< " destructs in thread " << CurrentThread::tid();wakeupChannel_->disableAll(); //从监听队列fd里移除wakeupChannel_->remove();  //移除epoll里面的channel::close(wakeupFd_);t_loopInThisThread = NULL;
}void EventLoop::loop()
{assert(!looping_);assertInLoopThread(); //事件循环必须在IO线程中,即创建该evenloop的线程looping_ = true; //是否正在循环quit_ = false;  // FIXME: what if someone calls quit() before loop() ?LOG_TRACE << "EventLoop " << this << " start looping";while (!quit_){activeChannels_.clear();                            //activeChannels_是一个vector//等待io复用函数返回pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); //调用poll返回活动的事件,有可能是唤醒返回的++iteration_;//根据设置的日志等级打印跟踪信息if (Logger::logLevel() <= Logger::TRACE){printActiveChannels();}// TODO sort channel by priority  按优先级排序//处理IO事件eventHandling_ = true;for (Channel* channel : activeChannels_)            //遍历通道来进行处理{currentActiveChannel_ = channel;currentActiveChannel_->handleEvent(pollReturnTime_);  //pollReturnTime_是poll返回的时刻}currentActiveChannel_ = NULL;                       //处理完了赋空eventHandling_ = false;//执行方法队列中的方法[方法队列functors,我们可以跨线程的往里面添加新的方法,这些方法会在处理完io事件后执行]doPendingFunctors();                                //这个设计也能够进行计算任务}LOG_TRACE << "EventLoop " << this << " stop looping";looping_ = false;
}void EventLoop::quit()
{quit_ = true;  //设置退出标志// There is a chance that loop() just executes while(!quit_) and exits,// then EventLoop destructs, then we are accessing an invalid object.// Can be fixed using mutex_ in both places.if (!isInLoopThread()){wakeup(); //唤醒}
}//在I/O线程中调用某个函数
//实际上就是如果是I/O线程主动调用该函数想要执行,那就同步执行该函数。如果是其他线程施加给I/O线程的任务,那么其他线程就需要把回调函数加入I/O线程的队列,等待异步执行
void EventLoop::runInLoop(Functor cb) 
{if (isInLoopThread())  //判断是否是本线程的loop{cb();}else{queueInLoop(std::move(cb)); }
}void EventLoop::queueInLoop(Functor cb)//把方法添加到队列中,该方法会出现在多个线程中,操作要加锁
{{MutexLockGuard lock(mutex_);pendingFunctors_.push_back(std::move(cb));//std::function支持移动初始化,所以这里用move提升性能。(减少一次拷贝)}if (!isInLoopThread() || callingPendingFunctors_)//如果调用的queneInLoop的线程不是IO线程,那么唤醒{//如果在IO线程调用queueInLoop(),而此时正在调用pending functor,由于doPendingFunctors()调用的Functor可能再次调用queueInLoop(cb),这是queueInLoop()就必须wakeup(),否则新增的cb可能就不能及时调用了wakeup();}
}size_t EventLoop::queueSize() const
{MutexLockGuard lock(mutex_);return pendingFunctors_.size();
}TimerId EventLoop::runAt(Timestamp time, TimerCallback cb)//在指定的时间调用callback
{return timerQueue_->addTimer(std::move(cb), time, 0.0);
}TimerId EventLoop::runAfter(double delay, TimerCallback cb)//等一段时间调用callback
{Timestamp time(addTime(Timestamp::now(), delay));//微妙return runAt(time, std::move(cb));
}TimerId EventLoop::runEvery(double interval, TimerCallback cb)//以固定的间隔反复的调用callback
{Timestamp time(addTime(Timestamp::now(), interval));return timerQueue_->addTimer(std::move(cb), time, interval);
}void EventLoop::cancel(TimerId timerId) //取消timer
{return timerQueue_->cancel(timerId);
}void EventLoop::updateChannel(Channel* channel)   //更新通道,用epoll_ctl更新fd
{assert(channel->ownerLoop() == this);  //判断channel的loop是不是当前loopassertInLoopThread();  poller_->updateChannel(channel);
}void EventLoop::removeChannel(Channel* channel) //移除通道,将channel从ChannelMap移除并EPOLL_CTL_DEL掉fd
{assert(channel->ownerLoop() == this);  //表示当前的loopassertInLoopThread();if (eventHandling_)  //正在处理channel{assert(currentActiveChannel_ == channel ||   //当前的channel或不是活跃的channelstd::find(activeChannels_.begin(), activeChannels_.end(), channel) == activeChannels_.end());}poller_->removeChannel(channel);
}bool EventLoop::hasChannel(Channel* channel)//查找事件分发器是否在channels_中
{assert(channel->ownerLoop() == this);assertInLoopThread();return poller_->hasChannel(channel);
}void EventLoop::abortNotInLoopThread()
{LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this<< " was created in threadId_ = " << threadId_<< ", current thread id = " <<  CurrentThread::tid();
}void EventLoop::wakeup()
{uint64_t one = 1;ssize_t n = sockets::write(wakeupFd_, &one, sizeof one);   //随便写点数据进去就唤醒了if (n != sizeof one){LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";}
}void EventLoop::handleRead()      //读取唤醒的数据
{uint64_t one = 1;ssize_t n = sockets::read(wakeupFd_, &one, sizeof one);if (n != sizeof one){LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";}
}// 1. 不是简单的在临界区内依次调用functor,而是把回调列表swap到functors中,这一方面减小了
//临界区的长度,意味着不会阻塞其他线程的queueInLoop(),另一方面也避免了死锁(因为Functor可能再次调用quueInLoop)
// 2. 由于doPendingFunctors()调用的Functor可能再次调用queueInLoop(cb),这是queueInLoop()就必须wakeup(),否则新增的cb可能就不能及时调用了
// 3. muduo没有反复执行doPendingFunctors()直到pendingFunctors为空,这是有意的,否则I/O线程可能陷入死循环,无法处理I/O事件
void EventLoop::doPendingFunctors()
{std::vector<Functor> functors;callingPendingFunctors_ = true;//注意这里的临界区,这里使用了一个栈上变量functors和pendingFunctors交换{MutexLockGuard lock(mutex_);functors.swap(pendingFunctors_);  //pendingFunctors_是存放Functor的vector}//此处其它线程就可以往pendingFunctors添加任务for (const Functor& functor : functors){functor();}callingPendingFunctors_ = false;
}void EventLoop::printActiveChannels() const
{for (const Channel* channel : activeChannels_){LOG_TRACE << "{" << channel->reventsToString() << "} ";}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/118195.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux | 1.Linux环境与版本

目录 1.以下哪个命令输出Linux内核的版本信息&#xff1a; 2.linux 2.6.* 内核默认支持的文件系统有哪些&#xff1f;[多选] 3.linux查看cpu占用的命令是什么&#xff1f; 4.在Linux系统中, 为找到文件try_grep含有以a字母为行开头的内容, 可以使用命令&#xff1f; 5.在使…

力扣每日一题67:二进制求和

题目描述&#xff1a; 给你两个二进制字符串 a 和 b &#xff0c;以二进制字符串的形式返回它们的和。 示例 1&#xff1a; 输入:a "11", b "1" 输出&#xff1a;"100" 示例 2&#xff1a; 输入&#xff1a;a "1010", b "…

Java实现Fisher‘s Exact Test 的置信区间的计算

实现代码 package com.bgi.aigi.common.utils;public class FisherExactUtils {public static double[] getConfidenceInterval(double[][] data) {if (datanull||data.length!2||data[0].length!2||data[1].length!2) {return null;}double[] intervalnew double[2];double …

神经网络中epoch、batch、batchsize区别

目录 1 epoch 2 batch 3 batchsize 4 区别 1 epoch 当数据集中的全部数据样本通过神经网络一次并且返回一次的过程即完成一次训练称为一个epoch。 当我们分批学习时,每次使用过全部训练数据完成一次Forword运算以及一次BP运算,称为完成了一次epoch。 epoch时期 = 所有训练…

C++ 流程控制(分支、循环、跳转)

#include<iostream>using namespace std;int main() {// 单分支和双分支cout << "please enter your age:" << endl;int age;cin >> age;if(age > 18){cout << "welcome! adult." << endl;}else{cout << &qu…

excel常用的几个函数

1、MID函数 通常用来返回返回指定字符串中的子串。 函数公式&#xff1a; MID(string, starting_at, extract_length) String&#xff08;必填&#xff09;&#xff1a;包含要提取字符的文本字符串 starting_at&#xff08;必填&#xff09;&#xff1a;文本中要提取的第一个字…

解决el-tooltip滚动时悬浮框相对位置发生变化

获取最外层box的class&#xff0c;并在内层添加el-scrollbar <template><div class"ChartsBottom"><el-scrollbar><ul class""><li v-for"(item, index) in list" :key"index"><div class"con…

YOLO目标检测——行人数据集【含对应voc、coco和yolo三种格式标签+划分脚本】

实际项目应用&#xff1a;智能监控、人机交互、行为分析、安全防护数据集说明&#xff1a;行人检测数据集&#xff0c;真实场景的高质量图片数据&#xff0c;数据场景丰富标签说明&#xff1a;使用lableimg标注软件标注&#xff0c;标注框质量高&#xff0c;含voc(xml)、coco(j…

docker-compose安装ES7.14和Kibana7.14(有账号密码)

一、docker-compose安装ES7.14.0和kibana7.14.0 1、下载镜像 1.1、ES镜像 docker pull elasticsearch:7.14.0 1.2、kibana镜像 docker pull kibana:7.14.0 2、docker-compose安装ES和kibana 2.1、创建配置文件目录和文件 #创建目录 mkdir -p /home/es-kibana/config mkdir…

音视频技术开发周刊 | 316

每周一期&#xff0c;纵览音视频技术领域的干货。 新闻投稿&#xff1a;contributelivevideostack.com。 日程揭晓&#xff01;速览深圳站大会专题议程详解 LiveVideoStackCon 2023 音视频技术大会深圳站&#xff0c;保持着往届强大的讲师阵容以及高水准的演讲质量。两天的参会…

【解锁未来】探索Web3的无限可能性-01

文章目录 前言什么是Web3&#xff1f; 前言 还记得你第一次听说比特币吗&#xff1f;也许那只是一个关于新技术将改变一切的微弱嗡嗡声。也许你会有一种 "FOMO "的感觉&#xff0c;因为那些早早入场的人突然积累了一大笔财富–尽管你并不清楚这些 "钱 "可…

flutter开发的一个小小小问题,内网依赖下不来

问题 由于众所周知的原因&#xff0c;flutter编译时&#xff0c;经常出现Could not get resource https://storage.googleapis.com/download.flutter.io…的问题&#xff0c;如下&#xff1a; * What went wrong: Could not determine the dependencies of task :app:lintVit…

ip报头和ip报文切片组装问题

在tcp层将数据打包封装向下传递后&#xff0c;网络层将其整个看为一个数据&#xff0c;然后对其数据加网络报头操作&#xff0c;在网络层最具有代表的协议就是ip协议。在这里我们探究ipv4的报头。 ip报头 4位版本&#xff1a;指定ip的版本号&#xff0c;对于ipv4来说就是4。 …

利用jupyter进行分类

Jupyter Notebook是一个非常强大的工具&#xff0c;可以用于各种数据分析和机器学习任务&#xff0c;包括分类问题。在Jupyter Notebook中进行分类通常需要以下步骤&#xff1a; 导入所需的库&#xff1a;首先&#xff0c;你需要导入必要的Python库&#xff0c;例如NumPy、Pand…

java 读取pdf文件内容

一、引入maven <dependency><groupId>org.apache.pdfbox</groupId><artifactId>pdfbox</artifactId><version>2.0.25</version> </dependency>二、代码工具类 package com.jiayou.peis.utils;//import com.itextpdf.text.pd…

软考 系统架构设计师系列知识点之设计模式(1)

所属章节&#xff1a; 老版&#xff08;第一版&#xff09;教材 第7章. 设计模式 第1节. 设计模式概述 7.1.4 设计模式的分类 设计模式的分类 软件模式主要可分为设计模式、分析模式、组织和过程模式等&#xff0c;每一类又可细分为若干个子类。在此着重介绍设计模式&#x…

asp.net文档管理系统VS开发sqlserver数据库web结构c#编程Microsoft Visual Studio

一、源码特点 asp.net文档管理系统是一套完善的web设计管理系统&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。开发环境为vs2010&#xff0c;数据库为sqlserver2008&#xff0c;使用c#语言开发 asp.net文档管理系统 二、功能介绍 (1…

C++ list 的使用

目录 1. 构造函数 1.1 list () 1.2 list (size_t n, const T& val T()) 1.3 list (InputIterator first, InputIterator last) 2. bool empty() const 3. size_type size() const 4. T& front() 4. T& back() 5. void push_front (const T& val) 6.…

FL Studio 21 for Mac中文破解版百度网盘免费下载安装激活

FL Studio 21 for Mac中文破解版是Mac系统中的一款水果音乐编辑软件&#xff0c;提供多种插件&#xff0c;包括采样器、合成器和效果器&#xff0c;可编辑不同风格的音乐作品&#xff0c;Pattern/Song双模式&#xff0c;可兼容第三方插件和音效包&#xff0c;为您的创意插上翅膀…

java _JDBC 开发

目录 一.封装JDBCUtiles 二.事务 三.批处理 四.数据库连接池 C3P0 Druidf(德鲁伊&#xff09;阿里 五.Apache-DBUtiles 六.Apache-DBUtils 七.DAO 和增删改查 通用方法 - BasicDao 一.封装JDBCUtiles 说明&#xff1a;在jdbc操作中&#xff0c;获取连接和释放资源&#…