Linux学习之高级IO

之前的内容我们基本掌握了基础IO,如套接字,文件描述符,重定向,缓冲区等知识都是文的基本认识,而高级IO则是指更加高效的IO。

对于应用层,在读写的时候,本质就是把数据写给OS,若一方为空,就会进行阻塞式等待,读写的本质也就是数据的拷贝。

所以高效的 IO就需要在多线程的情况下,单位时间的的拷贝的数据更多,等待的比重越小。

目录

五种IO模型

非阻塞式IO

多路复用(多路转接)

IO多路转接之select

初识select

 select函数原型

IO多路转接之epoll

1.认识有关epoll的接口

2.epoll原理

3.编写epoll

4.epoll的工作模式

注意:

读写的改进 reactor


五种IO模型

 当前有五种IO模型:

我们以钓鱼佬钓鱼为例:(鱼竿:文件描述符) (上钩:读写就绪)(鱼:数据)

1.钓鱼佬张三,在完成打窝,之后选择好地点就开始钓鱼,在等待的过程中,雷打不动,任何人都干扰不到他,他死死的盯着鱼漂,一有动静,钩被咬了就提竿上鱼。这种等待的方式我们称为阻塞式IO,这也是大部分常用的读写接口的方式

2.钓鱼佬李四是一个资深的钓鱼佬,在完成准备工作时,李四就先观察了鱼漂,发现没动静,就拿起了手机刷视频,看一会儿之后再检查鱼漂发现上钩了,于是提杆上鱼。李四每隔一段时间进行检查,如果满足条件就读写,反之就干自己的事。这种方式称为非阻塞式IO--非阻塞轮询

3.钓鱼佬王五,拥有较为先进的鱼竿,完成准备工作后,直接就把鱼竿插入地上,一心一意的干其它事,鱼竿上有报警装置,一旦上鱼,立刻发送警报告知王五,所以王五只需要看有没有报警信号。这种方式被称为信号驱动式IO

4.赵六,身家万贯的热爱钓鱼的钓鱼佬,秉持着杆多鱼多的准则,直接准备了一卡车鱼竿,依次完成准备工作,入水后准备开钓,由于鱼竿数量多,赵六在岸上来回检查看哪只鱼竿上货了。这种方式称为多路复用(多路转接)。

5.田七,以吃鱼为爱好的世界五百强上市公司CEO,司机小王拉着田七来钓鱼,刚准备钓鱼,由于公司业务繁忙,对钓鱼不是很感冒,田七又回到了公司处理,且告知小王车上装备齐全,你在这里钓鱼,掉满了之后再给我打电话,我来接你。对于田七来说,这种方式称为异步IO

首先我们来看看阻塞式与非阻塞式 IO:

两者在效率上基本一样(有人觉得非阻塞式效率会高一点,这指的是非阻塞可以去干别的事,但是对于IO效率(等+拷贝)是一样的),区别是等的方式不同,一个一直等,一个间隔等一会儿。

再看同步与异步:

同步:参与等或者参数与了拷贝,都可以是同步IO。异步:两者都没参与,只拿数据。

那么综上:哪一种效率更高呢?

实际上就是田七,多路复用IO。我们学习的重点也就是多路复用.

非阻塞式IO

