02 | 事件驱动框架(Get/Set)

02 | 事件驱动框架(Get/Set)

  • 【程序员的末路诗】
  • 01 aeEventLoop数据结构
    • 1)aeEventLoop
    • 2)aeFileEvent
    • 3)aeTimeEvent
  • 02 epoll 实例创建(epoll_create)
    • 1)创建eventLoop结构体
    • 2)aeApiCreate ->epoll_create
    • 3)设置不监听事件
  • 03 注册或修改事件(epoll_ctl)
    • 1)IO事件创建epoll_ctl参数
    • 2)aeCreateFileEvent函数
    • 3)aeApiAddEvent ->epoll_ctl
  • 04 循环监听事件(epoll_wait)
    • 1)aeMain函数
    • 2)aeProcessEvents事件捕获、分发和处理
    • 3)获取事件aeApiPoll ->epoll_wait
  • 05 读事件处理
    • 1)acceptTcpHandler函数(连接Handler)
    • 2)命令读取 readQueryFromClient
    • 3)命令解析 processInputBuffer
    • 4)命令执行 processCommand
    • 5)数据写回addReply
  • 06 写事件处理
    • 1)数据写回客户端

【程序员的末路诗】

四月职场是非雪,事中曲折似云烟。
栽赃陷害我何惧,怒马鲜衣是少年。
既以段意取终章,蜉蝣撼树有何意?
既然无意共谋事,何必道德高义裁。
道不同不相为谋,煮一副浊酒,笑看庭前花开花落,人世间云起云落。
红尘因果一刀断,一身青衣再入世,看那星火燎原否,看一眼那皓月荧辉。

01 aeEventLoop数据结构

Redis server 一旦和一个客户端建立连接后,就会在事件驱动框架中注册可读事件,这就对应了客户端的命令请求。而对于整个命令处理的过程来说,我认为主要可以分成四个阶段,它们分别对应了 Redis 源码中的不同函数。这里,我把它们对应的入口函数,也就是它们是从哪个函数开始进行执行的,罗列如下:
命令读取,对应 readQueryFromClient 函数;
命令解析,对应 processInputBufferAndReplicate 函数;
命令执行,对应 processCommand 函数;
结果返回,对应 addReply 函数;

1)aeEventLoop

/* State of an event based program */
typedef struct aeEventLoop {int maxfd;   /* highest file descriptor currently registered */int setsize; /* max number of file descriptors tracked */long long timeEventNextId;aeFileEvent *events; //IO事件数组 /* Registered events */aeFiredEvent *fired; //已触发事件数组/* Fired events */aeTimeEvent *timeEventHead; //记录时间事件的链表头int stop;void *apidata; //和API调用接口相关的数据/* This is used for polling API specific data */aeBeforeSleepProc *beforesleep; //进入事件循环流程前执行的函数,void beforeSleep(struct aeEventLoop *eventLoop)aeBeforeSleepProc *aftersleep; //退出事件循环流程后执行的函数,void afterSleep(struct aeEventLoop *eventLoop)int flags;
} aeEventLoop;

2)aeFileEvent

/* File event structure */
typedef struct aeFileEvent {int mask; //掩码标记,包括可读事件、可写事件和屏障事件  /* one of AE_(READABLE|WRITABLE|BARRIER) */aeFileProc *rfileProc;  //处理可读事件的回调函数,也就是 Reactor 模型中的 handler。aeFileProc *wfileProc;  //处理可写事件的回调函数,也就是 Reactor 模型中的 handler。void *clientData;  //指向客户端私有数据的指针。
} aeFileEvent;

3)aeTimeEvent

/* Time event structure */
typedef struct aeTimeEvent {long long id; /* time event identifier. */monotime when;aeTimeProc *timeProc;aeEventFinalizerProc *finalizerProc;void *clientData;struct aeTimeEvent *prev;struct aeTimeEvent *next;int refcount; /* refcount to prevent timer events from being* freed in recursive time event calls. */
} aeTimeEvent;

02 epoll 实例创建(epoll_create)

epoll_create 是用于创建一个 epoll 实例的系统调用。
它返回一个文件描述符,用于标识新创建的 epoll 实例。
该文件描述符可以被用于后续的 epoll 监控操作。

1)创建eventLoop结构体

创建一个 aeEventLoop 结构体类型的变量eventLoop
main函数启动时执行initServer方法,创建了事件集合(eventLoop->events)大小为10000 + 96 +32。
参数 setsize 的大小,其实是由 server 结构的 maxclients 变量和宏定义 CONFIG_FDSET_INCR 共同决定的。其中,maxclients 变量的值大小,可以在Redis 的配置文件 redis.conf 中进行定义,默认值是 10000。而宏定义
CONFIG_FDSET_INCR 的大小,等于宏定义 CONFIG_MIN_RESERVED_FDS 的值再加上96。

void initServer(void) {
...//调用aeCreateEventLoop函数创建aeEventLoop结构体,并赋值给server结构的el变量server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
...
}
#define CONFIG_MIN_RESERVED_FDS 32
#define CONFIG_FDSET_INCR (CONFIG_MIN_RESERVED_FDS+96) 

创建一个事件循环对象,setsize 事件集合的大小,决定了可以处理的文件描述符数量,返回创建的事件循环对象的指针。如果创建失败,则返回NULL。

aeEventLoop *aeCreateEventLoop(int setsize) {aeEventLoop *eventLoop;int i;// 初始化 monotonic 时间戳。monotonicInit();    /* just in case the calling app didn't initialize */// 分配事件循环结构体内存if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;// 分配文件事件数组内存eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);// 分配触发事件数组内存eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);// 检查内存分配是否成功if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;// 初始化事件循环的参数eventLoop->setsize = setsize;eventLoop->timeEventHead = NULL;eventLoop->timeEventNextId = 0;eventLoop->stop = 0;eventLoop->maxfd = -1;eventLoop->beforesleep = NULL;eventLoop->aftersleep = NULL;eventLoop->flags = 0;// 创建API特定的事件循环结构if (aeApiCreate(eventLoop) == -1) goto err;/* Events with mask == AE_NONE are not set. So let's initialize the vector with it. */// 初始化文件事件数组,将没有设置的事件的掩码初始化为AE_NONEfor (i = 0; i < setsize; i++)eventLoop->events[i].mask = AE_NONE;return eventLoop;

2)aeApiCreate ->epoll_create

函数封装了操作系统提供的 IO 多路复用函数
假设 Redis 运行在 Linux 操作系统上,并且 IO 多路复用机制是 epoll,那么此时,aeApiCreate 函数就会调用 epoll_create 创建 epoll 实例,同时会创建 epoll_event 结构的数组,数组大小等于参数 setsize。

aeEventLoop *aeCreateEventLoop(int setsize) {....// 创建API特定的事件循环结构if (aeApiCreate(eventLoop) == -1) goto err;....
}

state->epfd = epoll_create(1024),这里的1024当作一个初始资源分配的提示值,而不是实际限制。可以把他当做初始化大小。
epfd代表epoll 实例。在服务器端编程中,通常会创建一个 epoll 实例来监听所有客户端的连接请求。当有新的客户端连接进来时,服务器端会为这个新的连接创建一个新的套接字(socket),然后将这个套接字添加到已有的 epoll 实例中进行监听。一个 epoll 实例可以对应多个客户端连接,内部采用的主要数据结构是红黑树(Red-Black Tree)来存储被监控的文件描述符及其事件。每个添加到 epoll 实例中的套接字(即代表客户端连接的文件描述符)都会作为一个节点插入到红黑树中。

