24.多路转接-poll

poll也是一种linux中的多路转接的方案

  1. 解决select的fd有上限的问题
  2. 解决select每次调用都要重新设置关心的fd

poll函数接口

poll, ppoll - wait for some event on a file descriptor#include <poll.h>int poll(struct pollfd *fds, nfds_t nfds, int timeout);DESCRIPTIONpoll() performs a similar task to select(2): it waits for one of a set of file descriptors to become ready to perform I/O.The set of file descriptors to be monitored is specified in the fds argument, which is an array of structures of the following form:
// 要监视的文件描述符集在fds参数中指定,它是一个结构数组,格式如下:
// fd 代表用户告诉内核,要帮我关心一下fd
// events 代表用户告诉内核,要帮我关心一下fd的events事件是否就绪了(如:读事件,写事件)  输入时看 fd + events
// revents 代表内核告诉用户,用户要关心fd的events事件中,有哪些fd已经就绪了(如:读事件,写事件) 输出时看 fd + revents
// 通过这样的设计,输入输出分离,poll不需要对参数进行重新设定struct pollfd {int   fd;         /* file descriptor */   short events;     /* requested events */short revents;    /* returned events */};// 参数说明
struct pollfd *fds:
fds是一个poll函数监听的结构列表.每一个元素中,包含了三部分内容:文件描述符,监听的事件集合,返回的事件集合.nfds_t nfds:    
nfds表示fds数组的长度.int timeout:
timeout表示poll函数的超时时间,单位是毫秒(ms).// 设置情况// 1. timeout > 0 : 在timeout以内阻塞,否则非阻塞返回一次// 2. timeout = 0 : 非阻塞等待// 3. timeout < 0 : 阻塞等待// 返回值(同select)
返回值大于0:代表就绪的文件描述符的个数(如果是3,代表等待的多个文件描述符中,有三个文件描述符已经就绪)
返回值等于0:超时返回了
返回值小于0:select调用失败了(通过查看错误码errno来查看具体的错误原因)

events和revents的取值(以下宏值,一个宏值在对应的events和revents中占据一个比特位):

image-20231027222755803

poll示例:使用poll监控标准输入

#include <poll.h>
#include <unistd.h>
#include <stdio.h>int main()
{struct pollfd poll_fd;poll_fd.fd = 0;poll_fd.events = POLLIN;for (;;){int ret = poll(&poll_fd, 1, 1000);if (ret < 0){perror("poll");continue;}if (ret == 0){printf("poll timeout\n");continue;}if (poll_fd.revents == POLLIN){char buf[1024] = {0};read(0, buf, sizeof(buf) - 1);printf("stdin:%s", buf);}}
}

poll演示代码(根据select代码改编)

makefile

poll_server: main.ccg++ -o $@ $^ -std=c++11
.PHONY:clean
clean:rm -f poll_server

err.hpp

#pragma once#include <iostream>enum
{USAGE_ERR = 1,SOCKET_ERR,BIND_ERR,LISTEN_ERR
};

log.hpp

#include <iostream>
#include <string>
#include <cstdarg>
#include <ctime>
#include <unistd.h>#define DEBUG   0
#define NORMAL  1
#define WARNING 2
#define ERROR   3
#define FATAL   4const char * to_levelstr(int level)
{switch(level){case DEBUG : return "DEBUG";case NORMAL: return "NORMAL";case WARNING: return "WARNING";case ERROR: return "ERROR";case FATAL: return "FATAL";default : return nullptr;}
}void logMessage(int level, const char *format, ...)
{
#define NUM 1024char logprefix[NUM];snprintf(logprefix, sizeof(logprefix), "[%s][%ld][pid: %d]",to_levelstr(level), (long int)time(nullptr), getpid());char logcontent[NUM];va_list arg;va_start(arg, format);// 将变量参数列表中的格式化数据写入大小缓冲区// 也就是写入到缓冲区logcontent中vsnprintf(logcontent, sizeof(logcontent), format, arg);std::cout << logprefix << logcontent << std::endl;
}

sock.hpp

#pragma once#include <iostream>
#include <string>
#include <cstring>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "log.hpp"
#include "err.hpp"class Sock
{// 表示全连接队列中最多有32+1个连接// 具体请看tcp相关实验const static int backlog = 32;public:static int Socket(){// 1. 创建socket文件套接字对象int sock = socket(AF_INET, SOCK_STREAM, 0);if (sock < 0){logMessage(FATAL, "create socket error");exit(SOCKET_ERR);}logMessage(NORMAL, "create socket success: %d", sock);// int setsockopt(int sockfd, int level, int optname, const void *optval, socklen_t optlen);// sockfd:套接字描述符,指定要设置选项的套接字// level:选项的级别,通常是 SOL_SOCKET 表示通用套接字选项。// 设置 SO_REUSEADDR 选项,允许地址重用// optval:一个指向包含选项值的缓冲区的指针。// optlen:指定选项值的长度。int opt = 1;// 我们将 `SO_REUSEADDR` 选项设置为1,从而启用了地址重用功能。// 这可以让套接字在绑定地址时可以重用之前被关闭的套接字的地址,// 而不会因为 TIME_WAIT 状态而无法绑定。setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));return sock;}static void Bind(int sock, int port){// 2. bind绑定自己的网络信息struct sockaddr_in local;memset(&local, 0, sizeof(local));local.sin_family = AF_INET;local.sin_port = htons(port);local.sin_addr.s_addr = INADDR_ANY;if (bind(sock, (struct sockaddr *)&local, sizeof(local)) < 0){logMessage(FATAL, "bind socket error");exit(BIND_ERR);}logMessage(NORMAL, "bind socket success");}static void Listen(int sock){// 3. 设置socket 为监听状态if (listen(sock, backlog) < 0) {logMessage(FATAL, "listen socket error");exit(LISTEN_ERR);}logMessage(NORMAL, "listen socket success");}// 获取新链接static int Accept(int listensock, std::string *clientip, uint16_t *clientport){// 在结构体中,存在端口号和IP地址struct sockaddr_in peer;socklen_t len = sizeof(peer);// int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);// 将获取的套接字的文件描述符返回int sock = accept(listensock, (struct sockaddr *)&peer, &len);// sock < 0, 获取套接字失败if (sock < 0)logMessage(ERROR, "accept error, next");else{logMessage(NORMAL, "accept a new link success, get new sock: %d", sock); // ?// 获取套接字成功,通过输入输出型参数来查看客户端的端口号和IP地址*clientip = inet_ntoa(peer.sin_addr);*clientport = ntohs(peer.sin_port);}return sock;}
};

PollServer.hpp

