高级IO(Linux)

高级IO

  • 五种IO模型
  • 高级IO重要概念
    • 同步通信 vs 异步通信
    • 阻塞 vs 非阻塞
  • 非阻塞IO
    • fcntl
    • 实现函数SetNoBlock
    • 轮询方式读取标准输入
  • I/O多路转接之select
    • 初识select
    • select函数原型
      • 参数解释
      • 参数timeout取值
      • 关于fd_set结构
      • 关于timeval结构
      • 函数返回值
      • 三级目录
    • 理解select执行过程
    • socket就绪条件
      • 读就绪
      • 写就绪
    • select使用示例
    • select的特点
      • select缺点
  • I/O多路转接之epoll
    • epoll的相关系统调用
      • epoll_create
      • epoll_ctl
      • epoll_wait
    • epoll工作原理
    • epoll的实例
    • epoll的优点
    • epoll工作方式
      • 水平触发Level Triggered 工作模式
      • 边缘触发Edge Triggered工作模式
    • 对比LT和ET
    • epoll示例: epoll服务器(ET模式)
  • Reactor

五种IO模型

什么是IO?
就拿 read系统调用来说,从缓冲区中读取数据;首先要保证缓冲区有数据,若没有,操作就会被阻塞,也就是等待资源就绪;若有,将数据拷贝完之后直接返回;所以,IO分为两部分:等待资源就绪+拷贝数据;其中等待资源就绪在整个IO过程中占比非常大,如何降低等待的占比,也就是需要学习的目的

举个栗子:钓鱼的故事
张三,一个非常固执的钓鱼佬,在钓鱼的时候,只盯着鱼竿,身边发生什么都不在乎;
李四,一个随心所欲的钓鱼佬,在等待鱼上钩的时候,做着其他的事情,是不是地观察一个鱼竿,有没有鱼上钩;
王五,一个“非常懒”的钓鱼佬,把鱼竿放好,在上面安装一个报警器,一个有鱼上钩,直接把杆;
赵六,一个“多金”的钓鱼佬,每次都拿上十只鱼竿来钓鱼,没有都忙的不亦乐乎;
田七,一个爱吃鱼不爱钓鱼的有钱人,雇了一个小王,让他去钓鱼;

这五个人的钓鱼方式可以类比为五种IO方式:
张三-阻塞式IO,两耳不闻窗外事,一心只观水中浮
李四-非阻塞IO,闲来没事看两眼
王五-信号驱动IO,报警之后,立刻提杆
赵六-多路转接/多路复用
田七-异步IO

在整个钓鱼的过程中,鱼就是数据,河就是内核空间,鱼漂表明事件就绪,鱼竿就是文件描述符,钓鱼的动作也就是系统调用操作

钓鱼的人,等的占比越低,单位时间,钓鱼的效率就越高

多路转接/多路复用是高级IO的原因就是等待资源就绪的占比低

高级IO重要概念

同步通信 vs 异步通信

同步和异步关注的是消息通信机制

  • 所谓同步,就是在发出一个调用时,在没有得到结果之前,该调用就不返回. 但是一旦调用返回,就得到返回值了; 换句话说,就是由调用者主动等待这个调用的结果
  • 异步则是相反,调用在发出之后,这个调用就直接返回了,所以没有返回结果; 换句话说,当一个异步过程调用发出后,调用者不会立刻得到结果; 而是在调用发出后,被调用者通过状态、通知来通知调用者,或通过回调函数处理这个调用

多进程多线程中, 也提到同步和互斥. 这里的同步通信和进程之间的同步是完全不想干的概念

  • 进程/线程同步也是进程/线程之间直接的制约关系
  • 是为完成某种任务而建立的两个或多个线程,这个线程需要在某些位置上协调他们的工作次序而等待、传递信息所产生的制约关系. 尤其是在访问临界资源的时候

阻塞 vs 非阻塞

阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态

  • 阻塞调用是指调用结果返回之前,当前线程会被挂起. 调用线程只有在得到结果之后才会返回
  • 非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程

非阻塞IO

fcntl

一个文件描述符, 默认都是阻塞IO

int fcntl(int fd, int cmd, ... /* arg */ );

fcntl函数有5种功能

  • 复制一个现有的描述符(cmd=F_DUPFD)
  • 获得/设置文件描述符标记(cmd=F_GETFD或F_SETFD)
  • 获得/设置文件状态标记(cmd=F_GETFL或F_SETFL)
  • 获得/设置异步I/O所有权(cmd=F_GETOWN或F_SETOWN)
  • 获得/设置记录锁(cmd=F_GETLK,F_SETLK或F_SETLKW)

此处只是用第三种功能, 获取/设置文件状态标记, 就可以将一个文件描述符设置为非阻塞

实现函数SetNoBlock

基于fcntl, 我们实现一个SetNoBlock函数, 将文件描述符设置为非阻塞

void SetNonBlock(int fd)
{int fl=fcntl(fd,F_GETFL);if(fl<0){std::cerr<<"fcntl: "<<strerror(errno)<<std::endl;return ;}fcntl(fd,F_SETFL,fl|O_NONBLOCK);
}

轮询方式读取标准输入

