Redis网络相关的结构体 和 reactor模式

目录

1. epoll的封装

结构体aeApiStae

 创建epoll fd的封装

epoll_ctl的封装

epoll_wait的封装

2. 结构体aeFileEvent、aeFiredEvent、aeTimeEvent

结构体aeFileEvent

结构体aeFiredEvent 

结构体aeTimeEvent

3. struct aeEventLoop

 aeEventLoop相关的函数

1. 创建eventloop

2. 创建aeFileEvent

3. 删除aeFileEvent

4. 开始事件循环

4.什么时候设置回调函数的?

1. 绑定SleepProc

2. 绑定服务端的读事件回调函数

3. 绑定客户端的读事件回调函数

4. 绑定客户端的写事件回调函数

5.绑定时间事件

5. 有了IO多路复用,为什么还需要reactor模式?

IO多路复用与事件驱动

IO同步与异步的判断

reactor模式的优点

6. 通过Redis网络部分,学到如何实现reactor模式


上一章节讲解了Redis的网络交互流程,没有对关于网络部分的结构体有具体的讲解,所以该文章就主要讲解Redis网络部分使用的结构体。

1. epoll的封装

epoll有三个函数调用:

  • epoll_create:创建epoll实例,返回一个epoll fd,即是用于管理待检测的文件描述符的集合。
  • epoll_ctl:管理红黑树实例上的节点,可以进行添加、修改、删除操作。
  • epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout)。等待就绪的fd,并返回就绪的fd,存储在events中,以及个数。

结构体aeApiStae

  • 其内部有变量epfd,即是调用epoll_create创建出来fd
  • *events即是就绪的fd存储的位置(数组)
typedef struct aeApiState {int epfd;struct epoll_event *events;
} aeApiState;

 创建epoll fd的封装

主要流程:

  1. 申请内存空间给结构体aeApieState,申请内存空间给成员变量events。
  2. 使用epoll_create创建出epfd
//现在还没了解到aeEventLoop,可以先不用管aeEventLoop相关的,不影响看代码。
static int aeApiCreate(aeEventLoop *eventLoop) {aeApiState *state = zmalloc(sizeof(aeApiState));if (!state) return -1;state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);if (!state->events) {zfree(state);return -1;}state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */if (state->epfd == -1) {zfree(state->events);zfree(state);return -1;}anetCloexec(state->epfd);eventLoop->apidata = state;return 0;
}//重置aeApiState的events的大小,即是重置存放就绪fd的空间大小
static int aeApiResize(aeEventLoop *eventLoop, int setsize) {aeApiState *state = eventLoop->apidata;state->events = zrealloc(state->events, sizeof(struct epoll_event)*setsize);return 0;
}//释放结构体aeApiState
static void aeApiFree(aeEventLoop *eventLoop) {aeApiState *state = eventLoop->apidata;close(state->epfd);zfree(state->events);zfree(state);
}

epoll_ctl的封装

主要是两个:添加和删除。

  • 添加的就是使用epoll_ctl(epfd,EPOLL_CTL_ADD ,....)
  • 删除时使用epoll_ctl(epfd,EPOLL_CTL_MOD,....)
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {aeApiState *state = eventLoop->apidata;struct epoll_event ee = {0}; /* avoid valgrind warning *//* If the fd was already monitored for some event, we need a MOD* operation. Otherwise we need an ADD operation. */int op = eventLoop->events[fd].mask == AE_NONE ?EPOLL_CTL_ADD : EPOLL_CTL_MOD;ee.events = 0;//得到要关注的事件类型maskmask |= eventLoop->events[fd].mask; /* Merge old events */if (mask & AE_READABLE) ee.events |= EPOLLIN;if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;ee.data.fd = fd;if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;return 0;
}static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {aeApiState *state = eventLoop->apidata;struct epoll_event ee = {0}; /* avoid valgrind warning */int mask = eventLoop->events[fd].mask & (~delmask);ee.events = 0;if (mask & AE_READABLE) ee.events |= EPOLLIN;if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;ee.data.fd = fd;if (mask != AE_NONE) {epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee);} else {/* Note, Kernel < 2.6.9 requires a non null event pointer even for* EPOLL_CTL_DEL. */epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee);}
}

epoll_wait的封装