#pragma once#include <iostream>
#include <string>
#include <functional>
#include <poll.h>
#include "sock.hpp"namespace Poll_ns
{static const int defaultport = 8081;// 要监视的文件描述符集在rfds参数中指定,它是一个结构数组// 此处,我们设置一共申请2048个结构数组的空间,数组的个数可以自己定义static const int num = 2048;// 默认合法fd的数组中,全部将其初始化为-1static const int defaultfd = -1;using func_t = std::function<std::string(const std::string &)>;class PollServer{public:PollServer(func_t f, int port = defaultport): _func(f), _port(port), _listensock(-1), _rfds(nullptr){}void ResetItem(int i){// 将结构数组中的fd进行初始化_rfds[i].fd = defaultfd;// 将结构数组中的events和revents进行初始化_rfds[i].events = 0;_rfds[i].revents = 0;}void initServer(){// 创建的套接字_listensock = Sock::Socket();// 绑定套接字和端口号(任意IP地址bind,详看tcp套接字)Sock::Bind(_listensock, _port);// 将_listensock设置为监听套接字Sock::Listen(_listensock);// 要监视的文件描述符集在rfds参数中指定,它是一个结构数组_rfds = new struct pollfd[num];// 对这个结构化数组进行初始化for (int i = 0; i < num; i++){ResetItem(i);}// 我们将监听套接字放在数组中,下标为0_rfds[0].fd = _listensock;// POLLIN代表读事件,让poll帮我们关心_listensock的读事件_rfds[0].events = POLLIN;}void Print(){std::cout << "fd list: ";for (int i = 0; i < num; i++){if (_rfds[i].fd != defaultfd)std::cout << _rfds[i].fd << " ";}std::cout << std::endl;}void Recver(int pos){// 1. 读取request// 这里在进行读取的时候,不会被阻塞,因为poll已经监测到事件就绪了// 这里这种方式进行读取是有问题的// a.无法保证一定可以将缓冲区的数据读取完// b.就算将缓冲区的数据读取完,也无法保证能够完整的读取到一个报文// 那么应用层就无法将这个报文数据进行反序列化,转化为结构化数据// 后续在补充(目前先这样演示)char buffer[1024];ssize_t s = recv(_rfds[pos].fd, buffer, sizeof(buffer) - 1, 0);if (s > 0){// s > 0, 读取成功,s代表读取到的字符数量buffer[s] = 0;logMessage(NORMAL, "client# %s", buffer);}else if (s == 0){// s == 0,代表客户端已经关闭了,读取到的字符数为0// 因为我们也要关闭服务端对应的文件描述符close(_rfds[pos].fd);// 并将_rfds[pos]对应的数组中的节点进行初始化// 下次就不会再监测到这个节点了ResetItem(pos);logMessage(NORMAL, "client quit");return;}else{// s < 0, 代表recv()读取出错close(_rfds[pos].fd);ResetItem(pos);logMessage(ERROR, "client quit: %s", strerror(errno));return;}// 2. 处理requeststd::string response = _func(buffer);// 3. 返回responsewrite(_rfds[pos].fd, response.c_str(), response.size());}void Accepter(int listensock){logMessage(DEBUG, "Accepter in");// 此时说明_listensock一定是就绪的std::string clientip;uint16_t clientport = 0;// 获取新套接字的文件描述符,并获取客户端的端口号和IP地址int sock = Sock::Accept(listensock, &clientip, &clientport);// 如果sock < 0 ,说明获取套接字失败,则重新循环获取if (sock < 0)return;logMessage(NORMAL, "accept success [%s:%d]", clientip.c_str(), clientport);// 此时,一定可以获取新的连接,但是还不可以直接recv或者send// 因为获取到的新连接的套接字里面不一定存在数据(如果直接recv,那么recv可能还需要等数据就绪)// 所以,我们需要将新的套接字托管给poll,让poll来帮我们关心事件是否就绪int i = 0;for (; i < num; i++){// 在_rfds中寻找一个空的位置if (_rfds[i].fd != defaultfd)continue;elsebreak;}if (i == num){// 经过上面的循环,如果i == num,说明_rfds整个数组已经存储满了logMessage(WARNING, "server if full, please wait");// _rfds整个数组已经存储满了,因此直接关闭新获取的套接字close(sock);}else{// 代码运行到这里,说明在_rfds中找到了空的位置// 那么就将获取的新连接的文件描述符放入到数组中_rfds[i].fd = sock;_rfds[i].events = POLLIN;}Print();logMessage(DEBUG, "Accepter out");}// 1. rfds中,不仅仅是有一个fd是就绪的,可能存在多个// 通过循环accept(),来获取多个新连接的文件描述符// 2. 我们的select目前只处理了读事件void HandlerReadEvent(){for (int i = 0; i < num; i++){// 过滤掉非法的fdif (_rfds[i].fd == defaultfd)continue;if (!(_rfds[i].events & POLLIN))continue;// 如果_rfds[i].fd 是_listensock,// 并且_rfds[i].fd 的读事件已经就绪(也就是revents & POLLIN 为真)// 那么就代表监听套接字的读事件就绪,我们就从监听套接字中,获取新链接的文件描述符if (_rfds[i].fd == _listensock && (_rfds[i].revents & POLLIN)){Accepter(_listensock);}else if (_rfds[i].revents & POLLIN){// 如果_rfds[i].fd不是监听套接字,那么它就是普通的套接字// 如果这个普通的套接字,我们要求poll帮我们关心它的读事件(_rfds[i].events & POLLIN)// 并且这个套接字的读事件已经就绪(_rfds[i].revents & POLLIN),那么就开始读取Recver(i);}else{}}}void start(){for (;;){// 此处是1000ms,也就是1sint timeout = 1000;// 返回值大于0:代表就绪的文件描述符的个数(如果是3,代表等待的多个文件描述符中,有三个文件描述符已经就绪)int n = poll(_rfds, num, timeout);switch (n){case 0:// 返回值为0,代表在规定的时间内,没有新的连接就绪;logMessage(NORMAL, "timeout...");break;case -1:// 返回值为1,代表poll发生错误logMessage(WARNING, "poll error, code: %d, err string: %s", errno, strerror(errno));break;default:// 代码运行到这里说明有事件就绪了(目前只有一个监听事件就绪了)// 如果这个监听事件不被取走进行处理,那么它就一直处于就绪状态logMessage(NORMAL, "have event ready!");// 取走监听事件进行处理HandlerReadEvent();break;}}}~PollServer(){if (_listensock < 0)close(_listensock);if (_rfds)delete[] _rfds;}private:// 服务器需要绑定自己的端口号int _port;// 服务器需要有自己的监听套接字int _listensock;// int poll(struct pollfd *fds, nfds_t nfds, int timeout);// 要监视的文件描述符集在参数fd中指定,它是一个结构数组,格式如下:// struct pollfd// {//     int   fd;         /* file descriptor *///     short events;  /* requested events *///     short revents; /* returned events */// };// struct pollfd *fds 的底层是一个红黑树struct pollfd *_rfds;// 回调函数func_t _func;};
}

main.cc

#include "PollServer.hpp"
#include <memory>using namespace std;
using namespace Poll_ns;static void usage(std::string proc)
{std::cerr << "Usage:\n\t" << proc << " port" << "\n\n";
}// 回调函数
std::string transaction(const std::string &request)
{return request;
}// ./poll_server 8081
int main(int argc, char *argv[])
{if(argc != 2){usage(argv[0]);exit(USAGE_ERR);}unique_ptr<PollServer> svr(new PollServer(transaction,atoi(argv[1])));svr->initServer();svr->start();return 0;
}

演示结果

image-20231028183603973

poll的优点

不同与select使用三个位图来表示三个fdset的方式,poll使用一个pollfd的指针实现.

  • pollfd结构包含了要监视的event和发生的event,不再使用select“参数-值”传递的方式.接口使用比select更方便.
  • poll并没有最大数量限制(但是数量过大后性能也是会下降).

poll的缺点

poll中监听的文件描述符数目增多时

  • 和select函数一样,poll返回后,需要轮询pollfd来获取就绪的描述符.
  • 每次调用poll都需要把大量的pollfd结构从用户态拷贝到内核中.
  • 同时连接的大量客户端在一时刻可能只有很少的处于就绪状态,因此随着监视的描述符数量的增长,其效率也会线性下降.

epoll

epoll初识

  • 按照man手册的说法:是为处理大批量句柄而作了改进的poll.
  • 它是在2.5.44内核中被引进的(epoll(4) is a new API introduced in Linux kernel 2.5.44)它几乎具备了之前所说的一切优点,被公认为Linux2.6下性能最好的多路I/O就绪通知方法

epoll的相关系统调用

epoll有3个相关的系统调用.

epoll_create()

epoll_create, epoll_create1 - open an epoll file descriptor// epoll_create的功能就是创建一个epoll模型#include <sys/epoll.h>// size只需要传入一个大于0的数即可
int epoll_create(int size);DESCRIPTION
epoll_create() creates an epoll(7) instance.  Since Linux 2.6.8, the size argument is ignored, but must be greater than zero; see NOTES below.// epoll_create() 创建一个 epoll(7) 实例。 自 Linux 2.6.8 起,size 参数将被忽略,但必须大于 0;请参阅下面的注释。// 返回值
RETURN VALUE
On success, these system calls return a nonnegative file descriptor.  On error, -1 is returned, and errno is set to indicate the error.// 成功时,这些系统调用会返回一个非负的文件描述符。 如果出错,则返回-1,并设置 errno 表示出错。

epoll_ctl

epoll_ctl - control interface for an epoll descriptor
// 功能:用户告诉内核,你要帮我关心那些文件描述符上的那些事件#include <sys/epoll.h>int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);// epoll的事件注册函数
// 它不同于select()是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型.// 参数
第一个参数是epoll_create()的返回值(epoll的句柄).
第二个参数表示动作,用三个宏来表示.
第三个参数是需要监听的fd.
第四个参数是告诉内核需要监听什么事.// 第二个参数的取值:
EPOLL_CTL_ADD:注册新的fd到epfd中;
EPOLL_CTL_MOD:修改已经注册的fd的监听事件;
EPOLL_CTL_DEL:从epfd中删除一个fd;//  返回值When successful, epoll_ctl() returns zero.  When an error occurs, epoll_ctl() returns -1 and errno is set appropriately.

struct epoll_event结构如下:

typedef union epoll_data
{void *ptr;int fd;uint32_t u32;uint64_t u64;
} epoll_data_t;struct epoll_event
{// 注册时  events代表用户告诉内核,你要帮我关心什么事件,返回时代表内核告诉用户,什么事件已经就绪uint32_t events;	/* Epoll events */ epoll_data_t data;	/* User data variable 代表用户的可用数据*/
} __EPOLL_PACKED;

struct epoll_event结构体中的events可以是以下几个宏的集合:

  • EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
  • EPOLLOUT :表示对应的文件描述符可以写;
  • EPOLLPRI :表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
  • EPOLLERR :表示对应的文件描述符发生错误;
  • EPOLLHUP :表示对应的文件描述符被挂断;
  • EPOLLET :将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的.
  • EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里.

