目录
1. struct connection
ConnectionType属性
创建connection
2. struct client
3. 绑定客户端回调函数的流程
3.1. 读事件回调函数的设置
3.2. 写事件回调函数的设置
3.3. connSocketEventHandler函数
3.4. Redis5版本的设置回调函数
3.5. 个人的一些想法,修改源码
4. 总结设置客户端回调函数的流程
读事件回调函数的设置
写事件回调函数的设置流程
5. 回调函数的调用流程
客户端
服务器端
本文章主要内容是下面两部分:
- 两个结构体:connection、client
- 设置回调函数的注意点和函数connSocketEventHandler
1. struct connection
什么时候使用了connection?在创建一个客户端时候,会同时创建一个connection来绑定该fd。
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {int cport, cfd, max = MAX_ACCEPTS_PER_CALL;while(max--) {cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);.............acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);}
}//创建connection
connection *connCreateAcceptedSocket(int fd) {connection *conn = connCreateSocket();conn->fd = fd;conn->state = CONN_STATE_ACCEPTING;return conn;
}
该结构是一个完成的连接,客户端的fd封装成一个connection。
struct connection {ConnectionType *type;ConnectionState state; //表示该客户端当前的连接状态short int flags;short int refs; //该连接被引用的数量int last_errno; //该连接的最终错误void *private_data; //在网络这部分,可以认为是结构体client//一些对应的回调函数ConnectionCallbackFunc conn_handler;ConnectionCallbackFunc write_handler;ConnectionCallbackFunc read_handler;int fd; //该客户端对应的fd
};//回调函数的类型
typedef void (*ConnectionCallbackFunc)(struct connection *conn);//connection的flags的值
#define CONN_FLAG_CLOSE_SCHEDULED (1<<0) /* Closed scheduled by a handler */
#define CONN_FLAG_WRITE_BARRIER (1<<1) /* Write barrier requested */
//一般是先执行读事件,之后再执行写事件,但是想要置换顺序的话,其flags置为CONN_FLAG_WRITE_BARRIER,就可以换顺序了//表示客户端当前的连接状态
typedef enum {CONN_STATE_NONE = 0,CONN_STATE_CONNECTING,CONN_STATE_ACCEPTING,CONN_STATE_CONNECTED,CONN_STATE_CLOSED,CONN_STATE_ERROR
} ConnectionState;
ConnectionType属性
connection有个ConnectionType属性,这里是一堆接口(函数的第一个参数都是connection)
,而struct connection
是操作对象。
那么该结构与ConnectionType
配合使用。不同ConnectionType的connection就会有不同的接口。
typedef struct ConnectionType {void (*ae_handler)(struct aeEventLoop *el, int fd, void *clientData, int mask);int (*connect)(struct connection *conn, const char *addr, int port, const char *source_addr, ConnectionCallbackFunc connect_handler);int (*write)(struct connection *conn, const void *data, size_t data_len);int (*read)(struct connection *conn, void *buf, size_t buf_len);void (*close)(struct connection *conn);int (*accept)(struct connection *conn, ConnectionCallbackFunc accept_handler);int (*set_write_handler)(struct connection *conn, ConnectionCallbackFunc handler, int barrier);int (*set_read_handler)(struct connection *conn, ConnectionCallbackFunc handler);.........................int (*get_type)(struct connection *conn);
} ConnectionType;//这里有个要点需要留意:所有的函数类型的参数都是struct connecton* 开头的,
//但是只有ae_handler类型不是,其参数没有connection的,其是在参数clientData位置上
为什么要有这个类型ConnectionType呢?是因为Redis中默认有两种类型的connection。感觉像是面向对象的,继承,不同类型的connection会有不同的方法。
Redis从版本6开始支持SSL / TLS,这是一项可选功能,需要在编译时启用。所以才弄了两种类型。
//要对这些接口有印象,后续就是使用这些接口的ConnectionType CT_Socket = {//这些都是函数,比如把函数connSocketEventHandler赋值给ae_hander.ae_handler = connSocketEventHandler, .close = connSocketClose,.write = connSocketWrite,.read = connSocketRead,.accept = connSocketAccept,.connect = connSocketConnect,.set_write_handler = connSocketSetWriteHandler,.set_read_handler = connSocketSetReadHandler,.............................................get_type = connSocketGetType
};//tls.c
#ifdef USE_OPENSSL
//需要定义了USE_OPENSSL,这个CT_TLS才会生效
ConnectionType CT_TLS = {.ae_handler = tlsEventHandler,.accept = connTLSAccept,.connect = connTLSConnect,.blocking_connect = connTLSBlockingConnect,.read = connTLSRead,.write = connTLSWrite,.close = connTLSClose,.set_write_handler = connTLSSetWriteHandler,.set_read_handler = connTLSSetReadHandler..........................................................get_type = connTLSGetType
};
创建connection
那编译时候不使用TLS的,那创建的connection的type就是CT_Socket类型。从源码可知,所以后面我们就关注CT_Socket的接口就行。
connection *connCreateAcceptedSocket(int fd) {connection *conn = connCreateSocket();conn->fd = fd; //设置对应的fdconn->state = CONN_STATE_ACCEPTING; //设置状态return conn;
}connection *connCreateSocket() {connection *conn = zcalloc(sizeof(connection));conn->type = &CT_Socket; //这个重点,是CT_Socket类型conn->fd = -1;return conn;
}//tls.c
static connection *createTLSConnection(int client_side) {SSL_CTX *ctx = redis_tls_ctx;if (client_side && redis_tls_client_ctx)ctx = redis_tls_client_ctx;tls_connection *conn = zcalloc(sizeof(tls_connection));conn->c.type = &CT_TLS; //这个就是CT_TLS类型的conn->c.fd = -1; conn->ssl = SSL_new(ctx);return (connection *) conn;
}
2. struct client
什么时候使用到client?还是从创建一个客户端acceptTcpHandler开始。
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {........................acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
}
static void acceptCommonHandler(connection *conn, int flags, char *ip) {..............................client *c = createClient(conn); //创建client
}
client *createClient(connection *conn) {client *c = zmalloc(sizeof(client));if (conn) {.........................connSetReadHandler(conn, readQueryFromClient); //设置读事件回调函数connSetPrivateData(conn, c); //把client变量c赋值给conection->privateData}//初始化client的一些变量................if (conn) linkClient(c); //把该客户端添加到服务器server.client链表中保存
}
Redis使用结构体client存储客户端连接的所有信息。这里面就包括了客户端对应的connection。
//redis5版本的是有fd,而redis6版本的用connection替代了fd
typedef struct client {uint64_t id; /* Client incremental unique ID. */connection *conn; //客户对应的connectionint resp; /* RESP protocol version. Can be 2 or 3. */redisDb *db; //select命令选择的数据库对象//从客户端读取的数据存储的位置,即输入缓冲区sds querybuf; /* Buffer we use to accumulate client queries. */size_t qb_pos; /* The position we have read in querybuf. */// 命令和命令参数int argc; /* Num of arguments of current command. */robj **argv; /* Arguments of current command. */struct redisCommand *cmd; //待执行的命令int reqtype; /* Request protocol type: PROTO_REQ_* */int multibulklen; /* Number of multi bulk arguments left to read. */long bulklen; /* Length of bulk argument in multi bulk request. *///回复客户端数据的链表list *reply; /* List of reply objects to send to the client. */unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */size_t sentlen; /* Amount of bytes already sent in the currentbuffer or object being sent. */uint64_t flags; /* 客户端标识,Client flags: CLIENT_* macros. */.............../* Response buffer */ //回复客户端数据的地方,即输出缓冲区,若是不够空间,就存放在reply中int bufpos;char buf[PROTO_REPLY_CHUNK_BYTES];
} client;
3. 绑定客户端回调函数的流程
3.1. 读事件回调函数的设置
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {int cport, cfd, max = MAX_ACCEPTS_PER_CALL;..........while(max--) {cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);........................acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);}
}static void acceptCommonHandler(connection *conn, int flags, char *ip) {............................../* Create connection and client */client *c = createClient(conn);
}
client *createClient(connection *conn) {client *c = zmalloc(sizeof(client));if (conn) {.........................connSetReadHandler(conn, readQueryFromClient); //设置读事件回调函数connSetPrivateData(conn, c); //把client变量c赋值给conection->privateData}
}static inline int connSetReadHandler(connection *conn, ConnectionCallbackFunc func) {return conn->type->set_read_handler(conn, func);
}
调用函数connSetReadHandler设置读事件回调函数。看到该函数的实现,可能会比较疑惑。所以这时就需要关联上面讲的ConnectionType属性,其是CT_Socket。所以我们查看到CT_Socket的set_read_handler是函数 connSocketSetReadHandler。
那么connSetReadHandler的实现就变成如下。那么其最终也是调用aeCreateFileEvent来创建一个FileEvent,并且绑定func给对应的fileProc。
static inline int connSetReadHandler(connection *conn, ConnectionCallbackFunc func) {//return conn->type->set_read_handler(conn, func);//就是调用connSocketSetReadHandlerreturn connSocketSetReadHandler(conn, func);
}static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {if (func == conn->read_handler) return C_OK;conn->read_handler = func;if (!conn->read_handler)aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);elseif (aeCreateFileEvent(server.el,conn->fd,AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;return C_OK;
}int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,aeFileProc *proc, void *clientData)
{aeFileEvent *fe = &eventLoop->events[fd];....................fe->mask |= mask;if (mask & AE_READABLE) fe->rfileProc = proc;if (mask & AE_WRITABLE) fe->wfileProc = proc;fe->clientData = clientData;
}
那么又有疑惑了,不是说绑定func的吗?怎么函数aeCreateFileEvent中的参数是conn->type->ae_handler?我们先保留这个疑问,看完写事件回调函数的设置。
3.2. 写事件回调函数的设置
void beforeSleep(struct aeEventLoop *eventLoop) {................./* Handle writes with pending output buffers. */handleClientsWithPendingWritesUsingThreads();
}
int handleClientsWithPendingWritesUsingThreads(void) {.....................listRewind(server.clients_pending_write,&li);while((ln = listNext(&li))) {client *c = listNodeValue(ln);//设置写事件回调函数if (clientHasPendingReplies(c) &&connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)..................}
}static inline int connSetWriteHandler(connection *conn, ConnectionCallbackFunc func) {return conn->type->set_write_handler(conn, func, 0);
}
conn->type->set_write_handler绑定的是connSocketSetWriteHandler。那么connSetWriteHandler的实现即是:
static inline int connSetWriteHandler(connection *conn, ConnectionCallbackFunc func) {//return conn->type->set_write_handler(conn, func, 0);return connSocketSetWriteHandler(conn, func, 0);
}static int connSocketSetWriteHandler(connection *conn, ConnectionCallbackFunc func, int barrier) {if (func == conn->write_handler) return C_OK;conn->write_handler = func;if (barrier)conn->flags |= CONN_FLAG_WRITE_BARRIER;elseconn->flags &= ~CONN_FLAG_WRITE_BARRIER;if (!conn->write_handler)aeDeleteFileEvent(server.el,conn->fd,AE_WRITABLE);elseif (aeCreateFileEvent(server.el,conn->fd,AE_WRITABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;return C_OK;
}
设置写事件回调函数的也是使用conn->type->ae_handler。
说明读写事件的回调函数的设置都统一是使用conn->type->ae_handler。ae_handler对应的是connSocketEventHandler。
3.3. connSocketEventHandler函数
static void connSocketEventHandler(struct aeEventLoop *el, int fd, void *clientData, int mask)
{connection *conn = clientData;//创建connection时候,设置了state=CONN_STATE_ACCEPTING,所以这个判断不成立if (conn->state == CONN_STATE_CONNECTING && (mask & AE_WRITABLE) && conn->conn_handler) {int conn_error = connGetSocketError(conn);if (conn_error) {conn->last_errno = conn_error;conn->state = CONN_STATE_ERROR;} else {conn->state = CONN_STATE_CONNECTED;}if (!conn->write_handler) aeDeleteFileEvent(server.el,conn->fd,AE_WRITABLE);if (!callHandler(conn, conn->conn_handler)) return;conn->conn_handler = NULL;}//位全为1结果才是1,初始时候flags是0,所以&CONN_FLAG_WRITE_BARRIER后也是0//只有后续设置flags=CONN_FLAG_WRITE_BARRIER后,再&结果才是1int invert = conn->flags & CONN_FLAG_WRITE_BARRIER;int call_write = (mask & AE_WRITABLE) && conn->write_handler;int call_read = (mask & AE_READABLE) && conn->read_handler;//执行对应的回调函数/* Handle normal I/O flows */if (!invert && call_read) {if (!callHandler(conn, conn->read_handler)) return;}/* Fire the writable event. */if (call_write) {if (!callHandler(conn, conn->write_handler)) return;}/* If we have to invert the call, fire the readable event now* after the writable one. */if (invert && call_read) {if (!callHandler(conn, conn->read_handler)) return;}
}static inline int callHandler(connection *conn, ConnectionCallbackFunc handler) {connIncrRefs(conn); //增加refs值以保护连接if (handler) handler(conn); //这里就是执行回调函数,即是执行readQueryFromClient等connDecrRefs(conn); //回调函数执行后,refs--if (conn->flags & CONN_FLAG_CLOSE_SCHEDULED) {if (!connHasRefs(conn)) connClose(conn); //如果refs==0,执行延迟关闭return 0;}return 1;
}static inline void connIncrRefs(connection *conn) {conn->refs++;
}
到这里终于知道设置conn->type->ae_handler作为回调函数的原因了。
前面的设置读写回调时候,把readQueryFromClient绑定给conn->read_handler,把sendReplyToClient绑定给conn->write_handler。
所以,connSocketEventHandler
函数中既有读事件回调函数,也有写事件回调函数。所以,我们可以这样认为,connSocketEventHandler
是connection的处理中心。在主框架的epoll中并不会直接调用客户端的读写回调函数,而是统一调用connSocketEventHandler
,这样一来相当于是框架与connection解耦了。
3.4. Redis5版本的设置回调函数
我回头查看了Redis5.0.10版本的,发现,其是直接设置readQueryFromClient作为回调函数的,这个版本也是没有结构体connection,其是直接使用client的。
//Reids5.0.10版本
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {int cport, cfd, max = MAX_ACCEPTS_PER_CALL;while(max--) {cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);...........acceptCommonHandler(cfd,0,cip);}
}static void acceptCommonHandler(int fd, int flags, char *ip) {client *c = createClient(fd).................
}client *createClient(int fd) {client *c = zmalloc(sizeof(client));if (fd != -1) {aeCreateFileEvent(server.el,fd,AE_READABLE,readQueryFromClient, c) ......................}
}
Redis6后,开始支持SSL / TLS,添加了conneciton,这个connection就是有两种类型。所以才这样弄吧。
3.5. 个人的一些想法,修改源码
当事件就绪时,那都是执行connSocketEventHandler,而其都有读/写事件的回调函数。
我认为,那结构体aeFileEvent可以只拥有一个aeFileProc即可。
typedef struct aeFileEvent {int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */aeFileProc *rfileProc;aeFileProc *wfileProc;void *clientData;
} aeFileEvent;//可以改写成如下
typedef struct aeFileEvent {int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */aeFileProc *fileProc; //只用一个回调函数就行void *clientData;
} aeFileEvent;
而在函数aeProcessEvents中不再需要判别是读事件还是写事件了。可以改写成如下:
//展示主体,主要是修改了for循环内部,不管是哪种类型,统一是使用fe->fileProc(....)
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{int processed = 0, numevents;...............................if (eventLoop->maxfd != -1 ||((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {struct timeval tv, *tvp;if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)eventLoop->beforesleep(eventLoop);/* Call the multiplexing API, will return only on timeout or when* some event fires. */numevents = aeApiPoll(eventLoop, tvp);for (int j = 0; j < numevents; j++) {aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];int mask = eventLoop->fired[j].mask;int fd = eventLoop->fired[j].fd;if (fe->mask) //表示该fd是有关注的事件类型的,就可以执行对应的读或写//该函数就是调用connSocketEventHandler fe->fileProc(eventLoop,fd,fe->clientData,mask);}}
}
可以这样写的原因,是因为connSocketEventHandler中有写回调和读回调,只要传事件类型进去就知道是使用读回调还是写回调。这种写法就更加统一了。
那为什么Redis作者不这样做呢?是为了兼容之前版本的,或者是我漏了什么细节是不能这样操作的呢?若有见解,欢迎在评论区讨论指出。
4. 总结设置客户端回调函数的流程
读事件回调函数的设置
创建连接时候,需要设置客户端的读回调。
createClient--->connSetReadHandler--->(conn->type->set_read_handler)--->(connSocketReadHandler,其内部把读回调函数readQueryFromClient赋值给read_handler)--->(aeCreateFileEvent,把conn->type->ae_handler赋值给rfileProc)。
写事件回调函数的设置流程
单线程的情况:
aeProcessEvents--->beforesleep--->handleClientsWithPendingWritesUsingThreads--->handleClientsWithPendingWrites--->connSetWriteHandlerWithBarrier--->(connSocketSetWriteHandler,把sendReplyToClient赋值给write_handler)--->(aeCreateFileEvent,把conn->type->ae_handler赋值给wfileProc)。
5. 回调函数的调用流程
客户端
当事件就绪(假设是读事件),就会执行fe->rfileProc函数,那该函数就是执行connSocketEventHandler,接着其内部会调用callHandler函数,callHandler就会调用conn->read_handler。
其实不管是读事件还是写事件,都是执行connSocketEventHandler。
即是aeProcessEvents--->rfileProc--->connSocketEventHandler--->callHandler--->(conn->read_handler,即是readQueryFromClient)。
服务器端
aeProcessEvents--->rfileProc--->acceptTcpHandler。
对比:
服务器端的是直接调用回调函数acceptTcpHandler。
而客户端的是调用connSocketEventHandler,再在connSocketEventHandler内部判断若是读事件,才执行回调函数readQueryFromClient。