即是调用epoll_wait(...)。

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {aeApiState *state = eventLoop->apidata;int retval, numevents = 0;//eventLoop->setsize就是state->evnets数组的元素个数retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,tvp ? (tvp->tv_sec*1000 + (tvp->tv_usec + 999)/1000) : -1);if (retval > 0) {int j;numevents = retval;for (j = 0; j < numevents; j++) {    //遍历就绪的fdint mask = 0;    //mask就是该fd就绪的事件struct epoll_event *e = state->events+j;if (e->events & EPOLLIN) mask |= AE_READABLE;if (e->events & EPOLLOUT) mask |= AE_WRITABLE;if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;//把就绪的fd存储到 eventLoop->fired[j]中//可以先不用关注eventLoop,这里就是把就绪的fd存储到另一地方嘛eventLoop->fired[j].fd = e->data.fd;eventLoop->fired[j].mask = mask;}}return numevents;
}

2. 结构体aeFileEvent、aeFiredEvent、aeTimeEvent

结构体aeFileEvent

一个客户端建立连接后,会把该客户端的fd添加到epoll上,并监听其关注的事件。假如当前fd需要监听读事件,那该fd读就绪,那对应的读事件函数就会被调用。

所以,可以把fd关注的事件fd就绪会执行的回调函数封装成一个结构体,Redis叫其是aeFileEvent。

  • mask就是fd关注的事件类型
  • rfileProc是读就绪会执行的回调函数wfileProc就是写就绪会执行的回调函数。这两个都需要后续去定义和注册
  • clientData就是回调函数中的参数
/* File event structure */
typedef struct aeFileEvent {int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */aeFileProc *rfileProc;aeFileProc *wfileProc;void *clientData;
} aeFileEvent;//回调函数的类型
typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);//前面的形式可能不好看出,用c++11写的话,如下
//using aeFileProc=std::function<void(aeEventLoop* eventLoop,int fd,void* clientData,int mask)>;

 所以,客户端就会有自己的aeFileEvent,即是一个客户端就对应一个aeFileEvent。 当读或写就绪时,就会调用对应的函数

其实,服务端也是对应一个aeFileEvent,因为也要监听服务端的读事件。当客户端来连接,服务端读事件就绪,就会调用服务端的读事件回调函数进行accept。

所以就会有创建aeFileEvent,每个客户端或者一个服务端都会使用函数aeCreateFileEvent。(后续会详讲,先留个印象)

看到这里读者可能会有疑惑,不是说要对某个fd进行注册监听的吗,怎么该结构体没有fd的呢

那是因为aeFileEvent是存储在数组内。数组的下标就是该aeFileEvent对应的fd,也即是客户端对应的fd。(后续从代码中可以看出的)

结构体aeFiredEvent 

如名字一样,其就是就绪事件。一个就绪事件,就肯定是可以从中知道是哪个fd就绪,还有知道是哪种类型事件就绪。

  • fd表示epoll_wait返回的就绪fd
  • mask表示epol_wait返回的事件类型

epoll_wait返回的一个就绪fd就对应一个aeFireEvent。 

aeApiPoll就是把epoll_wait返回的就绪fd和事件类型 逐个赋值给aeFireEvent。

/* A fired event */
typedef struct aeFiredEvent {int fd;int mask;
} aeFiredEvent;//把epoll_wait返回的就绪fd和事件赋值给fired[j]
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {aeApiState *state = eventLoop->apidata;int retval, numevents = 0;//eventLoop->setsize就是state->evnets数组的元素个数retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,tvp ? (tvp->tv_sec*1000 + (tvp->tv_usec + 999)/1000) : -1);if (retval > 0) {numevents = retval;for (int j = 0; j < numevents; j++) {    //遍历就绪的fdint mask = 0;    //mask就是该fd就绪的事件struct epoll_event *e = state->events+j;if (e->events & EPOLLIN) mask |= AE_READABLE;if (e->events & EPOLLOUT) mask |= AE_WRITABLE;if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;//把就绪的fd存储到 eventLoop->fired[j]中//可以先不用关注eventLoop,这里就是把就绪的fd存储到另一地方嘛eventLoop->fired[j].fd = e->data.fd;eventLoop->fired[j].mask = mask;}}return numevents;
}

结构体aeTimeEvent

其是时间事件,可以认为是个定时器,定时会执行给定的函数。

从代码可以看到其是个双向链表。 

  • 每个时间事件都有一个事件id,aeEventLoop中的timeEventNextId是下一个要注册的时间事件id。
  • when_sec、when_ms是时间事件(定时器任务)的发生时间
  • timeProc是时间事件的处理回调函数。说明:aeTimeProc 需要返回一个 int 值,代表下次该超时事件触发的时间间隔。如果返回 - 1,则说明超时时间不需要再触发了,标记为删除即可
  • finalizerProc是时间事件要删除时的处理函数
