高级IO
阻塞IO
在内核将数据准备好之前,系统调用会一直等待,所有的套接字默认都是阻塞方式。
非阻塞IO
如果内核还未将数据准备好,系统调用仍然会直接返回,并且返回EWOULDBLOCK错误码。
非阻塞IO往往需要程序员循环的方式反复尝试读写文件描述符,这个过程称为轮询,不过这对CPU来说浪费较大,一般只有特定的场景下才使用。
信号驱动IO
内核将数据准备好的时候,使用SIGIO信号通知程序进行IO操作。
IO多路转接
IO多路转接能够同时等待多个文件描述符的就绪状态。
异步IO
由内核在数据拷贝完成时,通知应用程序(信号驱动时告诉应用程序何时可以开始拷贝数据,这里的异步IO是内核已经完成拷贝)。
同步和异步通信
同步和异步关注的是消息通知机制。
- 同步,就是在发出一个调用时,在没有得到结果之前,该调用就不返回,但是一旦调用返回,就得到返回值了,即由调用者主动等待这个调用的结果
- 异步则相反,调用在发出之后,调用直接返回,无返回结果,调用者不会立刻得到结果;而在调用发出后,被调用者通过状态、通知来通知调用者,或者通过回调函数处理这个调用
阻塞和非阻塞
阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态
- 阻塞调用是指调用结果返回之前,当前线程会被挂起,调用线程只有在得到结果之后才会返回
- 非阻塞调用是指在不能立刻得到结果之前,该调用不会阻塞当前进程
高级IO
任何IO过程中,都包含两个步骤,第一是等待,第二是拷贝。实际应用中,等待消耗的时间往往都远远高于拷贝的时间,让IO更高效,最核心的办法就是让等待的时间尽量少。
非阻塞IO,纪录锁,系统V流机制,I/O多路转接(也叫I/O多路复用),readv和writev函数以及存储映射IO(mmap),这些统称为高级IO.
非阻塞IO
设定非阻塞
- 让IO非阻塞,打开的时候,就可以指定非阻塞接口
- 统一的方式来进行非阻塞设置fcnt(),阻塞和非阻塞都是文件的一种属性
对一个文件描述符进行指定命令式的操作,如果失败返回-1。
轮询方式读取标准输入(非阻塞)
#include <iostream>
#include <unistd.h>
#include <cstring>
#include <fcntl.h>// 设置非阻塞接口
bool SetNonBlock(int fd)
{int fl = fcntl(fd, F_GETFL); // 在底层获取当前fd对应的文件读写标志位,放在flif (fl < 0)return false;fcntl(fd, F_SETFL, fl | O_NONBLOCK); // 设置选项,将fl设置为非阻塞,这个非阻塞属性是新增的return true;
}
int main()
{// 标准输入 0 默认是阻塞的SetNonBlock(0); // 只要设置一次,后续都是非阻塞char buffer[1024];while (true){sleep(1);errno = 0;// 非阻塞的时候,我们是以出错的形式返回,告知上层数据没有就绪// a. 如何甄别是真的出错了// b. 还是数据仅仅没有就绪呢// 数据有的话,正常读取就行ssize_t s = read(0, buffer, sizeof(buffer) - 1); // 出错,不仅仅是错误返回值,errno变量也会被设置,表明出错原因if (s > 0){buffer[s - 1] = 0;std::cout << "echo# " << buffer << " errno[---]: " << errno << " errstring: " << strerror(errno) << std::endl;}else{// 如果失败的errno值是11,就代表其实没错,只不过是底层数据没就绪// std::cout << "read \"error\" " << " errno: " << errno << " errstring: " << strerror(errno) << std::endl;if(errno == EWOULDBLOCK || errno == EAGAIN){std::cout << "当前0号fd数据没有就绪, 请下一次再来试试吧" <<std::endl;continue;}else if(errno == EINTR){std::cout << "当前IO可能被中断, 再试一试吧" << std::endl;continue;}else{//进行差错处理}sleep(1);}}return 0;
}
IO多路转接之select
系统提供select函数来实现多路复用输入/输出模型
- select系统调用是用来让程序监视多个文件描述符的状态变化的
- 程序会停在select这里等待,直到被监视的文件描述符有一个或者多个发生了状态改变
select函数原型
- nfds是需要监视的最大的文件描述符值+1
- 帮用户一次等待多个文件sock
- 当哪些文件sock就绪了,select就要通知用户,对应就绪的sock有哪些,然后,用户再调用
recv/recvfrom/read
等进行数据读取
struct timeval *timeout
的意义:
- select等待多个fd,等待策略可以选择三种
- 阻塞式 nullptr
- 非阻塞式 {0,0}
- 可以设置timeout时间,时间内阻塞,时间到立马返回 {5,0}
- timeout是一个输出型参数,如果等待时间内,有fd就绪,表示距离下一次timeout剩余多长时间。
fd_set 文件描述符集
本质是一个位图结构
以readfds
为例:
-
输入时:用户 -> 内核,比特位的位置:文件描述符值,比特位内容:是否关心
00001010 则表示关心2号比特位和4号比特位的文件描述符的读事件,不关心其他比特位的文件描述符的读事件。
-
输出时:内核 -> 用户,比特位的位置:文件描述符值,比特位内容:是否就绪
0000 1000 则表示,后续用户可以直接读取3号,而不会被阻塞。
编写代码注意事项
-
关于timeout:用户和内核都会修改同一个位图结构,因此这个参数用一次之后,一定需要进行重新设定。
struct timeval timeout = {5, 0};
-
关于nfds:随着获取的sock越来越多,添加到select的sock也会越来越多,此时nfds一定是动态变化,所以要对nfds进行动态计算
-
rfds/writefds/exceptfds:都是输入输出型参数,输入输出不一定一致,因此每一次要对rfds进行重新添加
-
由于2.3点,则:
- 需要有一个第三方数组,用来保存所有的合法fd
#define BITS 8 #define NUM (sizeof(fd_set)*BITS) #define FD_NONE -1 int _fd_array[NUM];//初始化 for(int i = 0; i < NUM; i++) _fd_array[i] = FD_NONE;
- 在while里面:① 遍历数组,更新出最大值 ② 遍历数组,添加所有需要关心的fd到fd_set位图中 ③ 调用select进行事件检测
fd_set rfds; FD_ZERO(&rfds); // 初始化 清空 int maxfd = _listensock; for (int i = 0; i < NUM; i++) {if (_fd_array[i] == FD_NONE)continue;FD_SET(_fd_array[i], &rfds);if (maxfd < _fd_array[i])maxfd = _fd_array[i]; } int n = select(maxfd + 1, &rfds, nullptr, nullptr, &timeout);
- 遍历数组,找到就绪的事件,根据就绪的事件,完成对应动作
void HandlerEvent(const fd_set &rfds){for (int i = 0; i < NUM; i++){if (_fd_array[i] == FD_NONE)continue;if (FD_ISSET(_fd_array[i], &rfds)){// 指定的fd,读事件就绪// 读事件就绪:连接事件到来,acceptif (_fd_array[i] == _listensock) Accepter(); else Recver(i); }}}
server.hpp
#ifndef __SELECT_SVR_H__
#define __SELECT_SVR_H__#include <iostream>
#include <sys/select.h>
#include <sys/time.h>
#include <vector>
#include <string>
#include "Log.hpp"
#include "Sock.hpp"#define BITS 8
#define NUM (sizeof(fd_set) * BITS)
#define FD_NONE -1
using namespace std;
// select 只完成读取,写入和异常不做处理 -- epoll(写完整)
class SelectServer
{
public:SelectServer(const uint16_t &port = 8080) : _port(port){_listensock = Sock::Socket();Sock::Bind(_listensock, _port);Sock::Listen(_listensock);logMessage(DEBUG, "%s", "create base socket success");for (int i = 0; i < NUM; i++)_fd_array[i] = FD_NONE;// 规定_fd_array[0] = _listensock;_fd_array[0] = _listensock;}void Start(){while (true){// struct timeval timeout = {0, 0};// int sock = Sock::Accept(_listensock); //不能直接调用accept// 加入select中,让select等// FD_SET(_listensock, &rfds);DebugPrint();fd_set rfds;FD_ZERO(&rfds); // 初始化 清空int maxfd = _listensock;for (int i = 0; i < NUM; i++){if (_fd_array[i] == FD_NONE)continue;FD_SET(_fd_array[i], &rfds);if (maxfd < _fd_array[i])maxfd = _fd_array[i];}int n = select(maxfd + 1, &rfds, nullptr, nullptr, nullptr);switch (n){case 0:logMessage(DEBUG, "%s", "time out...");break;case -1:logMessage(WARNING, "select error: %d : %s", errno, strerror(errno));default:// 成功的logMessage(DEBUG, "get a new link event...");HandlerEvent(rfds);break;}}}~SelectServer(){if (_listensock >= 0)close(_listensock);}private:void HandlerEvent(const fd_set &rfds){for (int i = 0; i < NUM; i++){if (_fd_array[i] == FD_NONE)continue;if (FD_ISSET(_fd_array[i], &rfds)){// 指定的fd,读事件就绪// 读事件就绪:连接事件到来,acceptif (_fd_array[i] == _listensock) Accepter(); else Recver(i); }}}void Accepter(){string clientip;uint16_t clientport = 0;// listensock上面的读事件就绪了,表示可以读取了// 获取新连接int sock = Sock::Accept(_listensock, &clientip, &clientport);if (sock < 0){logMessage(WARNING, "accept error");return;}logMessage(DEBUG, "get a new line success : [%s:%d] : %d", clientip.c_str(), clientport, sock);int pos = 1;for (; pos < NUM; pos++){if (_fd_array[pos] == FD_NONE)break;}if (pos == NUM){logMessage(WARNING, "%s:%d", "select server already full, close: %d", sock);close(sock);}else{_fd_array[pos] = sock;}}void Recver(int pos){// 读事件就绪:INPUT事件到来,recv,readlogMessage(DEBUG, "messsage in, get IO event: %d", _fd_array[pos]);// 先不考虑阻塞char buffer[1024];int n = recv(_fd_array[pos], buffer, sizeof(buffer)-1, 0);if(n > 0){buffer[n] = 0;logMessage(DEBUG, "client[%d]# %s", _fd_array[pos], buffer);}else if(n == 0){logMessage(DEBUG, "cilent[%d] quit, me too...", _fd_array[pos]);// 1. 不让select关心当前的fd了close(_fd_array[pos]);_fd_array[pos] = FD_NONE;}else{logMessage(WARNING, "%d sock recv error, %d : %s", _fd_array[pos], errno, strerror(errno));// 1. 不让select关心当前的fd了close(_fd_array[pos]);_fd_array[pos] = FD_NONE;}}void DebugPrint(){cout << "_fd_array[]: ";for (int i = 0; i < NUM; i++){if (_fd_array[i] == FD_NONE)continue;cout << _fd_array[i] << " ";}cout << endl;}private:uint16_t _port;int _listensock;int _fd_array[NUM];
};#endif
socket就绪条件
读就绪
- socket内核中,接收缓冲区中的字节数,大于等于低水平标记SO_RCVLOWAT,此时可以无阻塞的读该文件描述符,并且返回值大于0
- socket TCP通信中,对端关闭连接,此时对该socket读,则返回0
- 监听的socket上有新的连接请求
- socket上有未处理的错误
写就绪
- socket内核中,发送缓冲区中的可用字节数(发送缓冲区的空闲位置大小),大于等于低水平标记SO_SNDLOWAT,此时可以无阻塞的写,并且返回值大于0
- socket的写操作被关闭(close或者shutdown),对一个写操作被关闭的socket进行写操作,会触发SIGPIPE信号
- socket使用非阻塞connect连接成功或失败之后
- socket上有未读取的错误
select优缺点
通过编写代码可知select的优缺点:
优点:
- 相比多进程多线程效率高
- 应用场景:有大量连接,但是只有少量活跃时,省资源
缺点:
- 为维护第三方数组,select服务器会充满大量的遍历
- 每一次都要对select输出参数进行重新设定
- 能够同时管理的fd的个数是有上限的,取决于
sizeof(fd_set)
的值,字节长度乘以8比特则为支持的最大文件描述数 - 因为几乎每一个参数都是输入输出的,select一定会频繁的进行用户到内核,内核到用户的参数数据拷贝
- 编码比较复杂
IO多路转接之poll
poll
在select上的改进:
- 将输入输出参数进行了分离
- 解决了select管理上限问题
- 位图结构变为结构体结构
struct pollfd
//pollServer.hpp#ifndef __POLL_SVR_H__
#define __POLL_SVR_H__#include <iostream>
#include <sys/select.h>
#include <sys/time.h>
#include <vector>
#include <string>
#include <poll.h>
#include "Log.hpp"
#include "Sock.hpp"#define FD_NONE -1
using namespace std;
// select 只完成读取,写入和异常不做处理 -- epoll(写完整)
class PollServer
{public:static const int nfds = 100;
public:PollServer(const uint16_t &port = 8080) : _port(port), _nfds(nfds){_listensock = Sock::Socket();Sock::Bind(_listensock, _port);Sock::Listen(_listensock);logMessage(DEBUG, "%s", "create base socket success");_fds = new struct pollfd[_nfds];for(int i = 0; i < _nfds; i++){_fds[i].fd = FD_NONE;_fds[i].events = _fds[i].revents = 0; }_fds[0].fd = _listensock;_fds[0].events = POLLIN;_timeout = 1000;}void Start(){while (true){int n = poll(_fds, _nfds, _timeout);switch (n){case 0:logMessage(DEBUG, "%s", "time out...");break;case -1:logMessage(WARNING, "select error: %d : %s", errno, strerror(errno));default:// 成功的HandlerEvent();break;}}}~PollServer(){if (_listensock >= 0)close(_listensock);if (_fds) delete [] _fds;}private:void HandlerEvent(){for (int i = 0; i < _nfds; i++){if (_fds[i].fd == FD_NONE)continue;if (_fds[i].revents & POLLIN){// 指定的fd,读事件就绪// 读事件就绪:连接事件到来,acceptif (_fds[i].fd == _listensock) Accepter(); else Recver(i); }}}void Accepter(){string clientip;uint16_t clientport = 0;// listensock上面的读事件就绪了,表示可以读取了// 获取新连接int sock = Sock::Accept(_listensock, &clientip, &clientport);if (sock < 0){logMessage(WARNING, "accept error");return;}logMessage(DEBUG, "get a new line success : [%s:%d] : %d", clientip.c_str(), clientport, sock);int pos = 1;for (; pos < _nfds; pos++){if (_fds[pos].fd == FD_NONE)break;}if (pos == _nfds){// 对struct pollfd进行自动扩容logMessage(WARNING, "%s:%d", "select server already full, close: %d", sock);close(sock);}else{_fds[pos].fd = sock;_fds[pos].events = POLLIN;}}void Recver(int pos){// 读事件就绪:INPUT事件到来,recv,readlogMessage(DEBUG, "messsage in, get IO event: %d", _fds[pos]);// 先不考虑阻塞char buffer[1024];int n = recv(_fds[pos].fd, buffer, sizeof(buffer)-1, 0);if(n > 0){buffer[n] = 0;logMessage(DEBUG, "client[%d]# %s", _fds[pos].fd, buffer);}else if(n == 0){logMessage(DEBUG, "cilent[%d] quit, me too...", _fds[pos].fd);// 1. 不让select关心当前的fd了close(_fds[pos].fd);_fds[pos].fd = FD_NONE;_fds[pos].events = 0;}else{logMessage(WARNING, "%d sock recv error, %d : %s", _fds[pos].fd, errno, strerror(errno));// 1. 不让select关心当前的fd了close(_fds[pos].fd);_fds[pos].fd = FD_NONE;_fds[pos].events = 0;}}void DebugPrint(){cout << "_fd_array[]: ";for (int i = 0; i < _nfds; i++){if (_fds[i].fd == FD_NONE)continue;cout << _fds[i].fd << " ";}cout << endl;}private:uint16_t _port;int _listensock;struct pollfd *_fds;int _nfds;int _timeout;
};#endif
poll的优缺点
优点:
- 效率高
- 有大量的连接,但是只有少量的是活跃的,节省资源
- pollfd结构包含了要监视的event和发生的event,不再使用select”参数-值“传递的方式,接口使用比select更方便
- 输入输出参数分离,不需要大量的重置
- poll参数级别,对于可管理的fd的数量无上限
缺点:
- poll依旧需要不少的遍历,在用户层检测时间就绪,与内核检测fd就绪,都是一样
- 每次调用poll都需要把大量的pollfd结构从用户态拷贝到内核中
- poll代码编写也比较复杂(比select容易 )
IO多路转接之epoll
1 相关接口
1.1 epoll_create
创建一个epoll句柄,用完之后必须调用close()关闭
1.2 epoll_ctl 事件注册函数
对创建的epoll模型进行相关操作
不同于select()是在监听事件时告诉内核要监听什么类型的事件,而是先在这里注册要监听的事件类型
op有三个取值:
- EPOLL_CTL_ADD 增新的fd
- EPOLL_CTL_DEL 删fd
- EPOLL_CTL_MOD 修改已经注册的fd的监听事件
struct epoll_event的结构:
其中events可以是以下几个宏的集合:
- EPOLLIN : 表示对应的文件描述符可以读 (包括对端SOCKET正常关闭);
- EPOLLOUT : 表示对应的文件描述符可以写;
- EPOLLPRI : 表示对应的文件描述符有紧急的数据可读 (这里应该表示有带外数据到来);
- EPOLLERR : 表示对应的文件描述符发生错误;
- EPOLLHUP : 表示对应的文件描述符被挂断;
- EPOLLET : 将EPOLL设为边缘触发(Edge Triggered)模式, 这是相对于水平触发(Level Triggered)来说的.
- EPOLLONESHOT:只监听一次事件, 当监听完这次事件之后, 如果还需要继续监听这个socket的话, 需要再次把这个socket加入到EPOLL队列里.
1.3 epoll_wait
在特定的epoll当中获取已经就绪的事件,
关于epoll_wait的返回值:
设epoll_wait的返回值为n,每一次返回的n为就绪的事件的数量,并且存储在*events中,从0下标开始,按顺序存储。因此每次读 0 ~ (n-1) 即可读完就绪的事件,不会造成其他的浪费。
2 epoll优点
-
接口使用方便,虽然拆分成了三个函数,但是反而使用起来更方便高效,不需要每次循环都设置关注的文件描述符,也做到了输入输出参数分离
-
数据拷贝轻量,只在合适的时候调用
EPOLL_CTL_ADD
将文件描述符结构拷贝到内核中,这个操作并不频繁(而select/poll)是每次循环都有进行拷贝 -
事件回调机制:避免使用遍历,而是使用回调函数的方式,将就绪的文件描述符结构加入到就绪队列中,epoll_wait返回直接访问就绪队列就知道哪些文件描述符就绪,这个操作时间复杂度O(1),即时文件描述符数目很多,效率也不会受到影响
-
可监听文件描述符数目无上限
3 工作原理
OS如何知道,网卡里面有数据,或者键盘有用户输入?以从网卡读入为例,下面是模拟示意图
用户告诉内核需要维护的fd已经相关事件,内核会维护一个红黑树用来存储。同时,会创造一个就绪队列。
在系统中,每一个epoll模型对应的都是一个eventpoll结构。
注:
- 此处的红黑树的key值是描述符
- 用户只需要设置关系,获取结果即可,不用关心任何对fd与event的管理细节
- 底层要有fd就绪,OS会自动建立节点,连入到就绪队列中,上层只需要不断地从就绪队列中将数据拿走,就完成了获取就绪事件的任务(生产者与消费者模型)。
- 就绪队列的本质是一个共享资源,epoll已经保证所有的epoll接口都是线程安全的。
4 epoll服务器的封装
由于epoll中关心的文件描述符都必须是合法的文件描述符,因此当客户端断开连接时,首先应该在epoll中移除对该sock文件描述符的关心,再close该文件描述符,否则会出错。
为了保证每一回合的正确读取,每一个socket都要有自己的缓冲区。
5 epoll工作方式
epoll有两种工作方式,水平触发(LT)和边缘触发(ET)。select,poll,epoll的默认模式都是LT模式。
1. LT(Level Triggered,水平触发)
工作模式 在LT模式下,当epoll_wait()检测到描述符上有事件发生时,会重复通知应用程序,直到应用程序处理完所有的事件并将相应的文件描述符设置为非阻塞状态后,epoll_wait()才会返回。这意味着,如果有一个文件描述符上有多个事件发生,但应用程序没有一次性处理完所有的事件,那么epoll_wait()将继续通知应用程序该文件描述符上尚未处理的事件。
LT模式适用于处理普通的I/O事件,即不需要立即响应的事件。使用LT模式,应用程序可以在任何时候处理事件,而不必担心错过任何事件。
2. ET(Edge Triggered,边缘触发)工作模式
在ET模式下,当epoll_wait()检测到描述符上有事件发生时,只会通知应用程序一次,直到应用程序处理完该描述符上所有待处理事件之后,才会再次通知应用程序有新的事件发生。在ET模式下,应用程序需要立即响应事件,否则将会错过事件。
ET模式适用于处理高速、高流量的I/O事件,即需要尽可能快地响应事件。使用ET模式,应用程序可以尽可能地多次处理事件,从而提高效率。在ET模式下,应用程序需要对每个文件描述符上的所有事件进行处理,否则将会错过事件。
ET模式更高效:
- 更少的返回次数
- ET模式会倒逼程序员尽快将接收缓冲区中的数据全部取走,应用层尽快的取走了缓冲区的数据,那么在单位时间,该模式下的工作的服务器,就可以在一定程度上,给发送发送一个更大的接收窗口,所以发送方就可以有更大的滑动窗口,一次向接收方发送更多的数据,提高IO吞吐。
select、poll、epoll优缺点总结
select
优点:
- 跨平台支持:select是传统的I/O多路复用机制,几乎在所有操作系统上都有支持。
- 简单易用:使用select相对简单,只需使用一个文件描述符集合进行监听,并由内核通知就绪的文件描述符。
- 支持小规模连接:适用于连接数较少的情况,例如几十到几百个。
缺点:
- 文件描述符限制:select使用的文件描述符集合是一个位图,其大小被限制为FD_SETSIZE,默认一般较小。
- 效率低下:每次调用select,都需要将所有待监听的文件描述符从用户空间拷贝到内核空间,造成性能损耗。
- 遍历开销大:当就绪事件较少时,select需要遍历整个文件描述符集合,效率较低。
poll
优点:
- 跨平台支持:与select一样,poll也是具有跨平台特性的多路复用机制。
- 无文件描述符限制:poll没有select的文件描述符数量限制。
- 简单易用:使用一个pollfd数组进行监听,通过判断revents字段来确定就绪事件。
缺点:
- 效率问题:poll仍需要将所有待监听的文件描述符从用户空间拷贝到内核空间,造成性能损耗。
- 遍历开销大:当就绪事件较少时,poll需要遍历整个pollfd数组,效率较低。
epoll
优点:
- 高性能:epoll利用了内核的事件通知机制,只返回就绪的文件描述符,避免了遍历整个文件描述符集合的开销。
- 处理大规模连接:适用于连接数较大的情况,例如成千上万个连接。
- 零拷贝:epoll支持操作系统的零拷贝技术,减少了数据拷贝的次数,提高了性能。
缺点:
- Linux特定:epoll是Linux下的特有机制,不具备跨平台能力。
- 编程复杂:相对于select和poll,使用epoll需要更加复杂的编程方式。
- 内存占用:epoll需要维护一个用于存储事件的数据结构(epoll_event数组),占用一定的内存空间。
综上所述,epoll相较于select和poll,在性能和扩展性方面具有明显优势,但在跨平台和编程复杂性方面存在一些限制。因此,在选择使用哪种多路复用机制时,需要根据具体的应用场景和需求进行权衡和选择。