/*** 创建并初始化aeApiState结构体,为eventLoop关联API数据。** @param eventLoop 指向aeEventLoop结构体的指针,表示事件循环对象。* @return 成功返回0,内存分配失败或epoll_create调用失败返回-1。*/
static int aeApiCreate(aeEventLoop *eventLoop) {// 分配内存给aeApiState结构体aeApiState *state = zmalloc(sizeof(aeApiState));if (!state) return -1; // 内存分配失败,结束函数并返回-1// 分配内存以存储事件信息,其大小由事件循环的setsize决定state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);if (!state->events) {// 事件信息内存分配失败,释放已分配的state内存并返回-1zfree(state);return -1;}// 调用epoll_create创建一个用于监听事件的描述符state->epfd = epoll_create(1024); /* 1024是给内核的一个提示,表示初始监听容量 */if (state->epfd == -1) {// epoll_create调用失败,释放内存并返回-1zfree(state->events);zfree(state);return -1;}// 将epoll描述符设置为CLOEXEC属性,确保在进程fork后自动关闭anetCloexec(state->epfd);// 将state对象赋值给eventLoop的apidata成员,以备后续使用eventLoop->apidata = state;return 0; // 成功完成初始化
}

epoll_event *events 是一个指向 epoll_event 结构体数组的指针,这个数组用于在调用 epoll_wait 函数时存放那些在 epfd 监控下准备好进行某种操作(如读取或写入)的文件描述符及其相关事件的信息。

typedef struct aeApiState {//aeApiState结构体定义int epfd; //epoll实例的描述符struct epoll_event *events; //epoll_event结构体数组,记录监听事件
} aeApiState;

关联客户端连接与其自身事件的关系过程如下:

  • 创建epoll实例:首先调用 epoll_create 创建一个epoll实例,得到一个 epfd 文件描述符。
  • 注册事件:每当有一个新的客户端连接到来时,服务器会为该连接创建一个新的套接字描述符,并通过调用 epoll_ctl 函数将这个套接字描述符添加到 epfd 监控的事件列表中,并指定关心的事件类型,如 EPOLLIN(可读)、EPOLLOUT(可写)等。
  • 等待事件:之后服务器调用 epoll_wait 函数,传入 epfd 和 events 数组,等待文件描述符上发生的事件。当 epoll_wait 返回时,events 数组会被填充上准备好的事件。其中,数组中的每个元素都代表了一个客户端连接的事件信息,events[i].data.fd 字段即对应着客户端连接的套接字描述符,events[i].events 字段则包含了该描述符上发生的事件类型。

3)设置不监听事件

所有网络 IO 事件对应文件描述符的掩码,初始化为 AE_NONE,表示暂时不对任何事件进行监听。

aeEventLoop *aeCreateEventLoop(int setsize) {....// 初始化文件事件数组,将没有设置的事件的掩码初始化为AE_NONEfor (i = 0; i < setsize; i++)eventLoop->events[i].mask = AE_NONE;....
}

03 注册或修改事件(epoll_ctl)

epoll_ctl 是用于注册或修改 epoll 实例中的事件的系统调用。
通过 epoll_ctl 可以向 epoll 实例添加、修改或删除需要监视的文件描述符及其对应的事件。
可以指定事件类型(如可读、可写)和其他参数,以便 epoll 实例监控指定的文件描述符上的事件。

1)IO事件创建epoll_ctl参数

Redis 的 IO 事件主要包括三类,分别是可读事件、可写事件和屏障事件。
屏障事件的主要作用是用来反转事件的处理顺序。比如在默认情况下,Redis 会先给客户端返回结果,但是如果面临需要把数据尽快写入磁盘的情况,Redis 就会用到屏障事件,把写数据和回复客户端的顺序做下调整,先把数据落盘,再给客户端回复。

2)aeCreateFileEvent函数

负责事件和handler注册,就是处理客户端的连接,创建与管理客户端连接的fd对象。
当 Redis 启动后,服务器程序的 main 函数会调用 initSever 函数来进行初始化,而在初始化的过程中,aeCreateFileEvent 就会被 initServer 函数调用,用于注册要监听的事件,以及相应的事件处理函数。

int main(int argc, char **argv) {***initServer();***
}

创建一个事件处理程序,用于接受 TCP 和 Unix 域套接字中的新连接。

void initServer(void) {***if (createSocketAcceptHandler(&server.ipfd, acceptTcpHandler) != C_OK) {serverPanic("Unrecoverable error creating TCP socket accept handler.");}***
}

创建一个事件处理程序,用于接受 TCP 或 TLS 域套接字中的新连接。这在原子上适用于所有套接字 fds.

int createSocketAcceptHandler(socketFds *sfd, aeFileProc *accept_handler) {int j;//根据启用的 IP 端口个数,为每个 IP 端口上的网络事件for (j = 0; j < sfd->count; j++) {// 创建对可读事件的监听,并且注册可读事件的handler->如tcp的acceptTcpHandlerif (aeCreateFileEvent(server.el, sfd->fd[j], AE_READABLE, accept_handler,NULL) == AE_ERR) {/* Rollback */for (j = j-1; j >= 0; j--)aeDeleteFileEvent(server.el, sfd->fd[j], AE_READABLE);return C_ERR;}}return C_OK;
}

创建文件事件,创建对可读事件的监听,并且注册可读事件的handler->如tcp的acceptTcpHandler,它如何实现事件和处理函数的注册呢?

/*** 创建文件事件。** @param eventLoop 事件循环结构体指针。* @param fd 文件描述符。* @param mask 事件类型掩码,标识读写事件。* @param proc 文件事件处理函数指针。* @param clientData 用户自定义数据指针,会在事件触发时传递给事件处理函数。* @return 成功返回AE_OK,失败返回AE_ERR。*/
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData)
{// 检查文件描述符是否超过事件循环所能处理的范围if (fd >= eventLoop->setsize) {errno = ERANGE;return AE_ERR;}aeFileEvent *fe = &eventLoop->events[fd];// 使用底层API为文件描述符添加事件监控if (aeApiAddEvent(eventLoop, fd, mask) == -1)return AE_ERR;fe->mask |= mask;// 根据事件类型掩码设置读写事件处理函数if (mask & AE_READABLE)fe->rfileProc = proc;if (mask & AE_WRITABLE)fe->wfileProc = proc;fe->clientData = clientData;// 更新最大文件描述符值if (fd > eventLoop->maxfd)eventLoop->maxfd = fd;return AE_OK;
}

3)aeApiAddEvent ->epoll_ctl

调用epoll_ctl,向事件循环中添加或修改事件监听。