void SetNonBlock(int fd)
{int fl = fcntl(fd, F_GETFL);if (fl < 0){std::cerr << "fcntl: " << strerror(errno) << std::endl;return;}fcntl(fd, F_SETFL, fl | O_NONBLOCK);
}int main()
{SetNonBlock(0);char buffer[1024];while (true){printf(">>>> ");fflush(stdout);ssize_t s = read(0, buffer, sizeof(buffer) - 1);if (s > 0){buffer[s - 1] = 0;std::cout << "echo# " << buffer << std::endl;}else if (s == 0){std::cout << "read end" << std::endl;break;}else{}sleep(1);}
}

在这里插入图片描述

结果与预期的一致,不过这里还存在一个问题,当s<0时,数据读取失败,打印结果又是怎么样的呢?

在这里插入图片描述

资源准备未就绪

I/O多路转接之select

初识select

系统提供select函数来实现多路复用输入/输出模型

  • select系统调用是用来让我们的程序监视多个文件描述符的状态变化的
  • 程序会停在select这里等待,直到被监视的文件描述符有一个或多个发生了状态改变

select函数原型

select的函数原型如下:

int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);

参数解释

  • 参数nfds是需要监视的最大的文件描述符值+1
  • rdset,wrset,exset分别对应于需要检测的可读文件描述符的集合,可写文件描述符的集 合及异常文件描述符的集合,其本质是位图结构
  • 参数timeout为结构timeval,用来设置select()的等待时间

参数timeout取值

  • NULL:则表示select()没有timeout,select将一直被阻塞,直到某个文件描述符上发生了事件
  • 0:仅检测描述符集合的状态,然后立即返回,并不等待外部事件的发生
  • 特定的时间值:如果在指定的时间段里没有事件发生,select将超时返回

关于fd_set结构

在这里插入图片描述

其实这个结构就是一个整数数组, 更严格的说, 是一个 “位图”. 使用位图中对应的位来表示要监视的文件描述符
提供了一组操作fd_set的接口, 来比较方便的操作位图

// 用来清除描述词组set中相关fd 的位
void FD_CLR(int fd, fd_set *set); 
// 用来测试描述词组set中相关fd 的位是否为真
int FD_ISSET(int fd, fd_set *set); 
// 用来设置描述词组set中相关fd的位
void FD_SET(int fd, fd_set *set); 
// 用来清除描述词组set的全部位
void FD_ZERO(fd_set *set); 

关于timeval结构

timeval结构用于描述一段时间长度,如果在这个时间内,需要监视的描述符没有事件发生则函数返回,返回值为0

函数返回值

  • 执行成功则返回文件描述词状态已改变的个数
  • 如果返回0代表在描述词状态改变前已超过timeout时间,没有返回
  • 当有错误发生时则返回-1,错误原因存于errno,此时参数readfds,writefds, exceptfds和timeout的值变成不可预测

错误值可能是:

  • EBADF 文件描述词为无效的或该文件已关闭
  • EINTR 此调用被信号所中断
  • EINVAL 参数n 为负值
  • ENOMEM 核心内存不足

三级目录

理解select执行过程

取fd_set长度为1字节,fd_set中的每一bit可以对应一个文件描
述符fd。则1字节长的fd_set最大可以对应8个fd;
作为输入时:表示用户告知内核,关心一下,集合中所有的fd事件;
比特位的位置,表示fd的数值;比特位的内容,表示是否关心

作为输出时:内核告知用户,所关心的多个fd中,有哪些已经就绪;
比特位的位置,表示fd的数值;比特位的内容,表示fd对应的事件已经就绪

socket就绪条件

读就绪

  • socket内核中, 接收缓冲区中的字节数, 大于等于低水位标记SO_RCVLOWAT. 此时可以无阻塞的读该文件描述符, 并且返回值大于0
  • socket TCP通信中, 对端关闭连接, 此时对该socket读, 则返回0
  • 监听的socket上有新的连接请求
  • socket上有未处理的错误

写就绪

  • socket内核中, 发送缓冲区中的可用字节数(发送缓冲区的空闲位置大小), 大于等于低水位标记SO_SNDLOWAT, 此时可以无阻塞的写, 并且返回值大于0
  • socket的写操作被关闭(close或者shutdown). 对一个写操作被关闭的socket进行写操作, 会触发SIGPIPE信号
  • socket使用非阻塞connect连接成功或失败之后
  • socket上有未读取的错误

select使用示例

Tcpserver:只处理读取,只获取数据的server
首先,监听端口可以交付给select,监听端口的连接就绪事件,其实就是读事件就绪

err.hpp

enum{USAGE_ERR=1,SOCKET_ERR,BIND_ERR,LISTEN_ERR
};

log.hpp

#define DEBUG 0
#define NORMAL 1
#define WARNING 2
#define ERROR 3
#define FATAL 4const char*to_levelstr(int level)
{switch(level){case DEBUG: return "DEBUG";case NORMAL: return "NORMAL";case WARNING: return "WARNING";case ERROR: return "ERROR";case FATAL: return "FATAL";default: return nullptr;}
}void logMessage(int level,const char* format,...)
{
#define NUM 1024char logprefix[NUM];snprintf(logprefix,sizeof(logprefix),"[%s][%ld][pid:%d]",to_levelstr(level),(long int)time(nullptr),getpid());char logcontent[NUM];va_list arg;va_start(arg,format);vsnprintf(logcontent,sizeof(logcontent),format,arg);std::cout<<logprefix<<logcontent<<std::endl;
}

