Timestamp时间管理类
①:主要提供now函数显示当前时间:自1970年1月1日0点以来经过的秒数,使用time函数
②: toString函数将字符串转化成时间字符串,使用localtime函数将秒数格式化成日历时间
解析tm_time 并以日历格式输出:
// 2022/08/26 16:29:10
// 20220826 16:29:10.773804
③:如果想更景区可以显示微秒:toFormattedString函数
InetAddress地址管理类
封装了socket地址,提供了方便的方法来处理IP地址和端口,以及“sockaddr_in”结构转换的功能。
①:接收提供端口号和IP号两个参数的构造函数,也接受直接用sockaddr_in结构来初始化这个类。
②:网络地址或端口转化成本地字符串给人阅读
toIp()
: 返回IP地址的字符串表示。toPort()
: 返回端口号。toIpPort()
: 返回"IP:端口"格式的字符串
inet_ntoa转IP、ntohs转端口
③:设置sockaddr_in结构
getSockAddr()
: 获取内部的sockaddr_in
结构的指针。setSockAddr()
: 设置内部的sockaddr_in
结构。
三大核心模块:
Muduo库是基于Reactor模型实现的TCP网络编程库。Multi-Reactor模型:
Muduo库有三个核心组件支撑一个reactor实现持续监听一组fd,并根据每个fd上发生的事件调用相应的处理函数。这三个组件分别是Channel类、Poller/EpollPoller类以及EventLoop类。
三大核心模块一:Channel类
Channel类则封装了一个 [fd] 和这个 [fd感兴趣事件] 以及事件监听器监听到 [该fd实际发生的事件]。同时Channel类还提供了设置该fd的感兴趣事件,以及将该fd及其感兴趣事件注册到事件监听器或从事件监听器上移除,以及保存了该fd的每种事件对应的处理函数。
Channel文件描述的保姆!!!
也就是说Channel里主要就是封装了sockfd,感兴趣的事件类型events_,事件的回调函数(用户设置的,传给Channel用)
①:Channel类的主要成员变量:
- int fd_这个Channel对象照看的文件描述符
- int events_代表fd感兴趣的事件类型:kNoneEvent、kReadEvent、kWriteEvent
- int revents_代表事件监听器Poller实际监听到该fd发生的事件类型集合,当事件监听器监听到一个fd发生了什么事件,通过Channel::set_revents()函数来设置revents值。Poller返回的具体发生的事件类型。
- EventLoop* loop这个fd属于哪个EventLoop对象。
- int index表示Channel在Poller中的状态,有kNew未添加、kAdded已添加、kDeleted已删除。
- read_callback_ 、write_callback_、close_callback_、error_callback_:这些是std::function类型,代表着这个Channel为这个文件描述符保存的各事件类型发生时的处理函数。比如这个fd发生了可读事件,需要执行可读事件处理函数,这时候Channel类都替你保管好了这些可调用函数,真是贴心啊,要用执行的时候直接管保姆要就可以了。
②:Channel类重要的成员方法:
--向Channel对象注册各类事件的处理函数:
这里是用户设置的根据Poller返回的发生事件的具体类型设置的
--将Channel中的文件描述符及其感兴趣事件注册事件监听器上或从事件监听器上移除:
外部通过这几个函数来告知Channel你所监管的文件描述符都对哪些事件类型感兴趣,并把这个文件描述符及其感兴趣事件注册到事件监听器(IO多路复用模块)上。update实际上是epoll_ctl。
--index用来标识 channel在poller中的状态
--set_revents Poller监听Channel具体发生了什么事件并返回回去
当事件监听器监听到某个文件描述符发生了什么事件,通过这个函数可以将这个文件描述符实际发生的事件封装进这个Channel中。
--void HandlerEvent根据Poller返回给Channel的revents_判断去执行什么回调函数
Pollr中调用了epoll_wait()得知那些Channel文件描述符发生了那些事件,事件发生后自然就要调用这些Channel对应的处理函数。通过Channel中的revents_
变量得知)和感兴趣的事件(通过Channel中的events_
变量得知)来选择调用read_callback_
和/或write_callback_
和/或close_callback_
和/或error_callback_
。
三大核心模块二:Poller类/EpollPoller类
负责监听文件描述符事件是否触发以及返回发生事件的文件描述符以及具体事件的模块就是Poller。所以一个Poller对象对应一个事件监听器(这里我不确定要不要把Poller就当作事件监听器)。在multi-reactor模型中,有多少reactor就有多少Poller。
muduo提供了epoll和poll两种IO多路复用方法来实现事件监听。不过默认是使用epoll来实现,也可以通过选项选择poll。该项目自己重构的muduo库只支持epoll。
这个Poller是个抽象虚类,由EpollPoller和PollPoller继承实现,与监听文件描述符和返回监听结果的具体方法也基本上是在这两个派生类中实现。
EpollPoller就是封装了用epoll方法实现的与事件监听有关的各种方法。
①:EpollPoller的重要成员变量:
- epollfd_ :就是用epoll_create方法返回的epoll句柄。
- channels_:这个变量是std::unordered_map<int, Channel*>类型,负责记录文件描述符-->Channel的映射,也帮忙保管所有注册在Poller上的Channel。
- ownerLoop_:所属的EventLoop对象
②:EpollPoller给外部提供的最重要的方法:
TimeStamp poll(int timeoutMs, ChannelList *activeChannels)
Poller的核心,底层调用epoll_wait,获取监听器上发生事件的fd及其对应发生的事件,每个fd都是由一个Channel封装的,通过哈希表channels_可以根据fd找到封装fd的Channel。
Poller监听的该fd发生的事件写入Channel中的revents_成员变量。然后通过一个fillActiveChannels函数将该Channel装进activeChannels_(它是一个vector<Channel*>)中表示活跃连接集合。
EventLoop调用玩poll之后就可以拿到监听结果(activeChannels_)
三大核心模块三:EventLoop类
Poller封装了和事件监听有关的方法和成员(epoll_ctl , epoll_wait),调用一次Poller::poll方法它就返回事件监听器的监听结果(发生事件的fd及其发生的事件)。
作为一个网络服务器,需要有持续监听、持续获取监听结果、持续处理监听结果对应的事件的能力,也就是我们需要循环去调用Poller::poll方法获取实际发生事件的Channel集合,然后调用这些Channel里面保管的不同类型事件的处理函数(调用Channel::HandlerEvent方法)
EventLoop就是负责实现“循环”,负责驱动“循环”的重要模块!
Channel和Poller其实相当于EventLoop的手下,EventLoop封装了二者并向上提供了更方便的接口来使用。
One Loop Per Thread:每一个EventLoop都绑定了一个线程(一对一绑定),这种运行模式是Muduo库的特色!充分利用了多核cpu的能力,每一个核的线程负责循环监听一组文件描述符的集合。
①:EventLoop中的重要成员变量:
- atomic_bool looping_ 原子类型bool变量,表示EventLoop是否正在运行
- atomic_bool quit_原子类型bool变量,表示EventLoop是否已被要求停止
- pid_t threadId_ 记录当前loop所在线程tid
- unique_ptr<Poller> poller_ 指向Poller对象的指针,用于多路复用IO事件分发
- unique_ptr<TimerQueue> timerQueue_指向定时器队列指针,执行定时任务
- vector<Channel*> activeChannels_ 一个Channel列表,存储当前活跃的Channels
- vector<Functor> pendingFunctors_ 存储loop需要执行的所有的回调操作
- mutex mutex_ 保护
pendingFunctors_
等可能被多个线程同时访问的成员 - int wakeupFd_ 由linux内核eventfd创建出来的,一个唤醒文件描述符,用于在一个线程中唤醒另一个线程的EventLoop(每一个线程都有自己的wakeupFd,主线程mainLoop想唤醒一个SubLoop可以像SubLoop的wakeupFd写入数据,被唤醒的线程会读取自己的wakeupFd,清空它并处理唤醒事件)
②:EventLoop中重要的成员方法:
- EventLoop重要方法
EventLoop:loop()
:
每个EventLoop对象都唯一绑定了一个线程,这个线程其实就是在执行这个函数中的while循环,这个while循环的大致逻辑比较简单。就是调用Poller::poll方法获取事件监听器上的监听结果,结果会存在activeChannels_中。接下来在loop里面就会调用监听结果中每一个Channel的处理函数HandlerEvent()。每一个Channel的处理函数会根据Channel类中封装的实际发生的事件,执行Channel类中封装的各事件处理函数。
这里while中还会执行doPendingFunctors()函数,执行掉当前Loop事件循环需要处理的回调函数,这些函数是放在std::vector<Functor> pendingFunctors_之中。mainLoop只做accept新用户的连接
- quit(): 用于停止事件循环
如果loop在自己的线程中调用了quit直接退出;
如果在非loop的线程中调用loop需要通过wakeup()函数调用wakefd通知唤醒。
- ⭐⭐⭐eventfd()跨线程调度任务:
'eventfd()'是Linux内核为用户空间程序提供的轻量级事件通知机制。主要代替更为复杂、重量级的通知方式,比如管道等,从而为跨线程或进程间通信提供了一个简单、高效的方式。
创建一个'eventfd'对象时,内核会返回一个文件描述符,这个文件描述符可以用来进行读写操作。内部是一个计数器,写入整数值会被加到计数器上,读取时取出计数器当前值重置为0。
muduo中每一个EventLoop创建时都生成一个由eventfd返回的wakeupFd_,并封装成wakeupChannel_,通过bind函数设置回调函数handleRead,交给Poller监听读事件
handleRead()函数: 读
wakeup()函数:写
-
runInLoop(const Functor& cb): 在事件循环线程中执行给定的回调。如果当前在同一线程中,它会立即执行回调,否则它会排队等待在事件循环中执行。
-
queueInLoop(const Functor& cb): 将回调放入队列中,在事件循环的下一个迭代中执行。
-
doPendingFunctors()执行回调 在loop中调用的方法
这里又开辟了一个临时空间存放回调方法,资源交换把pendingFunctors_释放出来
- Poller接口:
CurrentThread获取线程tid类
服务器肯定有多个线程,一个线程执行一个EventLoop,所以我们会有很多个EventLoop,每个EventLoop都有很多Channel,自己Channel上的事件要在自己的EventLoop线程上去处理,为了控制这些逻辑,EventLoop在这里涉及到获取当前线程ID。
使用 '__thread' 线程局部存储 (TLS) 变量,每个线程都有自己的变量副本,这个变量副本的生命周期和使用它的线程生命周期相同。C++11提供 ‘thread_local’关键字
__thread int t_cachedTid = 0;
获取Tid的过程是一个系统调用,从用户空间切换到内核空间,比较浪费时间,第一次访问就把Tid存储在定义的线程局部变量中t_cachedTid
EventLoop相关的三个线程类
Thread线程类
使用C++11的thread类,创建一个线程这里是
①:thread成员变量
- bool started_; //启动当前线程
- bool joined_; // 当前线程等待其他线完了再运行下去
- std::shared_ptr<std::thread> thread_; // 自己掌控线程对象产生的时机,这里直接调用thread thread_它会立即创建一个新线程
- pid_t tid_;
- ThreadFunc func_; // 存储线程函数
- std::string name_; // 调试的时候打印
- static std::atomic_int numCreated_; // 对线程数量计数
②:thread成员方法
start() 启动当前线程
join() 当前线程等待其他线程完了再运行下去
EventLoopThread事件类
对EventLoop和thread的封装,通过bind绑定器将其绑定,
- 这允许在一个单独的线程中执行 I/O 操作,不影响主线程的执行。
- 这对于某些应用程序来说是有用的,因为它们可能想要在不同的线程中独立地处理 I/O,从而提高性能。
①:EventLoopThread成员变量
- void threadFunc();//线程函数,创建loop
- EventLoop *loop_;
- bool exiting_;//是否退出循环
- Thread thread_;
- std::mutex mutex_;
- std::condition_variable cond_;
- ThreadInitCallback callback_;//初始化操作
②:EventLoopThread成员方法
- EventLoopThread::EventLoopThread构造函数
这里重要的点就是通过bind函数将threadFunc函数和thread_绑定起来,也就是将线程创建和EventLoop创建绑定起来。
- EventLoopThread::startLoop() 开启循环
- EventLoopThread::threadFunc() start()运行后创建新线程,执行绑定的threadFunc函数
⭐⭐⭐这两个函数要放到一起说:
1. 首先在startLoop函数中,调用thread_.start(),启动新线程去执行在EventLoop构造函数中绑定的threadFunc函数;
2. 在startLoop函数中,创建一个EventLoop对象loop初始化为nullptr;
3. 进入一个作用域,unique_lock<std::mutex> lock(mutex_)创建互斥锁;
4. while循环检查loop_是否为nullptr,如果为空,调用cond_.wait(lock)挂起,等待其他线程通过调用cond.notufy_one()唤醒,继续执行;
5. 其他线程调用cond.notufy_one()后,while循环退出,将loop_赋值给loop,返回loop,也即返回新线程中创建的Eventloop对象;
6. 在threadFunc函数中,首先创建一个EventLoop对象,loop,每个线程都用有一个独立的EventLoop;
7. 如果存在callback_,则调用callback_(&loop)去执行回调函数;
8. 进入一个作用域,unique_lock<std::mutex> lock(mutex_)创建互斥锁,并将loop_设置为当前loop,即将新线程中创建的EventLoop对象loop赋值给loop_,然后cond.notufy_one()唤醒等待中的线程。
9. 调用loop.loop()进入事件循环,执行EventLoop中的loop()函数;
10. 事件循环结束后,也就是EventLoop中的loop()函数退出后,再次获取锁,将loop_设置为nullptr,表示线程结束。
11. 返回到startLoop函数将loop返回。
该过程实现了创建一个新线程,并在新线程中运行一个独立的 EventLoop 对象,并通过条件变量等待新线程中的 EventLoop 对象创建完成后返回,从而实现了一个 "one loop per thread" 的设计模式。
EventLoopThreadPool池
EventLoopThreadPool
管理一个线程池,每个线程都有自己的EventLoop
。- 它允许多线程并发处理 I/O 操作。
EventLoopThreadPool
对于利用多核 CPU 构建高性能的网络服务器尤其有用,因为每个线程可以在单独的 CPU 核心上运行,提供真正的并行处理。- 当新的连接到来时,
EventLoopThreadPool
可以选择一个现有的EventLoop
(运行在某个线程上)来处理该连接。
①:EventLoopPoll的成员变量
- vector<EventLoop*> loops_ :件线程EventLoopThread里面的EventLoop指针
- EventLoop *baseLoop_ :最基本的loop mainloop
- bool started_ :表示 EventLoopPool 是否已启动
- int numThreads_; 线程数量
- int next_; 做轮询的下标使用的
- vector<std::unique_ptr<EventLoopThread>> threads_ 所有事件的线程
②:EventLoopPoll的成员函数
- start(const ThreadInitCallback &cb) : 启动EventLoopPoll,启动所有的线程和事件循环
由设置的线程数量numThreads_循环构建EventLoopThread,并将所有的EventLoopThread放到管理容器threads_中去,同时每一个EventLoopThread执行startLoop()开启循环。
- EventLoopThreadPool::getNextLoop():通过轮询选择下一个loop
- EventLoopThreadPool::getAllLoops():返回有所得loop
根据存放所有loop得容器loops_返回
Acceptor类
muduo库在使用得时候需要我们自己顶一个EventLoop,这个是mainLoop,如果用户在使用时没有通过EventLoopThreadPool提供得setThreadNum()函数设置muduo库得底层线程 得个数得话,那么他得IO线程和工作线程实际上是一个线程。这个setThreadNum()设置得实际上是SubReactor得数量。
Acceptor接受新用户连接并分发连接给SubReactor(SubEventLoop),封装了服务器监听套接字fd以及相关处理方法,以及对其他类得方法进行调用。
处理accept,监听新用户连接,新用户连接响应以后,拿到和客户端通信的clientfd,打包成Channel,然后根据muduo库的轮询算法,找一个subloop,将Channel给到subloop,扔给subloop之前,需要将subloop唤醒一下(wakeupfd : 每loop都有一个wakeupfd ,他那个Linux的系统调用eventfd创建的,一个带有线程notify,即带有通知机制的fd),mainloop可以向subloop随便写
个整数,唤醒subloop,将打包好的Channel扔给subloop,也就是注册到subloop的Poller上!
Acceptor运行在我们的baseLoop(mainReactor)里面。需要从监听队列中监听新用户的连接,所以需要有一个listen fd,muduo将这个fd也封装了,就是Socket
Socket类
封装listenfd,包括 listen() 、accept() 都在这里实现。
①:Socket类的成员变量
- const int sockfd_ :对listenfd的封装
②:Socket类的成员方法
- void bindAddress(const InetAddress &localaddr); //调用bind绑定服务器Ip端口
- void listen(); //调用listen监听套接字
- int accept(InetAddress *peeraddr); //调用accept接收新客户连接请求
这里使用了accept4() 主要它可以设置成非阻塞的 ,accept
是阻塞调用,会一直等待直到有新的连接到达;而 accept4
可以设置为非阻塞模式,即使没有新连接到达,它也会立即返回。
- void shutdownWrite(); //调用shutdown关闭服务端写通道
①:Acceptor成员变量
acceptSocket_
:这个是服务器监听套接字的文件描述符 (创建一个非阻塞的fd)acceptChannel_
:这是个Channel类,把acceptSocket_
及其感兴趣事件和事件对应的处理函数都封装进去。EventLoop *loop
:监听套接字的fd由哪个EventLoop负责循环监听以及处理相应事件,其实这个EventLoop就是main EventLoop。newConnectionCallback_:
TcpServer构造函数中将TcpServer::newConnection( )
函数注册给了这个成员变量。这个TcpServer::newConnection
函数的功能是公平的选择一个subEventLoop,并把已经接受的连接分发给这个subEventLoop。
②:Acceptor成员函数
- 构造函数Acceptor::Acceptor
封装成acceptChannel_ 并绑定回调函数handleRead
- 析构函数Acceptor::~Acceptor()
listen( )
:该函数底层调用了linux的函数listen( )
,开启对acceptSocket_
的监听同时将acceptChannel
及其感兴趣事件(可读事件)注册到main EventLoop的事件监听器上。换言之就是让main EventLoop事件监听器去监听acceptSocket_
handleRead( )
:这是一个私有成员方法,这个方法是要注册到acceptChannel_
上的, 同时handleRead( )
方法内部还调用了成员变量newConnectionCallback_
保存的函数。当main EventLoop监听到acceptChannel_
上发生了可读事件时(新用户连接事件),就是调用这个handleRead( )
方法。
简单说一下这个handleRead( )
最终实现的功能是什么,接受新连接,并且以负载均衡的选择方式选择一个sub EventLoop,并把这个新连接分发到这个subEventLoop上(通过TcpServer设置的newConnectionCallback_)。
TcpServer对外提供类
Acceptor是在mainloop中做事情了,做事情的回调函数是由TcpServer给它传递的!
①:TcpServer成员变量
有event loop,有acceptor跟accept相关的操作全部打包进去,事件循环的这个线程池threadPool_,有一系列的回调,再者它就有一个connection map,它维护了所有的连接。
- EventLoop *loop_;//baseLoop 主事件循环 ,用户定义的loop 一个线程一个loop循环
- const std::string ipPort_;//服务器的IP地址端口号
- const std::string name_;//服务器的名称
-
⭐unique_ptr<Acceptor> acceptor_ 运行在mainReactor,监听新连接事件
-
shared_ptr<EventLoopThreadPool> threadPool_ //线程池
-
⭐using ConnectionMap = std::unordered_map<std::string, TcpConnectionPtr>;
ConnectionMap connections_;//保存所有活动的TcpConnection连接
②:TcpServer成员函数
- 构造函数、析构函数:
TcpServer要传入当前mainLoop
构造函数需要主事件循环、监听地址、服务器名和端口重用选项。 析构函数用于资源清理。
- 设置回调函数,用户设置的:
- 线程与启动:
setThreadNum
:设置线程池中线程的数量。start
:启动服务器,开始监听。启动Loop线程池,并且TcpServer开始监听新用户的连接。
- newConnection : 处理新的TCP连接
-
removeConnection和removeConnectionInLoop移除一个TCP连接。
TcpConnection类
它主要就是用来打包呢,成功连接服务器的客户端的这么一条通信链路
TcpServer通过Acceptor得知有一个新用户连接,通过accept函数拿到connfd;
=》通过TcpConnection 设置回调函数,然后给Channel,然后给Poller判断是什么事件,然后再返回给Channel去执行Channel的回调函数
①:TcpServer成员变量
- EventLoop *loop_ :这里loop一定不是mainloop ,因为TcpConnection都是在subLoop里面管理
- unique_ptr<Socket> socket_;
- unique_ptr<Channel> channel_;
- const InetAddress localAddr_;//当前主机IP地址端口号
- const InetAddress peerAddr_;//对端IP地址端口号
- ConnectionCallback connectionCallback_;//有新连接时的回调
- MessageCallback messageCallback_;//有读写消息时的回调
- WriteCompleteCallback writeCompleteCallback_;//消息发送完成以后的回调
- HighWaterMarkCallback highWaterMarkCallback_;//水位 控制发送数据的速度
- CloseCallback closeCallback_;
- size_t highWaterMark_;//水位标志
- Buffer inputBuffer_;//接收数据的缓冲区
- Buffer outputBuffer_;//发送数据的缓冲区
②:TcpServer成员函数
//发送数据
void send(const std::string &buf);
void send(Buffer *buf);
void shutdown();//关闭连接
void connectEstablished();//连接建立
void connectDestroyed(); //连接销毁
//各个回调函数,给Channel设置的:
handleRead
handleWrite
handleClose
handleError
Buffer类
8字节 包头存放要解析数据长度 | 可读缓冲区 | 可写缓冲区
kCheapPrepend 8 | kInitialSize 1024
缓冲区对于非阻塞IO非常重要,TCP编程中经常会出现粘包问题,一般在通讯数据中加一个包头表示读取数据的大小,每一次根据数据的长度来截取相应的包的大小。
- Buffer类的成员变量:
- std::vector<char> buffer_ //vector数组 扩容方便
- size_t readerIndex_; //可读数据的下标位置
- size_t writerIndex_; //写数据的下标位置
- Buffer类的成员函数:
- peek :返回缓冲区中可读数据的起始地址
- retrieve:判断数据有没有一次性读完,设置好两个数据位置
- retrieveAllAsString:onMessage函数上报的Buffer数据,转成string类型的数据返回
- makeSpace扩容函数:
- readFd:从fd上读取数据;
底层是readv,可以从fd上读取数据并可以按照一组散布缓冲区 (iovec 结构数组) 进行存储。
- writeFd:向fd上写数据;
底层是writev,可以将散布缓冲区的数据写到指定文件。
Logger异步日志
红黑树定时器
Echo服务器使用
我们需要做的是,设置注册回调函数,onConnection和onMessage
创建EventLoop loop主事件循环;
创建InerAddress addr(8000)地址类;
构建EchoServer类:
包含TcpServer对象 server_
包含EventLoop
文章借鉴:
万字长文梳理Muduo库核心代码及优秀编程细节思想剖析 - 知乎 (zhihu.com)