fcntl
一个文件描述符 , 默认都是阻塞 IO,通过函数fcntl可以修改文件描述符的属性。
函数原型如下
#include <unistd.h>
#include <fcntl.h>
int fcntl(int fd, int cmd, ... /* arg */ );
传入的 cmd 的值不同 , 后面追加的参数也不相同 .
fcntl 函数有 5 种功能 :
复制一个现有的描述符( cmd=F_DUPFD .
获得 / 设置文件描述符标记 (cmd=F_GETFD F_SETFD).
获得 / 设置文件状态标记 (cmd=F_GETFL F_SETFL).
获得 / 设置异步 I/O 所有权 (cmd=F_GETOWN F_SETOWN).
获得 / 设置记录锁 (cmd=F_GETLK,F_SETLK F_SETLKW).
我们此处只是用第三种功能 , 获取 / 设置文件状态标记 , 就可以将一个文件描述符设置为非阻塞
实现函数 SetNoBlock
基于 fcntl, 我们实现一个 SetNoBlock 函数 , 将文件描述符设置为非阻塞 .
void SetNoBlock(int fd) { int fl = fcntl(fd, F_GETFL); if (fl < 0) { perror("fcntl");return; }fcntl(fd, F_SETFL, fl | O_NONBLOCK); 
}
使用 F_GETFL 将当前的文件描述符的属性取出来 ( 这是一个位图 ).
然后再使用 F_SETFL 将文件描述符设置回去 . 设置回去的同时 , 加上一个 O_NONBLOCK 参数

了解到这两个主要的接口,我们就可以简单的实现一下非阻塞:

#include<iostream>
#include<string.h>
#include<unistd.h>
#include<fcntl.h>
#include<cstdio>
using namespace std;//默认阻塞,我们设置为非阻塞
void setNonBlock(int fd)
{int f1=fcntl(fd,F_GETFL);if(f1<0){cerr<<"get failed"<<endl;}//设置标记位,实际上是在原基础上新增一个标记位fcntl(fd,F_SETFL,f1 | O_NONBLOCK);//以非阻塞式打开cout<<"set O_NONBLOCK succed"<<endl;
}int main()
{char buffer[1024];//缓冲区while(true){printf("please enter#");fflush(stdout);setNonBlock(0);sleep(1);ssize_t size=read(0,buffer,sizeof(buffer)-1);//从标准输入中读取if(size>0){buffer[size-1]=0;cout<<"读取到数据 :"<<buffer<<endl;}else if(size==0){cout<<"read done"<<endl;break;}else{cerr<<errno<<":"<<strerror(errno)<<endl;}}return 0;
}

一般阻塞式,系统就会等待着我们输入数据后,才进行下一步工作(这里是打印输入的字符)。

我们设置为非阻塞式有几点需要注意:

1.设置为非阻塞,系统不会等待着我们输出,才打印,而是以读取到数据<0打印错误信息。

2,出错非两种情况:第一个就是fd真的异常 ,第二个就是底层还没就绪,你就返回了。(这里是第二种)。

3.非阻塞也能获取到数据并打印,不过给你就绪的时间非常短。

对于第二点,如何区分是还没就绪还是fd异常,可以通过打印错误码而知晓。

多路复用(多路转接)

实际上对于阻塞式,非阻塞式,非阻塞式轮询这些等待和拷贝一个函数就会搞定了,但是对于多路转接,因为要提高IO效率,因此等待与拷贝是分开的的,多路复用的本质就是非阻塞轮询,因此我们需要将非阻塞和轮询逐一实现。

IO多路转接之select

初识select

系统提供 select 函数来实现多路复用输入 / 输出模型 .
select 系统调用是用来让我们的程序监视多个文件描述符的状态变化的 ;
程序会停在 select 这里等待,直到被监视的文件描述符有一个或多个发生了状态改变

 select函数原型

 select的函数原型如下: #include <sys/select.h>

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

其中有三个参数类型为fd_set,位图类型,为输入输出型参数。

为输入型参数时:用户告诉内核,我给一些fd,你帮我注意观察一些fd的读事件,如果读事件就绪,你就告诉我

为输出型参数时:内核告诉用户,你给我传来的fd,我已经帮你知晓了有哪些已经就绪(进行位图修改),你可以读了。

至于为什么用位图,因为位图本质就是设置某个数的第几位为0/1,举例:如果此时内核发现0号和1号文件被描述符已经就绪,就会把第0位和第1位比特位置为1, (比特位的位置代表对应文件描述符,从右向左)。比特位的内容就可以表示内核是否要关心。

作为输入型位图:

比特位的位置代表文件描述符

比特位的内容代表是否内核需要关心

 作为输出型位图:

比特位的位置还是文件描述符

比特位的内容代表用户关心的fd的读事件就绪了

于是就提供了一系列位图的接口用来修改为位图。

  其中timeout为一个结构体,里面有两个成员:一个代表秒,一个代表微秒。、

如果设置了timeout,在该时间内如果有已经就绪的,立即返回,之后的timeout表示剩下的时间。

如果没设置,只要有一个就绪了,就返回。

select的返回值:

  1. 返回值

    • 正常情况下,返回已准备好的文件描述符的个数。
    • 经过超时时长后仍无设备准备好,返回值为 0。
    • 如果 select 被某个信号中断,返回 -1 并设置 errno 为 EINTR
    • 如果出错,返回 -1 并设置相应的 errno

对于服务端来说,使用多路复用可以提高IO效率,当没有用访问服务端使,此时select的个数为零,但一有客户端访问,就会有对应的文件描述符(接收缓冲区)被创建,并且添加进行状态检测。

select.hpp

#include "Socket.hpp"
#include <sys/select.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#include <iostream>const int max_fd = (sizeof(fd_set)) * 8; // 8个位图最多存储的bit位
static const uint32_t defaultport = 8080;
class SelectServer
{
public:SelectServer(const uint32_t Port = defaultport) : port(Port){// 对辅助数组初始化为-1for (int i = 0; i < 1024; i++){fd_arry[i] = defaultfd;}}~SelectServer(){_listenfd.Close();}bool Init(){_listenfd.Createsockfd();_listenfd.Bind(port);_listenfd.Listen();return true;}void Accept(){// 连接就绪,获取新连接可以通信了std::string clientip;uint16_t clientport;int sockfd = _listenfd.Accept(&clientip, &clientport); // 就绪了,不在阻塞if (sockfd < 0)return;std::cout << "获取连接成功"<< "描述符:" << sockfd << std::endl;// 获取之后不能进行读取,而是设置select,不然还是会阻塞在select中int pos = 1;for (; pos < max_fd; pos++){if (fd_arry[pos] != defaultfd){continue;}else{break;}}if (pos == max_fd){std::cout << "can not handle" << std::endl;close(sockfd); // 无法处理就关掉接收的文件描述符}else{// 找到了空闲位置,此时放到fd_arry里面管理fd_arry[pos] = sockfd;}}void recver(int fd,int i){// 不在位图中代表的是接收后新的fdchar buffer[1024];ssize_t n = read(fd, buffer, sizeof(buffer) - 1);if (n > 0){std::cout << "read meesage:" << buffer << std::endl;}else if(n==0){std::cout<<"client closed" << std::endl;//如果为空,说明要关闭连接了close(fd);fd_arry[i]=defaultfd;}else{std::cout<<"read warning"<<std::endl;close(fd);fd_arry[i]=defaultfd;}}void Disapacher(fd_set fds){// 为了确定就绪的文件描述符。需要遍历整个数组for (int i = 0; i < max_fd; i++){int fd = fd_arry[i];if (fd == defaultfd){continue;}// 先判断文件描述符是否在位图中if (fd == _listenfd.Fd() && FD_ISSET(fd, &fds)) // 判断是否是位图中的,是就代表就绪{Accept();}else{recver(fd,i);}}}void Start(){int listensock = _listenfd.Fd(); // 获取套接字的文件描述符fd_arry[0] = listensock;fd_set rfds;                     // 读的文件描述符位图FD_ZERO(&rfds);                  // 先进行清空struct timeval timeout = {10, 0}; // 设置时间戳,每隔五秒检验一次int fdmax = fd_arry[0];while (true){for (int i = 0; i < max_fd; i++){if (fd_arry[i] == defaultfd){continue;}else{// 如果不为-1,说明该位置有要检测的文件描述符FD_SET(fd_arry[i], &rfds); // 将指定的文件描述符添加进来(交给select进行检测)// 同时找出最大的文件描述符if (fd_arry[i] > fdmax){fdmax = fd_arry[i];std::cout << "max has updated:" << fdmax << std::endl;}}}// 这里不能直接进行accept,accept的本质就是检测并获取listenfd上面的事件,accept只能阻塞等一个// 因为我们要实现多路复用,所以这里是要去等待多个套接字并检测他们的状态,使用selectint n = select(fdmax + 1, &rfds, nullptr, nullptr, &timeout); // 通过内核的select帮我进行检测,刚开始启动只有一个listenfdswitch (n){case 0:std::cout << "time out" << std::endl;break;case -1:std::cout << "select error" << std::endl;default:// 有链接了,如果对链接不处理,select会一直通知你,需要处理并设置位图// 新连接到来我们认为是读事件,Disapacher(rfds);//名为事件派发器break;}}}private:Sock _listenfd;uint32_t port;int fd_arry[max_fd]; // 辅助数组,统计所有描述符
};

 main.cc

#include"SelectSever.hpp"
#include<memory>
#include<iostream>void usehelp(char*s)
{printf("use correct code:%s port[1024]\n",s);
}
int main(int argc,char*argv[])
{std::unique_ptr<SelectServer>  server(new(SelectServer));server->Init();server->Start();return 0;
}

编写select的思路:

1.首先创建,绑定,监听套接字,之后创建文件描述符数组对所有文件描述符管理。

2.插入最开始的listenfd到数组里,遍历数组找出最大的文件描述符,设置位图用来标识就绪的文件描述符,之后select数组最大fd+1文。

2.select之后判断是否有新链接,有新的连接(这里我们默认是读事件)就会有新的文件描述符,对事件进行处理。

3.处理时,数组里的文件描述符分两种,一种使位图中的就绪的fd,一种是就绪后进行accept的新的fd,因此对整个数组进行判断,是位图中的,就行新接受新连接,是accept的进行读事件,反之是-1,continue。

但是select是有很明显的缺陷:

1.等待的fd是有上限的--1024

2.输入输出参数较多,数据拷贝频率高

3.输入输出参数多,每一次都对需要关心的fd重置以达到复用

4.管理数组fd,有太多次进行遍历

为了解决种种情况,我们使用了下一种接口(poll):

poll 函数接口
说明
fds 是一个 poll 函数监听的结构列表 . 每一个元素中 , 包含了三部分内容 : 文件描述符 , 监听的事件集合 , 返 回的事件集合.
nfds 表示 fds 数组的长度 .
timeout 表示 poll 函数的超时时间 , 单位是毫秒 (ms).
返回结果
返回值小于 0, 表示出错 ;
返回值等于 0, 表示 poll 函数等待超时 ;
返回值大于 0, 表示 poll 由于监听的文件描述符就绪而返回

这里我们介绍一下,重点放在select_poll上。

IO多路转接之epoll

epoll的说明是:为了处理大批量句柄而改进的poll,

1.认识有关epoll的接口

epoll_create 创建一个epoll模型

 epoll_wait  等待多长时间进行一次事件检查 返回就绪的fd与event

 返回值位已经准备就绪的个数。对于event的宏表示的就绪时间,类型如下:

EPOLLIN : 表示对应的文件描述符可以读 ( 包括对端 SOCKET 正常关闭 );
EPOLLOUT : 表示对应的文件描述符可以写 ;
EPOLLPRI : 表示对应的文件描述符有紧急的数据可读 ( 这里应该表示有带外数据到来 );
EPOLLERR : 表示对应的文件描述符发生错误 ;
EPOLLHUP : 表示对应的文件描述符被挂断 ;
EPOLLET : EPOLL 设为边缘触发 (Edge Triggered) 模式 , 这是相对于水平触发 (Level Triggered) 来说的 .
EPOLLONESHOT :只监听一次事件 , 当监听完这次事件之后 , 如果还需要继续监听这个 socket 的话 , 需要
再次把这个 socket 加入到 EPOLL 队列里 .

epoll_ctl  作用是对系统的文件描述符的事件增加,删除,修改。即对文件描述符做管理

 第一个参数就是epoll_create的返回值,op代表三个选项(增加,修改,删除),第三个代表那个fd对应的哪个事件。

对于epoll的使用时要使用这三个系统接口的。

2.epoll原理

当某一进程调用 epoll_create 方法时, Linux 内核会创建一个 eventpoll 结构体,这个结构体中有两个成 员与epoll 的使用方式密切相关
每一个 epoll 对象都有一个独立的 eventpoll 结构体,用于存放通过 epoll_ctl 方法向 epoll 对象中添加进来 的事件.
这些事件都会挂载在红黑树中,如此,重复添加的事件就可以通过红黑树而高效的识别出来 ( 红黑树的插 入时间效率是lgn ,其中 n 为树的高度 ).
而所有添加到 epoll 中的事件都会与设备 ( 网卡 ) 驱动程序建立回调关系,也就是说,当响应的事件发生时 会调用这个回调方法.
这个回调方法在内核中叫 ep_poll_callback, 它会将发生的事件添加到 rdlist 双链表中 .
epoll 中,对于每一个事件,都会建立一个 epitem结构体

实际上epoll和poll是两个不同的模型,没太多关系,epoll是专门为用户设计的一个底层事件调度的模型。

3.编写epoll

对于select来说,我们使用一个系统调用接口加上文件描述符数组来进行非阻塞式的等待。

那么对于epoll来说就是通过三个系统调用接口,1.创建模型,3.进行非阻塞式等待(时间自己设置),2.使用epoll_ctl管理事件(本质上时使用红黑树的节点绑定事件进行高效的管理)。

每一个事件和她的描述符都被封装在一个结构体epoll_event当中,之后通过一个数组管理这里epol_event。

第一个不太完美的版本的eopp如下:

epoler.hpp

//封装epoll接口
#pragma once
#include <sys/epoll.h>
#include <unistd.h>
#include<iostream>
#include<string.h>
#include"Nocopy.hpp"uint32_t EVENT_IN=(EPOLLIN); //读事件
uint32_t EVENT_OUT=(EPOLLOUT);//写事件//因为我们不想该类被拷贝,所以继承不被拷贝的类
class Epoll :public nocopy
{public:static const int size=128;Epoll():_timeout(3000)     //设置检测时间3s一次,若为零就是非阻塞等待{//创建epoll模型_epfd=epoll_create(size);if(_epfd==-1){std::cout<<"errno: "<<errno<<"error message:"<<strerror(errno)<<std::endl;}else{std::cout<<"create succed ,epfd:"<<_epfd<<std::endl;}}//使用epoll接口进行等待int Epollwait(struct epoll_event events[],int num) //输入参数events表示就序的事件 num表示个数{int ret=epoll_wait(_epfd,events,num,_timeout);//就绪了就从这里拿}int Epollctl(int oper,int sock,uint32_t event){//向指定模型根据oper添加修改删除事件if(oper==EPOLL_CTL_DEL){//这里对删除事件进行单独处理int ret=epoll_ctl(_epfd,oper,sock,nullptr);}else{//注册事件struct epoll_event ev; //创建事件结构体并设置ev.events=event;//设置事件ev.data.fd=sock;//设置fdint ret=epoll_ctl(_epfd,oper,sock,&ev);//进行什么操作呢  本质就是向红黑树新增节点,节点绑定事件if(ret==-1){std::cout<<"errno: "<<errno<<"error message:"<<strerror(errno)<<std::endl;}else{std::cout<<"add succed ,epfd:"<<_epfd<<std::endl;}return ret;}}~Epoll(){if(_epfd==-1){close(_epfd);std::cout<<"close succed ,epfd:"<<_epfd<<std::endl;}}private:int _epfd;int _timeout; 
};

epollserver.hpp

#pragma once
#include"Socket.hpp"
#include"Epoller.hpp"
#include"Nocopy.hpp"
#include<iostream>
#include<memory>
#include<sys/epoll.h>//设置你要关心的时事件const uint16_t defalutport=8080;
class EpollServer :public nocopy
{public:static const int num=64;//每次从就绪队列中取64个就绪事件EpollServer(uint16_t port=defalutport):_port(port),_listensock(new Sock),_epoller(new Epoll){}bool Init(){_listensock->Createsockfd();_listensock->Bind(_port);_listensock->Listen();return true;}void Start(){//将listensock添加到epoll中检测你关心的读写事件,这里就是添加_epoller->Epollctl(EPOLL_CTL_ADD,_listensock->Fd(),EVENT_IN);struct epoll_event evns[num];//num代表最多就绪的个数,若果等待的个数太多,就会分配批次取waitwhile(true){int n=_epoller->Epollwait(evns,num); //返回值代表就绪的个数if(n>0){//在事件插入之后,有事件就绪了std::cout<<"检测对应的fd的事件,fd:"<<evns[0].data.fd<<std::endl; //既然就绪,那就开始处理事件HandlerEvent(evns,n);}else if(n==0){std::cout<<"time out"<<std::endl;}else{std::cout<<"wait error"<<std::endl;}}}void Accepter(){std::string clientip;uint16_t clientport;int sock =_listensock->Accept(&clientip,&clientport);if(sock>0){//获取到新连接再插入到红黑树当中_epoller->Epollctl(EPOLL_CTL_ADD,sock,EVENT_IN);}else{std::cout<<"accept error"<<std::endl;}}void Dispatcher(int fd){char buffer[1024];// 已经获取新连接,开始readint n = read(fd, buffer, sizeof(buffer) - 1);if (n > 0){buffer[n] = 0;std::cout << "读取到数据:" << buffer << std::endl;//读完之后写回给客户端std::string echo_string="server get message";std::string content=echo_string+std::string(buffer);write(fd,content.c_str(),content.size());}else if(n==0){std::cout << "client close:"<< std::endl;//删除对应的文件描述符 _epoller->Epollctl(EPOLL_CTL_DEL,fd,0); //从红黑树中拿出去,再关闭fdclose(fd);}else{std::cout << "read error!!" << std::endl;_epoller->Epollctl(EPOLL_CTL_DEL,fd,0); //从红黑树中拿出去,再关闭fdclose(fd);}}//这里的事件假设就两种,读 写void HandlerEvent( struct epoll_event evns[],int num){for(int i=0;i<num;i++){uint32_t event=evns[i].events;int fd=evns[i].data.fd; //判断是哪一种事件if(event&EVENT_IN){//判断fd是等待的fd,还是已经获取新连接的fdif(fd==_listensock->Fd()){Accepter();}else{Dispatcher(fd);}}else if(event & EVENT_OUT){//写事件就绪}}}~EpollServer(){_listensock->Close();}private:std::shared_ptr<Sock> _listensock;std::shared_ptr<Epoll> _epoller;uint16_t _port;};

4.epoll的工作模式

epoll的工作模式有利各种:ET 和LT

LT(level triggered)工作模式--水平工作模式

epoll 默认状态下就是 LT 工作模式 .
1.当 epoll 检测到 socket 上事件就绪的时候 , 可以不立刻进行处理 . 或者只处理一部分 .
如上面的例子 , 由于只读了 1K 数据 , 缓冲区中还剩 1K 数据 , 在第二次调用 epoll_wait , epoll_wait 仍然会立刻返回并通知socket 读事件就绪 .
直到缓冲区上所有的数据都被处理完 , epoll_wait 才不 会立刻返回 .
2.支持阻塞读写和非阻塞读写

 通俗点说,如果上层有新的数据,你不拿走,上层就一直通知你让你取数据,只要有就拿走。

ET(Edge Triggered)工作模式  --边缘工作模式

如果我们在第1步将socket添加到epoll描述符的时候使用了EPOLLET标志, epoll进入ET工作模式.

epoll 检测到 socket 上事件就绪时 , 必须立刻处理 .
如上面的例子 , 虽然只读了 1K 的数据 , 缓冲区还剩 1K 的数据 , 在第二次调用 epoll_wait 的时候 ,
epoll_wait 不会再返回了 .
也就是说 , ET 模式下 , 文件描述符上的事件就绪后 , 只有一次处理机会 .
ET 的性能比 LT 性能更高 ( epoll_wait 返回的次数少了很多 ). Nginx 默认采用 ET 模式使用 epoll.
只支持非阻塞的读写

通俗点说,只有发生变化(从没链接到有新连接,从一个变多个,从多个变没),就会进行通知取走数据,只有变化,才会拿走。

由于通知的次数少,且通知是有限的,所以一般而言ET的工作效率更高一些,但也因为之中特性,程序员必须每次把数据都取完(循环读取,直到读取出错),否则就会遗漏。

但是读完数据,我们此时的read是阻塞的,我们不能让她阻塞,这会导致服务器挂起(这是不好的),所以我们需要设置读是非阻塞的。直到非阻塞状态下还读完了,就会返回错误码。

且由于我们把缓冲区的数据都把走了,这一位则TCP通信时,窗口就更大了,一次拿去的数据也多了。

注意:

即使如此,一般情况ET比LT高效,但是你是一直比LT高效吗,而且既然你ET都能一次性非阻塞的读取完,我LT不行吗?只是我们一般去使用ET,如果可以,你也可以去使用LT修改。

读写的改进 reactor

所以我们的读写是不合理的,首先没有协议的定制,其次读取并不是非阻塞的。

这里就还是以编写计算器的客户端与服务端为例:

重要的读写部分在TcpServer.hpp中:

TcpServer.hpp

#pragma once
#include<string>
#include<iostream>
#include<memory>
#include<functional>
#include<unordered_map>
#include <arpa/inet.h>
#include"Nocopy.hpp"
#include"Socket.hpp"
#include"Epoller.hpp"
#include"Comen.hpp"
#include"Protocol.hpp"
#include"Calculator.hpp"class Connection;
class TcpServer;
using funct=std::function<void (std::shared_ptr<Connection>)>; //定义了返回值为void,参数为td::shared_ptr<Connection>指针的包装器
const static int buffersize=1024;
const uint16_t defaultport=8080;                                                                                                    
const int num=64; 
class Connection
{public:void SetHandle(funct &recv_cb,funct &send_cb,funct &except_cb){_recver_callback=recv_cb;_send_callback=send_cb;_except_callback=except_cb;}void Appendinbuffer(std::string buffer){_inbuffer+=buffer;//输入缓冲区}void Appendoubuffer(std::string buffer){_outbuffer+=buffer;}const std::string &inbuffer(){return _inbuffer;}Connection(int sock,std::shared_ptr<TcpServer> tcpserver_ptr):_sock(sock),_tcpserver_callback(tcpserver_ptr)//回值指针用来传递TCPserver{}int getsock(){return _sock;}std::string& getinbuf(){return _inbuffer;}std::string& getoutbuf(){return _outbuffer;}~Connection(){}private:int _sock;private:std::string _inbuffer;//输入缓冲区std::string _outbuffer;//输出缓冲区public:funct _recver_callback; //接收回调funct _send_callback;   //发送回调funct _except_callback; //异常回调//tcpserver回值指针std::shared_ptr<TcpServer> _tcpserver_callback;
};class TcpServer
{public:TcpServer(funct OnMessage,uint16_t port=defaultport):_epoll_server(new Epoll),_listensocket(new Sock),_port(port),_quit(true)//回调函数用来设置处理事件{_OnMessage=OnMessage;_listensocket->Createsockfd();_listensocket->Bind(_port);_listensocket->Listen();SetNonBlock(_listensocket->Fd()); //将文件设置为非阻塞等待AddConnection(_listensocket->Fd(),EVENT_IN,std::bind(&TcpServer::Accepter,this,std::placeholders ::_1),nullptr,nullptr);}void Init(){}void Accepter(std::shared_ptr<Connection> connection){//连接到来开始接受while(true){struct sockaddr_in peer;socklen_t len=sizeof(peer);int fd=::accept(connection->getsock(),(sockaddr*)&peer,&len); //获取新链接if(fd>0){uint16_t clientport=ntohs(peer.sin_port);char ipbuffer[10];inet_ntop(AF_INET,&peer.sin_addr,ipbuffer,sizeof(ipbuffer));std::string ip=std::string(ipbuffer);std::cout<<"get new link,ip:"<<ip<<",fd :"<<fd<<",port"<<clientport<<std::endl;//获取的新连接设置非阻塞,现在是ET模式SetNonBlock(fd);//之后再添加到哈希表中,epoll_event中,AddConnection(fd,EVENT_IN,\std::bind(&TcpServer::Recver,this,std::placeholders::_1),   //根据函数地址与参数bind成一个包装器对象std::bind(&TcpServer::Sender,this,std::placeholders::_1),\std::bind(&TcpServer::Exepter,this,std::placeholders::_1));}else{//直到非阻塞被读到没有新的连接了if(errno==EWOULDBLOCK){break;}else if(errno==EINTR){continue;}else{//读出错break;}}}}void AddConnection(int sock,uint32_t event,funct recv_cb,funct send_cb,funct except_cb){// 将listensock添加到epoll中检测你关心的读写事件,这里就是添加 并且设置为边缘工作模式_epoll_server->Epollctl(EPOLL_CTL_ADD, sock, event);//c除此之外,每一个fd都要构建成Connetcion对象,通过connect管理事件的处理std::shared_ptr<Connection> new_connection=std::make_shared<Connection>(sock,std::shared_ptr<TcpServer>(this));new_connection->SetHandle(recv_cb,send_cb,except_cb);//第三步,就是添加到unordered_map_connection_map[sock]=new_connection;}//事件管理,进行事件的处理void Recver(std::shared_ptr<Connection> connection){int sock=connection->getsock();//ET模式读while(true){char buffer[buffersize];memset(buffer,0,sizeof(buffer));ssize_t n=recv(sock,buffer,sizeof(buffer)-1,0); //这里的数据默认为字符流,如果还有二进制,需要换为vector if(n>0){//读取成功connection->Appendinbuffer(buffer);}else if(n==0){//连接关闭std::cout<<"link closed !"<<std::endl;connection->_except_callback(connection);}else{//读取错误if(errno==EWOULDBLOCK){break;}else if(errno==EINTR){continue;}else{std::cout<<"read error!"<<std::endl;connection->_except_callback(connection);break;}}}//读完之后,将数据交给上层_OnMessage(connection); //协议的定制}void Sender(std::shared_ptr<Connection> connection){while(true){std::string &buffer=connection->getoutbuf();int sockfd=connection->getsock();ssize_t n=send(sockfd,buffer.c_str(),buffer.size(),0);if(n>0){//清空缓冲区buffer.erase(0,n);if(buffer.empty())break;}else if(n==0){return ;}else {if(errno==EWOULDBLOCK){break;}else if(errno==EINTR){continue;//信号中断,跳过本次}else {//发失败了,进入异常处理std::cout<<"send error"<<std::endl;connection->_except_callback(connection);return;}}if(!buffer.empty()){//对写事件的关心EnableEvent(connection->getsock(),true,true);}else{//关闭对写事件的处理EnableEvent(connection->getsock(),true,false);}}}void EnableEvent(int fd,bool readable,bool writeable ){uint32_t event=0;event |=((readable ? EPOLLIN:0)|(writeable ? EPOLLOUT : 0)|EPOLLET);_epoll_server->Epollctl(EPOLL_CTL_MOD,fd,event);//修改事件的读写,如果没发完,就继续发,通过修改事件为写}void Exepter(std::shared_ptr<Connection> connection){//处理链接异常或着断开std::cout<<"事件异常,断开连接!"<<std::endl;//首先移除链接//EnableEvent(connection->getsock(),false,false);//读写都设置为False_epoll_server->Epollctl(EPOLL_CTL_DEL,connection->getsock(),0);//移出事件 //关闭异常的文件描述符close(connection->getsock());std::cout<<"已移除connection,已关闭文件描述符:"<<connection->getsock()<<std::endl;//把map中的也删了}bool isConnectionsafe(int fd){auto it=_connection_map.find(fd);if(it==_connection_map.end()){return false;}return true;}void Disptcher(int timeout){//进行等待就绪时间int n=_epoll_server->Epollwait(events,num,timeout);for(int i=0;i<n;i++){uint32_t event=events[i].events;int sock=events[i].data.fd;//进行事件判断,并转化成读写事件if((event & EVENT_IN)&& isConnectionsafe(sock)){//读就绪if(_connection_map[sock]->_recver_callback)//函数存在{_connection_map[sock]->_recver_callback(_connection_map[sock]);//fd对应的connetcion信息,其中调用对应的方法,参数为connection对象}}if((event & EPOLLOUT)&& isConnectionsafe(sock)){//写就绪if(_connection_map[sock]->_recver_callback){_connection_map[sock]->_send_callback(_connection_map[sock]);}}if(event & EPOLLHUP){//读关闭event |=(EPOLLIN & EPOLLOUT);}if(event &EPOLLERR){//错误event |=(EVENT_IN & EVENT_OUT);}}}void Loop(){_quit=false;while (!_quit){Disptcher(3000);//3s}_quit=true;}~TcpServer(){}private:std::shared_ptr<Epoll> _epoll_server;  //epoll接口std::unordered_map<int,std::shared_ptr<Connection>> _connection_map;  //连接池std::shared_ptr<Sock> _listensocket; //套接字接口struct epoll_event events[num];uint32_t _port;bool _quit; //是否退出funct _OnMessage;//上层回调处理信息}; 

剩余代码的所在地:reactor · 但成伟/编程学习 - 码云 - 开源中国 (gitee.com)

主要难点是对事件与链接的管理,事件的执行,读写非阻塞这几点。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/8168.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

从互联网医院源码到搭建:开发视频问诊小程序的技术解析

如今&#xff0c;视频问诊小程序作为医疗服务的一种新形式&#xff0c;正逐渐受到人们的关注和青睐。今天&#xff0c;小编将为您详解视频问诊小程序的开发流程。 一、背景介绍 互联网医院源码是视频问诊小程序开发的基础&#xff0c;它提供了一套完整的医疗服务系统框架&…

zlib编译后静态库调用时遇到的无法解析的外部符号问题

编译zlib的静态库后引用到项目中使用&#xff0c;发现报下面的链接错误&#xff1a; error LNK2019: 无法解析的外部符号 _zlibVersion error LNK2019: 无法解析的外部符号 _deflateEnd error LNK2019: 无法解析的外部符号 _deflate error LNK2019: 无法解析的外部符号 _deflat…

【Linux 性能详解】CPU性能篇

目录 平均负载&#xff08;Load Average&#xff09; CPU上下文切换 进程上下文切换 线程上下文切换 中断上下文切换 中断 硬中断 软中断 CPU使用率 性能分析工具 平均负载&#xff08;Load Average&#xff09; 平均负载&#xff1f;这个词对很多人来说&#xff0c…

构建第一个ArkTS应用之@AppStorage:应用全局的UI状态存储

AppStorage是应用全局的UI状态存储&#xff0c;是和应用的进程绑定的&#xff0c;由UI框架在应用程序启动时创建&#xff0c;为应用程序UI状态属性提供中央存储。 和AppStorage不同的是&#xff0c;LocalStorage是页面级的&#xff0c;通常应用于页面内的数据共享。而AppStora…

中国护照照片尺寸分辨率要求及居家自拍制作教程

经常出国的小伙伴都知道&#xff0c;护照照片作为出国旅行的重要身份证明文件&#xff0c;其规格和质量要求非常严格。本文将详细介绍中国护照照片的具体要求&#xff0c;并提供一些实用的居家自拍技巧&#xff0c;帮助您轻松拍出符合规定的护照照片&#xff08;手机和相机居家…

革新品质检测,质构科技重塑肉类行业新篇章

革新品质检测&#xff0c;质构科技重塑肉类行业新篇章 在现代社会&#xff0c;消费者对食品安全和品质的要求日益提升&#xff0c;特别是在肉类行业。为了满足这一市场需求&#xff0c;质构科技凭借其精准、高效的优势&#xff0c;正逐渐成为肉类品质检测的新星。今天&#xf…

QT-TCP通信

网上的资料太过于书面化&#xff0c;所以看起来有的让人云里雾里&#xff0c;看不懂C-tcpsockt和S-tcpsocket的关系 所以我稍微画了一下草图帮助大家理解两个套接字之间的关系。字迹有的飘逸勉强看看 下面是代码 服务端&#xff1a; MainWindow::MainWindow(QWidget *parent) …

windows10打印机共享完美解决方案

提到文件共享大家并不陌生,相关的还有打印机共享,这个多见于单位、复印部,在一个区域网里多台电脑共用一台打印机,打印资料非常方便,就包括在家里,我们现在一般都会有多台电脑或设备,通过家庭网络联接,如果共享一台打印机的话也是件便捷的事。 但是随着操作系统的更新…

web前端框架设计第八课-表单控件绑定

web前端框架设计第八课-表单控件绑定 一.预习笔记 1.v-model实现表单数据双向绑定 2.搜索数据的实现 3.全选案例实现1—JQ方法 4.单选案例实现 二.课堂笔记 三.课后回顾 –行动是治愈恐惧的良药&#xff0c;犹豫拖延将不断滋养恐惧

如何阅读:一个已被证实的低投入高回报的学习方法的笔记

系列文章目录 如何有效阅读一本书笔记 如何阅读&#xff1a;一个已被证实的低投入高回报的学习方法 麦肯锡精英高效阅读法笔记 读懂一本书笔记 文章目录 系列文章目录第一章 扫清阅读障碍破解读不快、读不进去的谜题一切为了阅读小学教师让你做&#xff0c;但中学老师阻止你做的…

快速搭建webase-front并且部署合约

PS: 因为我开发时候要用到fisco和webase-front,避免官方文档粘贴, 因此直接整理下面的笔记。开发的时候,好粘贴。1.搭建4节点联盟链 前提 curl 一种命令行工具 apt install -y openssl curl创建操作目录, 下载安装脚本 cd ~ && mkdir -p fisco && cd fisco…

【京东电商API接口】 | 京东某商品销量数据分析可视化

Python当打之年 当打之年&#xff0c;专注于各领域Python技术&#xff0c;量的积累&#xff0c;质的飞跃。后台回复&#xff1a;【可视化项目源码】可获取可视化系列文章源码和数据 本期将利用Python分析「京东商品数据接口」&#xff0c;希望对大家有所帮助&#xff0c;如有疑…

Quartz怎么简单创建一个定时执行的任务

1.安装Quartz包 2.编写Job任务 继承 IJob编辑自定义任务 3.调用job&#xff0c;以指定时间策略执行 定时600s执行一次 StdSchedulerFactory factory new StdSchedulerFactory(); IScheduler scheduler await factory.GetScheduler(); await scheduler.Start();// 定义…

带你快速掌握Spring Task

Spring Task ⭐Spring Task 是Spirng框架提供的任务调度工具&#xff0c;可以按照约定的时间自动执行某个代码逻辑 &#x1f4cc;一款定时任务框架 应用场景 信用卡信息银行贷款信息火车票信息 只要是需要定时处理的场景都可以使用Spring Task 只要有定时&#xff0c;就会有…

用js代码实现贪吃蛇小游戏

js已经学了大部分了&#xff0c;现在就利用我所学的js知识试试做贪吃蛇小游戏吧 以下部分相关图片以及思路笔记均出自渡一陈老师的视频 首先制作简单的静态页面&#xff0c;添加贪吃蛇移动的背景和相关图片&#xff0c;比如开始游戏等等 将各个功能均封装在函数中&#xff0…

react【实用教程】 搭建开发环境(2024版)Vite+React (官方推荐)

以项目名 reactDemo为例 1. 下载脚手架 在目标文件夹中打开命令行 npm create vite2. 安装项目依赖 cd reactDemo npm i若安装失败&#xff0c;则修改下载源重试 npm config set registry https://registry.npmmirror.com3. 启动项目 npm run dev4. 预览项目 浏览器访问 http…

iPhone 数据恢复软件 – 恢复丢失的 iPhone 数据

恢复丢失的 iPhone 数据&#xff0c;奇客数据恢复iPhone版。如今的 iPhone 用户在他们的设备上存储了大量数据&#xff0c;从照片和与亲人的文本对话到商业和医疗信息。其中一些是保密的&#xff1b;其中大部分内容都是非常个人化的&#xff1b;而且大多数一旦丢失就无法替代。…

vmware虚拟机内删除文件后宿主机空间不释放

问题描述 linux下&#xff0c;vmware内虚拟机删除文件&#xff0c;宿主机空间不释放&#xff0c;D盘快满了 解决方法 通过vmware-toolbox进行空间回收 安装 在虚拟机内操作 yum install -y open-vm-tools 清理 在虚拟机内操作 #查看磁盘的挂载点 sudo /usr/bin/vmware…

Agent AI智能体:塑造未来社会的智慧力量

&#x1f525; 个人主页&#xff1a;空白诗 文章目录 &#x1f916; Agent AI智能体&#xff1a;塑造未来社会的智慧力量&#x1f3af; 引言&#x1f331; 智能体的未来角色预览&#x1f4bc; 行业革新者&#x1f31f; 创意合作者&#x1f6e1;️ 公共安全与环保&#x1f680; …

鸿蒙 Next 模拟器 体验

参加华为社区相关Next 的活动&#xff0c;只要申请通过就可以下载模拟器。整个过程稍微慢些&#xff0c;大家可以根据活动相关信息&#xff0c;加入微信群。跟踪催促进度。争取早日体验 next 。 目前模拟器里边还是空空的&#xff0c;没有什么内置 APP &#xff0c;但是足够大…