//时间事件结构
typedef struct aeTimeEvent {long long id; //时间事件的唯一标识符,自增//事件的到达时间(即是执行时间)long when_sec; /* seconds */long when_ms; /* milliseconds *///事件处理函数 (到期执行的回调函数)aeTimeProc *timeProc;//事件释放函数 (回调函数)aeEventFinalizerProc *finalizerProc;void *clientData;               //多路复用库的私有数据//双向链表struct aeTimeEvent *prev;   struct aeTimeEvent *next;int refcount;     //以防止计时器事件在递归时间事件调用中释放
} aeTimeEvent;//时间事件回调函数类型
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);//删除定时事件的回调函数类型
typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);

3. struct aeEventLoop

EventLoop是个事件循环,其主要功能就是实现 while(1){ epoll_wait(....) }。所以说是个事件循环,这也对epoll再进一步封装。

aeEventLoop是Reactor模型的具体抽象,把网络读写事件时间事件(定时器任务)可以统一处理。

//ae.h
// 事件循环
typedef struct aeEventLoop {int maxfd;       //已经注册的文件描述符的最大值int setsize;     //setsize是能注册的fd的最大值(epoll_wait函数中的参数maxevents)long long timeEventNextId;    //下一个要注册的时间事件idtime_t lastTime;      //最后一次执行时间事件的时间aeFileEvent *events;     //是数组,已注册的文件事件 (就是IO event)aeFiredEvent *fired;     //数组,已就绪的文件事件aeTimeEvent *timeEventHead;    //定时器链表的头结点(头结点不一定是最早超时的任务)int stop;    //eventLoop的开关void *apidata; //具体的IO多路复用实现,即是前面讲的结构体aeApiState变量。aeBeforeSleepProc *beforesleep;//在处理事件前要执行的回调函数(即是在执行epoll_wait()之前)aeBeforeSleepProc *aftersleep;//在处理事件后要执行的回调函数(即是在执行epoll_wait()之后)int flags;          //设置的标识位
} aeEventLoop;//进入循环等待之前的回调函数类型
typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop);

从上面的注释知道 aeEventLoop的成员变量的用处。所以这里就只重点讲解两个变量*events 和 *fired

  • *event就是个数组,setsize是数组的元素个数。events的元素是aeFileEvent。前面说了一个客户端对应一个aeFileEvent。所以,events就是存储需要被监听的客户端。events数组下标是aeFileEvent中的fd。
  • fired数组是读写已就绪的 网络事件 数组。数组元素是记录了就绪的fd及其epoll_wait返回的事件类型

 aeEventLoop相关的函数

1. 创建eventloop

aeEventLoop *aeCreateEventLoop(int setsize) {aeEventLoop *eventLoop;int i;//分配内存给eventLoopeventLoop = zmalloc(sizeof(*eventLoop))//分配内存给eventLoop中的events 和 fired 数组eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);//设置其一些成员变量eventLoop->setsize = setsize;eventLoop->timeEventHead = NULL;eventLoop->timeEventNextId = 0;eventLoop->stop = 0;eventLoop->maxfd = -1;eventLoop->beforesleep = NULL;eventLoop->aftersleep = NULL;eventLoop->flags = 0;//创建epoll fd,即是调用epoll_create, 内部把创建出来的apiState赋值给eventLoop->apidataaeApiCreate(eventLoop)//当前每个aeFileEvent都不关注任何事件for (i = 0; i < setsize; i++)eventLoop->events[i].mask = AE_NONE;return eventLoop;//省略了些内存申请失败的处理................
}

2. 创建aeFileEvent

内部会调用aeApiAddEvent

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,aeFileProc *proc, void *clientData)
{if (fd >= eventLoop->setsize) {    //判断客户端fd是否大于events数组元素个数errno = ERANGE;return AE_ERR;        //若是大于,返回错误}//从events数组中获取一个元素aeFileEvent *fe = &eventLoop->events[fd]; //从这可以看出events的下标就是客户端的fdif (aeApiAddEvent(eventLoop, fd, mask) == -1)    //添加fd到epoll上,并关注事件maskreturn AE_ERR;fe->mask |= mask;    //设置结构体aeFileEvent关注的事件//设置回调函数if (mask & AE_READABLE) fe->rfileProc = proc;if (mask & AE_WRITABLE) fe->wfileProc = proc;fe->clientData = clientData;    //设置回调函数的参数//更新已监听的fd的最大值if (fd > eventLoop->maxfd)eventLoop->maxfd = fd;return AE_OK;
}//创建时间事件,即是定时器
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,aeTimeProc *proc, void *clientData,aeEventFinalizerProc *finalizerProc)
{long long id = eventLoop->timeEventNextId++;aeTimeEvent *te;te = zmalloc(sizeof(*te));if (te == NULL) return AE_ERR;te->id = id;te->when = getMonotonicUs() + milliseconds * 1000;//设置定时器的回调函数te->timeProc = proc;te->finalizerProc = finalizerProc;te->clientData = clientData;    //设置回调函数的参数te->prev = NULL;te->next = eventLoop->timeEventHead;    //从这句代码可以看出其是往头部插入的te->refcount = 0;if (te->next)te->next->prev = te;eventLoop->timeEventHead = te;    //更新链表头部为新插入的节点return id;
}