Sock.hpp

class Sock
{const static int backlog=30;
public://创建socket文件套接字对象static int Socket(){int sock=socket(AF_INET,SOCK_STREAM,0);if(sock<0){logMessage(FATAL,"创建套接字文件对象失败!");exit(SOCKET_ERR);}logMessage(NORMAL,"创建套接字文件对象成功: %d",sock);//设置地址复用int opt=1;setsockopt(sock,SOL_SOCKET,SO_REUSEADDR|SO_REUSEPORT,&opt,sizeof(opt));return sock;}//绑定自己的网络信息static void Bind(int sock,int port){struct sockaddr_in local;memset(&local,0,sizeof(local));local.sin_family=AF_INET;local.sin_port=htons(port);local.sin_addr.s_addr=INADDR_ANY;if(bind(sock,(struct sockaddr*)&local,sizeof(local))<0){logMessage(FATAL,"绑定失败!");exit(BIND_ERR);}logMessage(NORMAL,"绑定成功!");}//设置socket为监听状态static void Listen(int sock){if(listen(sock,backlog)<0){logMessage(FATAL,"监听失败!");exit(LISTEN_ERR);}logMessage(NORMAL,"监听成功!");}static int Accept(int listensock,std::string*clientip,uint16_t*clientport){struct sockaddr_in peer;socklen_t len=sizeof(peer);int sock=accept(listensock,(struct sockaddr*)&peer,&len);if(sock<0){logMessage(ERROR,"接收失败!");}else{logMessage(NORMAL,"接收一个新连接,sock: %d",sock);*clientip=inet_ntoa(peer.sin_addr);*clientport=ntohs(peer.sin_port);}return sock;}
};

SelectServer.hpp