/** @param eventLoop 指向 aeEventLoop 结构体的指针,代表一个事件循环。* @param fd 文件描述符,需要被监视的事件源。* @param mask 指定需要监听的事件类型,如 AE_READABLE 或 AE_WRITABLE。* @return 成功返回 0,失败返回 -1。*/
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {aeApiState *state = eventLoop->apidata;struct epoll_event ee = {0}; /* 初始化结构体,避免 Valgrind 警告 *//* 根据该文件描述符是否已经被监视来决定是添加还是修改操作 */int op = eventLoop->events[fd].mask == AE_NONE ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;ee.events = 0;mask |= eventLoop->events[fd].mask; /* 合并旧的事件 */if (mask & AE_READABLE) ee.events |= EPOLLIN; /* 设置读事件 */if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; /* 设置写事件 */ee.data.fd = fd;/* 控制 epoll,添加或修改文件描述符的事件监听 */if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1; /* 控制操作失败则返回 -1 */return 0;
}

关于epoll_ctl的4个参数:

  • state->epfd:epoll 实例;
aeApiState *state = eventLoop->apidata; 
  • op:要执行的操作类型,是添加还是修改;
int op = eventLoop->events[fd].mask == AE_NONE ? 
EPOLL_CTL_ADD : EPOLL_CTL_MOD;
  • fd:要监听的文件描述符,就是 aeApiAddEvent 函数接收到的参数fd;
    #count就是ipfd的数量
server.ipfd ->->fd[count]
  • &ee:epoll_event 类型变量;
    aeApiAddEvent 函数在调用 epoll_ctl 函数前,会新创建 epoll_event 类型变量 ee。然后,aeApiAddEvent 函数会设置变量 ee 中的监听事件类型和监听文件描述符。
struct epoll_event ee = {0}; /* 初始化结构体,避免 Valgrind 警告 */
/* 根据该文件描述符是否已经被监视来决定是添加还是修改操作 */
int op = eventLoop->events[fd].mask == AE_NONE ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
ee.events = 0;
mask |= eventLoop->events[fd].mask; /* 合并旧的事件 */
if (mask & AE_READABLE) ee.events |= EPOLLIN; /* 设置读事件 */
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; /* 设置写事件 */
ee.data.fd = fd;

04 循环监听事件(epoll_wait)

  • epoll_wait 是用于等待就绪事件的系统调用。
  • 当调用 epoll_wait 后,程序会阻塞,直到有文件描述符上发生了注册的事件。
  • 一旦有就绪事件发生,epoll_wait 返回就绪的文件描述符列表,并且可以立即处理这些事件。

1)aeMain函数

是主循环函数,redis的main方法初始化在最后会执行。

#server.c
int main(int argc, char **argv) {***//获取一批一批的IO事件,利用主线程按顺序执行aeMain(server.el);***
}

#ae.c

void aeMain(aeEventLoop *eventLoop) {//用一个循环不停地判断事件循环的停止标记。eventLoop->stop = 0;// 如果事件循环的停止标记被设置为 true,那么针对事件捕获、分发和处理的整个主循环就停止了;否则,主循环会一直执行。while (!eventLoop->stop) {aeProcessEvents(eventLoop, AE_ALL_EVENTS| AE_CALL_BEFORE_SLEEP| AE_CALL_AFTER_SLEEP);}
}

2)aeProcessEvents事件捕获、分发和处理

