目录
1. epoll的封装
结构体aeApiStae
创建epoll fd的封装
epoll_ctl的封装
epoll_wait的封装
2. 结构体aeFileEvent、aeFiredEvent、aeTimeEvent
结构体aeFileEvent
结构体aeFiredEvent
结构体aeTimeEvent
3. struct aeEventLoop
aeEventLoop相关的函数
1. 创建eventloop
2. 创建aeFileEvent
3. 删除aeFileEvent
4. 开始事件循环
4.什么时候设置回调函数的?
1. 绑定SleepProc
2. 绑定服务端的读事件回调函数
3. 绑定客户端的读事件回调函数
4. 绑定客户端的写事件回调函数
5.绑定时间事件
5. 有了IO多路复用,为什么还需要reactor模式?
IO多路复用与事件驱动
IO同步与异步的判断
reactor模式的优点
6. 通过Redis网络部分,学到如何实现reactor模式
上一章节讲解了Redis的网络交互流程,没有对关于网络部分的结构体有具体的讲解,所以该文章就主要讲解Redis网络部分使用的结构体。
1. epoll的封装
epoll有三个函数调用:
- epoll_create:创建epoll实例,返回一个epoll fd,即是用于管理待检测的文件描述符的集合。
- epoll_ctl:管理红黑树实例上的节点,可以进行添加、修改、删除操作。
- epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout)。等待就绪的fd,并返回就绪的fd,存储在events中,以及个数。
结构体aeApiStae
- 其内部有变量epfd,即是调用epoll_create创建出来fd
- *events即是就绪的fd存储的位置(数组)
typedef struct aeApiState {int epfd;struct epoll_event *events;
} aeApiState;
创建epoll fd的封装
主要流程:
- 申请内存空间给结构体aeApieState,申请内存空间给成员变量events。
- 使用epoll_create创建出epfd
//现在还没了解到aeEventLoop,可以先不用管aeEventLoop相关的,不影响看代码。
static int aeApiCreate(aeEventLoop *eventLoop) {aeApiState *state = zmalloc(sizeof(aeApiState));if (!state) return -1;state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);if (!state->events) {zfree(state);return -1;}state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */if (state->epfd == -1) {zfree(state->events);zfree(state);return -1;}anetCloexec(state->epfd);eventLoop->apidata = state;return 0;
}//重置aeApiState的events的大小,即是重置存放就绪fd的空间大小
static int aeApiResize(aeEventLoop *eventLoop, int setsize) {aeApiState *state = eventLoop->apidata;state->events = zrealloc(state->events, sizeof(struct epoll_event)*setsize);return 0;
}//释放结构体aeApiState
static void aeApiFree(aeEventLoop *eventLoop) {aeApiState *state = eventLoop->apidata;close(state->epfd);zfree(state->events);zfree(state);
}
epoll_ctl的封装
主要是两个:添加和删除。
- 添加的就是使用epoll_ctl(epfd,EPOLL_CTL_ADD ,....)
- 删除时使用epoll_ctl(epfd,EPOLL_CTL_MOD,....)
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {aeApiState *state = eventLoop->apidata;struct epoll_event ee = {0}; /* avoid valgrind warning *//* If the fd was already monitored for some event, we need a MOD* operation. Otherwise we need an ADD operation. */int op = eventLoop->events[fd].mask == AE_NONE ?EPOLL_CTL_ADD : EPOLL_CTL_MOD;ee.events = 0;//得到要关注的事件类型maskmask |= eventLoop->events[fd].mask; /* Merge old events */if (mask & AE_READABLE) ee.events |= EPOLLIN;if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;ee.data.fd = fd;if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;return 0;
}static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {aeApiState *state = eventLoop->apidata;struct epoll_event ee = {0}; /* avoid valgrind warning */int mask = eventLoop->events[fd].mask & (~delmask);ee.events = 0;if (mask & AE_READABLE) ee.events |= EPOLLIN;if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;ee.data.fd = fd;if (mask != AE_NONE) {epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee);} else {/* Note, Kernel < 2.6.9 requires a non null event pointer even for* EPOLL_CTL_DEL. */epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee);}
}
epoll_wait的封装
即是调用epoll_wait(...)。
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {aeApiState *state = eventLoop->apidata;int retval, numevents = 0;//eventLoop->setsize就是state->evnets数组的元素个数retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,tvp ? (tvp->tv_sec*1000 + (tvp->tv_usec + 999)/1000) : -1);if (retval > 0) {int j;numevents = retval;for (j = 0; j < numevents; j++) { //遍历就绪的fdint mask = 0; //mask就是该fd就绪的事件struct epoll_event *e = state->events+j;if (e->events & EPOLLIN) mask |= AE_READABLE;if (e->events & EPOLLOUT) mask |= AE_WRITABLE;if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;//把就绪的fd存储到 eventLoop->fired[j]中//可以先不用关注eventLoop,这里就是把就绪的fd存储到另一地方嘛eventLoop->fired[j].fd = e->data.fd;eventLoop->fired[j].mask = mask;}}return numevents;
}
2. 结构体aeFileEvent、aeFiredEvent、aeTimeEvent
结构体aeFileEvent
一个客户端建立连接后,会把该客户端的fd添加到epoll上,并监听其关注的事件。假如当前fd需要监听读事件,那该fd读就绪,那对应的读事件函数就会被调用。
所以,可以把fd关注的事件和fd就绪会执行的回调函数封装成一个结构体,Redis叫其是aeFileEvent。
- mask就是fd关注的事件类型
- rfileProc是读就绪会执行的回调函数,wfileProc就是写就绪会执行的回调函数。这两个都需要后续去定义和注册
- clientData就是回调函数中的参数
/* File event structure */
typedef struct aeFileEvent {int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */aeFileProc *rfileProc;aeFileProc *wfileProc;void *clientData;
} aeFileEvent;//回调函数的类型
typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);//前面的形式可能不好看出,用c++11写的话,如下
//using aeFileProc=std::function<void(aeEventLoop* eventLoop,int fd,void* clientData,int mask)>;
所以,客户端就会有自己的aeFileEvent,即是一个客户端就对应一个aeFileEvent。 当读或写就绪时,就会调用对应的函数。
其实,服务端也是对应一个aeFileEvent,因为也要监听服务端的读事件。当客户端来连接,服务端读事件就绪,就会调用服务端的读事件回调函数进行accept。
所以就会有创建aeFileEvent,每个客户端或者一个服务端都会使用函数aeCreateFileEvent。(后续会详讲,先留个印象)
看到这里读者可能会有疑惑,不是说要对某个fd进行注册监听的吗,怎么该结构体没有fd的呢?
那是因为aeFileEvent是存储在数组内。数组的下标就是该aeFileEvent对应的fd,也即是客户端对应的fd。(后续从代码中可以看出的)
结构体aeFiredEvent
如名字一样,其就是就绪事件。一个就绪事件,就肯定是可以从中知道是哪个fd就绪,还有知道是哪种类型事件就绪。
- fd表示epoll_wait返回的就绪fd
- mask表示epol_wait返回的事件类型
epoll_wait返回的一个就绪fd就对应一个aeFireEvent。
aeApiPoll就是把epoll_wait返回的就绪fd和事件类型 逐个赋值给aeFireEvent。
/* A fired event */
typedef struct aeFiredEvent {int fd;int mask;
} aeFiredEvent;//把epoll_wait返回的就绪fd和事件赋值给fired[j]
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {aeApiState *state = eventLoop->apidata;int retval, numevents = 0;//eventLoop->setsize就是state->evnets数组的元素个数retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,tvp ? (tvp->tv_sec*1000 + (tvp->tv_usec + 999)/1000) : -1);if (retval > 0) {numevents = retval;for (int j = 0; j < numevents; j++) { //遍历就绪的fdint mask = 0; //mask就是该fd就绪的事件struct epoll_event *e = state->events+j;if (e->events & EPOLLIN) mask |= AE_READABLE;if (e->events & EPOLLOUT) mask |= AE_WRITABLE;if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;//把就绪的fd存储到 eventLoop->fired[j]中//可以先不用关注eventLoop,这里就是把就绪的fd存储到另一地方嘛eventLoop->fired[j].fd = e->data.fd;eventLoop->fired[j].mask = mask;}}return numevents;
}
结构体aeTimeEvent
其是时间事件,可以认为是个定时器,定时会执行给定的函数。
从代码可以看到其是个双向链表。
- 每个时间事件都有一个事件id,aeEventLoop中的timeEventNextId是下一个要注册的时间事件id。
- when_sec、when_ms是时间事件(定时器任务)的发生时间
- timeProc是时间事件的处理回调函数。说明:aeTimeProc 需要返回一个 int 值,代表下次该超时事件触发的时间间隔。如果返回 - 1,则说明超时时间不需要再触发了,标记为删除即可
- finalizerProc是时间事件要删除时的处理函数
//时间事件结构
typedef struct aeTimeEvent {long long id; //时间事件的唯一标识符,自增//事件的到达时间(即是执行时间)long when_sec; /* seconds */long when_ms; /* milliseconds *///事件处理函数 (到期执行的回调函数)aeTimeProc *timeProc;//事件释放函数 (回调函数)aeEventFinalizerProc *finalizerProc;void *clientData; //多路复用库的私有数据//双向链表struct aeTimeEvent *prev; struct aeTimeEvent *next;int refcount; //以防止计时器事件在递归时间事件调用中释放
} aeTimeEvent;//时间事件回调函数类型
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);//删除定时事件的回调函数类型
typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);
3. struct aeEventLoop
EventLoop是个事件循环,其主要功能就是实现 while(1){ epoll_wait(....) }。所以说是个事件循环,这也对epoll再进一步封装。
aeEventLoop是Reactor模型的具体抽象,把网络读写事件和时间事件(定时器任务)可以统一处理。
//ae.h
// 事件循环
typedef struct aeEventLoop {int maxfd; //已经注册的文件描述符的最大值int setsize; //setsize是能注册的fd的最大值(epoll_wait函数中的参数maxevents)long long timeEventNextId; //下一个要注册的时间事件idtime_t lastTime; //最后一次执行时间事件的时间aeFileEvent *events; //是数组,已注册的文件事件 (就是IO event)aeFiredEvent *fired; //数组,已就绪的文件事件aeTimeEvent *timeEventHead; //定时器链表的头结点(头结点不一定是最早超时的任务)int stop; //eventLoop的开关void *apidata; //具体的IO多路复用实现,即是前面讲的结构体aeApiState变量。aeBeforeSleepProc *beforesleep;//在处理事件前要执行的回调函数(即是在执行epoll_wait()之前)aeBeforeSleepProc *aftersleep;//在处理事件后要执行的回调函数(即是在执行epoll_wait()之后)int flags; //设置的标识位
} aeEventLoop;//进入循环等待之前的回调函数类型
typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop);
从上面的注释知道 aeEventLoop的成员变量的用处。所以这里就只重点讲解两个变量*events 和 *fired。
- *event就是个数组,setsize是数组的元素个数。events的元素是aeFileEvent。前面说了一个客户端对应一个aeFileEvent。所以,events就是存储需要被监听的客户端。events数组下标是aeFileEvent中的fd。
- fired数组是读写已就绪的 网络事件 数组。数组元素是记录了就绪的fd及其epoll_wait返回的事件类型。
aeEventLoop相关的函数
1. 创建eventloop
aeEventLoop *aeCreateEventLoop(int setsize) {aeEventLoop *eventLoop;int i;//分配内存给eventLoopeventLoop = zmalloc(sizeof(*eventLoop))//分配内存给eventLoop中的events 和 fired 数组eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);//设置其一些成员变量eventLoop->setsize = setsize;eventLoop->timeEventHead = NULL;eventLoop->timeEventNextId = 0;eventLoop->stop = 0;eventLoop->maxfd = -1;eventLoop->beforesleep = NULL;eventLoop->aftersleep = NULL;eventLoop->flags = 0;//创建epoll fd,即是调用epoll_create, 内部把创建出来的apiState赋值给eventLoop->apidataaeApiCreate(eventLoop)//当前每个aeFileEvent都不关注任何事件for (i = 0; i < setsize; i++)eventLoop->events[i].mask = AE_NONE;return eventLoop;//省略了些内存申请失败的处理................
}
2. 创建aeFileEvent
内部会调用aeApiAddEvent。
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,aeFileProc *proc, void *clientData)
{if (fd >= eventLoop->setsize) { //判断客户端fd是否大于events数组元素个数errno = ERANGE;return AE_ERR; //若是大于,返回错误}//从events数组中获取一个元素aeFileEvent *fe = &eventLoop->events[fd]; //从这可以看出events的下标就是客户端的fdif (aeApiAddEvent(eventLoop, fd, mask) == -1) //添加fd到epoll上,并关注事件maskreturn AE_ERR;fe->mask |= mask; //设置结构体aeFileEvent关注的事件//设置回调函数if (mask & AE_READABLE) fe->rfileProc = proc;if (mask & AE_WRITABLE) fe->wfileProc = proc;fe->clientData = clientData; //设置回调函数的参数//更新已监听的fd的最大值if (fd > eventLoop->maxfd)eventLoop->maxfd = fd;return AE_OK;
}//创建时间事件,即是定时器
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,aeTimeProc *proc, void *clientData,aeEventFinalizerProc *finalizerProc)
{long long id = eventLoop->timeEventNextId++;aeTimeEvent *te;te = zmalloc(sizeof(*te));if (te == NULL) return AE_ERR;te->id = id;te->when = getMonotonicUs() + milliseconds * 1000;//设置定时器的回调函数te->timeProc = proc;te->finalizerProc = finalizerProc;te->clientData = clientData; //设置回调函数的参数te->prev = NULL;te->next = eventLoop->timeEventHead; //从这句代码可以看出其是往头部插入的te->refcount = 0;if (te->next)te->next->prev = te;eventLoop->timeEventHead = te; //更新链表头部为新插入的节点return id;
}
3. 删除aeFileEvent
通过传入的fd找到evnets数组中的元素,之后调用aeApiDelEvent进行删除。之后吧该位置的
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
{if (fd >= eventLoop->setsize) return;//通过传入的fd找到events数组中的元素aeFileEvent *fe = &eventLoop->events[fd];if (fe->mask == AE_NONE) return;/* We want to always remove AE_BARRIER if set when AE_WRITABLE* is removed. */if (mask & AE_WRITABLE) mask |= AE_BARRIER;aeApiDelEvent(eventLoop, fd, mask); //进行删除fe->mask = fe->mask & (~mask); //取消监听mask事件if (fd == eventLoop->maxfd && fe->mask == AE_NONE) {/* Update the max fd */ int j;for (j = eventLoop->maxfd-1; j >= 0; j--)if (eventLoop->events[j].mask != AE_NONE) break;eventLoop->maxfd = j; //更新已注册的fd的最大值}
}
4. 开始事件循环
aeMain就是一个while()循环,循环内部是aeProcessEvents函数。
void aeMain(aeEventLoop *eventLoop) {eventLoop->stop = 0;while (!eventLoop->stop) {aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_BEFORE_SLEEP|AE_CALL_AFTER_SLEEP);}
}
aeProcessEvents的主要步骤:
回调函数beforsleep和aftersleep可以先省略不关注的。因为目前其还没有用途,先知道有这两个回调函数就行。
- 计算epoll_wait()需要的阻塞时间。
- 执行beforsleep
- epoll_wait等待事件发生。
- 处理发生的IO事件,根据发生的事件类型来调用对应的回调函数:
- 若是AE_READABLE类型调用rfileProc,
- 若是AE_WRITABLE类型调用wfileProc。
- 处理时间事件。若该时间事件是周期性的,执行完后会再添加到时间事件链表的。
//为了能更好理解读懂,该函数只展示了主体流程,有些细节是没有展示
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{int processed = 0, numevents;if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))){struct timeval tv, *tvp;//省略了epoll_wait函数参数timeout的计算过程,其不是框架的重点,后续可以再详细了解...............................//执行befroesleep回调函数,这是在初始化server时绑定的,可以先不关注if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)eventLoop->beforesleep(eventLoop);//即是调用epoll_wait, 等待就绪的fdnumevents = aeApiPoll(eventLoop, tvp);...................//遍历执行已就绪的fd的回调函数for (int j = 0; j < numevents; j++) {//在aeApiPoll中把就绪的fd和事件赋值给了fired[j]aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];//得到就绪的fd和事件类型int mask = eventLoop->fired[j].mask;int fd = eventLoop->fired[j].fd;int fired = 0; /* Number of events fired for current fd. *///下面就执行对应的回调函数if (!invert && fe->mask & mask & AE_READABLE) { //读就绪,执行读回调函数fe->rfileProc(eventLoop,fd,fe->clientData,mask);fired++;fe = &eventLoop->events[fd]; /* Refresh in case of resize. */}if (fe->mask & mask & AE_WRITABLE) { //写就绪,执行写回调函数if (!fired || fe->wfileProc != fe->rfileProc) {fe->wfileProc(eventLoop,fd,fe->clientData,mask);fired++;}}......................processed++;}}if (flags & AE_TIME_EVENTS) //flags有需要,就执行时间事件,即是执行定时器任务processed += processTimeEvents(eventLoop);return processed; /* return the number of processed file/time events */
}
4.什么时候设置回调函数的?
看aeProcessEvents时候可能会有疑惑,调用fe->rfileProc,但是都不知道该函数是什么内容的。
因为这是通过绑定的。就是把一个函数赋值给fe->rfileProc。
从main()入手。
//server.c
int main(int argc, char **argv) {.......................initServer();
}//initServer函数中就有绑定回调函数的
void initServer(void) {//省略了很多其他部分的内容//socket(...),bind(...),listen(...),创建服务端的流程................................................................aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);/* Create an event handler for accepting new connections in TCP sockets. */createSocketAcceptHandler(&server.ipfd, acceptTcpHandler)/* Register before and after sleep handlers (note this needs to be done* before loading persistence since it is used by processEventsWhileBlocked. */aeSetBeforeSleepProc(server.el,beforeSleep);aeSetAfterSleepProc(server.el,afterSleep);................................
}
1. 绑定SleepProc
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) {eventLoop->beforesleep = beforesleep;
}void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep) {eventLoop->aftersleep = aftersleep;
}
在服务器初始化时候就绑定了这两个函数,那么在运行aeProcessEvents时候,就会调用函数beforesleep和aftersleep。
2. 绑定服务端的读事件回调函数
就是在createSocketAcceptHandler,其就是调用aeCreateFileEvent。sfd->fd[j]就是服务端fd,AE_READABLE就是需要监听的事件,即是监听服务端的读事件。回调函数是accept_handler,即是把accept_handler赋值给fe->rfileProc。
从函数aeProcessEvents,会返回就绪的fd和就绪的事件类型。当服务端fd返回的时候,是读事件就绪,就会调用fe->rfileProc。而这时rfileProc就是accpet_handler,即是会调用accpet_handler。
//sfd是个数组,我们看源码的话,目前就当其是一个元素的即可,就不用使用for
//就把这个函数当做就是使用aeCreateFileEvent即可。
int createSocketAcceptHandler(socketFds *sfd, aeFileProc *accept_handler) {for (int j = 0; j < sfd->count; j++) {aeCreateFileEvent(server.el, sfd->fd[j], AE_READABLE, accept_handler,NULL)}//错误处理没有展示return C_OK;
}int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,aeFileProc *proc, void *clientData)
{................fe->mask |= mask;if (mask & AE_READABLE) fe->rfileProc = proc;if (mask & AE_WRITABLE) fe->wfileProc = proc;fe->clientData = clientData;.....................
}
回调函数acceptTcpHandler很明显会调用accpet去对客户建立连接的。
3. 绑定客户端的读事件回调函数
继续跟着函数acceptTcpHandler。
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {int cport, cfd, max = MAX_ACCEPTS_PER_CALL;while(max--) {//调用acceptcfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);............................acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);}
}static void acceptCommonHandler(connection *conn, int flags, char *ip) {................./* Create connection and client */client *c = createClient(conn))/* Last chance to keep flags */c->flags |= flags;
}client *createClient(connection *conn) {client *c = zmalloc(sizeof(client));if (conn) {connSetReadHandler(conn, readQueryFromClient); //设置客户端的读回调函数connSetPrivateData(conn, c);}...................if (conn) linkClient(c); //把该客户端添加到服务器server.client链表中保存
}static inline int connSetReadHandler(connection *conn, ConnectionCallbackFunc func) {return conn->type->set_read_handler(conn, func);
}
set_read_handler也是个回调函数,其设置成了是函数connSocketSetReadHandler。
这里set_read_handler也是个回调函数是因为有两种类型的connection,Redis是把一个客户端封装成一个connnection(该结构体后续会讲解)。
//当前就认为conn->type->ae_handler是参数func即可,内部有很多兜兜转转
static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {if (func == conn->read_handler) return C_OK;conn->read_handler = func;if (!conn->read_handler)aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);elseif (aeCreateFileEvent(server.el,conn->fd,AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;return C_OK;
}
这样就把readQueryFromClient设置给了fe->rfileProc。当epoll_wait返回就绪的fd和事件,若该fd是客户端,也是读事件,那就会执行fe->rfileProc,即是执行readQueryFromClient。
4. 绑定客户端的写事件回调函数
这个是在绑定的beforeSleep函数中。
void beforeSleep(struct aeEventLoop *eventLoop) {........................../* Handle writes with pending output buffers. */handleClientsWithPendingWritesUsingThreads();.............................
}int handleClientsWithPendingWritesUsingThreads(void) {...................listRewind(server.clients_pending_write,&li);while((ln = listNext(&li))) {client *c = listNodeValue(ln);//如果缓冲区中还有回复客户端的数据,就需要设置写回调函数,当epoll_wait返回客户端fd的写事件时候,就会执行sendReplyToClientif (clientHasPendingReplies(c) &&connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR){}}......................
}static inline int connSetWriteHandler(connection *conn, ConnectionCallbackFunc func) {return conn->type->set_write_handler(conn, func, 0);
}
和set_read_handler一样,set_write_handler也是后面设置的。该函数是connSocketSetWriteHandler。
static int connSocketSetWriteHandler(connection *conn, ConnectionCallbackFunc func, int barrier) {if (func == conn->write_handler) return C_OK;conn->write_handler = func;if (barrier)conn->flags |= CONN_FLAG_WRITE_BARRIER;elseconn->flags &= ~CONN_FLAG_WRITE_BARRIER;if (!conn->write_handler)aeDeleteFileEvent(server.el,conn->fd,AE_WRITABLE);elseif (aeCreateFileEvent(server.el,conn->fd,AE_WRITABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;return C_OK;
}
这样就把 sendReplyToClient设置给了fe->wfileProc。当epoll_wait返回客户端fd的写事件时候,就会调用fe->wfileProc,即是执行sendReplyToClient。
5.绑定时间事件
使用aeCreateTimeEvent来绑定时间事件的回调函数。在aeProcessEvents函数内部会调用processTimeEvents去执行定时器任务。
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,aeTimeProc *proc, void *clientData,aeEventFinalizerProc *finalizerProc)
{long long id = eventLoop->timeEventNextId++;aeTimeEvent *te;te = zmalloc(sizeof(*te));if (te == NULL) return AE_ERR;te->id = id;te->when = getMonotonicUs() + milliseconds * 1000;//设置事件的回调函数te->timeProc = proc;te->finalizerProc = finalizerProc;te->clientData = clientData;te->prev = NULL;te->next = eventLoop->timeEventHead;te->refcount = 0;if (te->next)te->next->prev = te;eventLoop->timeEventHead = te;return id;
}
5. 有了IO多路复用,为什么还需要reactor模式?
IO多路复用与事件驱动
首先要明确一点,reactor模式就是基于IO多路复用的。事件驱动也是IO多路复用的,不是说使用了reactor模式才是使用了事件驱动。
以事件为连接点,当有IO事件准备就绪时,就会通知用户,并且告知用户返回的是什么类型的事件,进而用户可以执行相对应的任务。这样就不用在IO等待上浪费资源,这便是事件驱动
的核心思想。
比如你点了两份外卖,外卖A,外卖B。之后你无需时刻打电话去问外卖到了没。外卖到的时候,外卖员会打电话通知你。这中途你就可以做自己的事,不用纯纯等待。还有可以知道是外卖A到了还是外卖B到了,外卖员会告知是哪个外卖到的。
这个就是事件驱动。IO事件准备就绪时,会自动通知用户,并会告知其事件类型。
所以应该是,IO多路复用 + 回调机制 构成了 reactor模式。
IO同步与异步的判断
还有reactor模式是同步的。因为其是使用IO多路复用的,而IO多路复用是同步的。
IO操作是同步还是异步,关键看数据在内核空间与用户空间的拷贝过程(数据读写的IO操作)
reactor模式的优点
网上很多说其可以很好处理高并发,但是我觉得IO多路复用也可以处理的。
还有说可扩展性,通过增加Reactor实例个数来充分利用CPU资源;那通过在其他线程再创建epoll也行的。
所以,我觉得一个很大的优势是:
- 应用处理请求的逻辑,与事件分发框架完全分离,即是解耦,有很高的复用性
即写应用处理时,只需关注如何处理业务逻辑。Reactor框架本身与具体事件处理逻辑无关。
假如是把reactor模式做成一个网络库给用户使用。那用户就只需要关注处理请求的逻辑即可。该网络库对外开放setCallback函数。
假设,用户写服务器服务,收到客户端发来的数据(一串数字),想对数字做加法 或者想对数字做乘法都行。只要使用setCallback函数设置好处理请求的逻辑就行。这就是应用处理请求的逻辑,与事件分发框架完全分离,这是很方便的。
假如该框架是不对外开放的(就是用户不能使用setCallback),像Redis一样,为什么还需要使用reactor模式,为什么不可以只用IO多路复用,不用回调机制呢?
我认为,也是解耦的好处。Redis编写人员需要改变处理请求的逻辑时候,就只改变某个函数即可,不需要深入到epoll框架去改变,这就是很方便的。
6. 通过Redis网络部分,学到如何实现reactor模式
- 先对epoll进行封装,方便后续使用
- 需要对fd进行监听,并注册需要关注的事件类型,并注册关注的事件类型就绪时,需要执行的函数。所以,可以把fd,关注的事件类型,事件就绪执行的函数三者封装在一个结构体aeFileEvent内。
- 我们简单使用epoll时候,肯定是会写 while(1){ epoll_wait(....); }。这就是个事件循环,所以可以再封装个结构体EventLoop, 其也是对epoll的再次封装,即是会调用前面封装好的epoll的函数。EventLoop内部应该有存储aeFileEvent的数组。
- 在调用epoll_wait时,会返回就绪的fd和事件类型,把返回的(就绪fd和事件类型)数组 赋值给EventLoop的aeFileEvent数组。之后就是使用该aeFileEvent,根据事件类型执行对应的回调函数
- 提供setCallback函数,设置好对应的回调函数,即是把需要执行的函数 赋值给 aeFileEvent中的 事件就绪执行函数。
Redis的reactor模式没有使用到结构体event_data_t的指针变量ptr。
若是想逐步实现recactor模式,这里推荐下本人写的0.仿造muduo,实现linux服务器开发思路
该文章专栏有详细的逐步实现的reactor模式的代码流程。欢迎查看。