我们的服务器缺少了一个内容:超时。每个网络应用程序都需要处理超时,因为网络的另一边可能会消失。不要只进行持续的IO操作,如读/写需要超时,但启动空闲的TCP连接也是一个好主意。要实现超时,必须修改事件循环,因为轮询是唯一被阻塞的东西。
我们的代码如下:
int rv=poll(poll_args.data(),(nfds_t)poll_args.size(),1000);
poll系统调用接受一个timeout参数,该参数规定了用于poll系统调用的时间上限。超时值目前是1000毫秒的任意值。如果我们根据计时器设置超时值,poll应该在过期时唤醒,或在此之前唤醒;然后我们就有机会在适当的时候启动计时器。
问题是我们可能有多个计时器,poll的超时值应该是最近的计时器的超时值。需要一些数据结构来查找最近的计时器。堆数据结构是查找最小/最大值的常用选择,通常用于此目的。此外,还可以使用任何用于排序的数据结构。例如,我们可以使用AVL树来排序计时器,并可能扩展树来跟踪最小值。
让我们从添加计时器来踢出空闲的TCP连接开始。对于每个连接都有一个计时器,设置为一个固定的超时,每次在连接上有IO活动时,计时器都会更新为一个固定的超时。注意,当我们更新计时器时,它变成了最遥远的一个;因此,我们可以利用这一事实来简化数据结构;一个简单的链表足以保持计时器的顺序:新的或更新的计时器只是到列表的末尾,列表保持有序的顺序。同样,链表上的操作是O(1),哪个比排序数据结构更好
定义链表是一个微不足道的任务:
struct DList {DList *prev = NULL;DList *next = NULL;
};
inline void dlist_init(DList *node) {node->prev = node->next = node;
}
inline bool dlist_empty(DList *node) {return node->next == node;
}
inline void dlist_detach(DList *node) {DList *prev = node->prev;DList *next = node->next;prev->next = next;next->prev = prev;
}
inline void dlist_insert_before(DList *target, DList *rookie) {DList *prev = target->prev;prev->next = rookie;rookie->prev = prev;rookie->next = target;target->prev = rookie;
}
get_monotonic_usec是获取时间的函数。注意时间戳必须是单调的。时间戳向后跳转会给计算机系统带来各种各样的麻烦。
static uint64_t get_monotonic_usec() {timespec tv = {0, 0};clock_gettime(CLOCK_MONOTONIC, &tv);return uint64_t(tv.tv_sec) * 1000000 + tv.tv_nsec / 1000;
}
下一步是将列表添加到服务器和连接结构中。
static struct {HMap db;// a map of all client connections, keyed by fdstd::vector<Conn *> fd2conn;// timers for idle connectionsDList idle_list;
} g_data;
struct Conn {int fd = -1;uint32_t state = 0; // either STATE_REQ or STATE_RES// buffer for readingsize_t rbuf_size = 0;uint8_t rbuf[4 + k_max_msg];// buffer for writingsize_t wbuf_size = 0;size_t wbuf_sent = 0;uint8_t wbuf[4 + k_max_msg];uint64_t idle_start = 0;// timerDList idle_list;
};
修改后的事件循环概述:
int main() {// some initializationsdlist_init(&g_data.idle_list);int fd = socket(AF_INET, SOCK_STREAM, 0);// bind, listen & other miscs// code omitted...// the event loopstd::vector<struct pollfd> poll_args;while (true) {// prepare the arguments of the poll()// code omitted...// poll for active fdsint timeout_ms = (int)next_timer_ms();int rv = poll(poll_args.data(), (nfds_t)poll_args.size(), timeout_ms);if (rv < 0) {die("poll");}// process active connections
for (size_t i = 1; i < poll_args.size(); ++i) {if (poll_args[i].revents) {Conn *conn = g_data.fd2conn[poll_args[i].fd];connection_io(conn);if (conn->state == STATE_END) {// client closed normally, or something bad happened.// destroy this connectionconn_done(conn);}}
}// handle timersprocess_timers();// try to accept a new connection if the listening fd is active
if (poll_args[0].revents) {
(void)accept_new_conn(fd);}}return 0;
}
修改了几件事:
1. poll的超时参数由next_timer_ms函数计算。
2. 销毁连接的代码被移到了conn_done函数中。
3. 添加了用于触发计时器的process_timers函数。
4. 计时器在connection_io中更新,在accept_new_conn中初始化。