macOS支持、完善逻辑
目前的代码可以在Linux上完美运行编译,在Windows上也可以通过WSL编译运行源代码,但是在MacBook上却无法运行编译,这主要是由于macOS上没有epoll,取而代之的很相似的kqueue。由于操作系统不同,我们需要的是在面向操作系统即从用户态向内核态转变时进行修改。目前我们面向这一过程的仅有Epoll,所以在面向不同操作系统的过程中仅需要关心这部分。
为了完善逻辑处理机制,即将不同事件类型进行不同的注册,事件注册在Channel中完成,因为之前是在Channel对不同事件的回调函数进行设置的。
1、错误检测机制
2、Macros(宏定义,去除类移动和复制)
3、Socket(创建地址和socket)
4、Epoll->Poller(事件注册分发从以及红黑树上删除)
声明部分就可以很直观地看出来,两者同属一系,仅仅是多了一部分适用于macOS的定义。
//Epoll
#include "Macros.h"
#include <vector>
#ifdef OS_LINUX
#include <sys/epoll.h>
#endif
class Channel;
class Epoll {public:Epoll();~Epoll();DISALLOW_COPY_AND_MOVE(Epoll);void UpdateChannel(Channel *ch);void DeleteChannel(Channel *ch);std::vector<Channel *> Poll(int timeout = -1);private:int epfd_{1};struct epoll_event *events_{nullptr};
};
//Poller
#include "Macros.h"
#ifdef OS_LINUX
#include <sys/epoll.h>
#endif
#ifdef OS_MACOS
#include <sys/event.h>
#endif
class Channel;
class Poller {public:Poller();~Poller();DISALLOW_COPY_AND_MOVE(Poller);void UpdateChannel(Channel *ch);void DeleteChannel(Channel *ch);std::vector<Channel *> Poll(int timeout = -1);private:int fd_{1};
#ifdef OS_LINUXstruct epoll_event *events_{nullptr};
#endif
#ifdef OS_MACOSstruct kevent *events_{nullptr};
#endif
};
为了完善逻辑,能够为不同事件类型注册不同的事件处理方式,这里采用标志位的方案进行设置,由于多了一种系统所以代码量会多出一倍,这是因为需要对在macOS系统下也进行定义。
在poll中对就绪事件进行分发,利用Channel能够在使用fd的同时获得事件的处理方式。
std::vector<Channel *> Poller::Poll(int timeout) {std::vector<Channel *> active_channels;int nfds = epoll_wait(fd_, events_, MAX_EVENTS, timeout);ErrorIf(nfds == -1, "epoll wait error");for (int i = 0; i < nfds; ++i) {Channel *ch = (Channel *)events_[i].data.ptr;int events = events_[i].events;if (events & EPOLLIN) {ch->SetReadyEvents(Channel::READ_EVENT);}if (events & EPOLLOUT) {ch->SetReadyEvents(Channel::WRITE_EVENT);}if (events & EPOLLET) {ch->SetReadyEvents(Channel::ET);}active_channels.push_back(ch);}return active_channels;
}
之后是对Channel中的监听事件类型标志位进行设置,如果事件标志位判断为真则将该事件设置成对应标志,注意是对红黑树树上的通道进行设置,不在树上都需要放到树上。
注意就绪事件和监听事件之间的关系,
注册监听:程序首先通过 listen_events_ 向内核注册自己感兴趣的事件。
内核监控:内核不断监控这些事件,并在事件发生时通知程序。
事件就绪:当内核检测到某些事件发生时,会将这些事件标记为就绪,并通过 I/O 多路复用机制返回给应用程序。
处理事件:程序读取 ready_events_,了解具体哪些事件已发生,然后进行相应的处理。
void Poller::UpdateChannel(Channel *ch) {int sockfd = ch->GetSocket()->GetFd();struct epoll_event ev {};ev.data.ptr = ch;if (ch->GetListenEvents() & Channel::READ_EVENT) {ev.events |= EPOLLIN | EPOLLPRI;}if (ch->GetListenEvents() & Channel::WRITE_EVENT) {ev.events |= EPOLLOUT;}if (ch->GetListenEvents() & Channel::ET) {ev.events |= EPOLLET;}if (!ch->GetExist()) {ErrorIf(epoll_ctl(fd_, EPOLL_CTL_ADD, sockfd, &ev) == -1, "epoll add error");ch->SetExist();} else {ErrorIf(epoll_ctl(fd_, EPOLL_CTL_MOD, sockfd, &ev) == -1, "epoll modify error");}
}
在macOS系统上的差别不大,主要是函数的使用不同,需要注意一下。
#ifdef OS_MACOSPoller::Poller() {fd_ = kqueue();ErrorIf(fd_ == -1, "kqueue create error");events_ = new struct kevent[MAX_EVENTS];memset(events_, 0, sizeof(*events_) * MAX_EVENTS);
}Poller::~Poller() {if (fd_ != -1) {close(fd_);}
}std::vector<Channel *> Poller::Poll(int timeout) {std::vector<Channel *> active_channels;struct timespec ts;memset(&ts, 0, sizeof(ts));if (timeout != -1) {ts.tv_sec = timeout / 1000;ts.tv_nsec = (timeout % 1000) * 1000 * 1000;}int nfds = 0;if (timeout == -1) {nfds = kevent(fd_, NULL, 0, events_, MAX_EVENTS, NULL);} else {nfds = kevent(fd_, NULL, 0, events_, MAX_EVENTS, &ts);}for (int i = 0; i < nfds; ++i) {Channel *ch = (Channel *)events_[i].udata;int events = events_[i].filter;if (events == EVFILT_READ) {ch->SetReadyEvents(ch->READ_EVENT | ch->ET);}if (events == EVFILT_WRITE) {ch->SetReadyEvents(ch->WRITE_EVENT | ch->ET);}active_channels.push_back(ch);}return active_channels;
}void Poller::UpdateChannel(Channel *ch) {struct kevent ev[2];memset(ev, 0, sizeof(*ev) * 2);int n = 0;int fd = ch->GetSocket()->GetFd();int op = EV_ADD;if (ch->GetListenEvents() & ch->ET) {op |= EV_CLEAR;}if (ch->GetListenEvents() & ch->READ_EVENT) {EV_SET(&ev[n++], fd, EVFILT_READ, op, 0, 0, ch);}if (ch->GetListenEvents() & ch->WRITE_EVENT) {EV_SET(&ev[n++], fd, EVFILT_WRITE, op, 0, 0, ch);}int r = kevent(fd_, ev, n, NULL, 0, NULL);ErrorIf(r == -1, "kqueue add event error");
}void Poller::DeleteChannel(Channel *ch) {struct kevent ev[2];int n = 0;int fd = ch->GetSocket()->GetFd();if (ch->GetListenEvents() & ch->READ_EVENT) {EV_SET(&ev[n++], fd, EVFILT_READ, EV_DELETE, 0, 0, ch);}if (ch->GetListenEvents() & ch->WRITE_EVENT) {EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, ch);}int r = kevent(fd_, ev, n, NULL, 0, NULL);ErrorIf(r == -1, "kqueue delete event error");
}
#endif
5、Channel(根据fd设置对应回调函数并调用,包括了事件标志位)
今天Channel将正式开始处理不同类型的事件,通过上面Poller的事件类型的设置将通过判断进行不同的工作,调用不同的函数。
声明部分多出了EnableWrite、SetWriteCallback方法,正式将写回调函数用上了。
class Socket;
class EventLoop;
class Channel {public:Channel(EventLoop *loop, Socket *socket);~Channel();DISALLOW_COPY_AND_MOVE(Channel);void HandleEvent();void EnableRead();void EnableWrite();Socket *GetSocket();int GetListenEvents();int GetReadyEvents();bool GetExist();void SetExist(bool in = true);void UseET();void SetReadyEvents(int ev);void SetReadCallback(std::function<void()> const &callback);void SetWriteCallback(std::function<void()> const &callback);static const int READ_EVENT; // NOLINTstatic const int WRITE_EVENT; // NOLINTstatic const int ET; // NOLINTprivate:EventLoop *loop_;Socket *socket_;int listen_events_{0};int ready_events_{0};bool exist_{false};std::function<void()> read_callback_;std::function<void()> write_callback_;
};
在实现中,从析构函数就发生了变化,现在由EventLoop进行事件的回收工作,而不是像之前一样将fd置为-1;
const int Channel::READ_EVENT = 1;
const int Channel::WRITE_EVENT = 2;
const int Channel::ET = 4;
Channel::Channel(EventLoop *loop, Socket *socket) : loop_(loop), socket_(socket) {}Channel::~Channel() { loop_->DeleteChannel(this); }
在调用回调函数的方法上没发生什么变化,只不过有了标志位不再需要之前更靠近底层的判断方式。
void Channel::HandleEvent() {if (ready_events_ & READ_EVENT) {read_callback_();}if (ready_events_ & WRITE_EVENT) {write_callback_();}
}
根据事件的标志位将树上的通道中的处理方式进行更新。
void Channel::EnableRead() {listen_events_ |= READ_EVENT;loop_->UpdateChannel(this);
}void Channel::EnableWrite() {listen_events_ |= WRITE_EVENT;loop_->UpdateChannel(this);
}void Channel::UseET() {listen_events_ |= ET;loop_->UpdateChannel(this);
}
获取设置一些基础信息,包括将就绪事件的标志位设置为对应处理方式。
Socket *Channel::GetSocket() { return socket_; }int Channel::GetListenEvents() { return listen_events_; }
int Channel::GetReadyEvents() { return ready_events_; }bool Channel::GetExist() { return exist_; }void Channel::SetExist(bool in) { exist_ = in; }void Channel::SetReadyEvents(int ev) {if (ev & READ_EVENT) {ready_events_ |= READ_EVENT;}if (ev & WRITE_EVENT) {ready_events_ |= WRITE_EVENT;}if (ev & ET) {ready_events_ |= ET;}
}void Channel::SetReadCallback(std::function<void()> const &callback) { read_callback_ = callback; }
void Channel::SetWriteCallback(std::function<void()> const &callback) { write_callback_ = callback; }
6、EventLoop(对树上的通道进行轮询)
在事件处理的类中多了对通道回收的操作并多了一个Quit方法。
#include "Macros.h"
#include <functional>class Poller;
class Channel;
class EventLoop {public:EventLoop();~EventLoop();DISALLOW_COPY_AND_MOVE(EventLoop);void Loop();void UpdateChannel(Channel *ch);void DeleteChannel(Channel *ch);void Quit();private:Poller *poller_{nullptr};bool quit_{false};
};
quit_标志位作用不清楚,而DeleteChannel是调用poller中的deleteChannel方法完成的,这是因为由于不同系统中poller的对于并发Epoll库方法不同。
7、Acceptor(创建连接)
8、Connection(连接上发生的事件)
声明中State枚举类型中将Handshaking转变为了Connecting(从握手变成连接中…),多了Send、SetOnMessageCallback、Business、OnMessage以及on_message_callback_回调函数。
class EventLoop;
class Socket;
class Channel;
class Buffer;
class Connection {public:enum State {Invalid = 1,Connecting,Connected,Closed,Failed,};Connection(EventLoop *loop, Socket *sock);~Connection();DISALLOW_COPY_AND_MOVE(Connection);void Read();void Write();void Send(std::string msg);void SetDeleteConnectionCallback(std::function<void(Socket *)> const &callback);void SetOnConnectCallback(std::function<void(Connection *)> const &callback);void SetOnMessageCallback(std::function<void(Connection *)> const &callback);void Business();State GetState();void Close();void SetSendBuffer(const char *str);Buffer *GetReadBuffer();const char *ReadBuffer();Buffer *GetSendBuffer();const char *SendBuffer();void GetlineSendBuffer();Socket *GetSocket();void OnConnect(std::function<void()> fn);void OnMessage(std::function<void()> fn);private:EventLoop *loop_;Socket *sock_;Channel *channel_{nullptr};State state_{State::Invalid};Buffer *read_buffer_{nullptr};Buffer *send_buffer_{nullptr};std::function<void(Socket *)> delete_connectioin_callback_;std::function<void(Connection *)> on_connect_callback_;std::function<void(Connection *)> on_message_callback_;void ReadNonBlocking();void WriteNonBlocking();void ReadBlocking();void WriteBlocking();
};
从实现上看看发生了那些改变,注意在ReadNonBlocking中开始在连接断开的情况下内部调用Close而在昨天的项目中是在测试程序中进行的调用。
在这多出来的实现中,为外部设置了写数据的接口,封装更为简单的对外接口。SetOnMessageCallback设置on_message_callback_ 并进行回显调用
void Connection::Send(std::string msg){SetSendBuffer(msg.c_str());Write();
}void Connection::Business(){Read();on_message_callback_(this);
}void Connection::SetOnMessageCallback(std::function<void(Connection *)> const &callback) {on_message_callback_ = callback;std::function<void()> bus = std::bind(&Connection::Business, this);channel_->SetReadCallback(bus);
}
9、Buffer(缓冲区,用以存放双工过程中发送的数据)
10、ThreadPool(线程池,用以管理复用线程)
11、服务器类
从声明来看是将回显、新建连接进行了封装
class EventLoop;
class Socket;
class Acceptor;
class Connection;
class ThreadPool;
class Server {private:EventLoop *main_reactor_;Acceptor *acceptor_;std::map<int, Connection *> connections_;std::vector<EventLoop *> sub_reactors_;ThreadPool *thread_pool_;std::function<void(Connection *)> on_connect_callback_;std::function<void(Connection *)> on_message_callback_;std::function<void(Connection *)> new_connect_callback_;public:explicit Server(EventLoop *loop);~Server();DISALLOW_COPY_AND_MOVE(Server);void NewConnection(Socket *sock);void DeleteConnection(Socket *sock);void OnConnect(std::function<void(Connection *)> fn);void OnMessage(std::function<void(Connection *)> fn);void NewConnect(std::function<void(Connection *)> fn);
};