3. 删除aeFileEvent

通过传入的fd找到evnets数组中的元素,之后调用aeApiDelEvent进行删除。之后吧该位置的

void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
{if (fd >= eventLoop->setsize) return;//通过传入的fd找到events数组中的元素aeFileEvent *fe = &eventLoop->events[fd];if (fe->mask == AE_NONE) return;/* We want to always remove AE_BARRIER if set when AE_WRITABLE* is removed. */if (mask & AE_WRITABLE) mask |= AE_BARRIER;aeApiDelEvent(eventLoop, fd, mask);    //进行删除fe->mask = fe->mask & (~mask);    //取消监听mask事件if (fd == eventLoop->maxfd && fe->mask == AE_NONE) {/* Update the max fd */    int j;for (j = eventLoop->maxfd-1; j >= 0; j--)if (eventLoop->events[j].mask != AE_NONE) break;eventLoop->maxfd = j;    //更新已注册的fd的最大值}
}

4. 开始事件循环

aeMain就是一个while()循环,循环内部是aeProcessEvents函数。

void aeMain(aeEventLoop *eventLoop) {eventLoop->stop = 0;while (!eventLoop->stop) {aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_BEFORE_SLEEP|AE_CALL_AFTER_SLEEP);}
}

aeProcessEvents的主要步骤:

回调函数beforsleep和aftersleep可以先省略不关注的。因为目前其还没有用途,先知道有这两个回调函数就行。

  • 计算epoll_wait()需要的阻塞时间。
  • 执行beforsleep
  • epoll_wait等待事件发生。
  • 处理发生的IO事件,根据发生的事件类型来调用对应的回调函数:
    • 若是AE_READABLE类型调用rfileProc
    • 若是AE_WRITABLE类型调用wfileProc
  • 处理时间事件。若该时间事件是周期性的,执行完后会再添加到时间事件链表的。
//为了能更好理解读懂,该函数只展示了主体流程,有些细节是没有展示
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{int processed = 0, numevents;if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))){struct timeval tv, *tvp;//省略了epoll_wait函数参数timeout的计算过程,其不是框架的重点,后续可以再详细了解...............................//执行befroesleep回调函数,这是在初始化server时绑定的,可以先不关注if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)eventLoop->beforesleep(eventLoop);//即是调用epoll_wait, 等待就绪的fdnumevents = aeApiPoll(eventLoop, tvp);...................//遍历执行已就绪的fd的回调函数for (int j = 0; j < numevents; j++) {//在aeApiPoll中把就绪的fd和事件赋值给了fired[j]aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];//得到就绪的fd和事件类型int mask = eventLoop->fired[j].mask;int fd = eventLoop->fired[j].fd;int fired = 0; /* Number of events fired for current fd. *///下面就执行对应的回调函数if (!invert && fe->mask & mask & AE_READABLE) {  //读就绪,执行读回调函数fe->rfileProc(eventLoop,fd,fe->clientData,mask);fired++;fe = &eventLoop->events[fd]; /* Refresh in case of resize. */}if (fe->mask & mask & AE_WRITABLE) {    //写就绪,执行写回调函数if (!fired || fe->wfileProc != fe->rfileProc) {fe->wfileProc(eventLoop,fd,fe->clientData,mask);fired++;}}......................processed++;}}if (flags & AE_TIME_EVENTS)    //flags有需要,就执行时间事件,即是执行定时器任务processed += processTimeEvents(eventLoop);return processed; /* return the number of processed file/time events */
}

4.什么时候设置回调函数的?

看aeProcessEvents时候可能会有疑惑,调用fe->rfileProc,但是都不知道该函数是什么内容的。

因为这是通过绑定的。就是把一个函数赋值给fe->rfileProc。

从main()入手。

