Linux学习记录——사십사 高级IO(5)--- Epoll型服务器(2)(Reactor)

文章目录

  • 1、完善Epoll简单服务器
  • 2、打造统一的分开处理的体系
  • 3、epoll工作模式
  • 4、ET模式
  • 5、继续完善,处理写事件
  • 6、引入自定义协议,处理写事件


本篇基于上篇代码继续改进,很长。关于Reactor的说明在后一篇

1、完善Epoll简单服务器

上面的代码在处理读事件时,用的request数组是临时的,如果有数据没读完,那么下次再来到这里,就没有这些数据了。所以得让每一个fd都有自己的缓冲区。建立一个Connection类,然后有一个map结构,让这个类和每个fd建立映射。Start函数改一下,不管超时还是出错,就只处理数据,处理的部分交给HandlerEvent,改名成LoopOnce,也就是说,Start那里还是有循环,每次循环都去执行L函数,L函数用Wait提取一次,然后处理。

    void Start(){//1、将listensock添加到epoll中,要先有epoll模型bool r = epoller_.AddEvent(listensock_.Fd(), EPOLLIN);//只关心读事件assert(r);//可以做别的判断(void)r;struct epoll_event revs_[gnum];int timeout = 1000;while(true){LoopOnce(timeout);}}void Accepter(){std::string clientip;uint16_t clientport;int sock = listensock_.Accept(&clientip, &clientport);if (sock < 0) return ;logMessage(Debug, "%s:%d 已经连上服务器了", clientip.c_str(), clientport);// 还不能recv,即使有了连接但也不知道有没有数据// 只有epoll知道具体情况,所以将sock添加到epoll中bool r = epoller_.AddEvent(sock, EPOLLIN);assert(r);(void)r;}void Recver(int fd){char request[1024];ssize_t s = recv(fd, request, sizeof(request) - 1, 0);if (s > 0){request[s - 1] = 0; // 对打印格式request[s - 2] = 0; // 做一下调整std::string response = func_(request);send(fd, response.c_str(), response.size(), 0);}else{if (s == 0)logMessage(Info, "client quit ...");elselogMessage(Warning, "recv error, client quit...");close(fd);// 将文件描述符移除// 在处理异常的时候,fd必须合法才能被处理epoller_.DelEvent(fd);}}void LoopOnce(int timeout){int n = epoller_.Wait(revs_, gnum, timeout);for(int i = 0; i < n; i++){int fd = revs_[i].data.fd;uint32_t events = revs_[i].events;logMessage(Debug, "当前正在处理%d上的%s", fd, (events&EPOLLIN) ? "EPOLLIN" : "OTHER");if(events & EPOLLIN)//判断读事件就绪{if (fd == listensock_.Fd()){// 1、新连接到来Accepter();}else{// 2、读事件Recver(fd);}}}}
class Connection
{
public:Connection(int fd): fd_(fd){}~Connection(){}
public:int fd_;std::string inbuffer_;std::string outbuffer_;
};std::unordered_map<int, Connection*> connections_;

把Start的初始化任务交给InitServer

    void InitServer(){listensock_.Socket();listensock_.Bind(port_);listensock_.Listen();epoller_.Create();logMessage(Debug, "init server success");//为listensock创建对应的connection对象Connection* conn = new Connection(listensock_.Fd());//将listensock和connection对象添加到connections_connections_.insert(std::pair<int, Connection*>(listensock_.Fd(), conn));//将listensock添加到epoll中bool r = epoller_.AddEvent(listensock_.Fd(), EPOLLIN);assert(r);(void)r;}void Start(){struct epoll_event revs_[gnum];int timeout = 1000;while(true){LoopOnce(timeout);}}

同样地,Accepter有添加到epoll的fd也要映射上自己的Connection类,Recver那里就可以也改一下了

    void Accepter(){std::string clientip;uint16_t clientport;int sock = listensock_.Accept(&clientip, &clientport);if (sock < 0) return ;logMessage(Debug, "%s:%d 已经连上服务器了", clientip.c_str(), clientport);// 还不能recv,即使有了连接但也不知道有没有数据// 只有epoll知道具体情况,所以将sock添加到epoll中        Connection* conn = new Connection(sock);      connections_.insert(std::pair<int, Connection*>(sock, conn));bool r = epoller_.AddEvent(sock, EPOLLIN);assert(r);(void)r;}void Recver(int fd){char request[1024];ssize_t s = recv(fd, request, sizeof(request) - 1, 0);if (s > 0){request[s - 1] = 0; // 对打印格式request[s - 2] = 0; // 做一下调整connections_[fd]->inbuffer_ += request;std::string response = func_(request);send(fd, response.c_str(), response.size(), 0);}else{if (s == 0)logMessage(Info, "client quit ...");elselogMessage(Warning, "recv error, client quit...");close(fd);// 将文件描述符移除// 在处理异常的时候,fd必须合法才能被处理epoller_.DelEvent(fd);}}

所有就绪的fd,不只包含我们关心的fd,都要有Connection类。Accepter那里,得到连接后,获取套接字,不直接读取,因为不知道是否有数据,就交给epoll,不过获取套接字后,每个套接字都需要正确读取自己的报文,所以Connection有了两个buffer。

所有就绪的fd,不仅要有Connection类,还要被epoll管理。但这样的代码并不高效,删除的时候要从epoll里删,还要从connections_里删,且代码也不够简洁。

封装并修改一下形式

class Connection
{
public:Connection(const int& fd, const std::string& clientip, const uint16_t& clientport): fd_(fd), clientip_(clientip), clientport_(clientport){}~Connection(){}
public:int fd_;std::string inbuffer_;std::string outbuffer_;std::string clientip_;uint16_t clientport_;
};//...void InitServer(){listensock_.Socket();listensock_.Bind(port_);listensock_.Listen();epoller_.Create();//为listensock创建对应的connection对象//将listensock和connection对象添加到connections_       //将listensock添加到epoll中AddConnection(listensock_.Fd(), EPOLLIN);logMessage(Debug, "init server success");}void AddConnection(int fd, uint32_t events, std::string ip = "127.0.0.1", uint16_t port = gport){//1、构建connection对象,交给connections_管理Connection* conn = new Connection(fd, ip, port);connections_.insert(std::pair<int, Connection*>(fd, conn));//2、fd和events写到内核中bool r = epoller_.AddEvent(fd, events);assert(r);(void)r;logMessage(Debug, "AddConnection success, fd: %d, clientinfo: [%s:%d]", fd, ip.c_str(), port);}void Accepter(){std::string clientip;uint16_t clientport;int sock = listensock_.Accept(&clientip, &clientport);if (sock < 0) return ;logMessage(Debug, "%s:%d 已经连上服务器了", clientip.c_str(), clientport);// 还不能recv,即使有了连接但也不知道有没有数据// 只有epoll知道具体情况,所以将sock添加到epoll中        AddConnection(sock, EPOLLIN, clientip, clientport);}void Recver(int fd){char request[1024];ssize_t s = recv(fd, request, sizeof(request) - 1, 0);if (s > 0){request[s - 1] = 0; // 对打印格式request[s - 2] = 0; // 做一下调整connections_[fd]->inbuffer_ += request;std::string response = func_(request);send(fd, response.c_str(), response.size(), 0);}else{if (s == 0)logMessage(Info, "client quit ...");elselogMessage(Warning, "recv error// 在处理异常的时候,fd必须合法才能被处理epoller_.DelEvent(fd);}}

2、打造统一的分开处理的体系

现有的Accepter、Recver都是处理写事件的,LoopOnce那里可以加个读事件的判断,但相关的处理函数要怎么写?为了简便,这里再引入回调函数。

const static int gport = 8888;
class Connection;using func_t = std::function<std::string (std::string)>;
using callback_t = std::function<void(Connection*)>;class Connection
{
public:Connection(const int& fd, const std::string& clientip, const uint16_t& clientport): fd_(fd), clientip_(clientip), clientport_(clientport){}void Register(callback_t recver, callback_t sender, callback_t excepter){recver_ = recver;sender_ = sender;excepter_ = excepter;}~Connection(){}
public://IO信息int fd_;std::string inbuffer_;std::string outbuffer_;//IO处理callback_t recver_;callback_t sender_; callback_t excepter_;//用户信息std::string clientip_;uint16_t clientport_;
};

Register为注册方法,也就是要使用的方法。在AddConnection函数中,要判断一下,是我们关心的和不是我们关心的,都调用注册方法,但传的参数不一样。

    void AddConnection(int fd, uint32_t events, std::string ip = "127.0.0.1", uint16_t port = gport){//2、构建connection对象,交给connections_管理Connection* conn = new Connection(fd, ip, port);if(fd == listensock_.Fd()){conn->Register();}else{conn->Register();}connections_.insert(std::pair<int, Connection*>(fd, conn));//3、fd和events写到内核中bool r = epoller_.AddEvent(fd, events);assert(r);(void)r;logMessage(Debug, "AddConnection success, fd: %d, clientinfo: [%s:%d]", fd, ip.c_str(), port);}

Accepter那里,里面有AddConnection函数。当LoopOnce调用Accepter时,这个函数也要用回调函数,这样就是一个类的成员函数要调用另一个类的回调函数。

    void Accepter(Connection* conn){(void) conn;//先闲置不用std::string clientip;uint16_t clientport;int sock = listensock_.Accept(&clientip, &clientport);if (sock < 0) return ;logMessage(Debug, "%s:%d 已经连上服务器了", clientip.c_str(), clientport);// 还不能recv,即使有了连接但也不知道有没有数据// 只有epoll知道具体情况,所以将sock添加到epoll中        AddConnection(sock, EPOLLIN, clientip, clientport);}

AddConnection中,Regsiter三个参数都是callback_t类型的,我们可以这样写

        if(fd == listensock_.Fd()){conn->Register(std::bind(&EpollServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);}

这样设置,当我们关心的套接字上有事件就绪时,读方法就绑定Accepter。是其它套接字的话

        else{conn->Register(std::bind(&EpollServer::Recver, this, std::placeholders::_1),std::bind(&EpollServer::Sender, this, std::placeholders::_1),std::bind(&EpollServer::Excepter, this, std::placeholders::_1));}
    void AddConnection(int fd, uint32_t events, std::string ip = "127.0.0.1", uint16_t port = gport){//2、构建connection对象,交给connections_管理Connection* conn = new Connection(fd, ip, port);if(fd == listensock_.Fd()){conn->Register(std::bind(&EpollServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);}else{conn->Register(std::bind(&EpollServer::Recver, this, std::placeholders::_1),std::bind(&EpollServer::Sender, this, std::placeholders::_1),std::bind(&EpollServer::Excepter, this, std::placeholders::_1));}connections_.insert(std::pair<int, Connection*>(fd, conn));//3、fd和events写到内核中bool r = epoller_.AddEvent(fd, events);assert(r);(void)r;logMessage(Debug, "AddConnection success, fd: %d, clientinfo: [%s:%d]", fd, ip.c_str(), port);}

Recver和Sender函数要传的参数都是Connection* conn,Sender和Excepter下面再写。这样的设计主要是为了更集中实现功能,代码分明。

在LoopOnce就这样写:

    void LoopOnce(int timeout){int n = epoller_.Wait(revs_, gnum, timeout);for(int i = 0; i < n; i++){int fd = revs_[i].data.fd;uint32_t events = revs_[i].events;logMessage(Debug, "当前正在处理%d上的%s", fd, (events&EPOLLIN) ? "EPOLLIN" : "OTHER");if(events & EPOLLIN) connections_[fd]->recver_(connections_[fd]);if(events & EPOLLOUT) connections_[fd]->sender_(connections_[fd]);if((events & EPOLLERR) || (events & EPOLLHUP)) connections_[fd]->excepter_(connections_[fd]);}}

这样就形成了一整个体系。写事件,读事件,其它事件都有了处理。当服务器启动后,服务器就监听事件,一旦事件就绪,就会根据不同的事件类型来派发事件到不同的Connection中,由Connection来调用对应的函数来处理。

这时候,Start函数就是事件派发器,可以写为Disptcher()。接下来要写Recver、Sender、Excepter。

3、epoll工作模式

select,poll,epoll三个,一旦有事件就绪,如果上层不取,底层就会一直通知事件就绪,这种模式叫做LT模式,水平触发Level Triggered工作模式。epoll默认LT,另有一个ET模式,边缘触发Edge Triggered工作模式,在数据变化时只通知一次,变化就是从无到有,从有到多。ET倒逼程序员必须一次将本轮数据全部读取完毕,怎样保证读完?可以循环读取,直到某次读取的数量比每次要的量少,比如等于0或者小于这个数,就说明读完了;但因为recv/read是默认阻塞的,所以循环读取可能阻塞住,比如读完几次后刚好全部读完,那么下次读取就阻塞了,所以ET模式下,所有的读取和写入都必须是非阻塞的接口。

LT也可以在非阻塞的情况写入,读取,当然也可以在阻塞模式下工作。但LT也不能代替ET,因为代码无法统一起来,而ET只能是非阻塞,ET倒逼程序员写成它自己的形式。ET通知效率 >= LT,IO效率也是一样。

一次通知就是一次系统调用返回,一次返回必定对应一次调用,ET能有效减少系统调用次数。ET倒逼程序员尽快取走数据的本质是让TCP底层更新出更大的接收窗口,以较大概率地增加对方的滑动窗口的大小,提高发送效率。

ET并非能替代LT,ET适合高IO场景,LT能够读一部分就处理一部分,ET必须得读完才行。epoll接口默认LT。

4、ET模式

ET的设置是一个宏,EPOLLET。在我们的InitServer初始化函数中加上这个宏就行。下面也放了AddConnection的代码。

    void InitServer(){listensock_.Socket();listensock_.Bind(port_);listensock_.Listen();epoller_.Create();//为listensock创建对应的connection对象//将listensock和connection对象添加到connections_       //将listensock添加到epoll中AddConnection(listensock_.Fd(), EPOLLIN | EPOLLET);logMessage(Debug, "init server success");}void AddConnection(int fd, uint32_t events, std::string ip = "127.0.0.1", uint16_t port = gport){//2、构建connection对象,交给connections_管理Connection* conn = new Connection(fd, ip, port);if(fd == listensock_.Fd()){conn->Register(std::bind(EpollServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);}else{conn->Register(std::bind(EpollServer::Recver, this, std::placeholders::_1),std::bind(EpollServer::Sender, this, std::placeholders::_1),std::bind(EpollServer::Excepter, this, std::placeholders::_1));}connections_.insert(std::pair<int, Connection*>(fd, conn));//3、fd和events写到内核中bool r = epoller_.AddEvent(fd, events);assert(r);(void)r;logMessage(Debug, "AddConnection success, fd: %d, clientinfo: [%s:%d]", fd, ip.c_str(), port);}

除此之外,监听的套接字也得设置成非阻塞,用fcntl接口。写一个Util.hpp

#pragma once#include <iostream>
#include <unistd.h>
#include <fcntl.h>class Util
{
public:static bool SetNonBlock(int fd){int fl = fcntl(fd, F_GETFL);if(fl < 0) return false;fcntl(fd, F_SETFL, fl | O_NONBLOCK);return true;}
};

在EpollServer.hpp中

    void AddConnection(int fd, uint32_t events, std::string ip = "127.0.0.1", uint16_t port = gport){//1、设置fd非阻塞if(events & EPOLLET) Util::SetNonBlock(fd);//2、构建connection对象,交给connections_管理Connection* conn = new Connection(fd, ip, port);

Accepter里

AddConnection(sock, EPOLLIN | EPOLLET, clientip, clientport); 

5、继续完善,处理写事件

现在的代码,从InitServer,传listensock_.Fd()到AddConnection中,然后会调用Accetper函数,但只能读一个连接,得改成循环的。给Connection类加一个成员uint32_t events,AddConnection函数中在插入connections_数组前给conn->events赋值,Accepter函数中传过来的参数是conn,通过events来判断是否需要循环。

class Connection
{
public:Connection(const int& fd, const std::string& clientip, const uint16_t& clientport): fd_(fd), clientip_(clientip), clientport_(clientport){}void Register(callback_t recver, callback_t sender, callback_t excepter){recver_ = recver;sender_ = sender;excepter_ = excepter;}~Connection(){}
public://IO信息int fd_;std::string inbuffer_;std::string outbuffer_;//IO处理callback_t recver_;callback_t sender_; callback_t excepter_;//用户信息std::string clientip_;uint16_t clientport_;uint32_t events;
};void AddConnection(int fd, uint32_t events, std::string ip = "127.0.0.1", uint16_t port = gport){//1、设置fd非阻塞if(events & EPOLLET) Util::SetNonBlock(fd);//2、构建connection对象,交给connections_管理Connection* conn = new Connection(fd, ip, port);if(fd == listensock_.Fd()){conn->Register(std::bind(EpollServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);}else{conn->Register(std::bind(EpollServer::Recver, this, std::placeholders::_1),std::bind(EpollServer::Sender, this, std::placeholders::_1),std::bind(EpollServer::Excepter, this, std::placeholders::_1));}conn->events = events;connections_.insert(std::pair<int, Connection*>(fd, conn));//3、fd和events写到内核中bool r = epoller_.AddEvent(fd, events);assert(r);(void)r;logMessage(Debug, "AddConnection success, fd: %d, clientinfo: [%s:%d]", fd, ip.c_str(), port);}void Accepter(Connection* conn){do{std::string clientip;uint16_t clientport;int sock = listensock_.Accept(&clientip, &clientport);if (sock < 0) return;logMessage(Debug, "%s:%d 已经连上服务器了", clientip.c_str(), clientport);// 还不能recv,即使有了连接但也不知道有没有数据// 只有epoll知道具体情况,所以将sock添加到epoll中AddConnection(sock, EPOLLIN | EPOLLET, clientip, clientport);} while (conn->events & EPOLLET);//如果是ET模式就循环,不是就退出}

是我们需要的,就走Accetper方法,不是就走Recver,Sender,Excepter方法。上面Accepter函数中的Accept函数有返回值,这里的处理就是如果出错就返回,不过现在得处理出错,如果不出错才能继续走AddConnection函数。先在Sock.hpp中改一下,加上err参数,err = errno。

    int Accept(std::string* clientip, uint16_t* clientport, int* err){struct sockaddr_in temp;socklen_t len = sizeof(temp);int sock = accept(_sock, (struct sockaddr*)&temp, &len);*err = errno;if(sock < 0){logMessage(Warning, "accept error, code: %d, errstring: %s", errno, strerror(errno));}else{*clientip = inet_ntoa(temp.sin_addr);//这个函数就可以从结构体中拿出ip地址,转换好后返回*clientport = ntohs(temp.sin_port);}return sock;}

Accepter函数

    void Accepter(Connection* conn){do{int err = 0;std::string clientip;uint16_t clientport;int sock = listensock_.Accept(&clientip, &clientport, &err);if (sock < 0){logMessage(Debug, "%s:%d 已经连上服务器了", clientip.c_str(), clientport);// 还不能recv,即使有了连接但也不知道有没有数据// 只有epoll知道具体情况,所以将sock添加到epoll中AddConnection(sock, EPOLLIN | EPOLLET, clientip, clientport);}else{if(err == EAGAIN || err == EWOULDBLOCK) break;//读完了,缓冲区满了else if(err == EINTR) continue;//有信号暂时中断,后续还得继续读else//异常,本次获取连接失败,继续读下一个连接{logMessage(Warning, "errstring: %s, errcode: %d", strerror(err), err);continue;}}} while (conn->events & EPOLLET);//如果是ET模式就循环,不是就退出logMessage(Debug, "accepter done ...");}

再完成Recver,Sender,Excepter函数

    void Recver(Connection* conn){//读取完了本轮数据do{char buffer[bsize];//1024ssize_t n = recv(conn->fd_, buffer, sizeof(buffer) - 1, 0);if(n > 0){buffer[n] = 0;conn->inbuffer_ += buffer;//根据基本协议,进行数据分析,边读取边分析}else if(n == 0)//另一端关闭了套接字,要关闭连接{conn->excepter_(conn);//归到异常处理}else{if(errno == EAGAIN || errno == EWOULDBLOCK) break;else if(errno == EINTR) continue;else conn->excepter_(conn);}} while (conn->events & EPOLLET); //根据基本协议,进行数据分析}

分析数据可以全读完再分析,也可以边读边分析,这就需要有协议规定,协议在之前有写过简单的代码。现在有3种情况会调用异常处理函数Excpter,Recver函数读的时候异常, Sender函数发送数据出现异常,LoopOnce里也有。这样情况有些多,代码写起来也不够好。改一下

    void LoopOnce(int timeout){int n = epoller_.Wait(revs_, gnum, timeout);for(int i = 0; i < n; i++){int fd = revs_[i].data.fd;uint32_t events = revs_[i].events;logMessage(Debug, "当前正在处理%d上的%s", fd, (events&EPOLLIN) ? "EPOLLIN" : "OTHER");//下面这句就是把所有异常情况都转化为Recver和Sender去调用异常函数if((events & EPOLLERR) || (events & EPOLLHUP)) events |= (EPOLLIN | EPOLLOUT);//下面这两个也要改一下,要保证连接存在if((events & EPOLLIN) && ConnIsExists(fd))connections_[fd]->recver_(connections_[fd]);if((events & EPOLLOUT) && ConnIsExists(fd)) connections_[fd]->sender_(connections_[fd]);}}bool ConnIsExists(int fd){return connections_.find(fd) != connections_.end();}

6、引入自定义协议,处理写事件

用之前的Util.hpp和Protocol.hpp,有序列化反序列化,所以Makefile里得加上-ljsoncpp

epollserver:Main.ccg++ -o $@ $^ -ljsoncpp -std=c++11
.PHONY:clean
clean:rm -f epollserver

Util.hpp

#pragma once#include <iostream>
#include <unistd.h>
#include <fcntl.h>
#include <string>
#include <vector>
#include <cstdlib>
using namespace std;class Util
{
public:static bool SetNonBlock(int fd){int fl = fcntl(fd, F_GETFL);if(fl < 0) return false;fcntl(fd, F_SETFL, fl | O_NONBLOCK);return true;}static bool StringSplit(const std::string& str, const std::string& sep, std::vector<std::string>* result){size_t start = 0;while(start < str.size()){auto pos = str.find(sep, start);if(pos == std::string::npos) break;result->push_back(str.substr(start, pos - start));start = pos + sep.size();}if(start < str.size()) result->push_back(str.substr(start));return true;}static int toInt(const std::string& s){return atoi(s.c_str());}
};

Protocol.hpp

#pragma once#include <iostream>
#include <cstring>
#include <string>
#include <vector>
#include <jsoncpp/json/json.h>
#include <sys/types.h>
#include <sys/socket.h>
#include "Util.hpp"//#define MESELF 1//给网络版本计算机定制协议
namespace protocol_ns
{#define SEP " "#define SEP_LEN strlen(SEP)//不能用sizeof#define HEADER_SEP "\r\n"#define HEADER_SEP_LEN strlen("\r\n")//"长度"\r\n"_x_op_y"\r\n//假设报文是这样的: "7"\r\n"10 + 20"\r\n,这就相当于报头 + 有效载荷//请求/响应 = 报头\r\n有效载荷\r\n,只是请求和响应的有效载荷不同std::string AddHeader(const std::string &str){std::cout << "AddHeader 之前:\n"<< str << std::endl;std::string s = std::to_string(str.size());s += HEADER_SEP;s += str;s += HEADER_SEP;std::cout << "AddHeader 之hou:\n"<< s << std::endl;return s;}std::string RemoveHeader(const std::string& str, int len){std::cout << "RemoveHeader 之前:\n"<< str << std::endl;std::string res = str.substr(str.size() - HEADER_SEP_LEN - len, len);std::cout << "RemoveHeader 之后:\n"<< str << std::endl;return res;}int ReadPackage(int sock, std::string& inbuffer, std::string* package){std::cout << "ReadPackage inbuffer 之前:\n"<< inbuffer << std::endl;//读取 ———— 字符串"7"\r\n"10 + 20"\r\nchar buffer[1024];ssize_t s = recv(sock, buffer, sizeof(buffer - 1), 0);if(s <= 0) return -1;buffer[s] = 0;inbuffer += buffer;//此时inbuffer里就有了这样的字符串: "7"\r\n"10 + 20"\r\n//分析auto pos = inbuffer.find(HEADER_SEP);if(pos == std::string::npos) return 0;//没找到\r\n那么就不是正确的字符串,不动inbuffer里的内容,退出std::string lenStr = inbuffer.substr(0, pos);//获取头部字符串7int len = Util::toInt(lenStr);//得到了长度7,也就是有效载荷长度int targetPackagelen = lenStr.size() + len + 2 * HEADER_SEP_LEN;//接收到的有报文的字符串长度就是这个if(inbuffer.size() < targetPackagelen) return 0;//提取报文有效载荷*package = inbuffer.substr(0, targetPackagelen);//package保存了"7"\r\n"10 + 20"\r\n,去掉其它符号的工作交给RemoveHeaderinbuffer.erase(0, targetPackagelen);//只有到这里才改变inbuffer里的内容,从inbuffer里直接移除整个报文std::cout << "ReadPackage inbuffer 之后:\n"<< inbuffer << std::endl;return len;//len就是有效载荷的长度}class Request{public:Request() {}//为无参构造而准备的,这样就是一个无参一个有参Request(int x, int y, char op) : _x(x), _y(y), _op(op){}bool Serialize(std::string* outstr)//序列化:结构体转字符串{*outstr = "";
#ifdef MYSELFstd::string x_string = std::to_string(_x);std::string y_string = std::to_string(_y);// 手动序列化*outstr = x_string + SEP + _op + SEP + y_string;
#elseJson::Value root;//Value是一个万能对象,接受任何一个kv类型root["x"] = _x;root["y"] = _y;//所有放进去的会自动转为string类型root["op"] = _op;//Json::FastWriter writer;//FastWriter用来序列化,把结构化的数据转为字符串类型Json::StyledWriter writer;*outstr = writer.write(root);
#endifreturn true;}bool Deserialize(const std::string& instr)//反序列化:字符串转结构体{
#ifdef MYSELFstd::vector<std::string> result;Util::StringSplit(instr, SEP, &result);if (result.size() != 3)return false;_x = Util::toInt(result[0]);_y = Util::toInt(result[2]);if (result[1].size() == 1)return false;   // 协议规定_op = result[1][0]; // 因为是字符,所以只要一个符号即可std::cout << "_x: \n"<< _x << "_y: \n"<< _y << "_op: " << _op << std::endl;
#elseJson::Value root;Json::Reader reader;//Reader用来反序列化reader.parse(instr, root);_x = root["x"].asInt();//拿到的是字符串,要转成int类型_y = root["y"].asInt();//_op虽然是char,但它在计算机里就是整数,序列化时放进root的就是整数类型,反序列化时转成int类型,然后编译器会根据char类型自动解释成char类型_op = root["op"].asInt();
#endifreturn true;}~Request() {}public:int _x;int _y;char _op;};class Response{public:Response() {}Response(int result, int code): _result(result), _code(code){}bool Serialize(std::string* outstr){*outstr = "";
#ifdef MYSELFstd::string res_string = std::to_string(_result);std::string code_string = std::to_string(_code);// 手动序列化*outstr = res_string + SEP + code_string;
#elseJson::Value root;root["result"] = _result;root["code"] = _code;//Json::FastWriter writer;Json::StyledWriter writer;*outstr = writer.write(root);
#endifreturn true;}bool Deserialize(const std::string& instr){
#ifdef MYSELFstd::vector<std::string> result;Util::StringSplit(instr, SEP, &result);if (result.size() != 2)return false;_result = Util::toInt(result[0]);_code = Util::toInt(result[1]);std::cout << "_result: \n"<< _result << "_code: " << _code << std::endl;
#elseJson::Value root;Json::Reader reader;reader.parse(instr, root);_result = root["result"].asInt();_code = root["code"].asInt();
#endifreturn true;}~Response() {}public:int _result;int _code;//0表示计算成功,剩余的数字就是各种非法操作的错误码};
}

ReadPackage改一下,之前是接收并分析,现在只做分析

    int ParsePackage(std::string& inbuffer, std::string* package){std::cout << "ReadPackage inbuffer 之前:\n"<< inbuffer << std::endl;//分析auto pos = inbuffer.find(HEADER_SEP);if(pos == std::string::npos) return 0;//没找到\r\n那么就不是正确的字符串,不动inbuffer里的内容,退出std::string lenStr = inbuffer.substr(0, pos);//获取头部字符串7int len = Util::toInt(lenStr);//得到了长度7,也就是有效载荷长度int targetPackagelen = lenStr.size() + len + 2 * HEADER_SEP_LEN;//接收到的有报文的字符串长度就是这个if(inbuffer.size() < targetPackagelen) return 0;//提取报文有效载荷*package = inbuffer.substr(0, targetPackagelen);//package保存了"7"\r\n"10 + 20"\r\n,去掉其它符号的工作交给RemoveHeaderinbuffer.erase(0, targetPackagelen);//只有到这里才改变inbuffer里的内容,从inbuffer里直接移除整个报文std::cout << "ReadPackage inbuffer 之后:\n"<< inbuffer << std::endl;return len;//len就是有效载荷的长度}

继续写EpollServer.hpp中的Recver和Sender函数

    void Recver(Connection* conn){//读取完了本轮数据do{char buffer[bsize];//1024ssize_t n = recv(conn->fd_, buffer, sizeof(buffer) - 1, 0);if(n > 0){buffer[n] = 0;conn->inbuffer_ += buffer;//根据基本协议,进行数据分析,边读取边分析;std::string requestStr;int n = ParsePackage(conn->inbuffer_, &requestStr);//看ParsePackage//n为0表示没有不合理字符串或者inbuffer剩下的不够规定的长度,不用判断if(n > 0)//保证读到了完整的请求{//回调函数在Main.cc中//这边先反序列化,再交给回调函数//上面改成using func_t = std::function<void(const Request&)>;requestStr = RemoveHeader(requestStr, n);Request req;req.Deserialize(requestStr);func_(req);//交给回调函数处理}}else if(n == 0)//另一端关闭了套接字,要关闭连接{conn->excepter_(conn);//归到异常处理}else{if(errno == EAGAIN || errno == EWOULDBLOCK) break;else if(errno == EINTR) continue;else conn->excepter_(conn);}} while (conn->events & EPOLLET); }

更改在n > 0的判断后

Main.cc

#include "EpollServer.hpp"
#include <memory>//用之前网络计算器的计算函数
Response CalculateHelper(const Request& req)
{Response resp(0, 0);switch(req._op){case '+':resp._result = req._x + req._y;break;case '-':resp._result = req._x - req._y;break;case '*':resp._result = req._x * req._y;break;case '/':if(req._y == 0) resp._code = 1;else resp._result = req._x / req._y;break;case '%':if(req._y == 0) resp._code = 2;else resp._result = req._x / req._y;break; default:resp._code = 3;break; }return resp;
}void Calculate(const Request& req)
{Response resp = CalculateHelper(req);//序列化,当然这里放到EpollServer.hpp更好std::string sendStr;resp.Serialize(&sendStr);sendStr = AddHeader(sendStr);//序列化后发送出去}int main()
{std::unique_ptr<EpollServer> svr(new EpollServer(Calculate));svr->InitServer();svr->Disptcher();return 0;
}

epoll中关于fd的读取,一般要常设置,也就是一直要让epoll关心;关于fd的写入,则是按需设置,不能常设置,只有需要发送的时候才设置。发送的对象就是Connection中的outbuffer。

//EpollServer.hpp
using func_t = std::function<void(Connection *, const Request&)>;
//...
func_(conn, req);//Recver函数中
//Main.cc
void Calculate(Connection* conn, const Request& req)
{Response resp = CalculateHelper(req);//序列化,当然这里放到EpollServer.hpp更好std::string sendStr;resp.Serialize(&sendStr);//序列化后发送出去conn->outbuffer_ += sendStr;//开启对写事件的关心
}

加上写事件是要对内核做操作,在EpollServer.hpp的EpollServer类中中再添加一个函数专门做这个事,不过Main.cc中传过来的参数只有Connection类的,所有这个类还得添加一个成员来调用这个函数

class EpollServer;//...
EpollServer* R;//AddConnection里conn->events = events后
conn->R = this;//函数属于EpollServer类,this就是这个类//...
void Calculate(Connection* conn, const Request& req)
{Response resp = CalculateHelper(req);//序列化,当然这里放到EpollServer.hpp更好std::string sendStr;resp.Serialize(&sendStr);//序列化后发送出去conn->outbuffer_ += sendStr;//开启对写事件的关心conn->R->EnableReadWrite(conn, true, true);
}

开启后,Epoll底层会调用Sender函数来发送。

    void Sender(Connection* conn){do{ssize_t n = send(conn->fd_, conn->outbuffer_.c_str(), conn->outbuffer_.size(), 0);//体现按需思路if(n > 0)//发送成功,发送了局部或全部{conn->outbuffer_.erase(0, n);if(conn->outbuffer_.empty())//把数据发完了{EnableReadWrite(conn, true, false);//去掉对写事件的关心break;}else//没发完,也就是发送了局部{EnableReadWrite(conn, true, true);//继续}}else{//和Accepter里的一样的解释if(errno == EAGAIN || errno == EWOULDBLOCK) break;else if(errno == EINTR) continue;else{conn->excepter_(conn);break;}}} while (conn->events & EPOLLET);//ET模式就一直循环,不是就退出}

通常初次设置对写事件的关心,发送缓冲区是空的会,因此立马触发一次对应的fd的就绪,此时epoll底层会自动调用回调函数,从而使用Sender函数。

实现EnableReadWrite函数

    bool EnableReadWrite(Connection* conn, bool readable, bool writeable){conn->events = ((readable ? EPOLLIN : 0) | (writeable ? EPOLLOUT : 0) | EPOLLET);//修改Epoll.hpp中的AddEvent函数为AddModEvent,传入一个op,用来实现Add和Mod两个功能return epoller_.AddModEvent(conn->fd_, conn->events, EPOLL_CTL_MOD);}bool AddModEvent(int fd, uint32_t events, int op){struct epoll_event ev;ev.events = events;ev.data.fd = fd;//属于用户的数据,epoll底层不对该数据做任何修改,为了给未来就绪返回int n = epoll_ctl(epfd_, op, fd, &ev);if(n < 0){logMessage(Fatal, "epoll_ctl error, code: %d, errstring: %s", errno, strerror(errno));return false;}return true;}

这样就完成了服务器的拉取工作,也就是有数据来了,可以返回结果。

现在这样的代码是没问题的,但也有些复杂,我们希望暴露在外面的更少,所有工作都在底层完成,上层不需要关心,只调用接口就好。

本篇gitee

下一篇继续写。

结束。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/640886.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

216. 组合总和 III - 力扣(LeetCode)

题目描述 找出所有相加之和为 n 的 k 个数的组合&#xff0c;且满足下列条件&#xff1a; 只使用数字1到9每个数字 最多使用一次 返回 所有可能的有效组合的列表 。该列表不能包含相同的组合两次&#xff0c;组合可以以任何顺序返回。 输入示例 k 3, n 7输出示例 [[1,2,…

深入理解Kubernetes探针和.NET服务健康检查机制

前言 随着越来越多的软件采用云原生和微服务架构&#xff0c;我们面临着更多的技术挑战&#xff0c;比如&#xff1a; Kubernetes如何在容器服务异常终止、死锁等情况下&#xff0c;发现并自动重启服务&#xff1b;当服务依赖的关键服务&#xff08;例如数据库&#xff0c;Red…

【2024-01-22】某极验3流程分析-滑块验证码

声明:该专栏涉及的所有案例均为学习使用,严禁用于商业用途和非法用途,否则由此产生的一切后果均与作者无关!如有侵权,请私信联系本人删帖! 文章目录 一、前言二、抓包流程分析1.刷新页面2.点击按钮进行验证3.滑动验证码三、图片还原四、w值①u值②l值③h值④l中的o值aa参…

Dockerfile-xxxx

1、Dockerfile-server FROM openjdk:8-jdk-alpine WORKDIR /app COPY . . CMD java -Xms1536M -Xmx1536M -XX:UseG1GC -jar -Dlog4j2.formatMsgNoLookupstrue -Dloader.pathresources,lib -Duser.timezoneGMT-05 /app/server-main-1.0.0.jar 2、Dockerfile-bgd #FROM openjdk…

8. UE5 RPG创建UI(上)

UI是显示角色的一部分属性玩家可以直接查看的界面&#xff0c;通过直观的形式在屏幕上显示角色的各种信息。如何使用一种可扩展&#xff0c;可维护的形式来制作&#xff0c;这不得不说到耳熟能详的MVC架构。 MVC&#xff08;Model-View-Controller&#xff09;是一种常见的软件…

如何在Linux上部署Nexus私服

如何在Linux上部署Nexus私服 Nexus 是一个强大的仓库管理解决方案&#xff0c;由Sonatype公司开发。它主要用于软件开发中各种依赖包和构件的存储、管理和分发。 1、为什么要部署nexus&#xff1f; 统一管理依赖&#xff1a;在软件开发过程中&#xff0c;项目通常会依赖大量的…

[Linux 杂货铺] —— 权限(文件权限和目录配置)

目录 &#x1f308;前言 &#x1f4c1; 文件的属性 &#x1f4c1; 权限的概念 &#x1f4c2;拥有者和所属组&#xff08;角色&#xff09;&#xff1a; &#x1f4c2;用户&#xff08;具体的人&#xff09;&#xff1a; &#x1f4c1; 权限的管理 &#x1f4c2;1. chmod…

如何让亚马逊,速卖通,美客多店铺排名和流量稳定爬升

一、关键词优化 关键词是亚马逊店铺排名的关键。通过合理的关键词优化&#xff0c;可以提高店铺的曝光率。卖家需要研究消费者的搜索习惯和行为&#xff0c;了解他们使用哪些关键词进行搜索&#xff0c;然后将这些关键词用于商品描述、标题和元数据中。此外&#xff0c;还可以…

免费可用的ChatGPT替代方案

最近ChatGPT又不好访问了&#xff0c;整理了一些可以替代的方案&#xff0c;测了下可以使用&#xff0c;按照自己的喜好选择。 国外可用的GPT Poe - Fast, Helpful AI Chat - 它提供了对GPT-4、gpt-3.5-turbo、Anthropic的Claude以及其他多种机器人的访问权限。HuggingChat -…

Linux - 安装字体库解决乱码问题

文章目录 问题描述步骤资源 问题描述 该安装方法&#xff0c;不区分中文和英文字体 Java在linux上转word文档为pdf&#xff0c; linux的字体缺失&#xff0c;导致了转出的pdf为乱码。 ● Linux将word转为pdf后出现乱码&#xff1f; ● 在linux上将word转为pdf 是乱码 ● 在lin…

第一篇【传奇开心果短博文系列】Python的库OpenCV技术点案例示例:cv2常用功能和方法

传奇开心果短博文系列 短博文系列目录Python的库OpenCV技术点案例示例系列 短博文目录一、前言二、常用功能和方法示例三、归纳总结 短博文系列目录 Python的库OpenCV技术点案例示例系列 短博文目录 一、前言 cv2是Python中常用的第三方库&#xff0c;也称为OpenCV库&#…

在全志H616核桃派上实现USB摄像头的OpenCV颜色检测

在给核桃派开发板用OpenCV读取图像并显示到pyqt5的窗口上并加入颜色检测功能&#xff0c;尝试将图像中所有蓝色的东西都用一个框标记出来。 颜色检测核心api 按照惯例&#xff0c;先要介绍一下opencv中常用的hsv像素格式。颜色还是那个颜色&#xff0c;只是描述颜色用的参数变…

uniapp PDF文件预览/打开

兼容微信小程序/h5/ios/android 在微信小程序环境下需要配置pdf域名 pdfPreview(url){var title 预览uni.showLoading({ title: 加载中, mask: true })//由于android 下webview无法直接打开pdf&#xff0c;需要先下载uni.getSystemInfo({success: res > {console.log(res.p…

【C语言编程之旅 7】刷题篇-函数

第1题 解析 A&#xff1a;错误&#xff0c;一个函数只能返回一个结果 B&#xff1a;正确&#xff0c;将形参存在数组中&#xff0c;修改数组中内容&#xff0c;可以通过数组将修改结果带出去 C&#xff1a;正确&#xff0c;形参如果用指针&#xff0c;最终指向的是外部的实参…

【笔记】stable_baseline 记录输出说明

训练 PPO 代理时的记录器输出示例&#xff1a; ----------------------------------------- | eval/ | | | mean_ep_length | 200 | | mean_reward | -157 | | rollout/ | | |…

SpringBoot3.1.7集成Kafka和Kafka安装

一、背景 我们在很多系统开发都需要用到消息中间件&#xff0c;目前来说Kafka凭借其优秀的性能&#xff0c;使得它的使用率已经是名列前茅了&#xff0c;所以今天我们将它应用到我们的系统 二、版本选择 在使用一个中间件一定要考虑版本的兼容性&#xff0c;否则后面会遇到很…

美易官方:Amer Sports寻求美国IPO大举筹资

随着全球资本市场的活跃&#xff0c;各行各业的公司都在寻找更多的融资途径以支持其业务扩展和市场竞争力的提升。近日&#xff0c;知名体育用品制造商Amer Sports Inc.宣布计划通过美国首次公开募股&#xff08;IPO&#xff09;筹集高达18亿美元的资金&#xff0c;这一消息引发…

[足式机器人]Part2 Dr. CAN学习笔记- 最优控制Optimal Control Ch07-3 线性二次型调节器(LQR)

本文仅供学习使用 本文参考&#xff1a; B站&#xff1a;DR_CAN Dr. CAN学习笔记 - 最优控制Optimal Control Ch07-3 线性二次型调节器&#xff08;LQR&#xff09; 1. 数学推导2. 案例反洗与代码详解 1. 数学推导 2. 案例反洗与代码详解

Arrays.asList和ArrayList.subList,集合添加/修改遇到的问题

Arrays.asList List<Integer> statusList Arrays.asList(1,2); //底层源码返回的ArrayList&#xff0c;并不是java.util包下的 System.out.println(statusList);//[1,2] System.out.println(statusList.contains(1));//true System.out.println(statusList.contains(3)…

Go 的 Http 请求系统指南

文章目录 快速体验请求方法URL参数响应信息BodyStatusCodeHeaderEncoding 图片下载定制请求头复杂的POST请求表单提交提交文件 CookieClient 上设置 Cookie请求上设置 Cookie 重定向和请求历史超时设置总超时连接超时读取超时 请求代理错误处理总结 前几天在 “知乎想法” 谈到…