负责事件捕获、分发和处理,就是处理客户端的读、写请求。
为了适配不同的操作系统,Redis 对不同操作系统实现的网络 IO 多路复用函数,都进行了统一的封装,封装后的代码分别通过以下四个文件中实现:

  • ae_epoll.c,对应 Linux 上的 IO 复用函数 epoll;
  • ae_evport.c,对应 Solaris 上的 IO 复用函数 evport;
  • ae_kqueue.c,对应 macOS 或 FreeBSD 上的 IO 复用函数 kqueue;
  • ae_select.c,对应 Linux(或 Windows)的 IO 复用函数 select。
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{int processed = 0, numevents;/* 情况一:既没有时间事件,也没有网络事件,则立刻返回*/if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;/* 情况二:有IO事件或者有需要紧急处理的时间事件,则开始处理 */if (eventLoop->maxfd != -1 ||((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {***/* 调用多路复用 API,将仅在超时或某些事件触发时返回。 *///调用aeApiPoll函数捕获事件,依赖于操作系统底层提供的IO多路复用机制获取一批事件,检查是否有新的连接、读写事件发生。numevents = aeApiPoll(eventLoop, tvp);/* After sleep callback. */if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)eventLoop->aftersleep(eventLoop);for (j = 0; j < numevents; j++) {aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];int mask = eventLoop->fired[j].mask;int fd = eventLoop->fired[j].fd;int fired = 0; /* Number of events fired for current fd. *//* 通常我们先执行可读事件,然后再执行可写事件。这很有用,因为有时我们可以在处理查询后立即提供查询的回复。* 但是,如果在掩码中设置了AE_BARRIER,则我们的应用程序会要求我们执行相反的操作:永远不要在可读事件之后触发可写事件。* 在这种情况下,我们反转调用。例如,当我们想在 beforeSleep() 钩子中执行某些操作时,这很有用,例如在回复客户端之前将文件同步到磁盘。 */int invert = fe->mask & AE_BARRIER;/* 注意“fe->mask & mask & ...”代码:也许一个已经处理的事件删除了一个触发的元素,但我们仍然没有处理,所以我们检查该事件是否仍然有效。如果调用序列未反转,则触发可读事件。 *///即使使用了 IO 多路复用机制,命令的整个处理过程仍然可以由 IO 主线程来完成,也仍然可以保证命令执行的原子性。if (!invert && fe->mask & mask & AE_READABLE) {fe->rfileProc(eventLoop,fd,fe->clientData,mask);fired++;fe = &eventLoop->events[fd]; /* Refresh in case of resize. */}/* 触发可写事件。 */if (fe->mask & mask & AE_WRITABLE) {if (!fired || fe->wfileProc != fe->rfileProc) {fe->wfileProc(eventLoop,fd,fe->clientData,mask);fired++;}}/* 如果我们必须反转调用,请在可写事件之后立即触发可读事件。*/if (invert) {fe = &eventLoop->events[fd]; /* Refresh in case of resize. */if ((fe->mask & mask & AE_READABLE) &&(!fired || fe->wfileProc != fe->rfileProc)){fe->rfileProc(eventLoop,fd,fe->clientData,mask);fired++;}}processed++;}}/* 情况三:只有普通的时间事件,则开始处理 */if (flags & AE_TIME_EVENTS)processed += processTimeEvents(eventLoop);/* 返回已经处理的文件或时间*/return processed;
}

3)获取事件aeApiPoll ->epoll_wait

调用epoll_wait检测网络IO事件,并处理这些事件。

/*** @param eventLoop 指向当前事件循环结构的指针。* @param tvp 指向一个timeval结构的指针,用于指定epoll_wait的超时时间。如果为NULL,则表示无限等待。* @return 返回检测到的事件数量。如果没有事件发生,或者遇到错误,则返回0。*/
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {aeApiState *state = eventLoop->apidata;int retval, numevents = 0;// 调用epoll_wait等待并收集事件retval = epoll_wait(state->epfd,state->events,eventLoop->setsize, tvp ? (tvp->tv_sec*1000 + (tvp->tv_usec + 999)/1000) : -1);if (retval > 0) {int j;// 获取事件数量并遍历每个事件numevents = retval;for (j = 0; j < numevents; j++) {int mask = 0; // 初始化maskstruct epoll_event *e = state->events+j; // 获取当前事件// 根据epoll事件类型更新maskif (e->events & EPOLLIN) mask |= AE_READABLE;if (e->events & EPOLLOUT) mask |= AE_WRITABLE;if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;// 更新事件循环中的触发事件信息eventLoop->fired[j].fd = e->data.fd;eventLoop->fired[j].mask = mask;}}return numevents;
}

05 读事件处理

IO 多路复用对命令原子性保证的影响,IO 多路复用机制是在 readQueryFromClient 函数执行前发挥作用的。

它实际是在事件驱动框架中调用 aeApiPoll 函数,获取一批已经就绪的 socket 描述符。然后执行一个循环,针对每个就绪描述符上的读事件,触发执行 readQueryFromClient 函数。
这样一来,即使 IO 多路复用机制同时获取了多个就绪 socket 描述符,在实际处理时,Redis 的主线程仍然是针对每个事件逐一调用回调函数进行处理的。而且对于写事件来说,IO 多路复用机制也是针对每个事件逐一处理的。

1)acceptTcpHandler函数(连接Handler)

当 Redis server 接收到客户端的连接请求时,就会使用注册好的acceptTcpHandler 函数进行处理。
acceptTcpHandler 函数是在networking.c文件中,它会接受客户端连接,并创建已连接套接字 cfd。然后,acceptCommonHandler 函数(在 networking.c 文件中)会被调用,同时,刚刚创建的已连接套接字 cfd 会作为参数,传递给 acceptCommonHandler函数。

/*** 处理TCP连接的接受请求。** @param el 指向事件循环结构的指针,用于处理事件。* @param fd 文件描述符,表示正在监听的TCP套接字。* @param privdata 私有数据,此处未使用。* @param mask 标志位,表示触发事件的类型,此处未使用。*/
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {int cport, cfd, max = MAX_ACCEPTS_PER_CALL; // cport: 连接的客户端端口;cfd: 接受的客户端文件描述符;max: 一次调用中最大接受的连接数char cip[NET_IP_STR_LEN]; // cip: 客户端IP地址字符串UNUSED(el); // 确认el参数未被使用UNUSED(mask); // 确认mask参数未被使用UNUSED(privdata); // 确认privdata参数未被使用while(max--) { // 循环,尝试接受多个客户端连接,直到达到最大接受数或失败cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); // 尝试接受一个TCP连接if (cfd == ANET_ERR) { // 如果接受失败if (errno != EWOULDBLOCK) // 如果错误不是因为资源暂时不可用(EWOULDBLOCK),则记录日志serverLog(LL_WARNING,"Accepting client connection: %s", server.neterr);return; // 结束函数}anetCloexec(cfd); // 设置文件描述符的CLOEXEC标志,确保在fork/exec后关闭该描述符serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport); // 记录成功接受的客户端连接日志// 创建并初始化新客户端连接,注册相应的事件处理函数acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);}
}
  • anetTcpAccept创建cfd的socket连接
    创建客户端cfd文件描述符
/*** 接受TCP连接请求。** @param err 用于存储错误信息的字符数组。* @param s 监听套接字。* @param ip 用于存储客户端IP地址的字符数组。* @param ip_len ip字符数组的最大长度。* @param port 用于存储客户端端口号的整型指针。* @return 成功时返回新连接的文件描述符,失败时返回ANET_ERR。*/
int anetTcpAccept(char *err, int s, char *ip, size_t ip_len, int *port) {int fd;struct sockaddr_storage sa; // 用于存储客户端地址信息的结构体socklen_t salen = sizeof(sa); // sa结构体的长度// 尝试接受连接,出错时将错误信息存入errif ((fd = anetGenericAccept(err,s,(struct sockaddr*)&sa,&salen)) == -1)return ANET_ERR;// 判断连接是IPv4还是IPv6if (sa.ss_family == AF_INET) { // IPv4struct sockaddr_in *s = (struct sockaddr_in *)&sa;// 将IPv4地址转换为字符串形式if (ip) inet_ntop(AF_INET,(void*)&(s->sin_addr),ip,ip_len);// 端口号转换为主机字节序if (port) *port = ntohs(s->sin_port);} else { // IPv6struct sockaddr_in6 *s = (struct sockaddr_in6 *)&sa;// 将IPv6地址转换为字符串形式if (ip) inet_ntop(AF_INET6,(void*)&(s->sin6_addr),ip,ip_len);// 端口号转换为主机字节序if (port) *port = ntohs(s->sin6_port);}return fd; // 返回新连接的文件描述符
}

使用Linux底层accept函数接受客户端连接创建出客户端连接的cfd文件描述符。

/*** 尝试接受一个网络连接。** @param err 用于存储错误信息的字符数组。* @param s 监听套接字的文件描述符。* @param sa 用于接收客户端地址的结构体指针。* @param len 指向客户端地址结构体长度的指针。* @return 成功时返回新创建的客户端连接的文件描述符,失败时返回ANET_ERR。*/
static int anetGenericAccept(char *err, int s, struct sockaddr *sa, socklen_t *len) {int fd;while(1) {fd = accept(s,sa,len); // 尝试接受一个连接。if (fd == -1) { // 如接受失败,则根据错误码处理。if (errno == EINTR) // 如果是因为系统调用被中断而失败,则继续尝试。continue;else {anetSetError(err, "accept: %s", strerror(errno)); // 设置错误信息。return ANET_ERR; // 返回错误码。}}break; // 成功接受连接,退出循环。}return fd; // 返回新创建的文件描述符。
}

初始化cfd客户端连接,注册相应的事件处理函数,创建出对应客户端cfd的socket连接,并设置为accepting状态

/*** 创建一个已与接受的连接关联的新套接字类型连接。* 在调用 connAccept() 并执行连接级接受处理程序之前,套接字尚未准备好用于 IO。* 调用方应使用 connGetState() 并验证创建的连接未处于错误状态(这对于套接字连接是不可能的,但对于其他协议来说可能是可能的)。** @param fd 文件描述符,表示已接受的连接的套接字。* @return 返回创建的连接对象的指针。该连接对象与给定的文件描述符关联,并处于接受状态。*/
connection *connCreateAcceptedSocket(int fd) {// 创建一个新的连接对象connection *conn = connCreateSocket();// 将文件描述符赋值给连接对象conn->fd = fd;// 设置连接对象的状态为接受中conn->state = CONN_STATE_ACCEPTING;return conn;
}

acceptCommonHandler创建客户端

acceptCommonHandler 函数会调用 createClient 函数(在 networking.c 文件中)创建客户端。
/** 处理普通连接请求。* * @param conn 连接对象,包含连接的相关信息和操作函数。* @param flags 连接标志,用于配置连接的行为或属性。* @param ip 客户端的IP地址,该参数在此函数中未使用。*/
static void acceptCommonHandler(connection *conn, int flags, char *ip) {***// 尝试创建客户端连接对象。if ((c = createClient(conn)) == NULL) {serverLog(LL_WARNING,"Error registering fd event for the new client: %s (conn: %s)",connGetLastError(conn),connGetInfo(conn, conninfo, sizeof(conninfo)));connClose(conn); /* May be already closed, just ignore errors */return;}***
}

createClient函数会设置客户端连接的回调函数readQueryFromClient。好了,到这里,事件驱动框架就增加了对一个客户端已连接套接字的监听。一旦客户端有请求发送到 server,框架就会回调 readQueryFromClient 函数处理请求。这样一来,客户端请求就能通过事件驱动框架进行处理了。

client *createClient(connection *conn) {client *c = zmalloc(sizeof(client));/* passing NULL as conn it is possible to create a non connected client.* This is useful since all the commands needs to be executed* in the context of a client. When commands are executed in other* contexts (for instance a Lua script) we need a non connected client. */if (conn) {//将连接设置为非阻塞模式。connNonBlock(conn);//启用TCP_NODELAY选项,禁用Nagle算法。connEnableTcpNoDelay(conn);//如果服务器的tcpkeepalive选项为真,则设置连接的KeepAlive属性。if (server.tcpkeepalive)connKeepAlive(conn,server.tcpkeepalive);//设置连接的读取处理器为readQueryFromClient。connSetReadHandler(conn, readQueryFromClient);//设置客户端管道的管理对象对c指针connSetPrivateData(conn, c);}***   
}

2)命令读取 readQueryFromClient

readQueryFromClient 函数会从客户端连接的 socket 中,读取最大为 readlen(16KB)长度的数据。如果当前客户端是主从复制中的主节点,此函数还会把读取的数据,追加到用于主从节点命令同步的缓冲区中。最后,此函数会调用processInputBuffer 函数,这进入到了命令处理的下一个阶段,也就是命令解析阶段。

从客户端读取查询请求。

/*** 从客户端读取查询请求** 该函数用于处理来自客户端的查询请求,将请求数据读入到连接的查询缓冲区中,并在适当的情况下执行进一步的处理。** @param conn 连接结构指针,指向与客户端建立的连接。*/
void readQueryFromClient(connection *conn) {// 获取客户端结构体client *c = connGetPrivateData(conn);int nread, readlen;size_t qblen;/* 检查是否需要在退出事件循环后从客户端读取数据,这在启用线程I/O时会发生。 */if (postponeClientRead(c)) return;/* 更新服务器读取次数统计 */atomicIncr(server.stat_total_reads_processed, 1);readlen = PROTO_IOBUF_LEN;/* 如果是多bulk请求,并且我们正在处理一个足够大的bulk回复,尝试最大化查询缓冲区中恰好包含* SDS字符串表示对象的概率,即使这可能导致更多的read(2)调用。这样,函数processMultiBulkBuffer()* 可以避免复制缓冲区来创建表示参数的Redis Object。 */if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1&& c->bulklen >= PROTO_MBULK_BIG_ARG){ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);/* 注意,'remaining'变量在某些边界情况下可能为零,* 例如在我们恢复被CLIENT PAUSE阻塞的客户端之后。 */if (remaining > 0 && remaining < readlen) readlen = remaining;}qblen = sdslen(c->querybuf);if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;// 为查询缓冲区预留更多空间c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);nread = connRead(c->conn, c->querybuf+qblen, readlen);// 根据读取结果进行相应处理if (nread == -1) {if (connGetState(conn) == CONN_STATE_CONNECTED) {return;} else {serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn));freeClientAsync(c);return;}} else if (nread == 0) {serverLog(LL_VERBOSE, "Client closed connection");freeClientAsync(c);return;} else if (c->flags & CLIENT_MASTER) {/* 将查询缓冲区追加到主服务器的待处理(未应用)缓冲区中。我们稍后将使用这个缓冲区,* 以获取最后执行的命令应用的字符串副本。 */c->pending_querybuf = sdscatlen(c->pending_querybuf,c->querybuf+qblen,nread);}sdsIncrLen(c->querybuf,nread);c->lastinteraction = server.unixtime;if (c->flags & CLIENT_MASTER) c->read_reploff += nread;// 更新网络输入字节统计atomicIncr(server.stat_net_input_bytes, nread);// 检查查询缓冲区长度是否超过最大值if (sdslen(c->querybuf) > server.client_max_querybuf_len) {sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();bytes = sdscatrepr(bytes,c->querybuf,64);serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);sdsfree(ci);sdsfree(bytes);freeClientAsync(c);return;}/* 如果客户端输入缓冲区中还有更多数据,继续解析以检查是否存在完整的命令需要执行。 */processInputBuffer(c);
}

3)命令解析 processInputBuffer

首先,processInputBuffer 函数会执行一个 while 循环,不断地从客户端的输入缓冲区中读取数据。然后,它会判断读取到的命令格式,是否以“*”开头。

如果命令是以“*”开头,那就表明这个命令是 PROTO_REQ_MULTIBULK 类型的命令请求,也就是符合 RESP 协议(Redis 客户端与服务器端的标准通信协议)的请求。那么,processInputBuffer 函数就会进一步调用 processMultibulkBuffer(在 networking.c 文件中)函数,来解析读取到的命令。

void processInputBuffer(client *c) {***if (!c->reqtype) {if (c->querybuf[c->qb_pos] == '*') {c->reqtype = PROTO_REQ_MULTIBULK;} else {c->reqtype = PROTO_REQ_INLINE;}}***
}

而如果命令不是以“*”开头,那则表明这个命令是 PROTO_REQ_INLINE 类型的命令请求,并不是 RESP 协议请求。这类命令也被称为管道命令,命令和命令之间是使用换行符“\r\n”分隔开来的。比如,我们使用 Telnet 发送给 Redis 的命令,就是属于PROTO_REQ_INLINE 类型的命令。在这种情况下,processInputBuffer 函数会调用processInlineBuffer(在 networking.c 文件中)函数,来实际解析命令。

void processInputBuffer(client *c) {***/* 处理内联缓冲区 */if (c->reqtype == PROTO_REQ_INLINE) {// 处理内联缓冲区。if (processInlineBuffer(c) != C_OK)break;/* 如果启用了Gopher模式且我们得到了零个或一个参数,那么以Gopher模式处理请求 */if (server.gopher_enabled && !server.io_threads_do_reads &&((c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '/') ||c->argc == 0)){processGopherRequest(c);resetClient(c);c->flags |= CLIENT_CLOSE_AFTER_REPLY;break;}}***
}

4)命令执行 processCommand

等命令解析完成后,processInputBuffer 函数就会调用 processCommand 函数,开始进入命令处理的第三个阶段,也就是命令执行阶段。

void processInputBuffer(client *c) {/* 最终准备执行命令 */if (processCommandAndResetClient(c) == C_ERR) {/* 如果客户端不再有效,我们避免退出此循环并在稍后修剪客户端缓冲区。* 因此,在这种情况下,我们尽快返回。 */return;}
=  
}

此函数调用 processCommand(),但在该上下文中还为客户端执行了一些有用的任务:

  1. 将当前客户端设置为客户端 ‘c’。
  2. 如果命令被处理,则调用 commandProcessed()。
    如果作为处理命令的副作用释放了客户端,则函数返回 C_ERR,否则返回 C_OK。
    @param c 待处理命令的客户端指针。
    @return 如果客户端因处理命令而被释放,则返回 C_ERR,否则返回 C_OK。
int processCommandAndResetClient(client *c) {int deadclient = 0; // 标记客户端是否被释放client *old_client = server.current_client; // 保存当前客户端,以便后续恢复server.current_client = c; // 将当前客户端设置为指定的客户端 cif (processCommand(c) == C_OK) {commandProcessed(c); // 命令处理成功,调用 commandProcessed() 函数}if (server.current_client == NULL) deadclient = 1; // 检查当前客户端是否被设置为 NULL,即是否被释放// 恢复原来的客户端设置,因为如果脚本超时,从 processEventsWhileBlocked 调用此处时// 需要恢复 current_client,否则可能错误地指示客户端已死,并停止读取其缓冲区。server.current_client = old_client;// performEvictions 可能会刷新从属输出缓冲区,这可能导致从属(可能为当前活动客户端)被释放。return deadclient ? C_ERR : C_OK; // 根据 deadclient 的值返回 C_ERR 或 C_OK
}
  • processCommand命令执行
    如果调用此函数,我们已经读取了整个命令,参数位于客户端 argvargc 字段中。processCommand() 执行命令或准备服务器以从客户端批量读取。
int processCommand(client *c) {*****// 第一步,调用模块命令过滤器,将 Redis 命令替换成 module 中想要替换的命令。moduleCallCommandFilters(c);/* 第二步,特殊处理QUIT命令。普通命令进程将检查复制情况,而 QUIT 在启用FORCE_REPLICATION时会引起麻烦,并将在常规命令进程中实现。 */if (!strcasecmp(c->argv[0]->ptr,"quit")) {/* 第三步,现在查找命令并尽快检查有关小错误情况,例如错误的 arity、错误的命令名称等。 在全局变量 server的 commands 成员变量中查找相关的命令。*/c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);    *****//如果客户端有CLIENT_MULTI标记,并且当前不是exec、discard、multi和watch命令//集群节点可能收到 MULTI 命令,而 MULTI 命令表示紧接着它的多条命令是需要作为一个事务来执行的。当 Redis server 收到客户端发送的 MULTI 命令后,它会调用 MULTI命令的处理函数 multiCommandif (c->flags & CLIENT_MULTI &&c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&c->cmd->proc != multiCommand && c->cmd->proc != watchCommand &&c->cmd->proc != resetCommand){//将命令入队保存,等待后续一起处理queueMultiCommand(c);//将待返回结果写入客户端输出缓冲区。addReply(c,shared.queued);} else {//调用call函数执行命令call(c,CMD_CALL_FULL);c->woff = server.master_repl_offset;if (listLength(server.ready_keys))handleClientsBlockedOnKeys();}return C_OK;
}

首先,我们要知道,processCommand 函数是在server.c文件中实现的。它在实际执行命令前的主要逻辑可以分成三步:

  • 第一步,processCommand 函数会调用 moduleCallCommandFilters 函数(在module.c文件),将 Redis 命令替换成 module 中想要替换的命令。
  • 第二步,processCommand 函数会判断当前命令是否为 quit 命令,并进行相应处理。
  • 第三步,processCommand 函数会调用 lookupCommand 函数,在全局变量 server的 commands 成员变量中查找相关的命令。

这里,你需要注意下,全局变量 server 的 commands 成员变量是一个哈希表,它的定义是在server.h文件中的 redisServer 结构体里面,如下所示:

struct redisServer {dict *commands;
}

另外,commands 成员变量的初始化是在 initServerConfig 函数中,通过调用
dictCreate 函数完成哈希表创建,再通过调用 populateCommandTable 函数,将 Redis提供的命令名称和对应的实现函数,插入到哈希表中的。
redisCommandTable 数组是在 server.c 文件中定义的,它的每一个元素是一个redisCommand 结构体类型的记录,对应了 Redis 实现的一条命令。也就是说,redisCommand 结构体中就记录了当前命令所对应的实现函数是什么。

struct redisCommand redisCommandTable[] = {...{"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},{"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},...
}

比如setCommand,SET 命令对应的实现函数是 setCommand,这是在t_string.c文件中定义的。
setCommand 函数首先会对命令参数进行判断,比如参数是否带有 NX、EX、XX、PX 等这类命令选项,如果有的话,setCommand 函数就会记录下这些标记。

然后,setCommand 函数会调用 setGenericCommand 函数,这个函数也是在t_string.c 文件中实现的。setGenericCommand 函数会根据刚才 setCommand 函数记录的命令参数的标记,来进行相应处理。比如,如果命令参数中有 NX 选项,那么,setGenericCommand 函数会调用lookupKeyWrite 函数(在db.c文件中),查找要执行 SET 命令的 key 是否已经存在。

如果这个 key 已经存在了,那么 setGenericCommand 函数就会调用 addReply 函数,返回 NULL 空值,而这也正是符合分布式锁的语义的。

那么如果 SET 命令可以正常执行的话,也就是说命令带有 NX 选项但是 key 并不存在,或者带有 XX 选项但是 key 已经存在,这样 setGenericCommand 函数就会调用setKey 函数(在 db.c 文件中)来

完成键值对的实际插入,如下所示:

setKey(c->db,key,val); 

然后,如果命令设置了过期时间,setGenericCommand 函数还会调用 setExpire 函数设置过期时间。最后,setGenericCommand 函数会调用 addReply 函数,将结果返回给客户端,如下所示:

addReply(c, ok_reply ? ok_reply : shared.ok);
  • commandProcessed重置客户端
    重置客户端,使其准备好处理下一个命令。
    如果这个请求命令是从服务器发送的命令,需要将主服务器缓冲区中的数据复制到从服务器。
/* 执行命令后执行必要的任务:** 1. 除非有理由避免这样做,否则客户端会重置。* 2. 对于主客户端,将更新复制偏移量。* 3. 将我们从主服务器获得的命令传播到副本。*/
void commandProcessed(client *c) {/* 如果客户端被阻止(包括暂停),只需返回避免重置和复制。** 1. 不要重置被阻止客户端的客户端结构,以便回复回调仍能够访问客户端 argv 和 argc 字段。客户端将在 unblockClient() 中重置。* 2. 不要更新复制偏移量或将命令传播到副本,因为我们尚未应用该命令。*/if (c->flags & CLIENT_BLOCKED)return;//resetClient 准备客户端以处理下一个命令resetClient(c);long long prev_offset = c->reploff;if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {/* Update the applied replication offset of our master. */c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;}/* 如果客户端是主客户端,我们需要计算处理缓冲区之前和之后应用的偏移量之间的差异,以了解实际应用于主状态的复制流的量:* 此数量及其复制流的相应部分将传播到子副本和复制积压工作。 */if (c->flags & CLIENT_MASTER) {long long applied = c->reploff - prev_offset;// 如果操作已应用,则从主服务器的流中复制数据给从服务器,并更新待处理查询缓冲区if (applied) {// 向从服务器复制数据replicationFeedSlavesFromMasterStream(server.slaves, c->pending_querybuf, applied);// 更新查询缓冲区,去除已经应用的部分sdsrange(c->pending_querybuf,applied,-1);}}
}

5)数据写回addReply

addReply 函数是在 networking.c 文件中定义的。它的执行逻辑比较简单,主要是调用prepareClientToWrite 函数,并在 prepareClientToWrite 函数中调用clientInstallWriteHandler 函数,将待写回客户端加入到全局变量 server 的clients_pending_write 列表中。

然后,addReply 函数会调用 _addReplyToBuffer 等函数(在 networking.c 中),将要返回的结果添加到客户端的输出缓冲区中。

好,现在你就了解一条命令是如何从读取,经过解析、执行等步骤,最终将结果返回给客户端的了。下图展示了这个过程以及涉及的主要函数,你可以再回顾下。

06 写事件处理

而在 Redis 事件驱动框架每次循环进入事件处理函数前,也就是在框架主函数 aeMain 中调用 aeProcessEvents,来处理监听到的已触发事件或是到时的时间事件之前,都会调用server.c 文件中的 beforeSleep 函数,进行一些任务处理,这其中就包括了调用handleClientsWithPendingWrites 函数,它会将 Redis sever 客户端缓冲区中的数据写回客户端。
#启动时候定义

void initServer(void) {****/** 设置在事件循环进入睡眠前调用的函数* 参数:*     server.el - 指向事件循环系统的指针*     beforeSleep - 睡眠前调用的函数指针*/aeSetBeforeSleepProc(server.el,beforeSleep);/** 设置在事件循环醒来后调用的函数* 参数:*     server.el - 指向事件循环系统的指针*     afterSleep - 睡眠后调用的函数指针*/aeSetAfterSleepProc(server.el,afterSleep);****
}

1)数据写回客户端