//server.c
int main(int argc, char **argv) {.......................initServer();
}//initServer函数中就有绑定回调函数的
void initServer(void) {//省略了很多其他部分的内容//socket(...),bind(...),listen(...),创建服务端的流程................................................................aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);/* Create an event handler for accepting new connections in TCP  sockets. */createSocketAcceptHandler(&server.ipfd, acceptTcpHandler)/* Register before and after sleep handlers (note this needs to be done* before loading persistence since it is used by processEventsWhileBlocked. */aeSetBeforeSleepProc(server.el,beforeSleep);aeSetAfterSleepProc(server.el,afterSleep);................................
}

1. 绑定SleepProc

void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) {eventLoop->beforesleep = beforesleep;
}void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep) {eventLoop->aftersleep = aftersleep;
}

在服务器初始化时候就绑定了这两个函数,那么在运行aeProcessEvents时候,就会调用函数beforesleepaftersleep

2. 绑定服务端的读事件回调函数

就是在createSocketAcceptHandler,其就是调用aeCreateFileEvent。sfd->fd[j]就是服务端fd,AE_READABLE就是需要监听的事件,即是监听服务端的读事件。回调函数是accept_handler,即是把accept_handler赋值给fe->rfileProc。

从函数aeProcessEvents,会返回就绪的fd和就绪的事件类型。当服务端fd返回的时候,是读事件就绪,就会调用fe->rfileProc而这时rfileProc就是accpet_handler,即是会调用accpet_handler。

//sfd是个数组,我们看源码的话,目前就当其是一个元素的即可,就不用使用for
//就把这个函数当做就是使用aeCreateFileEvent即可。
int createSocketAcceptHandler(socketFds *sfd, aeFileProc *accept_handler) {for (int j = 0; j < sfd->count; j++) {aeCreateFileEvent(server.el, sfd->fd[j], AE_READABLE, accept_handler,NULL)}//错误处理没有展示return C_OK;
}int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,aeFileProc *proc, void *clientData)
{................fe->mask |= mask;if (mask & AE_READABLE) fe->rfileProc = proc;if (mask & AE_WRITABLE) fe->wfileProc = proc;fe->clientData = clientData;.....................
}

回调函数acceptTcpHandler很明显会调用accpet去对客户建立连接的。

3. 绑定客户端的读事件回调函数