epoll_wait

epoll_wait, epoll_pwait - wait for an I/O event on an epoll file descriptor#include <sys/epoll.h>int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);// 参数
// 第一个参数是epoll_create()的返回值(epoll的句柄).// 返回值
如果 epoll_wait 返回正整数值(大于0),则表示有事件已经触发并且已经存储在 events 缓冲区中。这个值表示有多少个事件已被检测到。
如果 epoll_wait 返回0,表示等待超时,没有任何事件发生。这通常意味着在指定的超时时间内没有事件触发。
如果 epoll_wait 返回-1,表示发生了错误。可以通过检查 errno 变量来获取具体的错误信息,以了解出现了什么问题。

epoll_wait 函数是 Linux 中用于 I/O 多路复用的系统调用之一,它用于等待一组文件描述符上的事件。这是一个关键的函数,通常与 epoll 系统调用一起使用,用于实现高效的非阻塞 I/O 多路复用。

以下是 epoll_wait 函数的参数和其含义:

  • epfd:这是一个整数,表示已创建的 epoll 实例的文件描述符。在调用 epoll_createepoll_create1 创建 epoll 实例后,您将获得一个 epoll 实例的文件描述符,将其传递给 epfd

  • events:**用于回传代处理事件的数组;**这是一个指向 struct epoll_event 结构的指针,用于存储已触发事件的信息。epoll_event 结构包含有关已触发事件的详细信息,包括文件描述符和事件类型。

  • maxevents:**每次能处理的事件数;**这是一个整数,表示 events 缓冲区的大小,即可以存储多少个已触发事件。通常,您应该根据需要的事件数量来设置这个值,以确保足够的事件信息可以被存储。

  • timeout:这是一个整数,表示等待事件的超时时间(以毫秒为单位)。如果设置为正数,epoll_wait 将等待指定的毫秒数,然后返回。如果设置为0,epoll_wait 将立即返回,不等待事件。如果设置为-1,epoll_wait 将无限期地等待事件,直到有事件触发。

epoll_wait 函数的主要作用是等待文件描述符上的事件触发,并将相关事件信息存储在 events 缓冲区中。一旦有事件触发或超时,epoll_wait 将返回已触发事件的数量,并将事件信息存储在 events 中。然后,您可以遍历 events 数组来处理已触发的事件。

此函数常用于实现高性能的服务器,以有效地处理多个套接字上的并发连接和数据交换,而无需使用传统的阻塞 I/O。

epoll模型的底层原理