aeProcessEvents调用的方法内部使用,beforesleep、aftersleep

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{***if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)eventLoop->beforesleep(eventLoop);/* 调用多路复用 API,将仅在超时或某些事件触发时返回。 */numevents = aeApiPoll(eventLoop, tvp);/* After sleep callback. */if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)eventLoop->aftersleep(eventLoop);***
}

beforeSleep 函数调用的 handleClientsWithPendingWrites 函数,会遍
历每一个待写回数据的客户端,然后调用 writeToClient 函数,将客户端输出缓冲区中的数据写回。

/** 在Redis进入事件驱动库的主要循环之前调用此函数,即在为就绪文件描述符休眠之前。* * 注意:此函数目前由两个函数调用:* 1. aeMain - 主服务器循环* 2. processEventsWhileBlocked - 在RDB/AOF加载期间处理客户端** 如果是从processEventsWhileBlocked调用的,我们不希望执行所有操作(例如,我们不想过期键),但我们确实需要执行一些操作。** 最重要的是freeClientsInAsyncFreeQueue,但我们也调用一些其他低风险的函数。 */
void beforeSleep(struct aeEventLoop *eventLoop) {UNUSED(eventLoop); // 忽略传入的eventLoop参数,未使用。size_t zmalloc_used = zmalloc_used_memory();if (zmalloc_used > server.stat_peak_memory)server.stat_peak_memory = zmalloc_used; // 更新内存使用峰值。/* 如果我们是在processEventsWhileBlocked()中重新进入事件循环,* 只调用一组关键函数。注意,在这种情况下,我们跟踪处理的事件数量,* 因为processEventsWhileBlocked()希望在不再有事件处理时尽快停止。 */if (ProcessingEventsWhileBlocked) {// 用于累计处理的事件数量uint64_t processed = 0;// 使用线程处理有待读取数据的客户端processed += handleClientsWithPendingReadsUsingThreads();// 处理待处理的TLS数据processed += tlsProcessPendingData();// 处理有待写入数据的客户端processed += handleClientsWithPendingWrites();// 释放异步自由队列中的客户端processed += freeClientsInAsyncFreeQueue();// 更新服务器统计的处理事件数量server.events_processed_while_blocked += processed;return; }***
}