继续跟着函数acceptTcpHandler。

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {int cport, cfd, max = MAX_ACCEPTS_PER_CALL;while(max--) {//调用acceptcfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);............................acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);}
}static void acceptCommonHandler(connection *conn, int flags, char *ip) {................./* Create connection and client */client *c = createClient(conn))/* Last chance to keep flags */c->flags |= flags;
}client *createClient(connection *conn) {client *c = zmalloc(sizeof(client));if (conn) {connSetReadHandler(conn, readQueryFromClient);    //设置客户端的读回调函数connSetPrivateData(conn, c);}...................if (conn) linkClient(c);    //把该客户端添加到服务器server.client链表中保存
}static inline int connSetReadHandler(connection *conn, ConnectionCallbackFunc func) {return conn->type->set_read_handler(conn, func);
}

set_read_handler也是个回调函数,其设置成了是函数connSocketSetReadHandler。

这里set_read_handler也是个回调函数是因为有两种类型的connection,Redis是把一个客户端封装成一个connnection(该结构体后续会讲解)

//当前就认为conn->type->ae_handler是参数func即可,内部有很多兜兜转转
static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {if (func == conn->read_handler) return C_OK;conn->read_handler = func;if (!conn->read_handler)aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);elseif (aeCreateFileEvent(server.el,conn->fd,AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;return C_OK;
}

这样就把readQueryFromClient设置给了fe->rfileProc。当epoll_wait返回就绪的fd和事件,若该fd是客户端,也是读事件,那就会执行fe->rfileProc,即是执行readQueryFromClient

4. 绑定客户端的写事件回调函数

这个是在绑定的beforeSleep函数中。

void beforeSleep(struct aeEventLoop *eventLoop) {........................../* Handle writes with pending output buffers. */handleClientsWithPendingWritesUsingThreads();.............................
}int handleClientsWithPendingWritesUsingThreads(void) {...................listRewind(server.clients_pending_write,&li);while((ln = listNext(&li))) {client *c = listNodeValue(ln);//如果缓冲区中还有回复客户端的数据,就需要设置写回调函数,当epoll_wait返回客户端fd的写事件时候,就会执行sendReplyToClientif (clientHasPendingReplies(c) &&connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR){}}......................
}static inline int connSetWriteHandler(connection *conn, ConnectionCallbackFunc func) {return conn->type->set_write_handler(conn, func, 0);
}

和set_read_handler一样,set_write_handler也是后面设置的。该函数是connSocketSetWriteHandler。

static int connSocketSetWriteHandler(connection *conn, ConnectionCallbackFunc func, int barrier) {if (func == conn->write_handler) return C_OK;conn->write_handler = func;if (barrier)conn->flags |= CONN_FLAG_WRITE_BARRIER;elseconn->flags &= ~CONN_FLAG_WRITE_BARRIER;if (!conn->write_handler)aeDeleteFileEvent(server.el,conn->fd,AE_WRITABLE);elseif (aeCreateFileEvent(server.el,conn->fd,AE_WRITABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;return C_OK;
}

这样就把 sendReplyToClient设置给了fe->wfileProc。当epoll_wait返回客户端fd的写事件时候,就会调用fe->wfileProc,即是执行sendReplyToClient。

5.绑定时间事件

使用aeCreateTimeEvent来绑定时间事件的回调函数。在aeProcessEvents函数内部会调用processTimeEvents去执行定时器任务。

long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,aeTimeProc *proc, void *clientData,aeEventFinalizerProc *finalizerProc)
{long long id = eventLoop->timeEventNextId++;aeTimeEvent *te;te = zmalloc(sizeof(*te));if (te == NULL) return AE_ERR;te->id = id;te->when = getMonotonicUs() + milliseconds * 1000;//设置事件的回调函数te->timeProc = proc;te->finalizerProc = finalizerProc;te->clientData = clientData;te->prev = NULL;te->next = eventLoop->timeEventHead;te->refcount = 0;if (te->next)te->next->prev = te;eventLoop->timeEventHead = te;return id;
}

5. 有了IO多路复用,为什么还需要reactor模式?

IO多路复用与事件驱动

首先要明确一点,reactor模式就是基于IO多路复用的。事件驱动也是IO多路复用的,不是说使用了reactor模式才是使用了事件驱动。

以事件为连接点,当有IO事件准备就绪时,就会通知用户,并且告知用户返回的是什么类型的事件,进而用户可以执行相对应的任务。这样就不用在IO等待上浪费资源,这便是事件驱动的核心思想。

比如你点了两份外卖,外卖A,外卖B。之后你无需时刻打电话去问外卖到了没。外卖到的时候,外卖员会打电话通知你。这中途你就可以做自己的事,不用纯纯等待。还有可以知道是外卖A到了还是外卖B到了,外卖员会告知是哪个外卖到的。

这个就是事件驱动。IO事件准备就绪时,会自动通知用户,并会告知其事件类型。

所以应该是,IO多路复用 + 回调机制 构成了 reactor模式

IO同步与异步的判断

还有reactor模式是同步的。因为其是使用IO多路复用的,而IO多路复用是同步的。

IO操作是同步还是异步,关键看数据在内核空间与用户空间的拷贝过程(数据读写的IO操作)

reactor模式的优点

网上很多说其可以很好处理高并发,但是我觉得IO多路复用也可以处理的。

还有说可扩展性,通过增加Reactor实例个数来充分利用CPU资源;那通过在其他线程再创建epoll也行的。

所以,我觉得一个很大的优势是:

  • 应用处理请求的逻辑,与事件分发框架完全分离,即是解耦,有很高的复用性

即写应用处理时,只需关注如何处理业务逻辑。Reactor框架本身与具体事件处理逻辑无关。

假如是把reactor模式做成一个网络库给用户使用。那用户就只需要关注处理请求的逻辑即可。该网络库对外开放setCallback函数。

假设,用户写服务器服务,收到客户端发来的数据(一串数字),想对数字做加法 或者想对数字做乘法都行。只要使用setCallback函数设置好处理请求的逻辑就行。这就是应用处理请求的逻辑,与事件分发框架完全分离,这是很方便的。

假如该框架是不对外开放的(就是用户不能使用setCallback),像Redis一样,为什么还需要使用reactor模式,为什么不可以只用IO多路复用,不用回调机制呢?

我认为,也是解耦的好处。Redis编写人员需要改变处理请求的逻辑时候,就只改变某个函数即可,不需要深入到epoll框架去改变,这就是很方便的。

6. 通过Redis网络部分,学到如何实现reactor模式

  1. 先对epoll进行封装,方便后续使用
  2. 需要对fd进行监听,并注册需要关注的事件类型,并注册关注的事件类型就绪时,需要执行的函数。所以,可以把fd关注的事件类型事件就绪执行的函数三者封装在一个结构体aeFileEvent内。
  3. 我们简单使用epoll时候,肯定是会写 while(1){ epoll_wait(....); }。这就是个事件循环,所以可以再封装个结构体EventLoop, 其也是对epoll的再次封装,即是会调用前面封装好的epoll的函数。EventLoop内部应该有存储aeFileEvent的数组。
    1. 在调用epoll_wait时,会返回就绪的fd和事件类型,把返回的(就绪fd和事件类型)数组 赋值给EventLoop的aeFileEvent数组。之后就是使用该aeFileEvent,根据事件类型执行对应的回调函数
  4. 提供setCallback函数,设置好对应的回调函数,即是把需要执行的函数 赋值给 aeFileEvent中的 事件就绪执行函数

Redis的reactor模式没有使用到结构体event_data_t的指针变量ptr。

若是想逐步实现recactor模式,这里推荐下本人写的0.仿造muduo,实现linux服务器开发思路

该文章专栏有详细的逐步实现的reactor模式的代码流程。欢迎查看。 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/3589.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Rust HTTP 客户端:易于使用、功能强大 | 开源日报 No.228

seanmonstar/reqwest Stars: 8.9k License: Apache-2.0 reqwest 是一个易于使用且功能强大的 Rust HTTP 客户端。 异步和阻塞客户端支持普通数据、JSON、urlencoded 和 multipart 数据格式可定制的重定向策略支持 HTTP 代理和系统原生 TLS 或 rustls 的 HTTPSCookie 存储功能…

JVM学习笔记(五)内存模型

目录 1、原子性 1.1 问题分析 1.2 解决方法 2、可见性 2.1 退不出的循环 2.2 解决办法 3、有序性 3.1 诡异的结果 3.2 解决办法 3.3 有序性理解 3.4 happens-before 4、CAS与原子类 4.1 CAS 4.2 乐观锁与悲观锁 4.3 原子操作类 5、synchronized 优化 5.1 轻量…

17(第十六章,数据管理组织与角色期望)

目录 概述 基本概念 首席数据官 个人角色 执行官角色 业务角色 IT角色 混合角色 组织架构模式 概述 本章书上的重点内容其实不多&#xff0c;了解的地方例如数据管理组织的结构包括分散、网络、混合、联邦、集中等运营模式。 之前也提过的RACI&#xff08;谁负责&am…

AWTK MODBUS Client channel 模型

名称&#xff1a;modbus_client_channel 功能&#xff1a;通过 modbus 协议访问远程 slave 设备上的数据&#xff0c;需要配合 modbus_client模型一起使用。用于将 modbus client 中的 channel 包装成view_model或者view_model_array 一般来说不需要&#xff0c;直接使用modbus…

CentOS 系统的优缺点

CentOS &#xff08;社区企业操作系统的缩写&#xff09;是一个基于红帽企业 Linux (RHEL)的免费开源发行版&#xff0c; 旨在为服务器和工作站提供稳定、可靠和安全的平台。 不应将其与CentOS Stream 混淆&#xff0c;后者是即将发布的 RHEL 版本的上游开发平台。 CentOS Li…

C++初阶之入门

零、什么是C C是基于C语言而产生的&#xff0c;它既可以进行C语言的过程化程序设计&#xff0c;又可以进行以抽象数据类型为特点的基于对象的程序设计&#xff0c;还可以进行面向对象的程序设计。 C缺点之一&#xff0c;是相对许多语言复杂&#xff0c;而且难学难精。许多人说学…

一般神经网络的微分与网络参数的初始化

(文章的主要内容来自电科的顾亦奇老师的 Mathematical Foundation of Deep Learning, 有部分个人理解) 一般深度神经网络的微分 上周讨论的前向和反向传播算法可以推广到任意深度神经网络的微分。 对于一般的网络来说&#xff0c;可能无法逐层分割&#xff0c;但仍然可以用流…

抖音小店运营过程中,每天必须要做的7件事情!少一件都不行!

大家好&#xff0c;我是电商小V 最近很多新手小伙伴刚做店&#xff0c;操作抖音小店每天需要做的事情都是哪些呢&#xff1f;感觉自己一天很忙&#xff0c;但是也没有忙出来结果&#xff0c;思绪很乱&#xff0c;关于这个问题咱们今天就来详细的说一下&#xff0c;给你们梳理总…

02 贪吃蛇

前言 呵呵 这是不知道 在哪里看到的 别人做的一个贪吃蛇 因此 也把我 之前的 贪吃蛇 移植上来了 当然 这个不过是为了 简单的入门了解, 呵呵 然后 c版本的贪吃蛇 需要先移植成 c 版本, 然后 再根据 单片机相关 设计调整 比如 led 点阵的输出, 比如 c99 语法的一些不兼容…

12 c++版本的坦克大战

前言 呵呵 这大概是 大学里面的 c 贪吃蛇了吧 有一些 面向对象的理解, 但是不多 这里 具体的实现 就不赘述, 仅仅是 发一下代码 以及 具体的使用 坦克大战 #include<iostream> #include<windows.h> #include<conio.h> #include<ctime> #include…

Python编程的终极十大工具

Python一直以来都是程序员们的首选编程语言之一&#xff0c;其灵活性和功能强大的库使其成为解决各种问题的理想选择。在本文中&#xff0c;我们将介绍Python编程的终极十大工具&#xff0c;这些工具可以帮助您在各种领域取得成功&#xff0c;无论您是初学者还是经验丰富的开发…

【C++】C++的内存管理

目录 内存分布 动态内存管理 C语言的动态内存管理 C的动态内存管理 对内置类型操作 对自定义类型操作 new[]和delete[] 开空间的细节 探讨匹配问题 定位new表达式 内存分布 栈&#xff1a;存放非静态局部变量&#xff0c;函数参数&#xff0c;返回值等。栈是向下增长…

Vim学习笔记01~04

第01章&#xff1a; 遁入空门&#xff0c;模式当道 1.什么是vim Vim是一个高效的文本编辑工具&#xff0c;并且可以在编程开发过程中发挥越来越重要的作用。 事实上&#xff0c;有不少编程高手使用他们来进行代码的开发&#xff0c;并且对此赞不绝口。 2.本系列目的 但是让…

微信小程序按钮点击时的样式hover-class=“hover“

小程序的button组件很好用&#xff0c;按钮点击的时候会显示点击状态&#xff0c;默认的就是颜色加深 但是我们改变了button的背景色之后&#xff0c;就看不出点击效果了&#xff0c;解决起来也很简单 关键代码就是小程序的 hover-class 属性&#xff0c;需要注意的是&#xff…

代码随想录算法训练营Day8 | ● 344.反转字符串● 541. 反转字符串II● 54.替换数字● 151.翻转字符串里的单词● 55.右旋转字符串

&#xff08;记得重学&#xff09; ● 344.反转字符串 题目&#xff1a;编写一个函数&#xff0c;其作用是将输入的字符串反转过来。输入字符串以字符数组 s 的形式给出。 不要给另外的数组分配额外的空间&#xff0c;你必须原地修改输入数组、使用 O(1) 的额外空间解决这一…

Qt [获取Dump] 使用WindowsAPI实现生成MiniDump文件

说明 客户现场的软件偶发崩溃是程序开发者&#xff0c;比较头疼的事情。如何更快速的定位到问题点和解决掉&#xff0c;是开发应该具备的基本能力。 Windows提供了一系列的API&#xff0c;可以记录软件崩溃前的堆栈信息。下面就实现一个生成Dump文件的程序实例。 主要代码 回…

计算机系列之输入输出、中断、总线、可靠性、操作系统、进程管理、同步互斥

9、输入输出-中断-总线-可靠性 1、输入输出技术、中断 1、内存与接口地址的编址方法&#xff08;了解概念即可&#xff09; 计算机系统中存在多种内存与接口地址的编址方法&#xff0c;常见的是下面两种&#xff1a;&#xff08;了解概念即可&#xff09; 1&#xff09;内存…

ai大模型应用开发

随着人工智能技术的飞速发展&#xff0c;AI大模型应用开发已成为一个日益重要的领域。本文将从专业角度深入探讨AI大模型的应用开发&#xff0c;并思考其未来的深度影响和逻辑性。 编辑搜图 请点击输入图片描述&#xff08;最多18字&#xff09; ​【一、AI大模型的定义与特点…

ASP.NET Core 3 高级编程(第8版) 学习笔记 03

本篇介绍原书的第 18 章&#xff0c;为 19 章 Restful Service 编写基础代码。本章实现了如下内容&#xff1a; 1&#xff09;使用 Entity Framework Core 操作 Sql Server 数据库 2&#xff09;Entity Framework Core 数据库迁移和使用种子数据的方法 3&#xff09;使用中间件…

Babylon.js 读取GLB模型元数据

如果你熟悉将 3D 资源导出到游戏引擎的过程&#xff0c;那么无疑也会熟悉 3D 资源的 PBR 和 GLB 导出过程。 这是我们之前概述的内容&#xff0c;也是我们交互式工作的所有资产准备的基石。 然而&#xff0c;从传统的管道意义上来说&#xff0c;能够用元数据标记网格有很多逻辑…