软硬件交互时,数据流动的整个过程

  • 数据从软件内存中拷贝到硬件外设,这个过程其实是比较好理解的,因为数据可以贯穿协议栈,层层向下封装报头,最后由硬件对应的驱动程序将数据包交付给具体的硬件,协议栈的最底层就是物理层。
  • 但数据到来时,操作系统是怎么知道网络中有数据到来了呢?这个我们之前从来没有学过,因为这属于计组的知识,我们搞软件的学习他,其实只是为了理解数据在IO流动时的整个过程。
  • 当数据到达网卡时,网卡有相应的8259中断设备,该设备用于向CPU的某个针脚发送中断信号,CPU有很多的针脚,一部分的针脚会对应一个硬件的中断设备,当CPU的针脚收到来自网卡中断设备的中断信号时,该针脚就会被点亮,触发高电平信号,该针脚对应的寄存器(CPU的工作台)里面会将该点亮的针脚解释为二进制序列,这个二进制序列就是该针脚对应的序号。
  • 接下来CPU处理器就会根据这个序号,查询一个叫做中断向量表的数据结构,中断向量表在CPU启动的时候就已经被加载到了内存的特定位置,中断向量表可以理解为一个数组结构,存储着每个中断序号所对应的处理程序的入口地址,其实就是函数指针,而该函数内部会回调网卡的驱动方法,将数据从硬件网卡拷贝到内存中的操作系统代码内部。(上面一整套的逻辑过程,全部都由操作系统来实现。至此就完成了数据从硬件到软件内存流动的过程,数据到达操作系统内部后,接下来的工作大家也很清楚,就是向上贯穿协议栈,拆分报头和有效载荷,直到最后交给应用层,软件里面的数据流动我们当然是很熟悉的。
  • 计算机的硬件中,不仅仅只有网卡有终端设备,像比较常见的硬件键盘,也有他自己的中断设备,我们在键盘上的每一次按键其实就会触发一次硬件中断。还有就是定时器模块,他也有自己的中断设备,可以在计算机整体的层面上,对内核进程进行管理和调度。
    ————————————————
    版权声明:本文为CSDN博主「rygttm」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/erridjsis/article/details/132548582

img

epoll模型内核结构图

  • 当你调用epoll_create时,内核会在底层创建一个epoll模型,该epoll模型主要由三个部分组成,红黑树+就绪队列+底层的回调机制。红黑树中的每个节点其实就是一个struct epoll_event结构体,当上层在调用epoll_ctl进行添加fd关心的事件时,其实就是向红黑树中插入节点,所以epoll_ctl对于fd关心事件的增删改,本质其实就是对内核中创建出来的红黑树进行节点的增删改,所以用户告知内核,你要帮我关心什么fd,底层就是对红黑树进行管理。

  • 就绪队列中存放的是已经就绪的struct epoll_event结构体,内核告知用户哪些fd上的事件就绪时,其实就是把就绪队列中的每个节点依次拷贝到(用户调用epoll_wait时,传入的纯输出型参数结构体数组)events中。就绪队列是一个双向链表Doubly Linked List。所以所谓的事件就绪的本质,其实就是将红黑树中的节点链入到就绪队列中,链入的过程其实也很简单,只要在红黑树节点内部多增加一个链表节点类型的指针即可,这个指针可以先初始化为nullptr,当该节点中fd关心的事件就绪时,再将这个指针指向就绪队列中的尾部节点即可。

  • 一个节点是可以同时在多个数据结构当中的,做法很简单,只要增加数据结构中元素类型的指针即可,通过修改指针的指向就可以把节点链入到新的数据结构里,在逻辑上我们把就绪队列和红黑树分开了,但在代码实现上,只需要在struct epoll_event结构体内部增加指针就可以了,让一个结构体同时在就绪队列和红黑树中。

  • 我们已经知道epoll模型的大概原理了,但还有一个问题,操作系统怎么知道红黑树上的哪些节点就绪了呢?难道操作系统也要遍历整棵红黑树,检测每个节点的就绪情况?操作系统其实并不会这样做,如果这样做的话,那epoll还谈论什么高效呢?你epoll不也得遍历所有的fd吗?和我poll遍历有什么区别呢?红黑树是查找的效率高,不是遍历的效率高,如果遍历所有的节点,红黑树其实和链表遍历在效率上是差不多的,一点都不高效!

  • 那操作系统是怎么知道红黑树上的哪个节点就绪了呢?其实是通过底层的回调机制来实现的,这也是epoll接口公认非常高效的重要的一个实现环节!

    当数据到达网卡时,我们知道数据会经过硬件中断,CPU执行中断向量表等步骤来让数据到达内存中的操作系统内部,在OS内贯穿网络协议栈时,在传输层数据会被拷贝到struct file结构体中的receive_queue接收缓冲区中,这个struct file结构体对应的文件描述符,其实就是accept上来的用于通信的sockfd,在这个结构体内部有一个非常重要的字段private_data,该指针会指向一个回调函数,这个回调函数就会把该sock对应的struct epoll_event结构体链入到就绪队列中,因为此时数据已经拷贝到内核的socket接收缓冲区了,事件已经就绪了,所以当内核在拷贝数据的同时,还会调用private_data回调方法,将该sock对应的红黑树节点链入到就绪队列中,所以操作系统根本不用遍历什么红黑树来检测哪些节点是否就绪,当数据到来时,底层的回调机制会自动将就绪的红黑树节点链入到就绪队列里。

  • 总结一下fd事件就绪时,底层的工作流程。

    当数据到达网络设备网卡时,会以硬件中断作为发起点,将中断信号通过中断设备发送到CPU的针脚,接下来CPU会查讯中断向量表,找到中断序号对应的驱动回调方法,在回调方法内部会将数据从硬件设备网卡拷贝到软件OS里。数据包在OS中会向上贯穿协议栈,到达传输层时,数据会被拷贝到struct file的内核缓冲区中,同时OS会执行一个叫做private_data的回调函数指针字段,在该回调函数内部会通过修改红黑树节点中的就绪队列指针的内容,将该节点链入到就绪队列,内核告知用户哪些fd就绪时,只需要将就绪队列中的节点内容拷贝到epoll_wait的输出型参数events即可,这就是epoll模型的底层回调机制!
    ————————————————
    版权声明:本文为CSDN博主「rygttm」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/erridjsis/article/details/132548582

image-20231028224020195

  • 当某一进程调用epoll_create方法时,Linux内核会创建一个eventpoll结构体,这个结构体中有两个成员与epoll的使用方式密切相关.
struct eventpoll
{..../*红黑树的根节点,这颗树中存储着所有添加到epoll中的需要监控的事件*/struct rb_root rbr;/*双链表中则存放着将要通过epoll_wait返回给用户的满足条件的事件*/struct list_head rdlist;....
};
  • 每一个epoll对象都有一个独立的eventpoll结构体,用于存放通过epoll_ctl方法向epoll对象中添加进来的事件.
  • 这些事件都会挂载在红黑树中,如此,重复添加的事件就可以通过红黑树而高效的识别出来(红黑树的插入时间效率是lgn,其中n为树的高度).
  • 而所有添加到epoll中的事件都会与设备(网卡)驱动程序建立回调关系,也就是说,当响应的事件发生时会调用这个回调方法.
  • 这个回调方法在内核中叫ep_poll_callback,它会将发生的事件添加到rdlist双链表中.
  • 在epoll中,对于每一个事件,都会建立一个epitem结构体.
struct epitem
{struct rb_node rbn;       // 红黑树节点struct list_head rdllink; // 双向链表节点struct epoll_filefd ffd;  // 事件句柄信息struct eventpoll *ep;     // 指向其所属的eventpoll对象struct epoll_event event; // 期待发生的事件类型
}
  • 当调用epoll_wait检查是否有事件发生时,只需要检查eventpoll对象中的rdlist双链表中是否有epitem元素即可.
  • 如果rdlist不为空,则把发生的事件复制到用户态,同时将事件数量返回给用户.这个操作的时间复杂度是O(1).

总结一下, epoll的使用过程就是三部曲:

  • 调用epoll_create创建一个epoll句柄;
  • 调用epoll_ctl,将要监控的文件描述符进行注册;
  • 调用epoll_wait,等待文件描述符就绪;

epoll演示代码(基本框架)

makefile

epoll_server: epollServer.ccg++ -o $@ $^ -std=c++11
.PHONY:clean
clean:rm -f epoll_server

err.hpp

#pragma once#include <iostream>
enum
{USAGE_ERR = 1,SOCKET_ERR,BIND_ERR,LISTEN_ERR,EPOLL_CREATE_ERR
};

log.hpp

#include <iostream>
#include <string>
#include <cstdarg>
#include <ctime>
#include <unistd.h>#define DEBUG   0
#define NORMAL  1
#define WARNING 2
#define ERROR   3
#define FATAL   4const char * to_levelstr(int level)
{switch(level){case DEBUG : return "DEBUG";case NORMAL: return "NORMAL";case WARNING: return "WARNING";case ERROR: return "ERROR";case FATAL: return "FATAL";default : return nullptr;}
}void logMessage(int level, const char *format, ...)
{
#define NUM 1024char logprefix[NUM];snprintf(logprefix, sizeof(logprefix), "[%s][%ld][pid: %d]",to_levelstr(level), (long int)time(nullptr), getpid());char logcontent[NUM];va_list arg;va_start(arg, format);// 将变量参数列表中的格式化数据写入大小缓冲区// 也就是写入到缓冲区logcontent中vsnprintf(logcontent, sizeof(logcontent), format, arg);std::cout << logprefix << logcontent << std::endl;
}

sock.hpp

#pragma once#include <iostream>
#include <string>
#include <cstring>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "log.hpp"
#include "err.hpp"class Sock
{// 表示全连接队列中最多有32+1个连接// 具体请看tcp相关实验const static int backlog = 32;public:static int Socket(){// 1. 创建socket文件套接字对象int sock = socket(AF_INET, SOCK_STREAM, 0);if (sock < 0){logMessage(FATAL, "create socket error");exit(SOCKET_ERR);}logMessage(NORMAL, "create socket success: %d", sock);// int setsockopt(int sockfd, int level, int optname, const void *optval, socklen_t optlen);// sockfd:套接字描述符,指定要设置选项的套接字// level:选项的级别,通常是 SOL_SOCKET 表示通用套接字选项。// 设置 SO_REUSEADDR 选项,允许地址重用// optval:一个指向包含选项值的缓冲区的指针。// optlen:指定选项值的长度。int opt = 1;// 我们将 `SO_REUSEADDR` 选项设置为1,从而启用了地址重用功能。// 这可以让套接字在绑定地址时可以重用之前被关闭的套接字的地址,// 而不会因为 TIME_WAIT 状态而无法绑定。setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));return sock;}static void Bind(int sock, int port){// 2. bind绑定自己的网络信息struct sockaddr_in local;memset(&local, 0, sizeof(local));local.sin_family = AF_INET;local.sin_port = htons(port);local.sin_addr.s_addr = INADDR_ANY;if (bind(sock, (struct sockaddr *)&local, sizeof(local)) < 0){logMessage(FATAL, "bind socket error");exit(BIND_ERR);}logMessage(NORMAL, "bind socket success");}static void Listen(int sock){// 3. 设置socket 为监听状态if (listen(sock, backlog) < 0){logMessage(FATAL, "listen socket error");exit(LISTEN_ERR);}logMessage(NORMAL, "listen socket success");}// 获取新链接static int Accept(int listensock, std::string *clientip, uint16_t *clientport){// 在结构体中,存在端口号和IP地址struct sockaddr_in peer;socklen_t len = sizeof(peer);// int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);// 将获取的套接字的文件描述符返回int sock = accept(listensock, (struct sockaddr *)&peer, &len);// sock < 0, 获取套接字失败if (sock < 0)logMessage(ERROR, "accept error, next");else{logMessage(NORMAL, "accept a new link success, get new sock: %d", sock); // ?// 获取套接字成功,通过输入输出型参数来查看客户端的端口号和IP地址*clientip = inet_ntoa(peer.sin_addr);*clientport = ntohs(peer.sin_port);}return sock;}
};

epollServer.hpp

#pragma once#include <iostream>
#include <string>
#include <cstring>
#include <functional>
#include <sys/epoll.h>
#include "err.hpp"
#include "log.hpp"
#include "sock.hpp"using func_t = std::function<std::string (const std::string&)>;namespace epoll_ns
{static const int defaultport = 8888;// epoll_create大于0的参数static const int size = 128;// 文件描述符将其初始化为-1static const int defaultvalue = -1;// 事件就绪空间的大小static const int defalultnum = 64;class EpollServer{public:EpollServer(func_t f,  uint16_t port = defaultport, int num = defalultnum): func_(f), _num(num), _revs(nullptr), _port(port), _listensock(defaultvalue), _epfd(defaultvalue){}void initServer(){// 1. 创建socket_listensock = Sock::Socket();Sock::Bind(_listensock, _port);Sock::Listen(_listensock);// 2. 创建epoll模// _epfd是关联epoll模型的文件描述符_epfd = epoll_create(size);if (_epfd < 0){logMessage(FATAL, "epoll create error: %s", strerror(errno));exit(EPOLL_CREATE_ERR);}// 3. 添加listensock到epoll中// int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);// 第二个参数:EPOLL_CTL_ADD:注册新的fd到epfd中// 第三个参数是需要监听的fd.// 第四个参数是结构体:ev.events代表epoll要关心的事件,EPOLLIN代表读事件// ev.data.fd 就是要关心事件对应的文件描述符struct epoll_event ev;ev.events = EPOLLIN;// 当事件就绪,被重新捞取上来的时候,我们要知道是哪一个fd就绪了ev.data.fd = _listensock;epoll_ctl(_epfd, EPOLL_CTL_ADD, _listensock, &ev);// 4. 申请就绪事件的空间_revs = new struct epoll_event[_num];logMessage(NORMAL, "init server success");}void HandlerEvent(int readyNum){logMessage(DEBUG, "HandlerEvent in");for (int i = 0; i < readyNum; i++){uint32_t events = _revs[i].events;int sock = _revs[i].data.fd;if (sock == _listensock && (events & EPOLLIN)){//_listensock读事件就绪, 获取新连接std::string clientip;uint16_t clientport;int fd = Sock::Accept(sock, &clientip, &clientport);if (fd < 0){logMessage(WARNING, "accept error");continue;}// 获取新连接的fd成功,可以直接读取吗?// 不可以,因为新的文件描述符的读事件不知道其是否就绪了// 将其放入epoll,让epoll帮我们关心这个文件描述符的读事件是否就绪struct epoll_event ev;ev.events = EPOLLIN;ev.data.fd = fd;epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &ev);}else if (events & EPOLLIN){// 普通的读事件就绪// 依旧有问题char buffer[1024];// 把本轮数据读完,就一定能够读到一个完整的请求吗??int n = recv(sock, buffer, sizeof(buffer), 0);if (n > 0){buffer[n] = 0;logMessage(DEBUG, "client# %s", buffer);// TODOstd::string response = func_(buffer);send(sock, response.c_str(), response.size(), 0);}else if (n == 0){// 建议先从epoll移除,才close fd// 如果先关闭fd,再从epoll模型中移除,// 那么epoll模型会检测到这个fd是一个非法的fd,就有可能会报错epoll_ctl(_epfd, EPOLL_CTL_DEL, sock, nullptr);close(sock);logMessage(NORMAL, "client quit");}else{// 建议先从epoll移除,才close fdepoll_ctl(_epfd, EPOLL_CTL_DEL, sock, nullptr);close(sock);logMessage(ERROR, "recv error, code: %d, errstring: %s", errno, strerror(errno));}}else{}}logMessage(DEBUG, "HandlerEvent out");}void start(){// timeout为-1,那么epoll_wait()阻塞式等待// 如果 epoll_wait 返回正整数值(大于0),// 则表示有事件已经触发并且已经存储在 events 缓冲区中。// 这个值表示有多少个事件已被检测到。int timeout = -1;for (;;){int n = epoll_wait(_epfd, _revs, _num, timeout);switch (n){case 0:logMessage(NORMAL, "timeout ...");break;case -1:logMessage(WARNING, "epoll_wait failed, code: %d, errstring: %s", errno, strerror(errno));break;default:logMessage(NORMAL, "have event ready");HandlerEvent(n);break;}}}~EpollServer(){if (_listensock != defaultvalue)close(_listensock);if (_epfd != defaultvalue)close(_epfd);if (_revs)delete[] _revs;}private:uint16_t _port;int _listensock;// epoll模型对应的文件描述符int _epfd;// 存放struct epoll_event结构体的指针struct epoll_event *_revs;// 就绪事件空间的大小int _num;// 回调函数func_t func_;};
}

epollServer.cc

#include "epollServer.hpp"
#include <memory>using namespace std;
using namespace epoll_ns;static void usage(std::string proc)
{std::cerr << "Usage:\n\t" << proc << " port"<< "\n\n";
}// 回调函数
std::string echo(const std::string &message)
{return "I am epollserver, " + message;
}// ./epoll_server port
int main(int argc, char *argv[])
{if (argc != 2){usage(argv[0]);exit(USAGE_ERR);}uint16_t port = atoi(argv[1]);unique_ptr<EpollServer> epollsvr(new EpollServer(echo, port));epollsvr->initServer();epollsvr->start();return 0;
}

演示结果

image-20231029162512198

epoll工作方式(LT和ET模式)

  • 多路转接接口select poll epoll所做的工作其实都是事件通知,只向上层通知事件到来,处理就绪事件的工作并不由这些API来完成,这些接口在进行事件通知时,有没有自己的策略呢?

  • 其实是有的,在网络编程中,select poll 只支持LT工作模式,而epoll除了LT工作模式外,还支持ET工作模式,不同的工作模式对应着不同的就绪事件通知策略,LT模式是这些IO接口的默认工作模式,ET模式是epoll的高效工作模式。

下面来举一个例子帮助大家理解ET和LT模式的区别(送快递的例子)

  • 新上任的快递员小李要给24宿舍楼的张三送快递,张三买了很多的快递,估摸着有6-7个快递,小李到了24宿舍楼楼底,然后就给楼上的张三打电话,通知张三下来拿快递,但是张三正在和他的狐朋狗友开黑打游戏呢,于是张三就嘴上答应着我马上下去,但始终就不下去,老实人小李见张三迟迟不下来拿快递,又给张三打电话,让张三下来拿快递,但张三嘴上又说,我马上下去拿快递,真的马上,但过了一会儿张三依旧还是不下来,小李又只能给张三打电话,张三啊,你的快递到了,你赶快下来取快递吧,终于张三和自己的狐朋狗友推完对面的水晶了,下楼来取快递了,但是张三一个人只拿走了3个快递,还剩下三个快递,张三也没办法了,张三一个人一次只能拿这么多快递啊,于是张三就拿着他的三个快递上楼了,继续和他的舍友开黑打游戏。结果没一会儿,小李又给张三打电话,说张三啊,你的快递没拿完呢,你买了6样东西,你只拿了3样,还剩3个包裹你没拿呢,张三又嘴上说,好的好的,我马上下去拿,但其实又重复着前面的动作,好一会儿才下楼拿走了剩余的3个包裹,当包裹全部被拿走之后,小李才不会给张三打电话了。

  • 老油条快递员小王恰巧也要给24宿舍楼的张三送快递,恰巧的是,张三这次又买了6个快递,所以小王也碰巧要给张三送6个包裹。小王到了张三楼底下,给张三打了一个电话,说 张三啊,我只给你打一次电话,你现在要是不下来取快递,我后面是不会给你打电话的,除非你又买了新的快递,我手上你的快递数量变多的时候,我才会再给你打一个电话,所以你现在要是不下来取走快递,那我就不管你了,我给其他客户送快递去了。张三一听,这不行啊,我要是现在不下来取快递,这个快递员以后就不给我打电话了,那我下楼找不到快递员,拿不到我的快递怎么办。所以张三就立马下楼取快递去了。张三一次拿不了这么多快递啊,**但张三又不能漏下一些快递,因为小王不会再给张三打电话了,除非有小王的新快递,**所以张三刚到楼上放下手中的三个快递,又立马返回楼下取走剩余的三个快递了。

在上面的这两个例子中,其实小李的工作模式就是水平触发Level Triggered模式,简称LT模式,小王的工作模式就是边缘触发Edge Triggered模式,简称ET模式,也是多路转接接口高效的模式。

  • LT对应epoll的工作方式就是,当epoll检测到sock上有就绪的事件时,epoll_wait会立马返回通知程序员事件就绪了,程序员可以选择只读取sock缓冲区的部分数据,剩下的数据暂时不读了,等下次调用recv的时候再读取sock缓冲区中的剩余数据,下次怎么调用recv呢?

  • 当然也是通过epoll_wait通知然后再进行调用啦,所以只要sock中的数据程序员没有一次性拿走,那么后续再调用epoll_wait时,epoll_wait依旧会进行就绪事件的通知,告诉程序员来读取sock中的剩余数据,而这样的方式就是LT模式,即只要底层有数据没读完,后续epoll_wait返回时就会一直通知用户读取数据。

  • 而ET对应的工作方式是,如果底层有数据没读完,后续epoll_wait不会通知程序员事件就绪了,只有当底层数据增多的时候,epoll_wait才会再通知一次程序员,否则epoll_wait只会通知一次。

————————————————
版权声明:本文为CSDN博主「rygttm」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/erridjsis/article/details/132548615

LT演示

epoll_server的默认工作模式也是LT模式,在下面的代码中我将处理就绪事件的接口HandlerEvent( )屏蔽掉了,当客户端连接到来时,服务器的epoll_wait一定会检测到listensock上的读事件就绪了,所以epoll_wait会返回,告知程序员要处理数据了。但如果程序员一直不处理数据的话,那epoll_wait每次都会告知程序员要处理数据了,所以从显示器的输出结果来看,epoll_wait返回后,根据返回值n,一定是进入到了default分支中,并且每次epoll_wait都会告知程序员事件就绪,所以显示器会一直疯狂打印have events ready,因为只要底层有事件就绪,对于listensock来说,只要内核监听队列有就绪的连接,那就是就绪,epoll_wait就会一直通知程序员事件就绪了,赶快处理吧。(就像小李一样,只要张三不拿走快递,小李就会一直给张三打电话)
————————————————
版权声明:本文为CSDN博主「rygttm」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/erridjsis/article/details/132548615

#pragma once#include <iostream>
#include <string>
#include <cstring>
#include <functional>
#include <sys/epoll.h>
#include "err.hpp"
#include "log.hpp"
#include "sock.hpp"using func_t = std::function<std::string (const std::string&)>;namespace epoll_ns
{static const int defaultport = 8888;// epoll_create大于0的参数static const int size = 128;// 文件描述符将其初始化为-1static const int defaultvalue = -1;// 事件就绪空间的大小static const int defalultnum = 64;class EpollServer{public:EpollServer(func_t f,  uint16_t port = defaultport, int num = defalultnum): func_(f), _num(num), _revs(nullptr), _port(port), _listensock(defaultvalue), _epfd(defaultvalue){}void initServer(){// 1. 创建socket_listensock = Sock::Socket();Sock::Bind(_listensock, _port);Sock::Listen(_listensock);// 2. 创建epoll模// _epfd是关联epoll模型的文件描述符_epfd = epoll_create(size);if (_epfd < 0){logMessage(FATAL, "epoll create error: %s", strerror(errno));exit(EPOLL_CREATE_ERR);}// 3. 添加listensock到epoll中// int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);// 第二个参数:EPOLL_CTL_ADD:注册新的fd到epfd中// 第三个参数是需要监听的fd.// 第四个参数是结构体:ev.events代表epoll要关心的事件,EPOLLIN代表读事件// ev.data.fd 就是要关心事件对应的文件描述符struct epoll_event ev;ev.events = EPOLLIN;// 当事件就绪,被重新捞取上来的时候,我们要知道是哪一个fd就绪了ev.data.fd = _listensock;epoll_ctl(_epfd, EPOLL_CTL_ADD, _listensock, &ev);// 4. 申请就绪事件的空间_revs = new struct epoll_event[_num];logMessage(NORMAL, "init server success");}void HandlerEvent(int readyNum){logMessage(DEBUG, "HandlerEvent in");for (int i = 0; i < readyNum; i++){uint32_t events = _revs[i].events;int sock = _revs[i].data.fd;if (sock == _listensock && (events & EPOLLIN)){//_listensock读事件就绪, 获取新连接std::string clientip;uint16_t clientport;int fd = Sock::Accept(sock, &clientip, &clientport);if (fd < 0){logMessage(WARNING, "accept error");continue;}// 获取新连接的fd成功,可以直接读取吗?// 不可以,因为新的文件描述符的读事件不知道其是否就绪了// 将其放入epoll,让epoll帮我们关心这个文件描述符的读事件是否就绪struct epoll_event ev;ev.events = EPOLLIN;ev.data.fd = fd;epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &ev);}else if (events & EPOLLIN){// 普通的读事件就绪// 依旧有问题char buffer[1024];// 把本轮数据读完,就一定能够读到一个完整的请求吗??int n = recv(sock, buffer, sizeof(buffer), 0);if (n > 0){buffer[n] = 0;logMessage(DEBUG, "client# %s", buffer);// TODOstd::string response = func_(buffer);send(sock, response.c_str(), response.size(), 0);}else if (n == 0){// 建议先从epoll移除,才close fd// 如果先关闭fd,再从epoll模型中移除,// 那么epoll模型会检测到这个fd是一个非法的fd,就有可能会报错epoll_ctl(_epfd, EPOLL_CTL_DEL, sock, nullptr);close(sock);logMessage(NORMAL, "client quit");}else{// 建议先从epoll移除,才close fdepoll_ctl(_epfd, EPOLL_CTL_DEL, sock, nullptr);close(sock);logMessage(ERROR, "recv error, code: %d, errstring: %s", errno, strerror(errno));}}else{}}logMessage(DEBUG, "HandlerEvent out");}void start(){// timeout为-1,那么epoll_wait()阻塞式等待// 如果 epoll_wait 返回正整数值(大于0),// 则表示有事件已经触发并且已经存储在 events 缓冲区中。// 这个值表示有多少个事件已被检测到。int timeout = -1;for (;;){int n = epoll_wait(_epfd, _revs, _num, timeout);switch (n){case 0:logMessage(NORMAL, "timeout ...");break;case -1:logMessage(WARNING, "epoll_wait failed, code: %d, errstring: %s", errno, strerror(errno));break;default:logMessage(NORMAL, "have event ready");// HandlerEvent( )屏蔽掉,显示器会一直疯狂打印have events ready// HandlerEvent(n);break;}}}~EpollServer(){if (_listensock != defaultvalue)close(_listensock);if (_epfd != defaultvalue)close(_epfd);if (_revs)delete[] _revs;}private:uint16_t _port;int _listensock;// epoll模型对应的文件描述符int _epfd;// 存放struct epoll_event结构体的指针struct epoll_event *_revs;// 就绪事件空间的大小int _num;// 回调函数func_t func_;};
}

演示结果

image-20231029172029422

ET演示

  • 在添加listensock到epoll底层的红黑树中时,不仅仅关心listensock的读事件,同时还让listensock的工作模式是ET(只要将EPOLLIN和EPOLLET按位或即可)。

  • 所以当连接到来时,可以看到服务器只会打印一次have event ready,只要没有新连接到来,那么epoll_wait只会通知程序员一次事件就绪,除非到来了新连接,那就说明内核监听队列中就绪的连接变多了,换言之就是listensock底层的数据变多了,此时epoll_wait才会再好心提醒一次程序员,事件就绪了,你赶快处理吧。反过来就是,只要后续listensock底层的数据没有增多,那么epoll_wait就不会在通知程序员了。

  • 而由于我们设置的timeout是阻塞式等待,所以你可以看到,只要没有新连接到来,服务器就会阻塞住,epoll_wait调用不会再返回,也就不会再通知程序员。而反观LT模式,虽然每次epoll_wait都是阻塞式等待,但epoll_wait每次都会返回,每次都会告知程序员,这就是两者的不同。边缘触发只会触发一次,水平触发会一直触发。

————————————————
版权声明:本文为CSDN博主「rygttm」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/erridjsis/article/details/132548615

#pragma once#include <iostream>
#include <string>
#include <cstring>
#include <functional>
#include <sys/epoll.h>
#include "err.hpp"
#include "log.hpp"
#include "sock.hpp"using func_t = std::function<std::string (const std::string&)>;namespace epoll_ns
{static const int defaultport = 8888;// epoll_create大于0的参数static const int size = 128;// 文件描述符将其初始化为-1static const int defaultvalue = -1;// 事件就绪空间的大小static const int defalultnum = 64;class EpollServer{public:EpollServer(func_t f,  uint16_t port = defaultport, int num = defalultnum): func_(f), _num(num), _revs(nullptr), _port(port), _listensock(defaultvalue), _epfd(defaultvalue){}void initServer(){// 1. 创建socket_listensock = Sock::Socket();Sock::Bind(_listensock, _port);Sock::Listen(_listensock);// 2. 创建epoll模// _epfd是关联epoll模型的文件描述符_epfd = epoll_create(size);if (_epfd < 0){logMessage(FATAL, "epoll create error: %s", strerror(errno));exit(EPOLL_CREATE_ERR);}// 3. 添加listensock到epoll中// int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);// 第二个参数:EPOLL_CTL_ADD:注册新的fd到epfd中// 第三个参数是需要监听的fd.// 第四个参数是结构体:ev.events代表epoll要关心的事件,EPOLLIN代表读事件// ev.data.fd 就是要关心事件对应的文件描述符struct epoll_event ev;// EPOLL默认是LT的工作模式// EPOLLET  :将EPOLL设为边缘触发(Edge Triggered)模式,// 这是相对于水平触发(Level Triggered)来说的.ev.events = EPOLLIN | EPOLLET;// 当事件就绪,被重新捞取上来的时候,我们要知道是哪一个fd就绪了ev.data.fd = _listensock;epoll_ctl(_epfd, EPOLL_CTL_ADD, _listensock, &ev);// 4. 申请就绪事件的空间_revs = new struct epoll_event[_num];logMessage(NORMAL, "init server success");}void HandlerEvent(int readyNum){logMessage(DEBUG, "HandlerEvent in");for (int i = 0; i < readyNum; i++){uint32_t events = _revs[i].events;int sock = _revs[i].data.fd;if (sock == _listensock && (events & EPOLLIN)){//_listensock读事件就绪, 获取新连接std::string clientip;uint16_t clientport;int fd = Sock::Accept(sock, &clientip, &clientport);if (fd < 0){logMessage(WARNING, "accept error");continue;}// 获取新连接的fd成功,可以直接读取吗?// 不可以,因为新的文件描述符的读事件不知道其是否就绪了// 将其放入epoll,让epoll帮我们关心这个文件描述符的读事件是否就绪struct epoll_event ev;ev.events = EPOLLIN;ev.data.fd = fd;epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &ev);}else if (events & EPOLLIN){// 普通的读事件就绪// 依旧有问题char buffer[1024];// 把本轮数据读完,就一定能够读到一个完整的请求吗??int n = recv(sock, buffer, sizeof(buffer), 0);if (n > 0){buffer[n] = 0;logMessage(DEBUG, "client# %s", buffer);// TODOstd::string response = func_(buffer);send(sock, response.c_str(), response.size(), 0);}else if (n == 0){// 建议先从epoll移除,才close fd// 如果先关闭fd,再从epoll模型中移除,// 那么epoll模型会检测到这个fd是一个非法的fd,就有可能会报错epoll_ctl(_epfd, EPOLL_CTL_DEL, sock, nullptr);close(sock);logMessage(NORMAL, "client quit");}else{// 建议先从epoll移除,才close fdepoll_ctl(_epfd, EPOLL_CTL_DEL, sock, nullptr);close(sock);logMessage(ERROR, "recv error, code: %d, errstring: %s", errno, strerror(errno));}}else{}}logMessage(DEBUG, "HandlerEvent out");}void start(){// timeout为-1,那么epoll_wait()阻塞式等待// 如果 epoll_wait 返回正整数值(大于0),// 则表示有事件已经触发并且已经存储在 events 缓冲区中。// 这个值表示有多少个事件已被检测到。int timeout = -1;for (;;){int n = epoll_wait(_epfd, _revs, _num, timeout);switch (n){case 0:logMessage(NORMAL, "timeout ...");break;case -1:logMessage(WARNING, "epoll_wait failed, code: %d, errstring: %s", errno, strerror(errno));break;default:logMessage(NORMAL, "have event ready");// HandlerEvent( )屏蔽掉,显示器会一直疯狂打印have events ready// HandlerEvent(n);break;}}}~EpollServer(){if (_listensock != defaultvalue)close(_listensock);if (_epfd != defaultvalue)close(_epfd);if (_revs)delete[] _revs;}private:uint16_t _port;int _listensock;// epoll模型对应的文件描述符int _epfd;// 存放struct epoll_event结构体的指针struct epoll_event *_revs;// 就绪事件空间的大小int _num;// 回调函数func_t func_;};
}

演示结果

image-20231029173340833

ET模式高效的原因(fd必须是非阻塞的)

为什么ET模式是高效的呢?

  • 这是非常重要的一个面试题,许多的面试官在问到网络环节时,都会让我们讲一下select poll epoll各自的用法,epoll的底层原理,三个接口的优缺点,还有就是epoll的两种工作模式,以及ET模式高效的原因,ET模式高效的原因也是一个高频的问题。

fd必须是非阻塞的

  1. ET模式下,只有底层数据从无到有,从有到多的时候,才会通知上层一次,通知的机制就是rbtree+ready_queue+cb(红黑树+就绪队列+回调函数),所以ET这种通知机制就会倒逼程序员一次将底层的数据全部读走,如果不一次读走,就可能造成数据丢失,你无法保证对方一定会继续给你发数据啊,如果无法保证这点,那就无法保证epoll_wait还会通知你下一次,如果无法保证这一点,那就有可能你只读取了sock的部分数据,但后续epoll_wait可能不会再通知你了,从而导致后续的数据你永远都读不上来了,所以你必须一次将底层的数据全部读走。

  2. 如何保证一次将底层的数据全部读走呢?那就只能循环读取了,如果只调用recv一次,是无法保证一次将底层的数据全部读走的。所以我们可以打个while循环一直读sock接收缓冲区中的数据,直到读取不上来数据,但这里其实就又有一个问题了,如果sock是阻塞的,循环读读到最后一定会没数据,而此时由于sock是阻塞的,那么服务器就会阻塞在最后一次的recv系统调用处,直到有数据到来,而此时服务器就会被挂起,服务器一旦被挂起,那就完蛋了。

  3. 服务器被挂起,那就无法运行了,无法给客户提供服务了,这就很有可能造成很多公司盈利上的损失,所以服务器一定不能停下来,更不能被挂起,需要一直运行,以便给客户提供服务。而如果使用非阻塞文件描述符,当recv读取不到数据时,recv会返回-1,同时错误码被设置为EAGAIN和EWOULDBLOCK,这俩错误码的值是一样的,此时就可以判断出,我们一次把底层的数据全部都读走了。所以在工程实践上,epoll以ET模式工作时,文件描述符必须设置为非阻塞,防止服务器由于等待某种资源就绪从而被挂起。

为什么ET模式是高效的呢?

  • 解释完ET模式下fd必须是非阻塞的原因后,那为什么ET模式是高效的呢?可能有人会说,因为ET模式只会通知一次,倒逼程序员将数据一次全部读走,所以ET模式就是高效的,如果这个问题满分100分的话,你这样的回答只能得到20分,因为你的回答其实仅仅只是答案的引线,真正最重要的部分你还是没说出来。
  • 倒逼程序员一次将数据全部读走,那不就是让上层尽快取走数据吗?尽快取走数据后,就可以给对方发送一个更大的16位窗口大小,让对方更新出更大的滑动窗口大小,提高底层数据发送的效率,更好的使用TCP延迟应答,滑动窗口等策略!!!这才是ET模式高效的最本质的原因!!!
  • 因为ET模式可以更好的利用TCP提高数据发送效率的种种策略,例如延迟应答,滑动窗口等。之前在讲TCP的时候,TCP报头有个字段叫做PSH,其实这个字段如果被设置的话,epoll_wait就会将此字段转换为通知机制,再通知一次上层,让其尽快读走数据。

————————————————
版权声明:本文为CSDN博主「rygttm」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/erridjsis/article/details/132548615

LT和ET模式使用时的读取方式

情况1:

在LT模式下,如果fd是阻塞的,那么上次只能读一次,这是出于工程需求,因为我们不能让服务器阻塞挂起,而在文件描述符是阻塞的情况下,如果我们进行循环读,则最后一次肯定会读取不到数据,那么此时服务器进程就会阻塞住,等待fd的skbuff中到来数据,但服务器是不能被阻塞挂起的,所以我们只能读取一行。

情况2:

如果fd是非阻塞的,那其实就不用担心了,我们进行循环读就可以,这样是比较高效的,因为在非阻塞且是LT工作模式的情况下,无论我们是一行读还是循环读服务器都是不会被阻塞挂起的。对于读一次来说,在LT模式下也是不会出问题的,因为只要skbuff中有数据,那么epoll_wait就会一直通知程序员来尽快取走数据,我们不用担心丢失数据的情况发生。

情况3:

在ET模式下,fd必须是非阻塞的,因为出于工程实践的角度考虑,为了让数据被程序员完整的拿到,我们只能进行循环读,而只要你进行循环读,fd万万就不能是阻塞的,因为循环读的最后一次读取一定会读不到数据,只要读不到数据,且fd是阻塞的,那么服务器就被挂起了,这并不是我们想要看到的结果,所以在ET模式下,没得商量,fd必须是非阻塞的,同时程序员在应用层读取数据的方式也必须是循环读,不可以读一行。

image-20231029174738445

————————————————
版权声明:本文为CSDN博主「rygttm」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/erridjsis/article/details/132548615
个问题了,如果sock是阻塞的,循环读读到最后一定会没数据,而此时由于sock是阻塞的,那么服务器就会阻塞在最后一次的recv系统调用处,直到有数据到来,而此时服务器就会被挂起,服务器一旦被挂起,那就完蛋了。

  1. 服务器被挂起,那就无法运行了,无法给客户提供服务了,这就很有可能造成很多公司盈利上的损失,所以服务器一定不能停下来,更不能被挂起,需要一直运行,以便给客户提供服务。而如果使用非阻塞文件描述符,当recv读取不到数据时,recv会返回-1,同时错误码被设置为EAGAIN和EWOULDBLOCK,这俩错误码的值是一样的,此时就可以判断出,我们一次把底层的数据全部都读走了。所以在工程实践上,epoll以ET模式工作时,文件描述符必须设置为非阻塞,防止服务器由于等待某种资源就绪从而被挂起。

为什么ET模式是高效的呢?

  • 解释完ET模式下fd必须是非阻塞的原因后,那为什么ET模式是高效的呢?可能有人会说,因为ET模式只会通知一次,倒逼程序员将数据一次全部读走,所以ET模式就是高效的,如果这个问题满分100分的话,你这样的回答只能得到20分,因为你的回答其实仅仅只是答案的引线,真正最重要的部分你还是没说出来。
  • 倒逼程序员一次将数据全部读走,那不就是让上层尽快取走数据吗?尽快取走数据后,就可以给对方发送一个更大的16位窗口大小,让对方更新出更大的滑动窗口大小,提高底层数据发送的效率,更好的使用TCP延迟应答,滑动窗口等策略!!!这才是ET模式高效的最本质的原因!!!
  • 因为ET模式可以更好的利用TCP提高数据发送效率的种种策略,例如延迟应答,滑动窗口等。之前在讲TCP的时候,TCP报头有个字段叫做PSH,其实这个字段如果被设置的话,epoll_wait就会将此字段转换为通知机制,再通知一次上层,让其尽快读走数据。

————————————————
版权声明:本文为CSDN博主「rygttm」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/erridjsis/article/details/132548615

LT和ET模式使用时的读取方式

情况1:

在LT模式下,如果fd是阻塞的,那么上次只能读一次,这是出于工程需求,因为我们不能让服务器阻塞挂起,而在文件描述符是阻塞的情况下,如果我们进行循环读,则最后一次肯定会读取不到数据,那么此时服务器进程就会阻塞住,等待fd的skbuff中到来数据,但服务器是不能被阻塞挂起的,所以我们只能读取一行。

情况2:

如果fd是非阻塞的,那其实就不用担心了,我们进行循环读就可以,这样是比较高效的,因为在非阻塞且是LT工作模式的情况下,无论我们是一行读还是循环读服务器都是不会被阻塞挂起的。对于读一次来说,在LT模式下也是不会出问题的,因为只要skbuff中有数据,那么epoll_wait就会一直通知程序员来尽快取走数据,我们不用担心丢失数据的情况发生。

情况3:

在ET模式下,fd必须是非阻塞的,因为出于工程实践的角度考虑,为了让数据被程序员完整的拿到,我们只能进行循环读,而只要你进行循环读,fd万万就不能是阻塞的,因为循环读的最后一次读取一定会读不到数据,只要读不到数据,且fd是阻塞的,那么服务器就被挂起了,这并不是我们想要看到的结果,所以在ET模式下,没得商量,fd必须是非阻塞的,同时程序员在应用层读取数据的方式也必须是循环读,不可以读一行。

image-20231029174738445

————————————————
版权声明:本文为CSDN博主「rygttm」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/erridjsis/article/details/132548615

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/76500.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux 基础入门操作 前言 linux操作指令介绍

1 linux 目录介绍 Linux 文件系统采用层次化的目录结构&#xff0c;所有目录都从根目录 / 开始 1.1 核心目录 / (根目录) 整个文件系统的起点、包含所有其他目录和文件 /bin (基本命令二进制文件) 存放系统最基本的shell命令&#xff1a;如 ls, cp, mv, rm, cat 等&#…

Chrome开发者工具实战:调试三剑客

在前端开发的世界里&#xff0c;Chrome开发者工具就是我们的瑞士军刀&#xff0c;它集成了各种强大的功能&#xff0c;帮助我们快速定位和解决代码中的问题。今天&#xff0c;就让我们一起来看看如何使用Chrome开发者工具中的“调试三剑客”&#xff1a;断点调试、调用栈跟踪和…

函数柯里化(Currying)介绍(一种将接受多个参数的函数转换为一系列接受单一参数的函数的技术)

文章目录 柯里化的特点示例普通函数柯里化实现使用Lodash进行柯里化 应用场景总结 函数柯里化&#xff08;Currying&#xff09;是一种将接受多个参数的函数转换为一系列接受单一参数的函数的技术。换句话说&#xff0c;柯里化将一个多参数函数转化为一系列嵌套的单参数函数。 …

torch.nn中的非线性激活介绍合集——Pytorch中的非线性激活

1、nn.ELU 基本语法&#xff1a; class torch.nn.ELU(alpha1.0, inplaceFalse)按元素应用 Exponential Linear Unit &#xff08;ELU&#xff09; 函数。 论文中描述的方法&#xff1a;通过指数线性单元 &#xff08;ELU&#xff09; 进行快速准确的深度网络学习。 ELU 定义为…

Databend Cloud Dashboard 全新升级:直击痛点,释放数据价值

自 Databend Cloud 上线以来&#xff0c;我们一直致力于为用户提供高效的数据处理与可视化体验。早期&#xff0c;我们在工作区的“图表”区域推出了轻量级可视化功能&#xff0c;支持积分卡、饼图、柱状图和折线图四种展示方式。这些功能简单易用&#xff0c;基本满足了用户对…

Android Fresco 框架扩展模块源码深度剖析(四)

Android Fresco 框架扩展模块源码深度剖析 一、引言 在 Android 开发领域&#xff0c;图片处理一直是一个重要且具有挑战性的任务。Fresco 作为 Facebook 开源的强大图片加载框架&#xff0c;在图片的加载、缓存和显示等方面已经提供了非常完善的功能。然而&#xff0c;为了满…

蓝桥杯最后十天冲刺 day 2 双指针的思想

双指针思想介绍 双指针&#xff08;Two Pointers&#xff09;是一种在数组或链表等线性结构中常用的算法技巧&#xff0c;通过使用两个指针&#xff08;索引或引用&#xff09;以不同的速度或方向遍历数据结构&#xff0c;从而高效解决问题。双指针通常用于优化暴力解法&#…

Axure 使用笔记

1.Axure如何制作页面弹窗 https://blog.csdn.net/SDTechnology/article/details/143948691 2.axure 怎么点击按钮打开新页面 &#xff08;1&#xff09;新建交互 &#xff08;2&#xff09;单击是触发 &#xff08;3&#xff09;选择打开链接 &#xff08;4&#xff09;选择…

STM32实现一个简单电灯

新建工程的步骤 建立工程文件夹&#xff0c;Keil中新建工程&#xff0c;选择型号工程文件夹里建立Start、Library、User等文件夹&#xff0c;复制固件库里面的文件到工程文件夹工程里对应建立Start、Library、User等同名称的分组&#xff0c;然后将文件夹内的文件添加到工程分组…

html5炫酷图片悬停效果实现详解

html5炫酷图片悬停效果实现详解 这里写目录标题 html5炫酷图片悬停效果实现详解项目介绍技术栈核心功能实现1. 页面布局2. 图片容器样式3. 炫酷悬停效果缩放效果倾斜效果模糊效果旋转效果 4. 悬停文字效果5. 性能优化6. 响应式设计 项目亮点总结 项目介绍 本文将详细介绍如何使…

Playwright与Browser Use:领略AI赋能UI自动化测试的魔法魅力

目录 Browser Use是什么&#xff1f; Playwright简介 框架设计的核心目标与原则 Playwright 在 UI 自动化测试中的优势 如何高效拦截错误 实现视频录制 UI自动化框架设计的挑战 测试框架的结构与模块化设计 自动化测试不是银弹 走进Browser Use 横空出世的背景与意义…

Uniapp 实现微信小程序滑动面板功能详解

文章目录 前言一、功能概述二、实现思路三、代码实现总结 前言 Uniapp 实现微信小程序滑动面板功能详解 一、功能概述 滑动面板是移动端常见的交互组件&#xff0c;通常用于在页面底部展开内容面板。本文将介绍如何使用 Uniapp 开发一个支持手势滑动的底部面板组件&#xff0…

【FAQ】HarmonyOS SDK 闭源开放能力 —Push Kit(12)

1.问题描述&#xff1a; pushdeviceid的长度是固定的吗&#xff1f; 解决方案&#xff1a; 在鸿蒙系统中&#xff0c;设备ID的长度是固定的。 2.问题描述&#xff1a; 通过REST API三方推送IM类消息&#xff0c;如何实现应用处于前台时不展示三方推送通知。 解决方案&…

【小兔鲜】day02 Pinia、项目起步、Layout

【小兔鲜】day02 Pinia、项目起步、Layout 1. Pinia2. 添加Pinia到Vue项目3. 案例&#xff1a;Pinia-counter基础使用3.1 Store 是什么&#xff1f;3.2 应该在什么时候使用 Store? 4. Pinia-getters和异步action4.1 getters4.2 action如何实现异步 1. Pinia Pinia 是 Vue 的专…

Android学习之计算器app(java + 详细注释 + 源码)

运行结果&#xff1a; 基础的四则运算&#xff1a; 可能会出现的问题以及解决方法&#xff1a; 问题1&#xff1a;出现多个操作符。 例子&#xff1a;12 解决方法&#xff1a; 在用户点击操作符之后&#xff0c;去检查之前的最后一位&#xff0c;如果最后一位也是操作符的话…

GMap.NET + WPF:构建高性能 ADS-B 航空器追踪平台

ADS-B 简介 ADS - B&#xff08;Automatic Dependent Surveillance - Broadcast&#xff0c;广播式自动相关监视&#xff09;是一种先进的航空监视技术。它依靠飞机上的机载设备&#xff0c;自动收集诸如飞机的位置、高度、速度、航向等关键数据&#xff0c;并周期性地以广播的…

关于testng.xml无法找到类的问题

问题&#xff1a;testng.xml添加测试类的时候飘红 解决办法&#xff1a; 1.试图通过自动生成testng.xml插件去解决&#xff0c;感觉也不是这个问题&#xff0c;没有尝试&#xff1b; 2.以为是创建包的方式不对&#xff0c;重新删除后新建--还是找不到 想新建类的时候发现从m…

数据在内存中存储(C语言)

文章目录 前言一、整数在内存中的存储1.1 计算机存储数据的基本单位示例代码 1.2 无符号整数的存储1.3 有符号整数的存储&#xff08;补码&#xff09;示例代码 二、大小端字节序和字节序判断2.1 什么是大小端&#xff1f;示例代码 2.2 为什么会有大小端&#xff1f;2.3 字节序…

Python爬虫第2节-网页基础和爬虫基本原理

目录 一、网页基础 1.1 网页的组成 1.2 网页的结构 1.3 节点树及节点间的关系 1.4 选择器 二、爬虫的基本原理 2.1 爬虫概述 2.2 能抓怎样的数据 2.3 JavaScript 渲染页面 一、网页基础 使用浏览器访问网站时&#xff0c;我们会看到各式各样的页面。你是否思考过&…

python-leetcode 64.在排序数组中查找元素的第一个和最后一个位置

题目&#xff1a; 给一个按照非递减顺序排列的整数数组nums,和一个目标值target,请找出给定目标值在数组中的开始位置和结束位置。 如果数组中不存在目标值target,返回[-1,-1] 方法一&#xff1a;二分查找 直观的思路肯定是从前往后遍历一遍。用两个变量记录第一次和最后一次…