namespace select_yjm
{static const int defaultport=8082;static const int fdnum=sizeof(fd_set)*8;static const int defaultfd=-1;using func_t =std::function<std::string(const std::string&)>;class SelectServer{public:SelectServer(func_t f,int port=defaultport):_func(f),_port(port),_listensock(-1),fdarray(nullptr){}~SelectServer(){if(_listensock<0) close(_listensock);if(fdarray) delete[] fdarray;}void initServer(){_listensock=Sock::Socket();Sock::Bind(_listensock,_port);Sock::Listen(_listensock);fdarray=new int[fdnum];for(int i=0;i<fdnum;i++) fdarray[i]=defaultfd;fdarray[0]=_listensock;}void Print(){std::cout<<"fd list: ";for(int i=0;i<fdnum;i++){if(fdarray[i]!=defaultfd) std::cout<<fdarray[i]<<" ";}std::cout<<std::endl;}void Accepter(int listensock){logMessage(DEBUG,"Accepter in");//此时,listen不会阻塞,已经就绪std::string clientip;uint16_t clinetport=0;int sock=Sock::Accept(listensock,&clientip,&clinetport);if(sock<0){return ;}logMessage(NORMAL,"accept success [%s:%d]",clientip.c_str(),clinetport);//此时不能直接读取,只有select有资格检测事件是否就绪//将新的sock交给select,本质就是将sock添加到fdarray数组中即可int i=0;for(;i<fdnum;i++){if(fdarray[i]!=defaultfd) continue;else break;}if(i==fdnum){logMessage(WARNING,"server id full,please wait!");close(sock);}else{fdarray[i]=sock;}Print();logMessage(DEBUG,"Accepter out");}void Recver(int sock,int pos){logMessage(DEBUG,"in Recver");//读取请求,并不会阻塞char buffer[1024];ssize_t s=recv(sock,buffer,sizeof(buffer)-1,0);if(s>0){buffer[s]=0;logMessage(NORMAL,"client# %s",buffer);}else if(s==0){close(sock);fdarray[pos]=defaultfd;logMessage(NORMAL,"client quit");return;}else{close(sock);fdarray[pos]=defaultfd;logMessage(ERROR,"client quit: %s",strerror(errno));return;}//处理响应std::string response=_func(buffer);//返回responsewrite(sock,response.c_str(),response.size());logMessage(DEBUG,"out Recver");}//void HandlerReadEvent(fd_set& rfds){for(int i=0;i<fdnum;i++){//过滤非法的fdif(fdarray[i]==defaultfd) continue;//正常的fd,不一定就绪,需要判断//此时只有监听事件就绪if(FD_ISSET(fdarray[i],&rfds)&&fdarray[i]==_listensock) Accepter(_listensock);//获取新连接之后,进行读取else if(FD_ISSET(fdarray[i],&rfds)) Recver(fdarray[i],i);else {}}}void start(){for(;;){fd_set rfds;FD_ZERO(&rfds);int maxfd=fdarray[0];for(int i=0;i<fdnum;i++){if(fdarray[i]==defaultfd) continue;//合法的fd全部添加到读文件描述符集中FD_SET(fdarray[i],&rfds);//更新所有fd中最大的fdif(maxfd<fdarray[i]) maxfd=fdarray[i];}logMessage(NORMAL,"max fd is: %d",maxfd);//使用select,需要程序员维护一个保存所有合法的fd的数组int n=select(maxfd+1,&rfds,nullptr,nullptr,nullptr);switch(n){case 0:logMessage(NORMAL,"timeout...");break;case -1:logMessage(WARNING,"select error,code: %d,err string: %s",errno,strerror(errno));break;default://表明有事件就绪,目前只有监听事件就绪logMessage(NORMAL,"have event ready!");HandlerReadEvent(rfds);break;}}}private:int _port;int _listensock;int *fdarray;func_t _func;};
}

在这里插入图片描述

select的特点

  1. select能够同时等待的文件fd是有上限的
  2. 必须借助第三方数组来维护合法的fd
  3. select的大部分参数都是输入输出型的,调用select之前,要重新设置所有的fd;调用之后,还要检查更新所有的fd,带来了遍历的成本
  4. select第一个参数的目的是为了确定遍历范围
  5. select采取位图的方式,用户到内核,内核到用户,来回地进行数据拷贝,造成拷贝成本的问题

select缺点

  1. select的fd有上限的问题
  2. 每次调用都要重新设置关心的fd

I/O多路转接之epoll

epoll的相关系统调用

epoll 有3个相关的系统调用

epoll_create

int epoll_create(int size);

在这里插入图片描述

创建一个epoll的句柄或者说是创建一个epoll模型(下面介绍):用完之后, 必须调用close()关闭

epoll_ctl

用户告知内核需要关心哪个文件描述符上的什么事件

int epoll_ctl(int epfd, int op, int fd,struct epoll_event *event);

epoll的事件注册函数

  • 它不同于select()是在监听事件时告诉内核要监听什么类型的事件, 而是在这里先注册要监听的事件类型.
  • 第一个参数是epoll_create()的返回值(epoll的句柄)
  • 第二个参数表示动作,用三个宏来表示(增删改)
  • 第三个参数是需要监听的fd
  • 第四个参数是告诉内核需要监听什么事

第二个参数的取值:

  • EPOLL_CTL_ADD:注册新的fd到epfd中
  • EPOLL_CTL_MOD:修改已经注册的fd的监听事件
  • EPOLL_CTL_DEL:从epfd中删除一个fd

struct epoll_event结构如下:
在这里插入图片描述
events可以是以下几个宏的集合:

  • EPOLLIN:表示对应的文件描述符可以读 (包括对端SOCKET正常关闭)
  • EPOLLOUT:表示对应的文件描述符可以写
  • EPOLLPRI:表示对应的文件描述符有紧急的数据可读
  • EPOLLERR:: 表示对应的文件描述符发生错误
  • EPOLLHUP:表示对应的文件描述符被挂断
  • EPOLLET:将EPOLL设为边缘触发(Edge Triggered)模式, 这是相对于水平触发(Level Triggered)来说的
  • EPOLLONESHOT:只监听一次事件, 当监听完这次事件之后, 如果还需要继续监听这个socket的话, 需要再次把这个socket加入到EPOLL队列里

epoll_wait

内核告知用户,那些文件描述符上的什么事件已经就绪

int epoll_wait(int epfd, struct epoll_event *events,
int maxevents, int timeout);

收集在epoll监控的事件中已经发送的事件

  • 参数events是分配好的epoll_event结构体数组
  • epoll将会把发生的事件赋值到events数组中 (events不可以是空指针,内核只负责把数据复制到这个events数组中,不会去帮助我们在用户态中分配内存)
  • maxevents告之内核这个events有多大,这个 maxevents的值不能大于创建epoll_create()时的size
  • 参数timeout是超时时间 (毫秒,0会立即返回,-1是永久阻塞)
  • 如果函数调用成功,返回对应I/O上已准备好的文件描述符数目,如返回0表示已超时, 返回小于0表示函数失败

epoll工作原理

在这里插入图片描述

  • 当某一进程调用epoll_create方法时,Linux内核会创建一个eventpoll结构体(其实就是红黑树),这个结构体中有两个成员与epoll的使用方式密切相关
struct eventpoll{ .... /*红黑树的根节点,这颗树中存储着所有添加到epoll中的需要监控的事件*/ struct rb_root rbr; /*双链表中则存放着将要通过epoll_wait返回给用户的满足条件的事件*/ struct list_head rdlist; .... 
}
  • 每一个epoll对象都有一个独立的eventpoll结构体,用于存放通过epoll_ctl方法向epoll对象中添加进来的事件;当使用epoll_ctl方法时相等于向红黑树中的节点中添加数据,而这个数据是由文件描述符和事件组成的键值对
  • 而所有添加到epoll中的事件都会与设备(网卡)驱动程序建立回调关系,也就是说,当响应的事件发生时会调用这个回调方法;红黑树的每个结点中还存在着链表的结构,每当文件描述符上的事件就绪之后,这些结点就会前后连接组成一个链表
  • 这个回调方法在内核中叫ep_poll_callback,它会将发生的事件添加到rdlist双链表中
  • 在epoll中,对于每一个事件,都会建立一个epitem结构体
struct epitem{ struct rb_node rbn;//红黑树节点 struct list_head rdllink;//双向链表节点 struct epoll_filefd ffd; //事件句柄信息 struct eventpoll *ep; //指向其所属的eventpoll对象 struct epoll_event event; //期待发生的事件类型 
}
  • 当调用epoll_wait检查是否有事件发生时,只需要检查eventpoll对象中的rdlist双链表中是否有epitem元素即可
  • 如果rdlist不为空,则把发生的事件复制到用户态,同时将事件数量返回给用户

epoll的使用过程就是三部曲

  • 调用epoll_create创建一个epoll句柄
  • 调用epoll_ctl, 将要监控的文件描述符进行注册
  • 调用epoll_wait, 等待文件描述符就绪

epoll的实例

namespace epoll_yjm
{static const int defaultport = 8083;static const int size = 128;static const int defaultvalue = -1;static const int defaultnum = 64;using func_t = std::function<std::string(const std::string &)>;class EpollServer{public:EpollServer(func_t f, uint16_t port = defaultport, int num = defaultnum): _func(f), _num(num), _revs(nullptr), _port(port), _listensock(defaultport), _epfd(defaultvalue){}~EpollServer(){if (_listensock != defaultvalue)close(_listensock);if (_epfd != defaultvalue)close(_epfd);if (_revs)delete[] _revs;}void initserver(){// 1.创建socket_listensock = Sock::Socket();Sock::Bind(_listensock, _port);Sock::Listen(_listensock);// 2.创建epoll模型_epfd = epoll_create(size);if (_epfd < 0){logMessage(FATAL, "epoll create error: %s", strerror(errno));exit(EPOLL_CREATE_ERR);}// 3.添加listen到epoll模型中struct epoll_event ev;ev.events = EPOLLIN;// 当事件就绪,被重新捞上来时,需要知道哪一个fd就绪ev.data.fd = _listensock;epoll_ctl(_epfd, EPOLL_CTL_ADD, _listensock, &ev);// 4.申请就绪事件的空间_revs = new struct epoll_event[_num];logMessage(NORMAL, "init server success!");}void HandlerEvent(int readynum){logMessage(DEBUG, "HandlerEvent in");for (int i = 0; i < readynum; i++){uint32_t events = _revs[i].events;int sock = _revs[i].data.fd;if (sock == _listensock && events & EPOLLIN){// listensock读事件就绪,获取新连接std::string clientip;uint16_t clientport;int fd = Sock::Accept(sock, &clientip, &clientport);if (fd < 0){logMessage(WARNING, "accept error");continue;}// 获取fd成功,不可以直接读,数据可能还没就绪// 将fd放入epoll等待就绪struct epoll_event ev;ev.events = EPOLLIN;ev.data.fd = fd;epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &ev);}else if (events & EPOLLIN){// 普通事件就绪char buffer[1024];// 把本轮数据读完,不一定能够读取完整的请求int n = recv(sock, buffer, sizeof(buffer), 0);if (n > 0){buffer[n] = 0;logMessage(DEBUG, "client# %s", buffer);std::string response = _func(buffer);send(sock, response.c_str(), response.size(), 0);}else if (n == 0){// 建议先将fd从epoll中移除,再关闭epoll_ctl(_epfd, EPOLL_CTL_DEL, sock, nullptr);close(sock);logMessage(NORMAL, "client quit");}else{// 建议先将fd从epoll中移除,再关闭epoll_ctl(_epfd, EPOLL_CTL_DEL, sock, nullptr);close(sock);logMessage(ERROR,"recv error,code: %d,errstring: %s",errno,strerror(errno));}}else{}}logMessage(DEBUG,"HandlerEvent out");}void start(){int timeout = -1;for (;;){int n = epoll_wait(_epfd, _revs, _num, timeout);switch (n){case 0:logMessage(NORMAL, "timeout ...");break;case -1:logMessage(WARNING, "epoll_wait failed,code: %d,errstring: %s", errno, strerror(errno));break;default:logMessage(NORMAL, "have event ready!");HandlerEvent(n);break;}}}private:uint16_t _port;int _listensock;int _epfd;struct epoll_event *_revs;int _num;func_t _func;};
}

在这里插入图片描述

epoll的优点

  • 接口使用方便: 虽然拆分成了三个函数, 但是反而使用起来更方便高效. 不需要每次循环都设置关注的文件描述符, 也做到了输入输出参数分离开
  • 数据拷贝轻量: 只在合适的时候调用 EPOLL_CTL_ADD 将文件描述符结构拷贝到内核中, 这个操作并不频繁(而select/poll都是每次循环都要进行拷贝)
  • 事件回调机制: 避免使用遍历, 而是使用回调函数的方式, 将就绪的文件描述符结构加入到就绪队列中, epoll_wait 返回直接访问就绪队列就知道哪些文件描述符就绪. 这个操作时间复杂度O(1). 即使文件描述符数目很多, 效率也不会受到影响
  • 没有数量限制: 文件描述符数目无上限

epoll工作方式

epoll有2种工作方式-水平触发(LT)和边缘触发(ET)
假如有这样一个例子

你网购了好几个快递,张三快递员先给你派发了几个快递,你并没有去拿,因为你知道张三肯定会一直通知你,直到把你所有的快递都给你为止,因此你一直在忙别的事,直到很晚采取拿快递;又过了几天,你又网购了几个快递,不过这次给你派发快递的小哥李四不同于张三,他打电话通过你,并告知你:如果你不抓紧拿快递,那么就再也不通知你,你没有办法只能一次性将所有的快递都拿走;如果你这次没有将所有的快递都拿走,当李四再次拿到需要给你派发的快递时,就会再次通知你一次

水平触发Level Triggered 工作模式

epoll默认状态下就是LT工作模式

  • 在上面的栗子中,张三的派发模式就是LT工作模式
  • 当epoll检测到socket上事件就绪的时候, 可以不立刻进行处理. 或者只处理一部分
  • 直到缓冲区上所有的数据都被处理完, epoll_wait 才不会立刻返回
  • 支持阻塞读写和非阻塞读写

边缘触发Edge Triggered工作模式

如果我们在第1步将socket添加到epoll描述符的时候使用了EPOLLET标志, epoll进入ET工作模式

  • 在上面的栗子中,李四的派发模式就是ET工作模式
  • 当epoll检测到socket上事件就绪时, 必须立刻处理
  • ET模式下, 文件描述符上的事件就绪后, 只有一次处理机会
  • ET的性能比LT性能更高( epoll_wait 返回的次数少了很多). Nginx默认采用ET模式使用epol
  • 只支持非阻塞的读写

epoll既可以支持LT, 也可以支持ET

对比LT和ET

LT是 epoll 的默认行为. 使用 ET 能够减少 epoll 触发的次数. 但是代价就是强逼着程序猿一次响应就绪过程中就把所有的数据都处理完;ET的高效不仅仅体现在通知机制上,为了尽快让上层将数据取走,TCP可以给发送方提供一个更大的窗口大小,让对方更新出更大的滑动窗口,提高底层的数据发送效率

相当于一个文件描述符就绪之后, 不会反复被提示就绪, 看起来就比 LT 更高效一些. 但是在 LT 情况下如果也能做到每次就绪的文件描述符都立刻处理, 不让这个就绪被重复提示的话, 其实性能也是一样的

epoll示例: epoll服务器(ET模式)

Reactor

Reactor=IO+协议定制+业务处理,基于ET模式下的Reactor,可以处理所有的IO

处理几个小细节:
在epollserver接收到数据之后,并不能确定数据的完整性,因为ET保证了数据被读取完,而协议才能保证数据的完整性;因此在epollserver还需要自己的接收缓冲区以保证数据的完整性

在epollserver同样也需要发送缓冲区,服务器刚开始启动,或者很多情况下,发送事件一直都是就绪的,可以直接发送;如果用户一次并没有把所有数据发送完,还需要再次发送;同样文件描述符上的发送事件也要注册到epoll模型中,一般按需设置

接下来就编写代码

tcpserver.hpp

namespace tcpserver
{class Connection;class Tcpserver;static const uint16_t defaultport = 8084;static const int num = 64;using func_t = std::function<void(Connection *)>;class Connection{public:Connection(int sock,Tcpserver*tsp):_sock(sock),_tsp(tsp){}void Register(func_t r,func_t s,func_t e){_recver=r;_sender=s;_excepter=e;}~Connection(){}void Close(){close(_sock);}public:int _sock;std::string _inbuffer;  // 输入缓冲区std::string _outbuffer; // 输出缓冲区func_t _recver;   // 从sock中读取func_t _sender;   // 向sock中写入func_t _excepter; // 处理sockIO的异常事件Tcpserver *_tsp; // 回指指针uint64_t lasttime;};class Tcpserver{private:void Recver(Connection *conn){conn->lasttime = time(nullptr);char buffer[1024];while (true){ssize_t s = recv(conn->_sock, buffer, sizeof(buffer) - 1, 0);if (s > 0){buffer[s] = 0;conn->_inbuffer += buffer; // 将读到的数据放入缓冲区中logMessage(DEBUG, "\n%s", conn->_inbuffer);_service(conn);}else if (s == 0){if (conn->_excepter){conn->_excepter(conn);return;}}else{if (errno == EAGAIN || errno == EWOULDBLOCK)break;else if (errno == EINTR)continue;else{if (conn->_excepter){conn->_excepter(conn);return;}}}}}void Sender(Connection *conn){conn->lasttime = time(nullptr);while (true){ssize_t s = send(conn->_sock, conn->_outbuffer.c_str(), conn->_outbuffer.size(), 0);if (s > 0){if (conn->_outbuffer.empty())break;elseconn->_outbuffer.erase(0, s);}else{if (errno == EAGAIN || errno == EWOULDBLOCK)break;else if (errno == EINTR)continue;else{if (conn->_excepter){conn->_excepter(conn);return;}}}}//如果没有发送完毕,需要对对应的sock开启写事件的关心//如果发送完毕,需要关闭对写事件的关心if(!conn->_outbuffer.empty())conn->_tsp->EnableReadWrite(conn,true,true);elseconn->_tsp->EnableReadWrite(conn,true,false);}void Excepter(Connection*conn){logMessage(DEBUG,"Excepter begin");_epoller.Control(conn->_sock,0,EPOLL_CTL_DEL);conn->Close();_connections.erase(conn->_sock);logMessage(DEBUG,"关闭%d 文件描述符的所有资源",conn->_sock);delete conn;}void Accepter(Connection *conn){for (;;){std::string clientip;uint16_t clientport;int err;int sock = _sock.Accept(&clientip, &clientport, &err);if (sock > 0){AddConnection(sock, EPOLLIN | EPOLLET,std::bind(&Tcpserver::Recver, this, std::placeholders::_1),std::bind(&Tcpserver::Sender, this, std::placeholders::_1),std::bind(&Tcpserver::Excepter, this, std::placeholders::_1));logMessage(DEBUG, "get a new link,info: [%s:%d]", clientip.c_str(), clientport);}else{if (err == EAGAIN || err == EWOULDBLOCK)break;else if (err == EINTR)continue;elsebreak;}}}void AddConnection(int sock, uint32_t events, func_t recver, func_t sender, func_t excepter){// 1.首先给该sock创建connection,并初始化,并添加到_connections中// 将该sock设置为非阻塞if (events & EPOLLET)Util::SetNonBlock(sock);Connection *conn = new Connection(sock, this);// 2.给对应的sock设置对应的回调处理方法conn->Register(recver, sender, excepter);// 3.将sock与其要关心的事件注册到epoll中bool r = _epoller.AddEvent(sock, events);assert(r);(void)r;// 4.将kv添加到_connections中_connections.insert(std::pair<int, Connection *>(sock, conn));logMessage(DEBUG, "add new sock: %d in epoll and unordered_map", sock);}bool IsConnectionExists(int sock){auto iter = _connections.find(sock);return iter != _connections.end();}void Loop(int timeout){// 获得已就绪事件int n = _epoller.Wait(_revs, _num, timeout);for (int i = 0; i < n; i++){int sock = _revs[i].data.fd;uint32_t events = _revs[i].events;// 将所有异常问题,全部转化为读写问题if (events & EPOLLERR)events |= (EPOLLIN | EPOLLOUT);if (events & EPOLLHUP)events |= (EPOLLIN | EPOLLOUT);// listen事件就绪if ((events & EPOLLIN) && IsConnectionExists(sock) && _connections[sock]->_recver)_connections[sock]->_recver(_connections[sock]);if ((events & EPOLLOUT) && IsConnectionExists(sock) && _connections[sock]->_sender)_connections[sock]->_sender(_connections[sock]);}}public:Tcpserver(func_t func, uint16_t port = defaultport): _service(func), _port(port), _revs(nullptr){}~Tcpserver(){_sock.Close();_epoller.Close();if (_revs == nullptr)delete[] _revs;}void initserver(){// 1.创建socket_sock.Socket();_sock.Bind(_port);_sock.Listen();// 2.创建epoll模型_epoller.Create();// 先将listenock设置为非阻塞// 3.将目前唯一一个sock,添加到epoll模型中AddConnection(_sock.Fd(), EPOLLIN | EPOLLET,std::bind(&Tcpserver::Accepter, this, std::placeholders::_1), nullptr, nullptr);_revs = new struct epoll_event[num];_num = num;}void EnableReadWrite(Connection *conn, bool readable, bool writeable){uint32_t event = (readable ? EPOLLIN : 0) | (writeable ? EPOLLIN : 0) | EPOLLET;_epoller.Control(conn->_sock, event, EPOLL_CTL_MOD);}// 事件派发void Dispatcher(){int timeout = 1000;while (true){Loop(timeout);}}private:uint16_t _port;Sock _sock;Epoll _epoller;std::unordered_map<int, Connection *> _connections;struct epoll_event *_revs;int _num;func_t _service;};
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/97839.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

微信小程序——CSS3渐变

SS3 渐变&#xff08;gradients&#xff09;可以在两个或多个指定的颜色之间显示平稳的过渡。CSS3 定义了两种类型的渐变&#xff08;gradients&#xff09;&#xff1a; 说明 1、线性渐变&#xff08;Linear Gradients&#xff09;- 向下/向上/向左/向右/对角方向&#xff1…

Java版本企业电子招投标采购系统源码之项目说明和开发类型源码

项目说明 随着公司的快速发展&#xff0c;企业人员和经营规模不断壮大&#xff0c;公司对内部招采管理的提升提出了更高的要求。在企业里建立一个公平、公开、公正的采购环境&#xff0c;最大限度控制采购成本至关重要。符合国家电子招投标法律法规及相关规范&#xff0c;以及…

练[FBCTF2019]RCEService

[FBCTF2019]RCEService 文章目录 [FBCTF2019]RCEService掌握知识解题思路关键paylaod 掌握知识 ​ json字符串格式&#xff0c;命令失效(修改环境变量)–绝对路径使用linux命令&#xff0c;%0a绕过preg_match函数&#xff0c;代码审计 解题思路 打开题目链接&#xff0c;发现…

Python爬虫——爬虫基础模块和类库(附实践项目)

一、简单介绍 Python爬虫是使用Python编程语言开发的一种自动化程序&#xff0c;用于从互联网上获取信息。通过模拟浏览器的行为&#xff0c;爬虫可以访问网页、解析网页内容&#xff0c;并提取所需的数据。 python的爬虫大致可以分为通用爬虫和专用爬虫&#xff1a; 通用爬虫…

【Linux】Vim使用总结

【Linux】Vim使用总结 Vim 的三种模式命令行模式1. 移动2.复制&#xff0c;粘贴&#xff0c;剪切3.撤销4.大小写切换&#xff0c;替换&#xff0c;删除 插入模式底行模式 Vim 的三种模式 一进入VIM就是处于一般模式&#xff08;命令模式&#xff09;&#xff0c;该模式下只能输…

ES 关于 remote_cluster 的一记小坑

最近有小伙伴找到我们说 Kibana 上添加不了 Remote Cluster&#xff0c;填完信息点 Save 直接跳回原界面了。具体页面&#xff0c;就和没添加前一样。 我们和小伙伴虽然隔着网线但还是进行了深入、详细的交流&#xff0c;梳理出来了如下信息&#xff1a; 两个集群&#xff1a;…

架构师-软件工程习题选择题

架构师-软件工程习题选择题

不同数据类型在单片机内存中占多少字节?

文章目录 前言一、不同编译器二、C51* 指针型 三、sizeof结构体联合体 前言 在C语言中&#xff0c;数据类型指的是用于声明不同类型的变量或者函数的一个广泛的系统。变量的类型决定了变量存储占用的空间 一、不同编译器 类型16位编译器大小32位编译器大小64位编译器大小char…

HTTPS工作过程,国家为什么让http为什么要换成https,Tomcat在MAC M1电脑如何安装,Tomcat的详细介绍

目录 引言 一、HTTPS工作过程 二、Tomcat 在访达中找到下载好的Tomcat文件夹&#xff08;这个要求按顺序&#xff09; zsh: permission denied TOMCAT的各部分含义&#xff1a; 引言 在密码中一般是&#xff1a;明文密钥->密文&#xff08;加密&#xff09; &#xff…

机器学习笔记 - 深入研究spaCy库及其使用技巧

一、简述 spaCy 是一个用于 Python 中高级自然语言处理的开源库。它专为生产用途而设计,这意味着它不仅功能强大,而且快速高效。spaCy 在学术界和工业界广泛用于各种 NLP 任务,例如标记化、词性标注、命名实体识别等。 安装,这里使用阿里的源。 pip install spacy…

三十二、【进阶】hash索引结构

1、hash索引结构 &#xff08;1&#xff09;简述&#xff1a; hash索引&#xff0c;就是采用一定的hash算法&#xff0c;将键值换算成新的hash值&#xff0c;映射到对应的槽位上&#xff0c;然后存储在hash表中。 &#xff08;2&#xff09;图示&#xff1a; 2、hash索引结构…

elasticsearch深度分页问题

一、深度分页方式from size es 默认采用的分页方式是 from size 的形式&#xff0c;在深度分页的情况下&#xff0c;这种使用方式效率是非常低的&#xff0c;比如我们执行如下查询 1 GET /student/student/_search 2 { 3 "query":{ 4 "match_all":…

【算法练习Day15】平衡二叉树二叉树的所有路径左叶子之和

​&#x1f4dd;个人主页&#xff1a;Sherry的成长之路 &#x1f3e0;学习社区&#xff1a;Sherry的成长之路&#xff08;个人社区&#xff09; &#x1f4d6;专栏链接&#xff1a;练题 &#x1f3af;长路漫漫浩浩&#xff0c;万事皆有期待 文章目录 平衡二叉树二叉树的所有路径…

周总结【java项目】

项目进度&#xff1a; 学习了JavaFX&#xff0c;下载了sceneBuilder辅助工具构建窗口&#xff08;目前建立了登陆&#xff0c;注册&#xff0c;忘记密码的界面&#xff09;&#xff0c;然后是学习了MySQL的连接&#xff0c;现在的项目是刚连上数据库&#xff1b; 下一步&…

多线程锁-synchronized字节码分析

从字节码角度分析synchronized实现 javap -c(v附加信息) ***.class 文件反编译 synchronized同步代码块 >>>实现使用的是monitorenter和monitorexit指令 synchronized普通同步方法 >>>调用指令将会检查方法的ACC_SYNCHRONIZED访问标志是否被设置&#xf…

【项目】5.1阻塞和非阻塞、同步和异步 5.2Unix、Linux上的五种IO模型

5.1阻塞和非阻塞、同步和异步&#xff08;网络IO&#xff09; 典型的一次IO的两个阶段是什么&#xff1f;数据就绪和数据读写 数据就绪&#xff1a;根据IO操作的就绪状态 阻塞非阻塞 数据读写&#xff1a;根据应用程序和内核的交互方式 同步异步 陈硕&#xff1a;在处理IO的…

华为云API自然语言处理的魅力—AI情感分析、文本分析

云服务、API、SDK&#xff0c;调试&#xff0c;查看&#xff0c;我都行 阅读短文您可以学习到&#xff1a;人工智能AI自言语言的情感分析、文本分词、文本翻译 1 IntelliJ IDEA 之API插件介绍 API插件支持 VS Code IDE、IntelliJ IDEA等平台、以及华为云自研 CodeArts IDE&a…

Mac os 点击桌面 出现黑边框 解决

1、桌面黑框效果 2、解决&#xff1a;设置为 仅在台前调度中

水波纹文字效果动画

效果展示 CSS 知识点 text-shadow 属性绘制立体文字clip-path 属性来绘制水波纹 工具网站 CSS clip-path maker 效果编辑器 页面整体结构实现 使用多个 H2 标签来实现水波纹的效果实现&#xff0c;然后使用clip-path结合动画属性一起来进行波浪的起伏动画实现。 <div …

CSS基础语法之盒子模型

目录 一、 选择器 1.1 结构伪类选择器 1.1.1基本使用 1.1.2 :nth-child(公式) 1.2 伪元素选择器 二、 PxCook 三、盒子模型 3.1 盒子模型-组成 3.2 边框线 3.2.1四个方向 3.2.2 单方向边框线 3.3 内边距 3.4 尺寸计算 3.5 外边距+版心居中 3.6 清除默认样式 3.7…