poll也是一种linux中的多路转接的方案
- 解决select的fd有上限的问题
- 解决select每次调用都要重新设置关心的fd
poll函数接口
poll, ppoll - wait for some event on a file descriptor#include <poll.h>int poll(struct pollfd *fds, nfds_t nfds, int timeout);DESCRIPTIONpoll() performs a similar task to select(2): it waits for one of a set of file descriptors to become ready to perform I/O.The set of file descriptors to be monitored is specified in the fds argument, which is an array of structures of the following form:
// 要监视的文件描述符集在fds参数中指定,它是一个结构数组,格式如下:
// fd 代表用户告诉内核,要帮我关心一下fd
// events 代表用户告诉内核,要帮我关心一下fd的events事件是否就绪了(如:读事件,写事件) 输入时看 fd + events
// revents 代表内核告诉用户,用户要关心fd的events事件中,有哪些fd已经就绪了(如:读事件,写事件) 输出时看 fd + revents
// 通过这样的设计,输入输出分离,poll不需要对参数进行重新设定struct pollfd {int fd; /* file descriptor */ short events; /* requested events */short revents; /* returned events */};// 参数说明
struct pollfd *fds:
fds是一个poll函数监听的结构列表.每一个元素中,包含了三部分内容:文件描述符,监听的事件集合,返回的事件集合.nfds_t nfds:
nfds表示fds数组的长度.int timeout:
timeout表示poll函数的超时时间,单位是毫秒(ms).// 设置情况// 1. timeout > 0 : 在timeout以内阻塞,否则非阻塞返回一次// 2. timeout = 0 : 非阻塞等待// 3. timeout < 0 : 阻塞等待// 返回值(同select)
返回值大于0:代表就绪的文件描述符的个数(如果是3,代表等待的多个文件描述符中,有三个文件描述符已经就绪)
返回值等于0:超时返回了
返回值小于0:select调用失败了(通过查看错误码errno来查看具体的错误原因)
events和revents的取值(以下宏值,一个宏值在对应的events和revents中占据一个比特位):
poll示例:使用poll监控标准输入
#include <poll.h>
#include <unistd.h>
#include <stdio.h>int main()
{struct pollfd poll_fd;poll_fd.fd = 0;poll_fd.events = POLLIN;for (;;){int ret = poll(&poll_fd, 1, 1000);if (ret < 0){perror("poll");continue;}if (ret == 0){printf("poll timeout\n");continue;}if (poll_fd.revents == POLLIN){char buf[1024] = {0};read(0, buf, sizeof(buf) - 1);printf("stdin:%s", buf);}}
}
poll演示代码(根据select代码改编)
makefile
poll_server: main.ccg++ -o $@ $^ -std=c++11
.PHONY:clean
clean:rm -f poll_server
err.hpp
#pragma once#include <iostream>enum
{USAGE_ERR = 1,SOCKET_ERR,BIND_ERR,LISTEN_ERR
};
log.hpp
#include <iostream>
#include <string>
#include <cstdarg>
#include <ctime>
#include <unistd.h>#define DEBUG 0
#define NORMAL 1
#define WARNING 2
#define ERROR 3
#define FATAL 4const char * to_levelstr(int level)
{switch(level){case DEBUG : return "DEBUG";case NORMAL: return "NORMAL";case WARNING: return "WARNING";case ERROR: return "ERROR";case FATAL: return "FATAL";default : return nullptr;}
}void logMessage(int level, const char *format, ...)
{
#define NUM 1024char logprefix[NUM];snprintf(logprefix, sizeof(logprefix), "[%s][%ld][pid: %d]",to_levelstr(level), (long int)time(nullptr), getpid());char logcontent[NUM];va_list arg;va_start(arg, format);// 将变量参数列表中的格式化数据写入大小缓冲区// 也就是写入到缓冲区logcontent中vsnprintf(logcontent, sizeof(logcontent), format, arg);std::cout << logprefix << logcontent << std::endl;
}
sock.hpp
#pragma once#include <iostream>
#include <string>
#include <cstring>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "log.hpp"
#include "err.hpp"class Sock
{// 表示全连接队列中最多有32+1个连接// 具体请看tcp相关实验const static int backlog = 32;public:static int Socket(){// 1. 创建socket文件套接字对象int sock = socket(AF_INET, SOCK_STREAM, 0);if (sock < 0){logMessage(FATAL, "create socket error");exit(SOCKET_ERR);}logMessage(NORMAL, "create socket success: %d", sock);// int setsockopt(int sockfd, int level, int optname, const void *optval, socklen_t optlen);// sockfd:套接字描述符,指定要设置选项的套接字// level:选项的级别,通常是 SOL_SOCKET 表示通用套接字选项。// 设置 SO_REUSEADDR 选项,允许地址重用// optval:一个指向包含选项值的缓冲区的指针。// optlen:指定选项值的长度。int opt = 1;// 我们将 `SO_REUSEADDR` 选项设置为1,从而启用了地址重用功能。// 这可以让套接字在绑定地址时可以重用之前被关闭的套接字的地址,// 而不会因为 TIME_WAIT 状态而无法绑定。setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));return sock;}static void Bind(int sock, int port){// 2. bind绑定自己的网络信息struct sockaddr_in local;memset(&local, 0, sizeof(local));local.sin_family = AF_INET;local.sin_port = htons(port);local.sin_addr.s_addr = INADDR_ANY;if (bind(sock, (struct sockaddr *)&local, sizeof(local)) < 0){logMessage(FATAL, "bind socket error");exit(BIND_ERR);}logMessage(NORMAL, "bind socket success");}static void Listen(int sock){// 3. 设置socket 为监听状态if (listen(sock, backlog) < 0) {logMessage(FATAL, "listen socket error");exit(LISTEN_ERR);}logMessage(NORMAL, "listen socket success");}// 获取新链接static int Accept(int listensock, std::string *clientip, uint16_t *clientport){// 在结构体中,存在端口号和IP地址struct sockaddr_in peer;socklen_t len = sizeof(peer);// int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);// 将获取的套接字的文件描述符返回int sock = accept(listensock, (struct sockaddr *)&peer, &len);// sock < 0, 获取套接字失败if (sock < 0)logMessage(ERROR, "accept error, next");else{logMessage(NORMAL, "accept a new link success, get new sock: %d", sock); // ?// 获取套接字成功,通过输入输出型参数来查看客户端的端口号和IP地址*clientip = inet_ntoa(peer.sin_addr);*clientport = ntohs(peer.sin_port);}return sock;}
};
PollServer.hpp
#pragma once#include <iostream>
#include <string>
#include <functional>
#include <poll.h>
#include "sock.hpp"namespace Poll_ns
{static const int defaultport = 8081;// 要监视的文件描述符集在rfds参数中指定,它是一个结构数组// 此处,我们设置一共申请2048个结构数组的空间,数组的个数可以自己定义static const int num = 2048;// 默认合法fd的数组中,全部将其初始化为-1static const int defaultfd = -1;using func_t = std::function<std::string(const std::string &)>;class PollServer{public:PollServer(func_t f, int port = defaultport): _func(f), _port(port), _listensock(-1), _rfds(nullptr){}void ResetItem(int i){// 将结构数组中的fd进行初始化_rfds[i].fd = defaultfd;// 将结构数组中的events和revents进行初始化_rfds[i].events = 0;_rfds[i].revents = 0;}void initServer(){// 创建的套接字_listensock = Sock::Socket();// 绑定套接字和端口号(任意IP地址bind,详看tcp套接字)Sock::Bind(_listensock, _port);// 将_listensock设置为监听套接字Sock::Listen(_listensock);// 要监视的文件描述符集在rfds参数中指定,它是一个结构数组_rfds = new struct pollfd[num];// 对这个结构化数组进行初始化for (int i = 0; i < num; i++){ResetItem(i);}// 我们将监听套接字放在数组中,下标为0_rfds[0].fd = _listensock;// POLLIN代表读事件,让poll帮我们关心_listensock的读事件_rfds[0].events = POLLIN;}void Print(){std::cout << "fd list: ";for (int i = 0; i < num; i++){if (_rfds[i].fd != defaultfd)std::cout << _rfds[i].fd << " ";}std::cout << std::endl;}void Recver(int pos){// 1. 读取request// 这里在进行读取的时候,不会被阻塞,因为poll已经监测到事件就绪了// 这里这种方式进行读取是有问题的// a.无法保证一定可以将缓冲区的数据读取完// b.就算将缓冲区的数据读取完,也无法保证能够完整的读取到一个报文// 那么应用层就无法将这个报文数据进行反序列化,转化为结构化数据// 后续在补充(目前先这样演示)char buffer[1024];ssize_t s = recv(_rfds[pos].fd, buffer, sizeof(buffer) - 1, 0);if (s > 0){// s > 0, 读取成功,s代表读取到的字符数量buffer[s] = 0;logMessage(NORMAL, "client# %s", buffer);}else if (s == 0){// s == 0,代表客户端已经关闭了,读取到的字符数为0// 因为我们也要关闭服务端对应的文件描述符close(_rfds[pos].fd);// 并将_rfds[pos]对应的数组中的节点进行初始化// 下次就不会再监测到这个节点了ResetItem(pos);logMessage(NORMAL, "client quit");return;}else{// s < 0, 代表recv()读取出错close(_rfds[pos].fd);ResetItem(pos);logMessage(ERROR, "client quit: %s", strerror(errno));return;}// 2. 处理requeststd::string response = _func(buffer);// 3. 返回responsewrite(_rfds[pos].fd, response.c_str(), response.size());}void Accepter(int listensock){logMessage(DEBUG, "Accepter in");// 此时说明_listensock一定是就绪的std::string clientip;uint16_t clientport = 0;// 获取新套接字的文件描述符,并获取客户端的端口号和IP地址int sock = Sock::Accept(listensock, &clientip, &clientport);// 如果sock < 0 ,说明获取套接字失败,则重新循环获取if (sock < 0)return;logMessage(NORMAL, "accept success [%s:%d]", clientip.c_str(), clientport);// 此时,一定可以获取新的连接,但是还不可以直接recv或者send// 因为获取到的新连接的套接字里面不一定存在数据(如果直接recv,那么recv可能还需要等数据就绪)// 所以,我们需要将新的套接字托管给poll,让poll来帮我们关心事件是否就绪int i = 0;for (; i < num; i++){// 在_rfds中寻找一个空的位置if (_rfds[i].fd != defaultfd)continue;elsebreak;}if (i == num){// 经过上面的循环,如果i == num,说明_rfds整个数组已经存储满了logMessage(WARNING, "server if full, please wait");// _rfds整个数组已经存储满了,因此直接关闭新获取的套接字close(sock);}else{// 代码运行到这里,说明在_rfds中找到了空的位置// 那么就将获取的新连接的文件描述符放入到数组中_rfds[i].fd = sock;_rfds[i].events = POLLIN;}Print();logMessage(DEBUG, "Accepter out");}// 1. rfds中,不仅仅是有一个fd是就绪的,可能存在多个// 通过循环accept(),来获取多个新连接的文件描述符// 2. 我们的select目前只处理了读事件void HandlerReadEvent(){for (int i = 0; i < num; i++){// 过滤掉非法的fdif (_rfds[i].fd == defaultfd)continue;if (!(_rfds[i].events & POLLIN))continue;// 如果_rfds[i].fd 是_listensock,// 并且_rfds[i].fd 的读事件已经就绪(也就是revents & POLLIN 为真)// 那么就代表监听套接字的读事件就绪,我们就从监听套接字中,获取新链接的文件描述符if (_rfds[i].fd == _listensock && (_rfds[i].revents & POLLIN)){Accepter(_listensock);}else if (_rfds[i].revents & POLLIN){// 如果_rfds[i].fd不是监听套接字,那么它就是普通的套接字// 如果这个普通的套接字,我们要求poll帮我们关心它的读事件(_rfds[i].events & POLLIN)// 并且这个套接字的读事件已经就绪(_rfds[i].revents & POLLIN),那么就开始读取Recver(i);}else{}}}void start(){for (;;){// 此处是1000ms,也就是1sint timeout = 1000;// 返回值大于0:代表就绪的文件描述符的个数(如果是3,代表等待的多个文件描述符中,有三个文件描述符已经就绪)int n = poll(_rfds, num, timeout);switch (n){case 0:// 返回值为0,代表在规定的时间内,没有新的连接就绪;logMessage(NORMAL, "timeout...");break;case -1:// 返回值为1,代表poll发生错误logMessage(WARNING, "poll error, code: %d, err string: %s", errno, strerror(errno));break;default:// 代码运行到这里说明有事件就绪了(目前只有一个监听事件就绪了)// 如果这个监听事件不被取走进行处理,那么它就一直处于就绪状态logMessage(NORMAL, "have event ready!");// 取走监听事件进行处理HandlerReadEvent();break;}}}~PollServer(){if (_listensock < 0)close(_listensock);if (_rfds)delete[] _rfds;}private:// 服务器需要绑定自己的端口号int _port;// 服务器需要有自己的监听套接字int _listensock;// int poll(struct pollfd *fds, nfds_t nfds, int timeout);// 要监视的文件描述符集在参数fd中指定,它是一个结构数组,格式如下:// struct pollfd// {// int fd; /* file descriptor */// short events; /* requested events */// short revents; /* returned events */// };// struct pollfd *fds 的底层是一个红黑树struct pollfd *_rfds;// 回调函数func_t _func;};
}
main.cc
#include "PollServer.hpp"
#include <memory>using namespace std;
using namespace Poll_ns;static void usage(std::string proc)
{std::cerr << "Usage:\n\t" << proc << " port" << "\n\n";
}// 回调函数
std::string transaction(const std::string &request)
{return request;
}// ./poll_server 8081
int main(int argc, char *argv[])
{if(argc != 2){usage(argv[0]);exit(USAGE_ERR);}unique_ptr<PollServer> svr(new PollServer(transaction,atoi(argv[1])));svr->initServer();svr->start();return 0;
}
演示结果
poll的优点
不同与select使用三个位图来表示三个fdset的方式,poll使用一个pollfd的指针实现.
- pollfd结构包含了要监视的event和发生的event,不再使用select“参数-值”传递的方式.接口使用比select更方便.
- poll并没有最大数量限制(但是数量过大后性能也是会下降).
poll的缺点
poll中监听的文件描述符数目增多时
- 和select函数一样,poll返回后,需要轮询pollfd来获取就绪的描述符.
- 每次调用poll都需要把大量的pollfd结构从用户态拷贝到内核中.
- 同时连接的大量客户端在一时刻可能只有很少的处于就绪状态,因此随着监视的描述符数量的增长,其效率也会线性下降.
epoll
epoll初识
- 按照man手册的说法:是为处理大批量句柄而作了改进的poll.
- 它是在2.5.44内核中被引进的(epoll(4) is a new API introduced in Linux kernel 2.5.44)它几乎具备了之前所说的一切优点,被公认为Linux2.6下性能最好的多路I/O就绪通知方法
epoll的相关系统调用
epoll有3个相关的系统调用.
epoll_create()
epoll_create, epoll_create1 - open an epoll file descriptor// epoll_create的功能就是创建一个epoll模型#include <sys/epoll.h>// size只需要传入一个大于0的数即可
int epoll_create(int size);DESCRIPTION
epoll_create() creates an epoll(7) instance. Since Linux 2.6.8, the size argument is ignored, but must be greater than zero; see NOTES below.// epoll_create() 创建一个 epoll(7) 实例。 自 Linux 2.6.8 起,size 参数将被忽略,但必须大于 0;请参阅下面的注释。// 返回值
RETURN VALUE
On success, these system calls return a nonnegative file descriptor. On error, -1 is returned, and errno is set to indicate the error.// 成功时,这些系统调用会返回一个非负的文件描述符。 如果出错,则返回-1,并设置 errno 表示出错。
epoll_ctl
epoll_ctl - control interface for an epoll descriptor
// 功能:用户告诉内核,你要帮我关心那些文件描述符上的那些事件#include <sys/epoll.h>int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);// epoll的事件注册函数
// 它不同于select()是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型.// 参数
第一个参数是epoll_create()的返回值(epoll的句柄).
第二个参数表示动作,用三个宏来表示.
第三个参数是需要监听的fd.
第四个参数是告诉内核需要监听什么事.// 第二个参数的取值:
EPOLL_CTL_ADD:注册新的fd到epfd中;
EPOLL_CTL_MOD:修改已经注册的fd的监听事件;
EPOLL_CTL_DEL:从epfd中删除一个fd;// 返回值When successful, epoll_ctl() returns zero. When an error occurs, epoll_ctl() returns -1 and errno is set appropriately.
struct epoll_event结构如下:
typedef union epoll_data
{void *ptr;int fd;uint32_t u32;uint64_t u64;
} epoll_data_t;struct epoll_event
{// 注册时 events代表用户告诉内核,你要帮我关心什么事件,返回时代表内核告诉用户,什么事件已经就绪uint32_t events; /* Epoll events */ epoll_data_t data; /* User data variable 代表用户的可用数据*/
} __EPOLL_PACKED;
struct epoll_event结构体中的events可以是以下几个宏的集合:
- EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
- EPOLLOUT :表示对应的文件描述符可以写;
- EPOLLPRI :表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
- EPOLLERR :表示对应的文件描述符发生错误;
- EPOLLHUP :表示对应的文件描述符被挂断;
- EPOLLET :将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的.
- EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里.
epoll_wait
epoll_wait, epoll_pwait - wait for an I/O event on an epoll file descriptor#include <sys/epoll.h>int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);// 参数
// 第一个参数是epoll_create()的返回值(epoll的句柄).// 返回值
如果 epoll_wait 返回正整数值(大于0),则表示有事件已经触发并且已经存储在 events 缓冲区中。这个值表示有多少个事件已被检测到。
如果 epoll_wait 返回0,表示等待超时,没有任何事件发生。这通常意味着在指定的超时时间内没有事件触发。
如果 epoll_wait 返回-1,表示发生了错误。可以通过检查 errno 变量来获取具体的错误信息,以了解出现了什么问题。
epoll_wait
函数是 Linux 中用于 I/O 多路复用的系统调用之一,它用于等待一组文件描述符上的事件。这是一个关键的函数,通常与epoll
系统调用一起使用,用于实现高效的非阻塞 I/O 多路复用。以下是
epoll_wait
函数的参数和其含义:
epfd
:这是一个整数,表示已创建的epoll
实例的文件描述符。在调用epoll_create
或epoll_create1
创建epoll
实例后,您将获得一个epoll
实例的文件描述符,将其传递给epfd
。
events
:**用于回传代处理事件的数组;**这是一个指向struct epoll_event
结构的指针,用于存储已触发事件的信息。epoll_event
结构包含有关已触发事件的详细信息,包括文件描述符和事件类型。
maxevents
:**每次能处理的事件数;**这是一个整数,表示events
缓冲区的大小,即可以存储多少个已触发事件。通常,您应该根据需要的事件数量来设置这个值,以确保足够的事件信息可以被存储。
timeout
:这是一个整数,表示等待事件的超时时间(以毫秒为单位)。如果设置为正数,epoll_wait
将等待指定的毫秒数,然后返回。如果设置为0,epoll_wait
将立即返回,不等待事件。如果设置为-1,epoll_wait
将无限期地等待事件,直到有事件触发。
epoll_wait
函数的主要作用是等待文件描述符上的事件触发,并将相关事件信息存储在events
缓冲区中。一旦有事件触发或超时,epoll_wait
将返回已触发事件的数量,并将事件信息存储在events
中。然后,您可以遍历events
数组来处理已触发的事件。此函数常用于实现高性能的服务器,以有效地处理多个套接字上的并发连接和数据交换,而无需使用传统的阻塞 I/O。
epoll模型的底层原理
软硬件交互时,数据流动的整个过程
- 数据从软件内存中拷贝到硬件外设,这个过程其实是比较好理解的,因为数据可以贯穿协议栈,层层向下封装报头,最后由硬件对应的驱动程序将数据包交付给具体的硬件,协议栈的最底层就是物理层。
- 但数据到来时,操作系统是怎么知道网络中有数据到来了呢?这个我们之前从来没有学过,因为这属于计组的知识,我们搞软件的学习他,其实只是为了理解数据在IO流动时的整个过程。
- 当数据到达网卡时,网卡有相应的8259中断设备,该设备用于向CPU的某个针脚发送中断信号,CPU有很多的针脚,一部分的针脚会对应一个硬件的中断设备,当CPU的针脚收到来自网卡中断设备的中断信号时,该针脚就会被点亮,触发高电平信号,该针脚对应的寄存器(CPU的工作台)里面会将该点亮的针脚解释为二进制序列,这个二进制序列就是该针脚对应的序号。
- 接下来CPU处理器就会根据这个序号,查询一个叫做中断向量表的数据结构,中断向量表在CPU启动的时候就已经被加载到了内存的特定位置,中断向量表可以理解为一个数组结构,存储着每个中断序号所对应的处理程序的入口地址,其实就是函数指针,而该函数内部会回调网卡的驱动方法,将数据从硬件网卡拷贝到内存中的操作系统代码内部。(上面一整套的逻辑过程,全部都由操作系统来实现。至此就完成了数据从硬件到软件内存流动的过程,数据到达操作系统内部后,接下来的工作大家也很清楚,就是向上贯穿协议栈,拆分报头和有效载荷,直到最后交给应用层,软件里面的数据流动我们当然是很熟悉的。
- 计算机的硬件中,不仅仅只有网卡有终端设备,像比较常见的硬件键盘,也有他自己的中断设备,我们在键盘上的每一次按键其实就会触发一次硬件中断。还有就是定时器模块,他也有自己的中断设备,可以在计算机整体的层面上,对内核进程进行管理和调度。
————————————————
版权声明:本文为CSDN博主「rygttm」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/erridjsis/article/details/132548582
epoll模型内核结构图
-
当你调用epoll_create时,内核会在底层创建一个epoll模型,该epoll模型主要由三个部分组成,红黑树+就绪队列+底层的回调机制。红黑树中的每个节点其实就是一个struct epoll_event结构体,当上层在调用epoll_ctl进行添加fd关心的事件时,其实就是向红黑树中插入节点,所以epoll_ctl对于fd关心事件的增删改,本质其实就是对内核中创建出来的红黑树进行节点的增删改,所以用户告知内核,你要帮我关心什么fd,底层就是对红黑树进行管理。
-
就绪队列中存放的是已经就绪的struct epoll_event结构体,内核告知用户哪些fd上的事件就绪时,其实就是把就绪队列中的每个节点依次拷贝到(用户调用epoll_wait时,传入的纯输出型参数结构体数组)events中。就绪队列是一个双向链表Doubly Linked List。所以所谓的事件就绪的本质,其实就是将红黑树中的节点链入到就绪队列中,链入的过程其实也很简单,只要在红黑树节点内部多增加一个链表节点类型的指针即可,这个指针可以先初始化为nullptr,当该节点中fd关心的事件就绪时,再将这个指针指向就绪队列中的尾部节点即可。
-
一个节点是可以同时在多个数据结构当中的,做法很简单,只要增加数据结构中元素类型的指针即可,通过修改指针的指向就可以把节点链入到新的数据结构里,在逻辑上我们把就绪队列和红黑树分开了,但在代码实现上,只需要在struct epoll_event结构体内部增加指针就可以了,让一个结构体同时在就绪队列和红黑树中。
-
我们已经知道epoll模型的大概原理了,但还有一个问题,操作系统怎么知道红黑树上的哪些节点就绪了呢?难道操作系统也要遍历整棵红黑树,检测每个节点的就绪情况?操作系统其实并不会这样做,如果这样做的话,那epoll还谈论什么高效呢?你epoll不也得遍历所有的fd吗?和我poll遍历有什么区别呢?红黑树是查找的效率高,不是遍历的效率高,如果遍历所有的节点,红黑树其实和链表遍历在效率上是差不多的,一点都不高效!
-
那操作系统是怎么知道红黑树上的哪个节点就绪了呢?其实是通过底层的回调机制来实现的,这也是epoll接口公认非常高效的重要的一个实现环节!
当数据到达网卡时,我们知道数据会经过硬件中断,CPU执行中断向量表等步骤来让数据到达内存中的操作系统内部,在OS内贯穿网络协议栈时,在传输层数据会被拷贝到struct file结构体中的receive_queue接收缓冲区中,这个struct file结构体对应的文件描述符,其实就是accept上来的用于通信的sockfd,在这个结构体内部有一个非常重要的字段private_data,该指针会指向一个回调函数,这个回调函数就会把该sock对应的struct epoll_event结构体链入到就绪队列中,因为此时数据已经拷贝到内核的socket接收缓冲区了,事件已经就绪了,所以当内核在拷贝数据的同时,还会调用private_data回调方法,将该sock对应的红黑树节点链入到就绪队列中,所以操作系统根本不用遍历什么红黑树来检测哪些节点是否就绪,当数据到来时,底层的回调机制会自动将就绪的红黑树节点链入到就绪队列里。
-
总结一下fd事件就绪时,底层的工作流程。
当数据到达网络设备网卡时,会以硬件中断作为发起点,将中断信号通过中断设备发送到CPU的针脚,接下来CPU会查讯中断向量表,找到中断序号对应的驱动回调方法,在回调方法内部会将数据从硬件设备网卡拷贝到软件OS里。数据包在OS中会向上贯穿协议栈,到达传输层时,数据会被拷贝到struct file的内核缓冲区中,同时OS会执行一个叫做private_data的回调函数指针字段,在该回调函数内部会通过修改红黑树节点中的就绪队列指针的内容,将该节点链入到就绪队列,内核告知用户哪些fd就绪时,只需要将就绪队列中的节点内容拷贝到epoll_wait的输出型参数events即可,这就是epoll模型的底层回调机制!
————————————————
版权声明:本文为CSDN博主「rygttm」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/erridjsis/article/details/132548582
- 当某一进程调用epoll_create方法时,Linux内核会创建一个eventpoll结构体,这个结构体中有两个成员与epoll的使用方式密切相关.
struct eventpoll
{..../*红黑树的根节点,这颗树中存储着所有添加到epoll中的需要监控的事件*/struct rb_root rbr;/*双链表中则存放着将要通过epoll_wait返回给用户的满足条件的事件*/struct list_head rdlist;....
};
- 每一个epoll对象都有一个独立的eventpoll结构体,用于存放通过epoll_ctl方法向epoll对象中添加进来的事件.
- 这些事件都会挂载在红黑树中,如此,重复添加的事件就可以通过红黑树而高效的识别出来(红黑树的插入时间效率是lgn,其中n为树的高度).
- 而所有添加到epoll中的事件都会与设备(网卡)驱动程序建立回调关系,也就是说,当响应的事件发生时会调用这个回调方法.
- 这个回调方法在内核中叫ep_poll_callback,它会将发生的事件添加到rdlist双链表中.
- 在epoll中,对于每一个事件,都会建立一个epitem结构体.
struct epitem
{struct rb_node rbn; // 红黑树节点struct list_head rdllink; // 双向链表节点struct epoll_filefd ffd; // 事件句柄信息struct eventpoll *ep; // 指向其所属的eventpoll对象struct epoll_event event; // 期待发生的事件类型
}
- 当调用epoll_wait检查是否有事件发生时,只需要检查eventpoll对象中的rdlist双链表中是否有epitem元素即可.
- 如果rdlist不为空,则把发生的事件复制到用户态,同时将事件数量返回给用户.这个操作的时间复杂度是O(1).
总结一下, epoll的使用过程就是三部曲:
- 调用epoll_create创建一个epoll句柄;
- 调用epoll_ctl,将要监控的文件描述符进行注册;
- 调用epoll_wait,等待文件描述符就绪;
epoll演示代码(基本框架)
makefile
epoll_server: epollServer.ccg++ -o $@ $^ -std=c++11
.PHONY:clean
clean:rm -f epoll_server
err.hpp
#pragma once#include <iostream>
enum
{USAGE_ERR = 1,SOCKET_ERR,BIND_ERR,LISTEN_ERR,EPOLL_CREATE_ERR
};
log.hpp
#include <iostream>
#include <string>
#include <cstdarg>
#include <ctime>
#include <unistd.h>#define DEBUG 0
#define NORMAL 1
#define WARNING 2
#define ERROR 3
#define FATAL 4const char * to_levelstr(int level)
{switch(level){case DEBUG : return "DEBUG";case NORMAL: return "NORMAL";case WARNING: return "WARNING";case ERROR: return "ERROR";case FATAL: return "FATAL";default : return nullptr;}
}void logMessage(int level, const char *format, ...)
{
#define NUM 1024char logprefix[NUM];snprintf(logprefix, sizeof(logprefix), "[%s][%ld][pid: %d]",to_levelstr(level), (long int)time(nullptr), getpid());char logcontent[NUM];va_list arg;va_start(arg, format);// 将变量参数列表中的格式化数据写入大小缓冲区// 也就是写入到缓冲区logcontent中vsnprintf(logcontent, sizeof(logcontent), format, arg);std::cout << logprefix << logcontent << std::endl;
}
sock.hpp
#pragma once#include <iostream>
#include <string>
#include <cstring>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "log.hpp"
#include "err.hpp"class Sock
{// 表示全连接队列中最多有32+1个连接// 具体请看tcp相关实验const static int backlog = 32;public:static int Socket(){// 1. 创建socket文件套接字对象int sock = socket(AF_INET, SOCK_STREAM, 0);if (sock < 0){logMessage(FATAL, "create socket error");exit(SOCKET_ERR);}logMessage(NORMAL, "create socket success: %d", sock);// int setsockopt(int sockfd, int level, int optname, const void *optval, socklen_t optlen);// sockfd:套接字描述符,指定要设置选项的套接字// level:选项的级别,通常是 SOL_SOCKET 表示通用套接字选项。// 设置 SO_REUSEADDR 选项,允许地址重用// optval:一个指向包含选项值的缓冲区的指针。// optlen:指定选项值的长度。int opt = 1;// 我们将 `SO_REUSEADDR` 选项设置为1,从而启用了地址重用功能。// 这可以让套接字在绑定地址时可以重用之前被关闭的套接字的地址,// 而不会因为 TIME_WAIT 状态而无法绑定。setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));return sock;}static void Bind(int sock, int port){// 2. bind绑定自己的网络信息struct sockaddr_in local;memset(&local, 0, sizeof(local));local.sin_family = AF_INET;local.sin_port = htons(port);local.sin_addr.s_addr = INADDR_ANY;if (bind(sock, (struct sockaddr *)&local, sizeof(local)) < 0){logMessage(FATAL, "bind socket error");exit(BIND_ERR);}logMessage(NORMAL, "bind socket success");}static void Listen(int sock){// 3. 设置socket 为监听状态if (listen(sock, backlog) < 0){logMessage(FATAL, "listen socket error");exit(LISTEN_ERR);}logMessage(NORMAL, "listen socket success");}// 获取新链接static int Accept(int listensock, std::string *clientip, uint16_t *clientport){// 在结构体中,存在端口号和IP地址struct sockaddr_in peer;socklen_t len = sizeof(peer);// int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);// 将获取的套接字的文件描述符返回int sock = accept(listensock, (struct sockaddr *)&peer, &len);// sock < 0, 获取套接字失败if (sock < 0)logMessage(ERROR, "accept error, next");else{logMessage(NORMAL, "accept a new link success, get new sock: %d", sock); // ?// 获取套接字成功,通过输入输出型参数来查看客户端的端口号和IP地址*clientip = inet_ntoa(peer.sin_addr);*clientport = ntohs(peer.sin_port);}return sock;}
};
epollServer.hpp
#pragma once#include <iostream>
#include <string>
#include <cstring>
#include <functional>
#include <sys/epoll.h>
#include "err.hpp"
#include "log.hpp"
#include "sock.hpp"using func_t = std::function<std::string (const std::string&)>;namespace epoll_ns
{static const int defaultport = 8888;// epoll_create大于0的参数static const int size = 128;// 文件描述符将其初始化为-1static const int defaultvalue = -1;// 事件就绪空间的大小static const int defalultnum = 64;class EpollServer{public:EpollServer(func_t f, uint16_t port = defaultport, int num = defalultnum): func_(f), _num(num), _revs(nullptr), _port(port), _listensock(defaultvalue), _epfd(defaultvalue){}void initServer(){// 1. 创建socket_listensock = Sock::Socket();Sock::Bind(_listensock, _port);Sock::Listen(_listensock);// 2. 创建epoll模// _epfd是关联epoll模型的文件描述符_epfd = epoll_create(size);if (_epfd < 0){logMessage(FATAL, "epoll create error: %s", strerror(errno));exit(EPOLL_CREATE_ERR);}// 3. 添加listensock到epoll中// int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);// 第二个参数:EPOLL_CTL_ADD:注册新的fd到epfd中// 第三个参数是需要监听的fd.// 第四个参数是结构体:ev.events代表epoll要关心的事件,EPOLLIN代表读事件// ev.data.fd 就是要关心事件对应的文件描述符struct epoll_event ev;ev.events = EPOLLIN;// 当事件就绪,被重新捞取上来的时候,我们要知道是哪一个fd就绪了ev.data.fd = _listensock;epoll_ctl(_epfd, EPOLL_CTL_ADD, _listensock, &ev);// 4. 申请就绪事件的空间_revs = new struct epoll_event[_num];logMessage(NORMAL, "init server success");}void HandlerEvent(int readyNum){logMessage(DEBUG, "HandlerEvent in");for (int i = 0; i < readyNum; i++){uint32_t events = _revs[i].events;int sock = _revs[i].data.fd;if (sock == _listensock && (events & EPOLLIN)){//_listensock读事件就绪, 获取新连接std::string clientip;uint16_t clientport;int fd = Sock::Accept(sock, &clientip, &clientport);if (fd < 0){logMessage(WARNING, "accept error");continue;}// 获取新连接的fd成功,可以直接读取吗?// 不可以,因为新的文件描述符的读事件不知道其是否就绪了// 将其放入epoll,让epoll帮我们关心这个文件描述符的读事件是否就绪struct epoll_event ev;ev.events = EPOLLIN;ev.data.fd = fd;epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &ev);}else if (events & EPOLLIN){// 普通的读事件就绪// 依旧有问题char buffer[1024];// 把本轮数据读完,就一定能够读到一个完整的请求吗??int n = recv(sock, buffer, sizeof(buffer), 0);if (n > 0){buffer[n] = 0;logMessage(DEBUG, "client# %s", buffer);// TODOstd::string response = func_(buffer);send(sock, response.c_str(), response.size(), 0);}else if (n == 0){// 建议先从epoll移除,才close fd// 如果先关闭fd,再从epoll模型中移除,// 那么epoll模型会检测到这个fd是一个非法的fd,就有可能会报错epoll_ctl(_epfd, EPOLL_CTL_DEL, sock, nullptr);close(sock);logMessage(NORMAL, "client quit");}else{// 建议先从epoll移除,才close fdepoll_ctl(_epfd, EPOLL_CTL_DEL, sock, nullptr);close(sock);logMessage(ERROR, "recv error, code: %d, errstring: %s", errno, strerror(errno));}}else{}}logMessage(DEBUG, "HandlerEvent out");}void start(){// timeout为-1,那么epoll_wait()阻塞式等待// 如果 epoll_wait 返回正整数值(大于0),// 则表示有事件已经触发并且已经存储在 events 缓冲区中。// 这个值表示有多少个事件已被检测到。int timeout = -1;for (;;){int n = epoll_wait(_epfd, _revs, _num, timeout);switch (n){case 0:logMessage(NORMAL, "timeout ...");break;case -1:logMessage(WARNING, "epoll_wait failed, code: %d, errstring: %s", errno, strerror(errno));break;default:logMessage(NORMAL, "have event ready");HandlerEvent(n);break;}}}~EpollServer(){if (_listensock != defaultvalue)close(_listensock);if (_epfd != defaultvalue)close(_epfd);if (_revs)delete[] _revs;}private:uint16_t _port;int _listensock;// epoll模型对应的文件描述符int _epfd;// 存放struct epoll_event结构体的指针struct epoll_event *_revs;// 就绪事件空间的大小int _num;// 回调函数func_t func_;};
}
epollServer.cc
#include "epollServer.hpp"
#include <memory>using namespace std;
using namespace epoll_ns;static void usage(std::string proc)
{std::cerr << "Usage:\n\t" << proc << " port"<< "\n\n";
}// 回调函数
std::string echo(const std::string &message)
{return "I am epollserver, " + message;
}// ./epoll_server port
int main(int argc, char *argv[])
{if (argc != 2){usage(argv[0]);exit(USAGE_ERR);}uint16_t port = atoi(argv[1]);unique_ptr<EpollServer> epollsvr(new EpollServer(echo, port));epollsvr->initServer();epollsvr->start();return 0;
}
演示结果
epoll工作方式(LT和ET模式)
多路转接接口select poll epoll所做的工作其实都是事件通知,只向上层通知事件到来,处理就绪事件的工作并不由这些API来完成,这些接口在进行事件通知时,有没有自己的策略呢?
其实是有的,在网络编程中,select poll 只支持LT工作模式,而epoll除了LT工作模式外,还支持ET工作模式,不同的工作模式对应着不同的就绪事件通知策略,LT模式是这些IO接口的默认工作模式,ET模式是epoll的高效工作模式。
下面来举一个例子帮助大家理解ET和LT模式的区别(送快递的例子)
新上任的快递员小李要给24宿舍楼的张三送快递,张三买了很多的快递,估摸着有6-7个快递,小李到了24宿舍楼楼底,然后就给楼上的张三打电话,通知张三下来拿快递,但是张三正在和他的狐朋狗友开黑打游戏呢,于是张三就嘴上答应着我马上下去,但始终就不下去,老实人小李见张三迟迟不下来拿快递,又给张三打电话,让张三下来拿快递,但张三嘴上又说,我马上下去拿快递,真的马上,但过了一会儿张三依旧还是不下来,小李又只能给张三打电话,张三啊,你的快递到了,你赶快下来取快递吧,终于张三和自己的狐朋狗友推完对面的水晶了,下楼来取快递了,但是张三一个人只拿走了3个快递,还剩下三个快递,张三也没办法了,张三一个人一次只能拿这么多快递啊,于是张三就拿着他的三个快递上楼了,继续和他的舍友开黑打游戏。结果没一会儿,小李又给张三打电话,说张三啊,你的快递没拿完呢,你买了6样东西,你只拿了3样,还剩3个包裹你没拿呢,张三又嘴上说,好的好的,我马上下去拿,但其实又重复着前面的动作,好一会儿才下楼拿走了剩余的3个包裹,当包裹全部被拿走之后,小李才不会给张三打电话了。
老油条快递员小王恰巧也要给24宿舍楼的张三送快递,恰巧的是,张三这次又买了6个快递,所以小王也碰巧要给张三送6个包裹。小王到了张三楼底下,给张三打了一个电话,说 张三啊,我只给你打一次电话,你现在要是不下来取快递,我后面是不会给你打电话的,除非你又买了新的快递,我手上你的快递数量变多的时候,我才会再给你打一个电话,所以你现在要是不下来取走快递,那我就不管你了,我给其他客户送快递去了。张三一听,这不行啊,我要是现在不下来取快递,这个快递员以后就不给我打电话了,那我下楼找不到快递员,拿不到我的快递怎么办。所以张三就立马下楼取快递去了。张三一次拿不了这么多快递啊,**但张三又不能漏下一些快递,因为小王不会再给张三打电话了,除非有小王的新快递,**所以张三刚到楼上放下手中的三个快递,又立马返回楼下取走剩余的三个快递了。
在上面的这两个例子中,其实小李的工作模式就是水平触发Level Triggered模式,简称LT模式,小王的工作模式就是边缘触发Edge Triggered模式,简称ET模式,也是多路转接接口高效的模式。
-
LT对应epoll的工作方式就是,当epoll检测到sock上有就绪的事件时,epoll_wait会立马返回通知程序员事件就绪了,程序员可以选择只读取sock缓冲区的部分数据,剩下的数据暂时不读了,等下次调用recv的时候再读取sock缓冲区中的剩余数据,下次怎么调用recv呢?
-
当然也是通过epoll_wait通知然后再进行调用啦,所以只要sock中的数据程序员没有一次性拿走,那么后续再调用epoll_wait时,epoll_wait依旧会进行就绪事件的通知,告诉程序员来读取sock中的剩余数据,而这样的方式就是LT模式,即只要底层有数据没读完,后续epoll_wait返回时就会一直通知用户读取数据。
-
而ET对应的工作方式是,如果底层有数据没读完,后续epoll_wait不会通知程序员事件就绪了,只有当底层数据增多的时候,epoll_wait才会再通知一次程序员,否则epoll_wait只会通知一次。
————————————————
版权声明:本文为CSDN博主「rygttm」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/erridjsis/article/details/132548615
LT演示
epoll_server的默认工作模式也是LT模式,在下面的代码中我将处理就绪事件的接口HandlerEvent( )屏蔽掉了,当客户端连接到来时,服务器的epoll_wait一定会检测到listensock上的读事件就绪了,所以epoll_wait会返回,告知程序员要处理数据了。但如果程序员一直不处理数据的话,那epoll_wait每次都会告知程序员要处理数据了,所以从显示器的输出结果来看,epoll_wait返回后,根据返回值n,一定是进入到了default分支中,并且每次epoll_wait都会告知程序员事件就绪,所以显示器会一直疯狂打印have events ready,因为只要底层有事件就绪,对于listensock来说,只要内核监听队列有就绪的连接,那就是就绪,epoll_wait就会一直通知程序员事件就绪了,赶快处理吧。(就像小李一样,只要张三不拿走快递,小李就会一直给张三打电话)
————————————————
版权声明:本文为CSDN博主「rygttm」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/erridjsis/article/details/132548615
#pragma once#include <iostream>
#include <string>
#include <cstring>
#include <functional>
#include <sys/epoll.h>
#include "err.hpp"
#include "log.hpp"
#include "sock.hpp"using func_t = std::function<std::string (const std::string&)>;namespace epoll_ns
{static const int defaultport = 8888;// epoll_create大于0的参数static const int size = 128;// 文件描述符将其初始化为-1static const int defaultvalue = -1;// 事件就绪空间的大小static const int defalultnum = 64;class EpollServer{public:EpollServer(func_t f, uint16_t port = defaultport, int num = defalultnum): func_(f), _num(num), _revs(nullptr), _port(port), _listensock(defaultvalue), _epfd(defaultvalue){}void initServer(){// 1. 创建socket_listensock = Sock::Socket();Sock::Bind(_listensock, _port);Sock::Listen(_listensock);// 2. 创建epoll模// _epfd是关联epoll模型的文件描述符_epfd = epoll_create(size);if (_epfd < 0){logMessage(FATAL, "epoll create error: %s", strerror(errno));exit(EPOLL_CREATE_ERR);}// 3. 添加listensock到epoll中// int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);// 第二个参数:EPOLL_CTL_ADD:注册新的fd到epfd中// 第三个参数是需要监听的fd.// 第四个参数是结构体:ev.events代表epoll要关心的事件,EPOLLIN代表读事件// ev.data.fd 就是要关心事件对应的文件描述符struct epoll_event ev;ev.events = EPOLLIN;// 当事件就绪,被重新捞取上来的时候,我们要知道是哪一个fd就绪了ev.data.fd = _listensock;epoll_ctl(_epfd, EPOLL_CTL_ADD, _listensock, &ev);// 4. 申请就绪事件的空间_revs = new struct epoll_event[_num];logMessage(NORMAL, "init server success");}void HandlerEvent(int readyNum){logMessage(DEBUG, "HandlerEvent in");for (int i = 0; i < readyNum; i++){uint32_t events = _revs[i].events;int sock = _revs[i].data.fd;if (sock == _listensock && (events & EPOLLIN)){//_listensock读事件就绪, 获取新连接std::string clientip;uint16_t clientport;int fd = Sock::Accept(sock, &clientip, &clientport);if (fd < 0){logMessage(WARNING, "accept error");continue;}// 获取新连接的fd成功,可以直接读取吗?// 不可以,因为新的文件描述符的读事件不知道其是否就绪了// 将其放入epoll,让epoll帮我们关心这个文件描述符的读事件是否就绪struct epoll_event ev;ev.events = EPOLLIN;ev.data.fd = fd;epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &ev);}else if (events & EPOLLIN){// 普通的读事件就绪// 依旧有问题char buffer[1024];// 把本轮数据读完,就一定能够读到一个完整的请求吗??int n = recv(sock, buffer, sizeof(buffer), 0);if (n > 0){buffer[n] = 0;logMessage(DEBUG, "client# %s", buffer);// TODOstd::string response = func_(buffer);send(sock, response.c_str(), response.size(), 0);}else if (n == 0){// 建议先从epoll移除,才close fd// 如果先关闭fd,再从epoll模型中移除,// 那么epoll模型会检测到这个fd是一个非法的fd,就有可能会报错epoll_ctl(_epfd, EPOLL_CTL_DEL, sock, nullptr);close(sock);logMessage(NORMAL, "client quit");}else{// 建议先从epoll移除,才close fdepoll_ctl(_epfd, EPOLL_CTL_DEL, sock, nullptr);close(sock);logMessage(ERROR, "recv error, code: %d, errstring: %s", errno, strerror(errno));}}else{}}logMessage(DEBUG, "HandlerEvent out");}void start(){// timeout为-1,那么epoll_wait()阻塞式等待// 如果 epoll_wait 返回正整数值(大于0),// 则表示有事件已经触发并且已经存储在 events 缓冲区中。// 这个值表示有多少个事件已被检测到。int timeout = -1;for (;;){int n = epoll_wait(_epfd, _revs, _num, timeout);switch (n){case 0:logMessage(NORMAL, "timeout ...");break;case -1:logMessage(WARNING, "epoll_wait failed, code: %d, errstring: %s", errno, strerror(errno));break;default:logMessage(NORMAL, "have event ready");// HandlerEvent( )屏蔽掉,显示器会一直疯狂打印have events ready// HandlerEvent(n);break;}}}~EpollServer(){if (_listensock != defaultvalue)close(_listensock);if (_epfd != defaultvalue)close(_epfd);if (_revs)delete[] _revs;}private:uint16_t _port;int _listensock;// epoll模型对应的文件描述符int _epfd;// 存放struct epoll_event结构体的指针struct epoll_event *_revs;// 就绪事件空间的大小int _num;// 回调函数func_t func_;};
}
演示结果
ET演示
-
在添加listensock到epoll底层的红黑树中时,不仅仅关心listensock的读事件,同时还让listensock的工作模式是ET(只要将EPOLLIN和EPOLLET按位或即可)。
-
所以当连接到来时,可以看到服务器只会打印一次have event ready,只要没有新连接到来,那么epoll_wait只会通知程序员一次事件就绪,除非到来了新连接,那就说明内核监听队列中就绪的连接变多了,换言之就是listensock底层的数据变多了,此时epoll_wait才会再好心提醒一次程序员,事件就绪了,你赶快处理吧。反过来就是,只要后续listensock底层的数据没有增多,那么epoll_wait就不会在通知程序员了。
-
而由于我们设置的timeout是阻塞式等待,所以你可以看到,只要没有新连接到来,服务器就会阻塞住,epoll_wait调用不会再返回,也就不会再通知程序员。而反观LT模式,虽然每次epoll_wait都是阻塞式等待,但epoll_wait每次都会返回,每次都会告知程序员,这就是两者的不同。边缘触发只会触发一次,水平触发会一直触发。
————————————————
版权声明:本文为CSDN博主「rygttm」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/erridjsis/article/details/132548615
#pragma once#include <iostream>
#include <string>
#include <cstring>
#include <functional>
#include <sys/epoll.h>
#include "err.hpp"
#include "log.hpp"
#include "sock.hpp"using func_t = std::function<std::string (const std::string&)>;namespace epoll_ns
{static const int defaultport = 8888;// epoll_create大于0的参数static const int size = 128;// 文件描述符将其初始化为-1static const int defaultvalue = -1;// 事件就绪空间的大小static const int defalultnum = 64;class EpollServer{public:EpollServer(func_t f, uint16_t port = defaultport, int num = defalultnum): func_(f), _num(num), _revs(nullptr), _port(port), _listensock(defaultvalue), _epfd(defaultvalue){}void initServer(){// 1. 创建socket_listensock = Sock::Socket();Sock::Bind(_listensock, _port);Sock::Listen(_listensock);// 2. 创建epoll模// _epfd是关联epoll模型的文件描述符_epfd = epoll_create(size);if (_epfd < 0){logMessage(FATAL, "epoll create error: %s", strerror(errno));exit(EPOLL_CREATE_ERR);}// 3. 添加listensock到epoll中// int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);// 第二个参数:EPOLL_CTL_ADD:注册新的fd到epfd中// 第三个参数是需要监听的fd.// 第四个参数是结构体:ev.events代表epoll要关心的事件,EPOLLIN代表读事件// ev.data.fd 就是要关心事件对应的文件描述符struct epoll_event ev;// EPOLL默认是LT的工作模式// EPOLLET :将EPOLL设为边缘触发(Edge Triggered)模式,// 这是相对于水平触发(Level Triggered)来说的.ev.events = EPOLLIN | EPOLLET;// 当事件就绪,被重新捞取上来的时候,我们要知道是哪一个fd就绪了ev.data.fd = _listensock;epoll_ctl(_epfd, EPOLL_CTL_ADD, _listensock, &ev);// 4. 申请就绪事件的空间_revs = new struct epoll_event[_num];logMessage(NORMAL, "init server success");}void HandlerEvent(int readyNum){logMessage(DEBUG, "HandlerEvent in");for (int i = 0; i < readyNum; i++){uint32_t events = _revs[i].events;int sock = _revs[i].data.fd;if (sock == _listensock && (events & EPOLLIN)){//_listensock读事件就绪, 获取新连接std::string clientip;uint16_t clientport;int fd = Sock::Accept(sock, &clientip, &clientport);if (fd < 0){logMessage(WARNING, "accept error");continue;}// 获取新连接的fd成功,可以直接读取吗?// 不可以,因为新的文件描述符的读事件不知道其是否就绪了// 将其放入epoll,让epoll帮我们关心这个文件描述符的读事件是否就绪struct epoll_event ev;ev.events = EPOLLIN;ev.data.fd = fd;epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &ev);}else if (events & EPOLLIN){// 普通的读事件就绪// 依旧有问题char buffer[1024];// 把本轮数据读完,就一定能够读到一个完整的请求吗??int n = recv(sock, buffer, sizeof(buffer), 0);if (n > 0){buffer[n] = 0;logMessage(DEBUG, "client# %s", buffer);// TODOstd::string response = func_(buffer);send(sock, response.c_str(), response.size(), 0);}else if (n == 0){// 建议先从epoll移除,才close fd// 如果先关闭fd,再从epoll模型中移除,// 那么epoll模型会检测到这个fd是一个非法的fd,就有可能会报错epoll_ctl(_epfd, EPOLL_CTL_DEL, sock, nullptr);close(sock);logMessage(NORMAL, "client quit");}else{// 建议先从epoll移除,才close fdepoll_ctl(_epfd, EPOLL_CTL_DEL, sock, nullptr);close(sock);logMessage(ERROR, "recv error, code: %d, errstring: %s", errno, strerror(errno));}}else{}}logMessage(DEBUG, "HandlerEvent out");}void start(){// timeout为-1,那么epoll_wait()阻塞式等待// 如果 epoll_wait 返回正整数值(大于0),// 则表示有事件已经触发并且已经存储在 events 缓冲区中。// 这个值表示有多少个事件已被检测到。int timeout = -1;for (;;){int n = epoll_wait(_epfd, _revs, _num, timeout);switch (n){case 0:logMessage(NORMAL, "timeout ...");break;case -1:logMessage(WARNING, "epoll_wait failed, code: %d, errstring: %s", errno, strerror(errno));break;default:logMessage(NORMAL, "have event ready");// HandlerEvent( )屏蔽掉,显示器会一直疯狂打印have events ready// HandlerEvent(n);break;}}}~EpollServer(){if (_listensock != defaultvalue)close(_listensock);if (_epfd != defaultvalue)close(_epfd);if (_revs)delete[] _revs;}private:uint16_t _port;int _listensock;// epoll模型对应的文件描述符int _epfd;// 存放struct epoll_event结构体的指针struct epoll_event *_revs;// 就绪事件空间的大小int _num;// 回调函数func_t func_;};
}
演示结果
ET模式高效的原因(fd必须是非阻塞的)
为什么ET模式是高效的呢?
- 这是非常重要的一个面试题,许多的面试官在问到网络环节时,都会让我们讲一下select poll epoll各自的用法,epoll的底层原理,三个接口的优缺点,还有就是epoll的两种工作模式,以及ET模式高效的原因,ET模式高效的原因也是一个高频的问题。
fd必须是非阻塞的
ET模式下,只有底层数据从无到有,从有到多的时候,才会通知上层一次,通知的机制就是rbtree+ready_queue+cb(红黑树+就绪队列+回调函数),所以ET这种通知机制就会倒逼程序员一次将底层的数据全部读走,如果不一次读走,就可能造成数据丢失,你无法保证对方一定会继续给你发数据啊,如果无法保证这点,那就无法保证epoll_wait还会通知你下一次,如果无法保证这一点,那就有可能你只读取了sock的部分数据,但后续epoll_wait可能不会再通知你了,从而导致后续的数据你永远都读不上来了,所以你必须一次将底层的数据全部读走。
如何保证一次将底层的数据全部读走呢?那就只能循环读取了,如果只调用recv一次,是无法保证一次将底层的数据全部读走的。所以我们可以打个while循环一直读sock接收缓冲区中的数据,直到读取不上来数据,但这里其实就又有一个问题了,如果sock是阻塞的,循环读读到最后一定会没数据,而此时由于sock是阻塞的,那么服务器就会阻塞在最后一次的recv系统调用处,直到有数据到来,而此时服务器就会被挂起,服务器一旦被挂起,那就完蛋了。
服务器被挂起,那就无法运行了,无法给客户提供服务了,这就很有可能造成很多公司盈利上的损失,所以服务器一定不能停下来,更不能被挂起,需要一直运行,以便给客户提供服务。而如果使用非阻塞文件描述符,当recv读取不到数据时,recv会返回-1,同时错误码被设置为EAGAIN和EWOULDBLOCK,这俩错误码的值是一样的,此时就可以判断出,我们一次把底层的数据全部都读走了。所以在工程实践上,epoll以ET模式工作时,文件描述符必须设置为非阻塞,防止服务器由于等待某种资源就绪从而被挂起。
为什么ET模式是高效的呢?
- 解释完ET模式下fd必须是非阻塞的原因后,那为什么ET模式是高效的呢?可能有人会说,因为ET模式只会通知一次,倒逼程序员将数据一次全部读走,所以ET模式就是高效的,如果这个问题满分100分的话,你这样的回答只能得到20分,因为你的回答其实仅仅只是答案的引线,真正最重要的部分你还是没说出来。
- 倒逼程序员一次将数据全部读走,那不就是让上层尽快取走数据吗?尽快取走数据后,就可以给对方发送一个更大的16位窗口大小,让对方更新出更大的滑动窗口大小,提高底层数据发送的效率,更好的使用TCP延迟应答,滑动窗口等策略!!!这才是ET模式高效的最本质的原因!!!
- 因为ET模式可以更好的利用TCP提高数据发送效率的种种策略,例如延迟应答,滑动窗口等。之前在讲TCP的时候,TCP报头有个字段叫做PSH,其实这个字段如果被设置的话,epoll_wait就会将此字段转换为通知机制,再通知一次上层,让其尽快读走数据。
————————————————
版权声明:本文为CSDN博主「rygttm」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/erridjsis/article/details/132548615
LT和ET模式使用时的读取方式
情况1:
在LT模式下,如果fd是阻塞的,那么上次只能读一次,这是出于工程需求,因为我们不能让服务器阻塞挂起,而在文件描述符是阻塞的情况下,如果我们进行循环读,则最后一次肯定会读取不到数据,那么此时服务器进程就会阻塞住,等待fd的skbuff中到来数据,但服务器是不能被阻塞挂起的,所以我们只能读取一行。
情况2:
如果fd是非阻塞的,那其实就不用担心了,我们进行循环读就可以,这样是比较高效的,因为在非阻塞且是LT工作模式的情况下,无论我们是一行读还是循环读服务器都是不会被阻塞挂起的。对于读一次来说,在LT模式下也是不会出问题的,因为只要skbuff中有数据,那么epoll_wait就会一直通知程序员来尽快取走数据,我们不用担心丢失数据的情况发生。
情况3:
在ET模式下,fd必须是非阻塞的,因为出于工程实践的角度考虑,为了让数据被程序员完整的拿到,我们只能进行循环读,而只要你进行循环读,fd万万就不能是阻塞的,因为循环读的最后一次读取一定会读不到数据,只要读不到数据,且fd是阻塞的,那么服务器就被挂起了,这并不是我们想要看到的结果,所以在ET模式下,没得商量,fd必须是非阻塞的,同时程序员在应用层读取数据的方式也必须是循环读,不可以读一行。
————————————————
版权声明:本文为CSDN博主「rygttm」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/erridjsis/article/details/132548615
个问题了,如果sock是阻塞的,循环读读到最后一定会没数据,而此时由于sock是阻塞的,那么服务器就会阻塞在最后一次的recv系统调用处,直到有数据到来,而此时服务器就会被挂起,服务器一旦被挂起,那就完蛋了。
- 服务器被挂起,那就无法运行了,无法给客户提供服务了,这就很有可能造成很多公司盈利上的损失,所以服务器一定不能停下来,更不能被挂起,需要一直运行,以便给客户提供服务。而如果使用非阻塞文件描述符,当recv读取不到数据时,recv会返回-1,同时错误码被设置为EAGAIN和EWOULDBLOCK,这俩错误码的值是一样的,此时就可以判断出,我们一次把底层的数据全部都读走了。所以在工程实践上,epoll以ET模式工作时,文件描述符必须设置为非阻塞,防止服务器由于等待某种资源就绪从而被挂起。
为什么ET模式是高效的呢?
- 解释完ET模式下fd必须是非阻塞的原因后,那为什么ET模式是高效的呢?可能有人会说,因为ET模式只会通知一次,倒逼程序员将数据一次全部读走,所以ET模式就是高效的,如果这个问题满分100分的话,你这样的回答只能得到20分,因为你的回答其实仅仅只是答案的引线,真正最重要的部分你还是没说出来。
- 倒逼程序员一次将数据全部读走,那不就是让上层尽快取走数据吗?尽快取走数据后,就可以给对方发送一个更大的16位窗口大小,让对方更新出更大的滑动窗口大小,提高底层数据发送的效率,更好的使用TCP延迟应答,滑动窗口等策略!!!这才是ET模式高效的最本质的原因!!!
- 因为ET模式可以更好的利用TCP提高数据发送效率的种种策略,例如延迟应答,滑动窗口等。之前在讲TCP的时候,TCP报头有个字段叫做PSH,其实这个字段如果被设置的话,epoll_wait就会将此字段转换为通知机制,再通知一次上层,让其尽快读走数据。
————————————————
版权声明:本文为CSDN博主「rygttm」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/erridjsis/article/details/132548615
LT和ET模式使用时的读取方式
情况1:
在LT模式下,如果fd是阻塞的,那么上次只能读一次,这是出于工程需求,因为我们不能让服务器阻塞挂起,而在文件描述符是阻塞的情况下,如果我们进行循环读,则最后一次肯定会读取不到数据,那么此时服务器进程就会阻塞住,等待fd的skbuff中到来数据,但服务器是不能被阻塞挂起的,所以我们只能读取一行。
情况2:
如果fd是非阻塞的,那其实就不用担心了,我们进行循环读就可以,这样是比较高效的,因为在非阻塞且是LT工作模式的情况下,无论我们是一行读还是循环读服务器都是不会被阻塞挂起的。对于读一次来说,在LT模式下也是不会出问题的,因为只要skbuff中有数据,那么epoll_wait就会一直通知程序员来尽快取走数据,我们不用担心丢失数据的情况发生。
情况3:
在ET模式下,fd必须是非阻塞的,因为出于工程实践的角度考虑,为了让数据被程序员完整的拿到,我们只能进行循环读,而只要你进行循环读,fd万万就不能是阻塞的,因为循环读的最后一次读取一定会读不到数据,只要读不到数据,且fd是阻塞的,那么服务器就被挂起了,这并不是我们想要看到的结果,所以在ET模式下,没得商量,fd必须是非阻塞的,同时程序员在应用层读取数据的方式也必须是循环读,不可以读一行。
————————————————
版权声明:本文为CSDN博主「rygttm」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/erridjsis/article/details/132548615