我刚开始看这个模块时,也是看不明白,什么是事件管理模块。
此时此刻,大领导的背影,还是那么清晰。结合故事模块,慢慢理。
EventLoop模块 成员:
绿色:
利用智能指针对new出来的对象进行管理,这里就应该这样做,否则会内存泄露。
蓝色:
就是事件监控模块,一个EventLoop对象对应一个Poller对象,Poller负责文件描述符的监控,可能有许多客户端的文件描述符,定时管理文件描述符,事件管理文件描述符。在没其他文件描述符。
灰色:
就是定时任务模块,Connection模块调用 EventLoop模块共有接口,将定时任务插入到TimerWheel模块中。在这个项目中,定时器模块只是对客户发起的连接进行了管理(非活跃连接销毁功能)。
红色:
就是EventLoop绑定的线程,线程函数是Start()死循环。
绿色:就是EventLoop模块对任务池所做的通知机制,向任务池中插入任务时,会向事件管理文件描述符中写入一个1。
如果短时间内,有许多任务,但是事件管理文件描述符来不及读取,就会不断对之前那个数字加1,每插入一次就加一。但是将数字读取之后,并没有像TimerWheel模块中的,定时管理文件描述符那样,用读取到的数字去执行定时任务。而是这样做。
Poller模块 对文件描述符进行事件监控,他是阻塞是等待,有事件就绪,处理就绪事件,哪怕只有一个事件。
这里有两种情况:一,其他文件描述符有就绪事件,但是任务池中也有事件,首先,将其他文件描述符中就绪事件处理完,在处理任务池中的事件。
二,其他文件描述符中没有就绪事件,但是任务池中有事件,这时候 事件管理文件描述符就产生作用,他将Poller从,1,阻塞模式中唤醒,2去执行事件管理文件描述符上的事件
3,再执行任务池中的任务。
任务池是如何运行的
首先创建一个新的任务池对象,将旧的任务池中的任务放到新的任务池中。这样做,不仅没有将下一次到来的任务 和这次要执行的任务搞混,还达到了运行任务的目的。
线程安全问题:
首先,线程安全就是只线程会修改进程中的一些资源,如果其他线程用到这个资源,就会产生,错误。因为这个资源被修改了。
你想,主线程 将建立好的连接给子线程。这句话中就有两对象:主线程,子线程。
所以我当时就没理解,为什么会有两个线程,由于没有掌握GDB调试,我就硬看和猜测。
我也询问别人,别人提醒了一句,我才恍然大悟。下面就是我用打印呈现出来的现象。
结果发现,客户端刚已连接,服务端就但因这句话,并且出现两个线程id。这就真是了我的猜想。你如果不理解,可以去打印。
剩下的函数有关模块的讲过,这个模块单拿出来将,也就是大白话,没什么讲的。
EventLoop整体代码:
class EventLoop {private:using Functor = std::function<void()>;std::thread::id _thread_id;//线程IDint _event_fd;//eventfd唤醒IO事件监控有可能导致的阻塞std::unique_ptr<Channel> _event_channel;Poller _poller;//进行所有描述符的事件监控std::vector<Functor> _tasks;//任务池std::mutex _mutex;//实现任务池操作的线程安全TimerWheel _timer_wheel;//定时器模块public://执行任务池中的所有任务void RunAllTask() {std::vector<Functor> functor;{std::unique_lock<std::mutex> _lock(_mutex);_tasks.swap(functor);}for (auto &f : functor) {f();}return ;}static int CreateEventFd() {int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);if (efd < 0) {ERR_LOG("CREATE EVENTFD FAILED!!");abort();//让程序异常退出}return efd;}void ReadEventfd() {uint64_t res = 0;int ret = read(_event_fd, &res, sizeof(res));if (ret < 0) {//EINTR -- 被信号打断; EAGAIN -- 表示无数据可读if (errno == EINTR || errno == EAGAIN) {return;}ERR_LOG("READ EVENTFD FAILED!");abort();}return ;}void WeakUpEventFd() {uint64_t val = 1;int ret = write(_event_fd, &val, sizeof(val));if (ret < 0) {if (errno == EINTR) {return;}ERR_LOG("READ EVENTFD FAILED!");abort();}return ;}public:EventLoop():_thread_id(std::this_thread::get_id()), _event_fd(CreateEventFd()), _event_channel(new Channel(this, _event_fd)),_timer_wheel(this) {//给eventfd添加可读事件回调函数,读取eventfd事件通知次数_event_channel->SetReadCallback(std::bind(&EventLoop::ReadEventfd, this));//启动eventfd的读事件监控_event_channel->EnableRead();}//三步走--事件监控-》就绪事件处理-》执行任务void Start() {while(1) {//1. 事件监控, std::vector<Channel *> actives;_poller.Poll(&actives);//2. 事件处理。 for (auto &channel : actives) {channel->HandleEvent();}//3. 执行任务RunAllTask();}}//用于判断当前线程是否是EventLoop对应的线程;bool IsInLoop() {return (_thread_id == std::this_thread::get_id());}void AssertInLoop() {assert(_thread_id == std::this_thread::get_id());}//判断将要执行的任务是否处于当前线程中,如果是则执行,不是则压入队列。void RunInLoop(const Functor &cb) {if (IsInLoop()) {return cb();}return QueueInLoop(cb);}//将操作压入任务池void QueueInLoop(const Functor &cb) {{std::unique_lock<std::mutex> _lock(_mutex);_tasks.push_back(cb);}//唤醒有可能因为没有事件就绪,而导致的epoll阻塞;//其实就是给eventfd写入一个数据,eventfd就会触发可读事件WeakUpEventFd();}//添加/修改描述符的事件监控void UpdateEvent(Channel *channel) { return _poller.UpdateEvent(channel); }//移除描述符的监控void RemoveEvent(Channel *channel) { return _poller.RemoveEvent(channel); }void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb) { return _timer_wheel.TimerAdd(id, delay, cb); }void TimerRefresh(uint64_t id) { return _timer_wheel.TimerRefresh(id); }void TimerCancel(uint64_t id) { return _timer_wheel.TimerCancel(id); }bool HasTimer(uint64_t id) { return _timer_wheel.HasTimer(id); }
};