遍历列表中的每个客户端,尝试将缓冲区数据写入客户端套接字。

/** 在进入事件循环之前调用此函数,希望可以直接将回复写入客户端输出缓冲区,* 而无需使用系统调用来安装可写事件处理程序、调用它等。** 参数:无* 返回值:处理的具有待写数据的客户端数量。*/
int handleClientsWithPendingWrites(void) {listIter li;listNode *ln;int processed = listLength(server.clients_pending_write); // 初始化已处理的客户端数量为待写客户端列表的长度listRewind(server.clients_pending_write,&li); // 重置待写客户端列表的迭代器while((ln = listNext(&li))) { // 遍历列表中的每个客户端client *c = listNodeValue(ln);c->flags &= ~CLIENT_PENDING_WRITE; // 清除客户端的待写标志listDelNode(server.clients_pending_write,ln); // 从列表中删除当前客户端/* 如果客户端被保护,则不进行任何操作,以避免触发写错误或重新创建处理程序 */if (c->flags & CLIENT_PROTECTED) continue;/* 如果客户端即将关闭,则不对其进行写操作 */if (c->flags & CLIENT_CLOSE_ASAP) continue;/* 尝试将缓冲区数据写入客户端套接字 */if (writeToClient(c,0) == C_ERR) continue; // 如果写入失败,则继续处理下一个客户端/* 如果在上述同步写入后仍有数据需要输出到客户端,则需要安装可写事件处理程序 */if (clientHasPendingReplies(c)) {int ae_barrier = 0;/* 对于fsync=always策略,我们希望在接收查询和向客户端提供服务的同一事件循环迭代中,* 确保给定的文件描述符既不会被用于读也不会被用于写,这样在中间阶段调用beforeSleep()* 时才能真正将AOF数据同步到磁盘。写屏障确保了这一点。 */if (server.aof_state == AOF_ON &&server.aof_fsync == AOF_FSYNC_ALWAYS){ae_barrier = 1;}//继续通过sendReplyToClient调用writeToClient写回客户端if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_barrier) == C_ERR) {freeClientAsync(c); // 如果设置写处理程序失败,则异步释放客户端}}}return processed; // 返回处理的客户端数量
}

