本章代码Gitee地址:EpollServer
文章目录
- 1. epoll接口
- 1.1 epoll_create
- 1.2 epoll_wait
- 1.3 epoll_ctl
- 2. epoll原理
- 3. epoll_server
- 4. epoll两种工作模式
1. epoll接口
1.1 epoll_create
#include <sys/epoll.h>
int epoll_create(int size);
参数int size
理论上可以随便写(已废弃)
返回值:
- 成功返回一个文件描述符
- 失败返回
-1
1.2 epoll_wait
#include <sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
参数:
-
int epfd
:epoll_create
的返回值 -
struct epoll_event *events, int maxevents
:用户及缓冲区,返回已经就绪的文件描述符和事件typedef union epoll_data {void *ptr;int fd;uint32_t u32;uint64_t u64; } epoll_data_t;struct epoll_event {uint32_t events; //位图传递epoll_data_t data; // };
-
int timeout
:超时时间,单位是毫秒,0
为非阻塞,-1
为阻塞式
返回值:已经就绪的文件描述符的个数
1.3 epoll_ctl
#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
参数:
-
int epfd
:epoll_create
的返回值 -
int op
:EPOLL_CTL_ADD //增添 EPOLL_CTL_MOD //修改 EPOLL_CTL_DEL //删除
-
int fd, struct epoll_event *event
:哪个文件描述符上的哪个事件
select
和poll
都是用数组维护的,需要用户进行管理
2. epoll原理
网卡是外设,当硬件就绪之后,会以硬件中断的方式来告诉操作系统,将网卡的数据读到网卡驱动上,而操作系统读数据是从文件缓冲区读取数据。
所以为了支持epoll
,操作系统支持三种机制:
-
内核会维护一颗红黑树,红黑树节点里面包含:
struct rb_node {int fd; //内核要关系的文件描述符uint32_t event; //要关系的事件 位图形式//... }
-
此外还会维护一个就绪队列,一旦红黑树上有节点就绪,此时就会将该节点链入到队列当中
struct list_node
{int fd; //已就绪的文件描述符uint32_t event; //已就绪的事件//...
}
-
操作系统的底层网卡,是允许操作系统注册一些回调机制。
操作系统内部提供一个回调函数,网卡以中断的方式将数据搬到了网卡驱动层,驱动层当中有数据就绪了,那么数据链路层就会自动调用对应的回调函数。
这个回调函数要做的就是:- 向上交付
- 数据到来解包交到
tcp
接收队列 - 查找
rb_tree->fd
- 构建就绪节点,插入就绪队列
以上三套机制,就叫做epoll
模型
Linux一切接文件,strcut file
指针指向这个epoll
模型,然后将struct file
对象添加到进程文件描述符表里面,所以epoll
的返回值是一个文件描述符。
epoll
优势:
-
检测就绪时间复杂度为O(1),判断队列是否为空
获取就绪队列时间复杂度O(n)
-
fd
、event
没有上限,所以的文件描述符和关系的事件都是由红黑树管理的,这颗红黑树多大,操作系统决定
如何看待这颗红黑树?
select
和poll
都需要辅助数组,数组用户维护,而这颗红黑树就相当于之前我们自己维护的数组,只不过在epoll
里面是由系统管理
epoll_wait
返回值表示有多少事件就绪,将就绪的节点一个一个弹出,依次放入数组,就绪事件是连续的
3. epoll_server
#include<iostream>
#include<memory>
#include<string>
#include"Socket.hpp"
#include"Log.hpp"
#include"Epoller.hpp"
#include"Nocopy.hpp"uint32_t EVENT_IN = (EPOLLIN);
uint32_t EVENT_OUT = (EPOLLOUT);class EpollServer : public Nocopy
{static const int defaultnum = 64; //默认一次性最多获取64个事件
public:EpollServer(uint16_t port):_port(port),_listensock_ptr(new MySocket()),_epoller_ptr(new Epoller()){}void Init(){//创建套接字_listensock_ptr->Socket();//绑定套接字_listensock_ptr->Bind(_port);//监听套接字_listensock_ptr->Listen();log(Info, "create listen socket success: %d", _listensock_ptr->Getfd());}void Accepter(){// 获取新链接std::string clientip;uint16_t clientport;int sock = _listensock_ptr->Accept(&clientip, &clientport);if (sock > 0){// 不能直接读取,获取连接不代表发送了数据// 让epoll去关心_epoller_ptr->EpollerCtl(EPOLL_CTL_ADD, sock, EVENT_IN);log(Info, "get a new link, clientip: %s, clientport: %d", clientip.c_str(), clientport);}}void Recver(int fd){// 读事件就绪char buffer[1024];ssize_t n = read(fd, buffer, sizeof(buffer) - 1); //BUGif (n > 0){buffer[n] = 0;std::cout << "get a message: " << buffer << std::endl;//返回std::string echo_str = "server echo $";echo_str += buffer;write(fd, echo_str.c_str(), echo_str.size());}else if (n == 0){log(Info, "client quit, me too, close fd:%d", fd);//从epoll当中移除 删除红黑树节点_epoller_ptr->EpollerCtl(EPOLL_CTL_DEL, fd, 0);close(fd); //细节 先移除再关闭}else{log(Warning, " read error, close fd:%d", fd);}}void Dispatcher(struct epoll_event revs[], int num){for(int i = 0; i < num; i++){uint32_t events = revs[i].events;int fd = revs[i].data.fd;if(events & EVENT_IN){//读事件就绪if(fd == _listensock_ptr->Getfd()){Accepter();}else{//其他事件就绪Recver(fd);}}else if(events & EVENT_OUT){//写事件就绪}}}void Start(){//listensock套接字添加进epoll当中//listensock和它关心的事件 本质上添加到内核epoll模型的rb_tree里面_epoller_ptr->EpollerCtl(EPOLL_CTL_ADD, _listensock_ptr->Getfd(), EVENT_IN); //关心读事件struct epoll_event revs[defaultnum]; //存放就绪的事件for(; ;){//epoll只负责等待int n = _epoller_ptr->EpollerWait(revs, defaultnum);if(n > 0){//有事件就绪log(Debug, "event happend, fd is : %d", revs[0].data.fd);//提取就绪事件 epoll_wait返回值会返回就绪的事件数量//如果数量大于定义的大小, 下次再捞Dispatcher(revs, n); }else if(n == 0){log(Info, "time out...");}else{log(Error, "epoll_wait error");}}}~EpollServer(){_listensock_ptr->Close();}
private:std::shared_ptr<MySocket> _listensock_ptr;std::shared_ptr<Epoller> _epoller_ptr;//MySocket _listensock;uint16_t _port;//Epoller _epoller;
};
4. epoll两种工作模式
LT
模式:
epoll
默认工作模式是LT(Level Triggered水平触发)
模式
当事件到来时,如果上层一直不取走,底层会一直通知
select
和poll
采用的也是LT
模式
EL
模式:
EL(Edge Triggered边缘触发)
模式是当数据变化的时候,才会通知一次
数据从无到有,从少到多
打个比方:
快递员A(
LT模式
)送快递的时候,如果客户一直不取,他就一直打电话,说你的快递到了,签收一下;快递员B(
ET模式
)送快递的时候,只通知一次,然后就放在驿站了;如果之后又有快递到了,则又通知一次;快递员A在一个小时只能,可能只能通知到几个客户;而快递员B在一个小时之内可以通知多个客户
ET
不止通知效率高于LT
,IO
效率也高于LT
由于
ET
只通知一次,所以就倒逼上层,每次都要把本轮数据全部取走
如何知道本轮数据全部取完?
比如说,我们有550g的大米,每天要吃100g,前5天正常,到第6天的时候,原本是要吃50g大米的,可是只能吃50g了,这就说明大米没有了
也就是说当需要读取的目标数据大于实际读取的数据的时候,就表明数据已经全部取走。
这就需要我们循环读取数据,直到读取出错为止,可是fd
是默认是阻塞的,所以在ET
模式下,所有的fd
要设置成非阻塞Non_block
,如果不设置,程序会一直阻塞住每次都能取走全部的数据,接收缓冲区就有空间了,这样
tcp
就能给对方通知更大的窗口,然后对方就可以给我们发送更多的数据
ET
是否一定比LT
高效?将
LT
所有的fd
设置成non_block
非阻塞,然后循环读取,这就个ET
类似了
这里所谓的通知一次和每次通知,本质上其实是向就绪队列添加一次还是每次都添加