将数据从输出缓冲区写入客户端。

int writeToClient(client *c, int handler_installed) {/* 更新服务器上的总写次数 */atomicIncr(server.stat_total_writes_processed, 1);ssize_t nwritten = 0, totwritten = 0;size_t objlen;clientReplyBlock *o;/* 当客户端有等待回复时循环处理 */while(clientHasPendingReplies(c)) {if (c->bufpos > 0) {/* 尝试写入缓冲区中未发送的部分 */nwritten = connWrite(c->conn,c->buf+c->sentlen,c->bufpos-c->sentlen);if (nwritten <= 0) break;***
}

未完待续…

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/816895.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

香港科技大学广州|智能制造学域博士招生宣讲会—广州大学城专场

香港科技大学广州&#xff5c;智能制造学域博士招生宣讲会—广州大学城专场 时间&#xff1a;2024年4月18日&#xff08;星期四&#xff09;14:30 地点&#xff1a;广州市大学城雅乐轩酒店二楼策略2厅&#xff08;地铁大学城南站C口&#xff09; 报名链接&#xff1a;https:/…

记录西门子:增量编码器使用

编码器功能实现&#xff1a; 1、显示角度0~360 2、显示编码器速度 3、掉电保持当前角度 4、一键定位功能---改变当前角度为180 5、通过Z相不断纠偏角度 实物编码器&#xff1a; 接线图&#xff1a; 接到PLC的高速计数点位 方案一&#xff1a;200-Smart 方案二&#xff1a;1…

AI大模型探索之路-提升篇2:一文掌握AI大模型的核心-注意力机制

目录 前言 一、注意力机制简介 二、注意力机制的工作原理 三、注意力机制的变体 1、自注意力&#xff08;Self-Attention&#xff09; 2、双向注意力&#xff08;Bidirectional Attention&#xff09; 3、多头注意力&#xff08;Multi-Head Attention&#xff09; ​4、…

数据结构课程设计选做(三)---公共钥匙盒(线性表,栈,队列)

2.3.1 题目内容 2.3.1-A [问题描述] 有一个学校的老师共用N个教室&#xff0c;按照规定&#xff0c;所有的钥匙都必须放在公共钥匙盒里&#xff0c;老师不能带钥匙回家。每次老师上课前&#xff0c;都从公共钥匙盒里找到自己上课的教室的钥匙去开门&#xff0c;上完课后&…

pycharm debug 的时候 waiting for process detach

当你使用pycharm debug或者run的时候&#xff0c;突然出现了点不动&#xff0c;然后一直显示&#xff1a;waiting for process detach 可能是以下问题&#xff1a; 1、需要设置Gevent compatible pycharm一直没显示运行步骤&#xff0c;只是出现waiting for process detach-C…

利用Sentinel解决雪崩问题(一)流量控制

1、解决雪崩问题的常见方式有四种: 超时处理:设定超时时间&#xff0c;请求超过一定时间没有响应就返回错误信息&#xff0c;不会无休止等待;舱壁模式:限定每个业务能使用的线程数&#xff0c;避免耗尽整个tomcat的资源&#xff0c;因此也叫线程隔离;熔断降级:由断路器统计业务…

demo(四)nacosgateway(2)gatewayspringsercurity

一、思路 1、整体思路 用户通过客户端访问项目时&#xff0c;前端项目会部署在nginx上&#xff0c;加载静态文件时直接从nginx上返回即可。当用户在客户端操作时&#xff0c;需要调用后端的一些服务接口。这些接口会通过Gateway网关&#xff0c;网关进行一定的处理&#xff0…

Thingsboard PE 白标的使用

只有专业版支持白标功能。 使用 ThingsBoard Cloud 或安装您自己的平台实例。 一、介绍 ThingsBoard Web 界面提供了简便的操作,让您能够轻松配置您的公司或产品标识和配色方案,无需进行编码工作或重新启动服务。 系统管理员、租户和客户管理员可以根据需要自定义配色方案、…

精通技术写作:如何写出高质量技术文章?

CSDN 的朋友你们好&#xff0c;我是未来&#xff0c;今天给大家带来专栏【程序员博主教程&#xff08;完全指南&#xff09;】的第 7 篇文章“如何撰写高质量技术文章”。本文深入探讨了如何写好一篇技术文章。文章给出了好的技术文章的定义和分析&#xff0c;并提供了从选题、…

Day103:漏洞发现-漏扫项目篇Poc开发Rule语法反链判断不回显检测Yaml生成

目录 Xray&Afrog-Poc开发-环境配置&编写流程 Xray-Poc开发-数据回显&RCE不回显&实验室 Afrog-Poc开发-数据回显&RCE不回显&JDNI注入 HTTP/S数据回显Poc开发-CVE-2023-28432 HTTP/S不回显RCE-Poc开发-CVE-2022-30525 HTTP/S不回显JNDI-Poc开发 知…

Upload-labs(Pass-14 - Pass-16)

Pass-14 &#xff08;图片马&#xff0c;判断文件类型&#xff09; 图片的格式在防护中通常是不会使用后缀进行判断的依据&#xff0c;文件头是文件开头的一段二进制码&#xff0c;不同类型的图片也就会有不同的二进制头。   JPEG (jpg)&#xff0c;文件头&#xff1a;FF D…

便携式污水采样器的工作环境要求

便携式污水采样器的工作环境要求极为严格&#xff0c;以确保其能够准确、稳定地采集和分析水样。首先&#xff0c;该采样器必须在干燥、通风良好的环境中工作&#xff0c;以避免潮湿和高温对其内部电子元件的损害。同时&#xff0c;为了保证采样器的稳定性和精度&#xff0c;工…

【数据结构(六)】队列

❣博主主页: 33的博客❣ ▶️文章专栏分类:数据结构◀️ &#x1f69a;我的代码仓库: 33的代码仓库&#x1f69a; &#x1faf5;&#x1faf5;&#x1faf5;关注我带你学更多数据结构知识 目录 1.前言2.概念3.队列的使用4.循环队列5.双端队列6.经典习题6.1队列实现栈6.2栈实现队…

一款挺不错网站维护页面HTML源码

一款挺不错网站维护页面源码&#xff0c;单HTML不需要数据库&#xff0c;上传到你的虚拟机就可以用做维护页面还不错&#xff0c;用处多。。 源码下载 一款挺不错网站维护页面源码

LangChain LangServe 学习笔记

LangChain LangServe 学习笔记 0. 引言1. LangServe 概述2. 特性3. 限制4. 安装5. 示例应用程序6. OpenAPI文档7. Python SDK 客户端8. Playground9. 聊天可运行页面 0. 引言 使用 LangServe 可以立即将您的LLM应用程序变成 API 服务器。 LangServe 使用 FastAPI 构建&#x…

three.js(1):three.js简介

1 什么是three.js three.js&#xff0c;一个WebGL引擎&#xff0c;基于JavaScript&#xff0c;可直接运行GPU驱动游戏与图形驱动应用于浏览器。其库提供的特性与API以绘制3D场景于浏览器。 2 下载地址 three.js下载地址:https://github.com/mrdoob/three.js 3 目录介绍 下载…

【题目】【信息安全管理与评估】2022年国赛高职组“信息安全管理与评估”赛项样题5

【题目】【信息安全管理与评估】2022年国赛高职组“信息安全管理与评估”赛项样题5 第一阶段竞赛项目试题 本文件为信息安全管理与评估项目竞赛-第一阶段试题&#xff0c;第一阶段内容包括&#xff1a;网络平台搭建与设备安全防护。 本次比赛时间为180分钟。 介绍 竞赛阶段…

浅谈函数 fscanf/sscanf 和 fprintf/sprintf

目录 一&#xff0c;fprintf 的介绍和使用1. 函数介绍2. 函数使用 二&#xff0c;fscanf 的介绍和使用1. 函数介绍2. 函数使用 三&#xff0c;sprintf 的介绍和使用1. 函数介绍2. 函数使用 四&#xff0c;sscanf 的介绍和使用1&#xff0c;函数介绍2&#xff0c;函数使用 五&am…

SSL Pinning之双向认证

双向认证处理流程 概述获取证书逆向app 获取证书的KeyStore的 key通过jadx 反编译 app 获取证书&#xff1a;frida hook 证书转换命令行转换portecle 工具使用 charles 配置 p12 格式证书 概述 本篇只介绍怎么解决ssl pinning&#xff0c; 不讲ssl/tls 原理。 为了解决ssl pinn…

RT-Thread 多级目录 scons 构建

前言 RT-Thread 默认使用 scons 进行工程的构建&#xff0c;虽然 RT-Thread BSP 中的 hello world 例程比较简单&#xff0c;实际项目开发&#xff0c;可能源码的工程会由多级目录&#xff0c;如何让多级的目录参与构建&#xff1f; scons 构建时&#xff0c;除了依赖工程的根…