TcpServer服务器管理模块(模块十)

目录

类功能

类定义

类实现

编译测试

server.cc

gdb测试断点

忽略SIGPIPE信号


类功能

类定义

// TcpServer服务器管理模块(即全部模块的整合)
class TcpServer
{
private:uint64_t _next_id;                                  // 这是一个自动增长的连接IDint _port;int _timeout;                                       // 这是非活跃连接的统计时间--多长时间无通信就是非活跃连接bool _enable_inactive_release;                      // 是否启动非活跃连接超时销毁的判断标志EventLoop _baseloop;                                // 这是主线程的EventLoop对象,负责监听事件的处理Acceptor _acceptor;                                 // 这是监听套接字的管理对象LoopThreadPool _pool;                               // 这是从属EventLoop线程池std::unordered_map<uint64_t, PtrConnection> _conns; // 保管所有连接对应的share_ptr对象,这里面的对象被删除,就意味这某一个连接被删除using ConnectedCallback = std::function<void(const PtrConnection &)>;using MessageCallback = std::function<void(const PtrConnection &, Buffer *)>;using ClosedCallback = std::function<void(const PtrConnection &)>;using AnyEventCallback = std::function<void(const PtrConnection &)>;using Functor = std::function<void()>;ConnectedCallback _connected_callback;MessageCallback _message_callback;ClosedCallback _closed_callback;AnyEventCallback _event_callback;
private:void NewConnection(int fd); // 为新连接构造一个Connection进行管理void RemoveConnection(); // 从管理Connection的_conns移除连接信息
public:TcpServer();void SetThreadCount(int count);void SetConnectedCallback(const ConnectedCallback &cb) { _connected_callback = cb; }void SetMessageCallback(const MessageCallback &cb) { _message_callback = cb; }void SetClosedCallback(const ClosedCallback &cb) { _closed_callback = cb; }void SetAnyEventCallback(const AnyEventCallback &cb) { _event_callback = cb; }void EnableInactiveRelease(int timeout);void RunAfter(const Functor &task, int delay);  // 用于添加一个定时任务void Start();
};

类实现

// TcpServer服务器管理模块(即全部模块的整合)
class TcpServer
{
private:uint64_t _next_id; // 这是一个自动增长的连接IDint _port;int _timeout;                                       // 这是非活跃连接的统计时间--多长时间无通信就是非活跃连接bool _enable_inactive_release;                      // 是否启动非活跃连接超时销毁的判断标志EventLoop _baseloop;                                // 这是主线程的EventLoop对象,负责监听事件的处理Acceptor _acceptor;                                 // 这是监听套接字的管理对象LoopThreadPool _pool;                               // 这是从属EventLoop线程池std::unordered_map<uint64_t, PtrConnection> _conns; // 保管所有连接对应的share_ptr对象,这里面的对象被删除,就意味这某一个连接被删除using ConnectedCallback = std::function<void(const PtrConnection &)>;using MessageCallback = std::function<void(const PtrConnection &, Buffer *)>;using ClosedCallback = std::function<void(const PtrConnection &)>;using AnyEventCallback = std::function<void(const PtrConnection &)>;using Functor = std::function<void()>;ConnectedCallback _connected_callback;MessageCallback _message_callback;ClosedCallback _closed_callback;AnyEventCallback _event_callback;private:void RunAfterInLoop(const Functor &task, int delay){_next_id++;_baseloop.TimerAdd(_next_id, delay, task);}// 为新连接构造一个Connection进行管理void NewConnection(int fd){_next_id++;PtrConnection conn(new Connection(_pool.NextLoop(), _next_id, fd));conn->SetMessageCallback(_message_callback);conn->SetClosedCallback(_closed_callback);conn->SetConnectedCallback(_connected_callback);conn->SetAnyEventCallback(_event_callback);conn->SetSrvClosedCallback(std::bind(&TcpServer::RemoveConnection, this, std::placeholders::_1));if (_enable_inactive_release)conn->EnableInactiveRelease(10); // 启动非活跃超时销毁conn->Established();                 // 就绪初始化_conns.insert(std::make_pair(_next_id, conn));}void RemoveConnectionInLoop(const PtrConnection &conn){int id = conn->Id();auto it = _conns.find(id);if (it != _conns.end()){_conns.erase(it);}}// 从管理Connection的_conns移除连接信息void RemoveConnection(const PtrConnection &conn){_baseloop.RunInLoop(std::bind(&TcpServer::RemoveConnectionInLoop, this, conn));}public:TcpServer(int port) : _port(port),_next_id(0),_enable_inactive_release(false),_acceptor(&_baseloop, port),_pool(&_baseloop){_acceptor.SetAcceptCallback(std::bind(&TcpServer::NewConnection, this, std::placeholders::_1));_acceptor.Listen(); // 将监听套接字挂到baseloop上}void SetThreadCount(int count) { return _pool.SetThreadCount(count); }void SetConnectedCallback(const ConnectedCallback &cb) { _connected_callback = cb; }void SetMessageCallback(const MessageCallback &cb) { _message_callback = cb; }void SetClosedCallback(const ClosedCallback &cb) { _closed_callback = cb; }void SetAnyEventCallback(const AnyEventCallback &cb) { _event_callback = cb; }void EnableInactiveRelease(int timeout) { _timeout = timeout, _enable_inactive_release = true; }// 用于添加一个定时任务void RunAfter(const Functor &task, int delay){_baseloop.RunInLoop(std::bind(&TcpServer::RunAfterInLoop, this, task, delay));}void Start(){_pool.Create(); // 创建线程池中的从属线程_baseloop.Start();}
};

编译测试

为了便于测试整合,创建了一个新的文件server.cc

server.cc

#include "../source/server.hpp"void OnConnected(const PtrConnection &conn)
{DBG_LOG("NEW CONNECTION:%p", conn.get());
}
void OnClosed(const PtrConnection &conn)
{DBG_LOG("CLOSE CONNECTION:%p", conn.get());
}
void OnMessage(const PtrConnection &conn, Buffer *buf)
{DBG_LOG("%s", buf->ReadPosition());buf->MoveReadOffset(buf->ReadAbleSize());std::string str = "Hello World";conn->Send(str.c_str(), str.size());conn->Shutdown(); // 调用关闭接口
}int main()
{TcpServer server(8500);server.SetThreadCount(2);server.EnableInactiveRelease(10);server.SetClosedCallback(OnClosed);server.SetConnectedCallback(OnConnected);server.SetMessageCallback(OnMessage);server.Start();return 0;
}

服务端

客户端

符合预期

gdb测试断点

在测试的过程中,出现了一些小问题,通过断点进行处理

忽略SIGPIPE信号

前几天测试发生的问题,可以依靠创建下面一个类来忽略该信号的触发

忽略SIGPIPE信号,当连接断开的时候,如果我们继续向对端send发送信息,就会触发异常,即SIGPIPE异常,这个就是导致客户端异常退出的原因

// 忽略SIGPIPE信号,当连接断开的时候,如果我们继续向对端send发送信息,就会触发异常,即SIGPIPE异常,这个就是导致客户端异常退出的原因
class NetWork{public:NetWork(){DBG_LOG("SIGPIPE INIT");signal(SIGPIPE, SIG_IGN); // 忽视SIGPIPE异常,这个会导致进程退出}
};
static NetWork nw;  // 这个是为了执行里面的构造函数

服务器主体源码

因为写到这里已经算是到了一定程度了,也就是说服务器的部分已经基本完成,后续会以次为基础,创建一个回显服务器,这里就直接贴代码了,不要嫌长

#ifndef __M_SERVER_H__  
#define __M_SERVER_H__
#include <iostream>
#include <vector>
#include <cstdint>
#include <cassert>
#include <ctime>
#include <cstring>
#include <string>
#include <unistd.h>
#include <typeinfo>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <memory>
#include <fcntl.h>
#include <functional>
#include <signal.h>
#include <unordered_map>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <sys/eventfd.h>
#include <sys/timerfd.h>
#include <sys/socket.h>
#include <sys/types.h>#define INF 0
#define DBG 1
#define ERR 2
#define LOG_LEVEL DBG#define LOG(level, format, ...)                                                                                        \do                                                                                                                 \{                                                                                                                  \if (level < LOG_LEVEL)                                                                                         \break;                                                                                                     \time_t t = time(NULL);                                                                                         \struct tm *ltm = localtime(&t);                                                                                \char tmp[32] = {0};                                                                                            \strftime(tmp, 31, "%H:%M:%S", ltm);                                                                            \fprintf(stdout, "[%p %s %s:%d] " format "\n", (void *)pthread_self(), tmp, __FILE__, __LINE__, ##__VA_ARGS__); \} while (0)#define INF_LOG(format, ...) LOG(INF, format, ##__VA_ARGS__)
#define DBG_LOG(format, ...) LOG(DBG, format, ##__VA_ARGS__)
#define ERR_LOG(format, ...) LOG(ERR, format, ##__VA_ARGS__)// 缓冲区类
#define BUFFER_DEFAULT_SIZE 1024 // Buffer 默认起始大小
class Buffer
{
private:std::vector<char> _buffer; // 使用vector进行内存空间管理uint64_t _reader_idx;      // 读偏移uint64_t _writer_idx;      // 写偏移
public:Buffer() : _reader_idx(0), _writer_idx(0), _buffer(BUFFER_DEFAULT_SIZE) {}char *Begin() { return &*_buffer.begin(); }// 获取当前写入起始地址char *WritePosition() { return Begin() + _writer_idx; }// 获取当前读取起始地址char *ReadPosition() { return Begin() + _reader_idx; }// 获取缓冲区末尾空闲空间大小--写偏移之后的空闲空间, 总体空间大小减去写偏移uint64_t TailIdleSize() { return _buffer.size() - _writer_idx; }// 获取缓冲区起始空闲空间大小--读偏移之前的空闲空间uint64_t HeadIdleSize() { return _reader_idx; }// 获取可读数据大小 = 写偏移 - 读偏移uint16_t ReadAbleSize() { return _writer_idx - _reader_idx; };// 将读偏移向后移动void MoveReadOffset(uint64_t len){if (len == 0)return;// 向后移动的大小, 必须小于可读数据大小assert(len <= ReadAbleSize());_reader_idx += len;}// 将写偏移向后移动void MoveWriteOffset(uint64_t len){// 向后移动的大小,必须小于当前后边的空闲空间大小assert(len <= TailIdleSize());_writer_idx += len;}// 确保可写空间足够(整体空闲空间够了就移动数据,否则就扩容)void EnsureWriteSpace(uint64_t len){// 如果末尾空闲空间大小足够,直接返回if (TailIdleSize() >= len){return;}// 末尾空闲空间不够,则判断加上起始位置的空闲空间大小是否足够,够了就将数据移动到起始位置if (len <= TailIdleSize() + HeadIdleSize()){// 将数据移动到起始位置uint64_t rsz = ReadAbleSize();                            // 把当前数据大小先保存起来std::copy(ReadPosition(), ReadPosition() + rsz, Begin()); // 把可读数据拷贝到起始位置_reader_idx = 0;                                          // 将读偏移归0_writer_idx = rsz;                                        // 将写位置置为可读数据大小, 因为当前的可读数据大小就是写偏移量}else{// 总体空间不够,则需要扩容,不移动数据,直接给写偏移之后扩容足够空间即可_buffer.resize(_writer_idx + len);}}// 写入数据void Write(const void *data, uint64_t len){// 1.保证有足够空间, 2.拷贝数据进去EnsureWriteSpace(len);const char *d = (const char *)data;std::copy(d, d + len, WritePosition());}void WriteAndPush(const void *data, uint64_t len){Write(data, len);MoveWriteOffset(len);}void WriteString(const std::string &data){return Write(data.c_str(), data.size());}void WriteStringAndPush(const std::string &data){WriteString(data);MoveWriteOffset(data.size());}void WriteBuffer(Buffer &data){return Write(data.ReadPosition(), data.ReadAbleSize());}void WriteBufferAndPush(Buffer &data){WriteBuffer(data);MoveWriteOffset(data.ReadAbleSize());}// 读取数据void Read(void *buf, uint64_t len){// 要求获取的数据大小必须小于可读数据大小assert(len <= ReadAbleSize());std::copy(ReadPosition(), ReadPosition() + len, (char *)buf);}void ReadAndPop(void *buf, uint64_t len){Read(buf, len);MoveReadOffset(len);}std::string ReadAsString(uint64_t len){// 要求获取的数据大小必须小于可读数据大小assert(len <= ReadAbleSize());std::string str;str.resize(len);Read(&str[0], len); // 这里不直接用str.c_str()的原因是,这个的返回值是const类型return str;}std::string ReadAsStringAndPop(uint64_t len){assert(len <= ReadAbleSize());std::string str = ReadAsString(len);MoveReadOffset(len);return str;}char *FindCRLF(){char *res = (char *)memchr(ReadPosition(), '\n', ReadAbleSize());return res;}// 这种情况针对的是,通常获取一行数据std::string GetLine(){char *pos = FindCRLF();if (pos == NULL)return "";// +1 是为了把换行字符也取出来return ReadAsString(pos - ReadPosition() + 1);}std::string GetLineAndPop(){std::string str = GetLine();MoveReadOffset(str.size());return str;}// 清空缓冲区void Clear(){// 只需要将偏移量归0即可_reader_idx = 0;_writer_idx = 0;}
};// 套接字类
#define MAX_LISTEN 1024
class Socket
{
private:int _sockfd;public:Socket() : _sockfd(-1) {}Socket(int fd) : _sockfd(fd) {}~Socket() { Close(); };int Fd() { return _sockfd; }// 创建套接字bool Create(){// int socket(int domain, int type, int protocol)_sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);if (_sockfd < 0){ERR_LOG("CREATE SOCKET FAILED!");return false;}return true;}// 绑定地址信息bool Bind(const std::string &ip, uint16_t port){struct sockaddr_in addr;addr.sin_family = AF_INET;addr.sin_port = htons(port);addr.sin_addr.s_addr = inet_addr(ip.c_str());socklen_t len = sizeof(struct sockaddr_in);// int bind(int sockfd, struct sockaddr* addr, socklen_t len)int ret = bind(_sockfd, (struct sockaddr *)&addr, len);if (ret < 0){ERR_LOG("BIND ADDRESS FAILED!");return false;}return true;}// 开始监听bool Listen(int backlog = MAX_LISTEN){// int listen(int backlog)int ret = listen(_sockfd, backlog);if (ret < 0){ERR_LOG("SOCKET LISTEN FAILED!");return false;}return true;}// 向服务器发起连接bool Connect(const std::string &ip, uint16_t port){struct sockaddr_in addr;addr.sin_family = AF_INET;addr.sin_port = htons(port);addr.sin_addr.s_addr = inet_addr(ip.c_str());socklen_t len = sizeof(struct sockaddr_in);// int connect(int sockfd, struct sockaddr* addr, socklen_t len)int ret = connect(_sockfd, (struct sockaddr *)&addr, len);if (ret < 0){ERR_LOG("CONNECT SERVER FAILED!");return false;}return true;}// 获取新连接int Accept(){// int accept(int sockfd, struct sockaddr *addr, socklen_t *len);int newfd = accept(_sockfd, NULL, NULL);if (newfd < 0){ERR_LOG("SOCKET ACCEPT FAILED!");return -1;}return newfd;}// 接收数据ssize_t Recv(void *buf, size_t len, int flag = 0) // 0 阻塞{// ssize_t recv(int sockfd, void *buf, size_t len, int flag)ssize_t ret = recv(_sockfd, buf, len, flag);if (ret <= 0){// EAGAIN 当前的接收缓冲区中没用数据了,在非阻塞的情况下才有这个错误// EINTR 表示当前socket的阻塞等待,被信号打断了if (errno == EAGAIN || errno == EINTR){return 0; // 表示这次没用接收到数据}ERR_LOG("SOCKET RECV FAILED!");return -1;}return ret; // 实际接收的数据长度}ssize_t NonBlockRecv(void *buf, size_t len){return Recv(buf, len, MSG_DONTWAIT); // MSG_DONTWAIT 表示当前接收为非阻塞}// 发送数据ssize_t Send(const void *buf, size_t len, int flag = 0){// ssize_t send(int sockfd, void *data, size_t len, int flag)ssize_t ret = send(_sockfd, buf, len, flag);if (ret < 0){if (errno == EAGAIN || errno == EINTR){return 0;}ERR_LOG("SOCKET SEND FAILED!!");return -1;}return ret; // 实际发送的数据长度}ssize_t NonBlockSend(void *buf, size_t len){if (len == 0)return 0;return Send(buf, len, MSG_DONTWAIT); // MSG_DONTWAIT 表示当前接收为非阻塞}// 关闭套接字void Close(){if (_sockfd != -1){close(_sockfd);_sockfd = -1;}}// 创建一个服务器连接bool CreateServer(uint16_t port, const std::string &ip = "0.0.0.0", bool block_flag = false) // 接收全部{// 1.创建套接字 2.绑定地址 3.开始监听 4.设置非阻塞 5.启动地址重用if (Create() == false)return false;if (block_flag)NonBlock(); // 默认阻塞if (Bind(ip, port) == false)return false;if (Listen() == false)return false;ReuseAddress();return true;}// 创建一个客户端连接bool CreateClient(uint16_t port, const std::string &ip){// 1.创建套接字 2.指向连接服务器if (Create() == false)return false;if (Connect(ip, port) == false)return false;return true;}// 设置套接字选项 -- 开启地址端口重用void ReuseAddress(){// int setsockopt(int fd, int level, int optname, void *val, int vallen)int val = 1;setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, (void *)&val, sizeof(int)); // 地址val = 1;setsockopt(_sockfd, SOL_SOCKET, SO_REUSEPORT, (void *)&val, sizeof(int)); // 端口号}// 设置套接字阻塞属性 -- 设置为非阻塞void NonBlock(){// int fcntl(int fd, int cmd, .../*arg*/)int flag = fcntl(_sockfd, F_GETFL, 0);fcntl(_sockfd, F_SETFL, flag | O_NONBLOCK);}
};class Poller; // 整合测试1:声明
class EventLoop;
// Channel类
class Channel
{
private:int _fd;EventLoop *_loop;uint32_t _events;  // 当前需要监控的事件uint32_t _revents; // 当前连接触发的事件using EventCallback = std::function<void()>;EventCallback _read_callback;  // 可读事件被触发的回调函数EventCallback _write_callback; // 可写事件被触发的回调函数EventCallback _error_callback; // 错误事件被触发的回调函数EventCallback _close_callback; // 连接断开事件被触发的回调函数EventCallback _event_callback; // 任意事件被触发的回调函数
public:Channel(EventLoop *loop, int fd) : _fd(fd), _events(0), _revents(0), _loop(loop) {}int Fd() { return _fd; }uint32_t Events() { return _events; } // 获取想要监控的事件void SetREvents(uint32_t events) { _revents = events; }void SetReadCallback(const EventCallback &cb) { _read_callback = cb; } // 设置实际就绪的事件void SetWriteCallback(const EventCallback &cb) { _write_callback = cb; }void SetErrorCallback(const EventCallback &cb) { _error_callback = cb; }void SetCloseCallback(const EventCallback &cb) { _close_callback = cb; }void SetEventCallback(const EventCallback &cb) { _event_callback = cb; }// 当前是否监控了可读bool ReadAble() { return (_events & EPOLLIN); }// 当前是否监控了可写bool WriteAble() { return (_events & EPOLLOUT); }// 启动读事件监控void EnableRead(){_events |= EPOLLIN;Update();}// 启动写事件监控void EnableWrite(){_events |= EPOLLOUT;Update();}// 关闭读事件监控void DisableRead(){_events &= ~EPOLLIN;Update();}// 关闭写事件监控void DisableWrite(){_events &= ~EPOLLOUT;Update();}// 关闭所有事件监控void DisableAll(){_events = 0;Update();}// 移除监控void Remove(); // 声明和实现要分离,因为实现的时候是不知道里面有什么函数成员的void Update(); // 这两个特殊,所以把实现放在Poller类的下面进行实现// 事件处理,一旦触发了事件,就调用这个函数,自己触发了什么事件如何处理自己决定void HandleEvent(){// 第二参数,对方关闭连接,第三参数,带外数据if ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI)){if (_event_callback) // 不管任何事件,都调用的回调函数_event_callback();if (_read_callback)_read_callback();}// 有可能会释放连接的操作事件,一次只处理一个if (_revents & EPOLLOUT){if (_event_callback)_event_callback(); // 放到事件处理完毕后调用,刷新活跃度if (_write_callback)_write_callback();}else if (_revents & EPOLLERR){if (_event_callback)_event_callback();if (_error_callback)_error_callback();}else if (_revents & EPOLLHUP){if (_event_callback)_event_callback();if (_close_callback)_close_callback();}}
};// Poller描述符监控类
#define MAX_EPOLLEVENTS 1024
class Poller
{
private:int _epfd;struct epoll_event _evs[MAX_EPOLLEVENTS];std::unordered_map<int, Channel *> _channels;private:// 对epoll的直接操作void Update(Channel *channel, int op){// int epoll_ctl(int epfd, int op, int fd, struct epoll_event *ev)int fd = channel->Fd();struct epoll_event ev;ev.data.fd = fd;ev.events = channel->Events();int ret = epoll_ctl(_epfd, op, fd, &ev);if (ret < 0){ERR_LOG("EPOLLCTL FAILED!");}return;}// 判断一个Channel 是否已经添加了事件监控bool HasChannel(Channel *channel){auto it = _channels.find(channel->Fd());if (it == _channels.end()){return false;}return true;}public:Poller(){_epfd = epoll_create(MAX_EPOLLEVENTS); // 这个值大于0就行了,无用处if (_epfd < 0){ERR_LOG("EPOLL CREATE FAILED!");abort(); // 退出程序}}// 添加或修改监控事件void UpdateEvent(Channel *channel){bool ret = HasChannel(channel);if (ret == false){// 不存在则添加_channels.insert(std::make_pair(channel->Fd(), channel));return Update(channel, EPOLL_CTL_ADD);}return Update(channel, EPOLL_CTL_MOD);}// 移除监控void RemoveEvent(Channel *channel){auto it = _channels.find(channel->Fd());if (it != _channels.end()){_channels.erase(it);}Update(channel, EPOLL_CTL_DEL);}// 开始监控, 返回活跃连接void Poll(std::vector<Channel *> *active){// int epoll_wait(int epfd, struct epoll_event *evs, int maxevents, int timeout);int nfds = epoll_wait(_epfd, _evs, MAX_EPOLLEVENTS, -1); // -1阻塞监控if (nfds < 0){if (errno == EINTR) // 信号打断{return;}ERR_LOG("EPOLL WAIT ERROR:%s\n", strerror(errno));abort();}for (int i = 0; i < nfds; i++) // 添加活跃信息{auto it = _channels.find(_evs[i].data.fd); // 没找到就说明不在我们的管理之下,这是不正常的assert(it != _channels.end());it->second->SetREvents(_evs[i].events); // 设置实际就绪的事件active->push_back(it->second);}return;}
};// timerwheel时间轮定时器类
using TaskFunc = std::function<void()>;
using ReleaseFunc = std::function<void()>;
class TimerTask
{
private:uint64_t _id;         // 定时器任务对象uint32_t _timeout;    // 定时任务的超时时间bool _canceled;       // false-表示没有被取消,true-表示被取消TaskFunc _task_cb;    // 定时器要执行的定时任务ReleaseFunc _release; // 用于删除TimerWheel中保存的定时器对象信息
public:TimerTask(uint64_t id, uint32_t delay, const TaskFunc &cb) : _id(id), _timeout(delay), _task_cb(cb), _canceled(false) {}~TimerTask(){if (_canceled == false)_task_cb();_release();}void Cancel() { _canceled = true; }void SetRelease(const ReleaseFunc &cb) { _release = cb; }uint32_t DelayTime() { return _timeout; } // 返回时间
};class TimerWheel
{
private:using WeakTask = std::weak_ptr<TimerTask>;using PtrTask = std::shared_ptr<TimerTask>;int _tick;     // 当前的的秒针,走到哪里哪里就释放执行int _capacity; // 表盘最大数量 -- 其实就是最大延迟时间std::vector<std::vector<PtrTask>> _wheel;// 用weak_ptr来构造出新的shared_ptr用来计数,不过后续要记得释放std::unordered_map<uint64_t, WeakTask> _timers;EventLoop *_loop;int _timerfd; // 定时器描述符 -- 可读事件回调就是读取计数器,执行定时任务std::unique_ptr<Channel> _timer_channel;private:void RemoveTimer(uint64_t id){auto it = _timers.find(id);if (it != _timers.end()){_timers.erase(it);}}static int CreateTimerfd(){// int timerfd_create(int clockid, int flags);int timerfd = timerfd_create(CLOCK_MONOTONIC, 0);if (timerfd < 0){ERR_LOG("TIMERFD CREATE FAILED!");abort();}// int timerfd_settime(int fd, int flags, struct itimerspec *new, struct itimerspec);struct itimerspec itime;itime.it_value.tv_sec = 1;                 // 设置 秒钟itime.it_value.tv_nsec = 0;                // 设置 纳秒 第一次超时时间为1s后itime.it_interval.tv_sec = 1;              // 同上itime.it_interval.tv_nsec = 0;             // 第一次超时后,每隔超时的间隔时timerfd_settime(timerfd, 0, &itime, NULL); // 0代表阻塞式return timerfd;}void ReadTimefd(){uint64_t times;int ret = read(_timerfd, &times, 8);if (ret < 0){ERR_LOG("READ TIMERFD FAILED!");abort();}return;}// 这个函数应该每秒钟被执行一次,相当于秒钟向后走了一步void RunTimerTask(){_tick = (_tick + 1) % _capacity;_wheel[_tick].clear(); // 清空指定位置的数组,就会把数组中保存的所有管理定时器对象的shared_ptr释放掉.从而执行函数}void OnTime(){ReadTimefd();RunTimerTask();}void TimerAddInLoop(uint64_t id, uint32_t delay, const TaskFunc &cb) // 添加定时任务{PtrTask pt(new TimerTask(id, delay, cb));                      // 实例化定时任务对象pt->SetRelease(std::bind(&TimerWheel::RemoveTimer, this, id)); // 第0个位置是隐藏的this指针。再把任务id绑定进去int pos = (_tick + delay) % _capacity;_wheel[pos].push_back(pt);_timers[id] = WeakTask(pt);}// 刷新/延迟定时任务void TimerRefreshInLoop(uint64_t id){// 通过保存的定时器对象的weak_ptr构造一个shared_ptr出来, 添加到轮子中auto it = _timers.find(id);if (it == _timers.end()){return; // 没找到定时任务, 没法刷新,没法延迟}PtrTask pt = it->second.lock(); // lock获取weak_ptr管理的对象对应的shared_ptrint delay = pt->DelayTime();    // 获取到了初始的延迟时间int pos = (_tick + delay) % _capacity;_wheel[pos].push_back(pt);}void TimerCancelInLoop(uint64_t id){auto it = _timers.find(id);if (it == _timers.end()){return; // 没找到定时任务, 没法刷新,没法延迟}PtrTask pt = it->second.lock(); // 当还没有过期才进行取消if (pt)pt->Cancel();}public:TimerWheel(EventLoop *loop) : _capacity(60), _tick(0), _wheel(_capacity), _loop(loop),_timerfd(CreateTimerfd()), _timer_channel(new Channel(_loop, _timerfd)){_timer_channel->SetReadCallback(std::bind(&TimerWheel::OnTime, this));_timer_channel->EnableRead(); // 启动读事件监控}/*定时器中有个_timers成员,定时器信息的操作有可能在多线程中进行,因此需要考虑线程安全问题*//*如果不想加锁,那就把对定期的所有操作,都放在一个线程中进行*/void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb);// 刷新/延迟定时任务void TimerRefresh(uint64_t id);void TimerCancel(uint64_t id);/*这个接口存在线程安全问题--这个接口实际上不能被外界使用者调用,只能在模块内,对应的EventLoop线程内执行*/bool HasTimer(uint64_t id){auto it = _timers.find(id);if (it == _timers.end()){return false; // 没找到定时任务, 没法刷新,没法延迟}return true;}
};// EventLoop事件监控处理类
class EventLoop
{
private:using Functor = std::function<void()>;std::thread::id _thread_id;              // 线程IDint _event_fd;                           // eventfd唤醒IO事件监控有可能导致的阻塞std::unique_ptr<Channel> _event_channel; // 智能指针Poller _poller;                          // 进行所有描述符的事件监控std::vector<Functor> _tasks;             // 任务池std::mutex _mutex;                       // 实现任务池操作的线程安全TimerWheel _timer_wheel;                 // 定时器模块
public://  执行任务池中的所有任务void RunAllTask(){std::vector<Functor> functor;{std::unique_lock<std::mutex> _lock(_mutex);_tasks.swap(functor);}for (auto &f : functor){f();}return;}static int CreateEventFd(){int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);if (efd < 0){ERR_LOG("CREATE EVENTFD FAILED!!");abort(); // 让程序异常退出}return efd;}void ReadEventfd(){uint64_t res = 0;int ret = read(_event_fd, &res, sizeof(res));if (ret < 0){// EINTR -- 被信号打断, EAGAIN -- 表示无数据可读if (errno == EINTR || EAGAIN){return;}ERR_LOG("READ EVENTFD FAILED!");abort();}return;}void WeakUpEventFd(){uint64_t val = 1;int ret = write(_event_fd, &val, sizeof(val));if (ret < 0){if (errno == EINTR){return;}ERR_LOG("READ EVENTFD FAILED!");abort();}return;}public:EventLoop() : _thread_id(std::this_thread::get_id()),_event_fd(CreateEventFd()),_event_channel(new Channel(this, _event_fd)),_timer_wheel(this){// 给eventfd添加可读事件回调函数,读取eventfd事件通知次数_event_channel->SetReadCallback(std::bind(&EventLoop::ReadEventfd, this));// 启动eventfd的读事件监控_event_channel->EnableRead();}// 三步走--事件监控-》就绪事件处理-》执行任务void Start(){while (1){// 1.事件监控std::vector<Channel *> actives;_poller.Poll(&actives);// 2.事件处理for (auto &channel : actives){channel->HandleEvent();}// 3.执行任务RunAllTask();}}// 用于判断当前线程是否是EventLoop对应的线程bool IsInLoop(){return (_thread_id == std::this_thread::get_id());}void AssertInLoop(){assert(_thread_id == std::this_thread::get_id());}// 判断将要执行的任务是否处于当前线程中,如果是则执行,否则压入队列void RunInLoop(const Functor &cb){if (IsInLoop()){return cb();}return QueueInLoop(cb);}// 将操作压入任务池void QueueInLoop(const Functor &cb){{std::unique_lock<std::mutex> _lock(_mutex);_tasks.push_back(cb);}// 唤醒有可能因为没有事件就绪,而导致的epoll阻塞// 其实就是给eventfd写入一个数据,eventfd就会触发可读事件WeakUpEventFd();}// 添加/修改描述符的事件监控void UpdateEvent(Channel *channel) { return _poller.UpdateEvent(channel); }// 移除描述符的监控void RemoveEvent(Channel *channel) { return _poller.RemoveEvent(channel); }void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb) { return _timer_wheel.TimerAdd(id, delay, cb); }void TimerRefresh(uint64_t id) { return _timer_wheel.TimerRefresh(id); }void TimerCancel(uint64_t id) { return _timer_wheel.TimerCancel(id); }bool HasTimer(uint64_t id) { return _timer_wheel.HasTimer(id); }
};// EventLoop实例化管理类
class LoopThread
{
private:/* 用于实现_loop获取的同步关系,避免线程创建了,但是_loop还没有实例化之前去获取_loop*/std::mutex _mutex;             // 互斥锁std::condition_variable _cond; // 条件变量EventLoop *_loop;              // EventLoop指针变量,这个对象需要在线程内实例化std::thread _thread;           // EventLoop对应的线程
private:/*实例化EventLoop对象,唤醒_cond上有可能阻塞的线程,并且开始运行EventLoop模块的功能*/void ThreadEntry(){EventLoop loop;{std::unique_lock<std::mutex> lock(_mutex);_loop = &loop;_cond.notify_all();}loop.Start();}public:/*创建线程,设定线程入口函数*/LoopThread() : _loop(NULL), _thread(std::thread(&LoopThread::ThreadEntry, this)) {}/*返回当前线程关联的EventLoop对象指针*/EventLoop *GetLoop(){EventLoop *loop = NULL;{std::unique_lock<std::mutex> lock(_mutex); // 加锁_cond.wait(lock, [&](){ return _loop != NULL; }); // loop为NULL就一直阻塞loop = _loop;}return loop;}
};// LoopThreadPool管理LoopThread创建主从线程类
class LoopThreadPool
{
private:int _thread_count;int _next_idx;EventLoop *_baseloop;std::vector<LoopThread *> _threads;std::vector<EventLoop *> _loops;public:LoopThreadPool(EventLoop *baseloop) : _thread_count(0), _next_idx(0), _baseloop(baseloop) {}void SetThreadCount(int count) { _thread_count = count; }void Create(){if (_thread_count > 0){_threads.resize(_thread_count); // 里面存放的是指针不是对象,所以可以直接扩大_loops.resize(_thread_count);for (int i = 0; i < _thread_count; i++){_threads[i] = new LoopThread();     // 构造无参数_loops[i] = _threads[i]->GetLoop(); // 在上一句构造线程还没有创建完成会一直阻塞,因此不用担心在创建期间就分配连接}}}EventLoop *NextLoop(){if (_thread_count == 0)return _baseloop; // 没有从线程就直接返回主线程_next_idx = (_next_idx + 1) % _thread_count;return _loops[_next_idx];}
};class Any
{
private:class holder{public:virtual ~holder() {}virtual const std::type_info &type() = 0;virtual holder *clone() = 0;};template <class T>class placeholder : public holder{public:placeholder(const T &val) : _val(val) {}// 获取子类对象保存的数据类型virtual const std::type_info &type() { return typeid(T); }// 针对当前的对象自身,克隆出一个新的子类对象virtual holder *clone() { return new placeholder(_val); }// 析构用数据自身的就行了public:T _val;};holder *_content;public:Any() : _content(nullptr) {}template <class T>Any(const T &val) : _content(new placeholder<T>(val)) {}Any(const Any &other) : _content(other._content ? other._content->clone() : nullptr) {}~Any() { delete _content; }Any &swap(Any &other){std::swap(_content, other._content);return *this;}// 返回子类对象保存的数据的指针template <class T>T *get(){// 想要获取的数据类型,必须和保存的数据类型一致assert(typeid(T) == _content->type());return &((placeholder<T> *)_content)->_val;}// 赋值运算符的重载函数template <class T>Any &operator=(const T &val){// 为val构造一个临时的通用容器,然后与当前容器自身进行指针交换,临时对象释放的时候,原先保存的数据也就被释放了Any(val).swap(*this);return *this;}Any &operator=(const Any &other){Any(other).swap(*this);return *this;}
};// 连接管理类
class Connection;
// DISCONNECTED -- 连接关闭状态  CONNECTING -- 连接建立成功-待处理状态
// CONNECTED -- 连接建立完成,各种设置已完成,可以通信状态    DISCONNECTING -- 待关闭状态
typedef enum
{DISCONNECTED,CONNECTING,CONNECTED,DISCONNECTING
} ConnStatu;
using PtrConnection = std::shared_ptr<Connection>;
class Connection : public std::enable_shared_from_this<Connection>
{
private:uint64_t _conn_id; // 连接的唯一ID,便于连接的管理和查找// uint64_t _timer_id; // 定时器ID,必须是唯一的,这块是为了简化操作使用conn_id作为定时器int _sockfd;                   // 连接关联的文件描述符bool _enable_inactive_release; // 连接是否启动非活跃的判断标志,默认为falseEventLoop *_loop;              // 连接所关联的一个EventLoopConnStatu _statu;              // 连接状态Socket _socket;                // 套接字操作管理Channel _channel;              // 连接的事件管理Buffer _in_buffer;             // 输入缓冲区--存放从socket中读取到的数据Buffer _out_buffer;            // 输出缓冲区--存放要发送给对端的数据Any _context;                  // 请求的接收处理上下文/*这四个回调函数,是让服务器模块来设置的(其实服务器模块的处理回调也是组件使用者设置的)*//*换句话来说,这几个回调都是组件使用者使用的*/using ConnectedCallback = std::function<void(const PtrConnection &)>;using MessageCallback = std::function<void(const PtrConnection &, Buffer *)>;using ClosedCallback = std::function<void(const PtrConnection &)>;using AnyEventCallback = std::function<void(const PtrConnection &)>;ConnectedCallback _connected_callback;MessageCallback _message_callback;ClosedCallback _closed_callback;AnyEventCallback _event_callback;/*组件内的连接关闭回调--组件内设置的,因为服务器组件内会把所有的连接管理起来,一旦某个连接要关闭*//*就应该从管理的地方移除掉自己的信息*/ClosedCallback _server_closed_callback;private:/*五个channel的事件回调函数*/// 描述符可读事件触发后调用的函数,接收socket数据放到接收缓冲区中,然后调用_message_callbackvoid HandleRead(){// 1.接收socket的数据,放到缓冲区char buf[65536];ssize_t ret = _socket.NonBlockRecv(buf, 65535);if (ret < 0){// 出错了,不能直接关闭连接return ShutdownInLoop();}// 这里的等于0表示的是没有读取到数据,而并不是连接断开了,连接断开返回的是-1// 将数据放入输入缓冲区,写入之后顺便将写偏移向后移动_in_buffer.WriteAndPush(buf, ret);// 2.调用message_callback进行业务处理if (_in_buffer.ReadAbleSize() > 0){// shard_from_this--从当前对象自身获取自身的shared_ptr管理对象return _message_callback(shared_from_this(), &_in_buffer);}}// 描述符可写事件触发后调用的函数,将发送缓冲区中的数据进行发送void HandleWrite(){// _out_buffer中保存的就是要发送的数据ssize_t ret = _socket.NonBlockSend(_out_buffer.ReadPosition(), _out_buffer.ReadAbleSize());if (ret < 0){// 发送错误就应该关闭连接了if (_in_buffer.ReadAbleSize() > 0){_message_callback(shared_from_this(), &_in_buffer);}}_out_buffer.MoveReadOffset(ret); // 千万不要忘了,将读偏移向后移动if (_out_buffer.ReadAbleSize() == 0){_channel.DisableWrite(); // 没有数据待发送,关闭写事件监控// 如果当前是连接待关闭状态,则有数据,发送完数据释放连接,没有数据则直接释放if (_statu == DISCONNECTING){return ReleaseInLoop(); // 这时候就是实际的关闭释放操作了}}return;}// 描述符触发挂断事件void HandleClose(){/*一旦连接挂断了,套接字就什么都干不了了,因此有数据待处理就处理一下,完毕关闭连接*/if (_in_buffer.ReadAbleSize() > 0){_message_callback(shared_from_this(), &_in_buffer);}return ReleaseInLoop();}// 描述符触发出错事件void HandleError(){return HandleClose();}// 描述符触发任意事件: 1.刷新连接活跃度--延迟定时销毁任务 2.调用组件使用者的任意事件回调void HandleEvent(){if (_enable_inactive_release == true){_loop->TimerRefresh(_conn_id);}if (_event_callback){_event_callback(shared_from_this());}}// 连接获取之后,所处的状态要进行各种设置(给channel设置事件回调,启动读监控,调用回调函数)void EstablishedInLoop(){// 1.修改连接状态   2.启动读事件监控    3.调用回调函数assert(_statu == CONNECTING); // 当前状态必须一定是上层的半连接状态_statu = CONNECTED;           // 当前函数执行完毕,则连接进入已完成连接状态// 一旦启动读事件监控就有可能会立即触发读事件,如果这时候启动了非活跃连接销毁_channel.EnableRead();if (_connected_callback)_connected_callback(shared_from_this());}// 这个接口才是实际的释放接口void ReleaseInLoop(){// 1.修改连接状态,将其置为DISCONNECTED_statu = DISCONNECTED;// 2.移除连接的事件监控_channel.Remove();// 3.关闭描述符_socket.Close();// 4.如果当前定时器队列中还有定时销毁任务,则取消任务if (_loop->HasTimer(_conn_id))CancelInactiveReleaseInLoop();// 5.调用关闭回调函数,避免先移除服务器管理的连接信息导致Connection被释放,再去处理会出错,因此先调用用户的回调函数if (_closed_callback)_closed_callback(shared_from_this());// 移除服务器内部管理的连接信息if (_server_closed_callback)_server_closed_callback(shared_from_this());}// 这个并不是实际的发送接口,而只是把数据放到了发送缓冲区,启动了可写事件监控void SendInLoop(Buffer &buf){if (_statu == DISCONNECTED)return;_out_buffer.WriteBufferAndPush(buf); // 可以在这个函数后面加上const表示不修改thisif (_channel.WriteAble() == false){_channel.EnableWrite();}}// 这个关闭操作并非实际的连接释放操作,需要判断还有没有数据待处理,待发送void ShutdownInLoop(){_statu = DISCONNECTING; // 设置连接为半关闭状态if (_in_buffer.ReadAbleSize() > 0){if (_message_callback)_message_callback(shared_from_this(), &_in_buffer);}// 要么就是写入数据的时候出错关闭,要么就是没有待发送数据,直接关闭if (_out_buffer.ReadAbleSize() > 0){if (_channel.WriteAble() == false){_channel.EnableWrite();}}if (_out_buffer.ReadAbleSize() == 0){ReleaseInLoop();}}// 启动非活跃连接超时释放规则void EnableInactiveReleaseInLoop(int sec){// 1.将判断标志 _enable_inactive_release 置为true_enable_inactive_release = true;// 2.如果当前定时销毁任务已经存在,那就刷新一下延迟即可if (_loop->HasTimer(_conn_id)){return _loop->TimerRefresh(_conn_id);}// 3.如果不存在定时销毁任务,则新增_loop->TimerAdd(_conn_id, sec, std::bind(&Connection::ReleaseInLoop, this));}void CancelInactiveReleaseInLoop(){_enable_inactive_release = false;if (_loop->HasTimer(_conn_id)){_loop->TimerCancel(_conn_id);}}void UpgradeInLoop(const Any &context,const ConnectedCallback &conn,const MessageCallback &msg,const ClosedCallback &closed,const AnyEventCallback &event){_context = context;_connected_callback = conn;_message_callback = msg;_closed_callback = closed;_event_callback = event;}public:Connection(EventLoop *loop, uint64_t conn_id, int sockfd) : _conn_id(conn_id), _sockfd(sockfd),_enable_inactive_release(false), _loop(loop), _statu(CONNECTING), _socket(_sockfd),_channel(loop, _sockfd){_channel.SetCloseCallback(std::bind(&Connection::HandleClose, this));_channel.SetEventCallback(std::bind(&Connection::HandleEvent, this));_channel.SetReadCallback(std::bind(&Connection::HandleRead, this));_channel.SetWriteCallback(std::bind(&Connection::HandleWrite, this));_channel.SetErrorCallback(std::bind(&Connection::HandleError, this));}~Connection() { DBG_LOG("RELEASE CONNECTION:%p", this); }// 获取管理的文件描述符int Fd() { return _sockfd; }// 获取连接IDint Id() { return _conn_id; }// 是否处于CONNECTED状态bool Connected() { return (_statu == CONNECTED); }// 设置上下文--连接建立完成时进行调用void SetContext(const Any &context) { _context = context; }// 获取上下文,返回的是指针Any *GetContext() { return &_context; }void SetConnectedCallback(const ConnectedCallback &cb) { _connected_callback = cb; }void SetMessageCallback(const MessageCallback &cb) { _message_callback = cb; }void SetClosedCallback(const ClosedCallback &cb) { _closed_callback = cb; }void SetAnyEventCallback(const AnyEventCallback &cb) { _event_callback = cb; }void SetSrvClosedCallback(const ClosedCallback &cb) { _server_closed_callback = cb; }// 连接建立就绪后,进行channel回调设置,启动读监控,调用_connect_callbackvoid Established(){_loop->RunInLoop(std::bind(&Connection::EstablishedInLoop, this));}// 发送数据,将数据发送到发送缓冲区,启动写事件监控void Send(const char *data, size_t len){// 外界传入的data,可能是个临时空间,我们现在只是把发送操作压入了任务池,有可能并没有被执行// 因此有可能执行的时候,data指向的空间有可能已经被释放了Buffer buf;buf.WriteAndPush(data, len);_loop->RunInLoop(std::bind(&Connection::SendInLoop, this, buf));}// 提供给组件使用者的关闭接口--并不实际关闭,需要判断有没有数据待处理void Shutdown(){_loop->RunInLoop(std::bind(&Connection::ShutdownInLoop, this));}// 启动非活跃销毁,并定义多长时间无通信就是非活跃,添加定时任务void EnableInactiveRelease(int sec){_loop->RunInLoop(std::bind(&Connection::EnableInactiveReleaseInLoop, this, sec));}// 取消非活跃销毁void CancelInactiveRelease(){_loop->RunInLoop(std::bind(&Connection::CancelInactiveReleaseInLoop, this));}// 切换协议--重置上下文以及阶段性处理函数--而是这个接口必须在EventLoop线程中立即执行// 防备新的事件触发后,处理的时候,切换任务还没有被执行--会导致数据使用原协议处理了void Upgrade(const Any &context, const ConnectedCallback &conn, const MessageCallback &msg,const ClosedCallback &closed, const AnyEventCallback &event){_loop->AssertInLoop();_loop->RunInLoop(std::bind(&Connection::UpgradeInLoop, this, context, conn, msg, closed, event));}
};// 监听套接字管理类
class Acceptor
{
private:Socket _socket;   // 用于创建监听套接字EventLoop *_loop; // 用于对监听套接字进行事件监控Channel _channel; // 用于对监控套接字进行事件管理using AcceptCallback = std::function<void(int)>;AcceptCallback _accept_callback;private:/*监听套接字的读事件回调处理函数 -- 获取新连接,调用_accept_callback函数进行新连接管理*/void HandleRead(){int newfd = _socket.Accept();if (newfd < 0){return;}if (_accept_callback)_accept_callback(newfd);}int CreateServer(int port){bool ret = _socket.CreateServer(port);assert(ret == true);return _socket.Fd();}public:/* 不能将启动读监控,放到构造函数中,必须在设置回调函数后,再去启动*//* 否则有可能造成启动监控后,立即有事件,处理的时候回调函数还没有设置:新连接得不到处理,且资源泄漏*/Acceptor(EventLoop *loop, int port) : _socket(CreateServer(port)), _loop(loop),_channel(loop, _socket.Fd()){_channel.SetReadCallback(std::bind(&Acceptor::HandleRead, this));}void SetAcceptCallback(const AcceptCallback &cb) { _accept_callback = cb; }void Listen() { _channel.EnableRead(); }
};// TcpServer服务器管理模块(即全部模块的整合)
class TcpServer
{
private:uint64_t _next_id; // 这是一个自动增长的连接IDint _port;int _timeout;                                       // 这是非活跃连接的统计时间--多长时间无通信就是非活跃连接bool _enable_inactive_release;                      // 是否启动非活跃连接超时销毁的判断标志EventLoop _baseloop;                                // 这是主线程的EventLoop对象,负责监听事件的处理Acceptor _acceptor;                                 // 这是监听套接字的管理对象LoopThreadPool _pool;                               // 这是从属EventLoop线程池std::unordered_map<uint64_t, PtrConnection> _conns; // 保管所有连接对应的share_ptr对象,这里面的对象被删除,就意味这某一个连接被删除using ConnectedCallback = std::function<void(const PtrConnection &)>;using MessageCallback = std::function<void(const PtrConnection &, Buffer *)>;using ClosedCallback = std::function<void(const PtrConnection &)>;using AnyEventCallback = std::function<void(const PtrConnection &)>;using Functor = std::function<void()>;ConnectedCallback _connected_callback;MessageCallback _message_callback;ClosedCallback _closed_callback;AnyEventCallback _event_callback;private:void RunAfterInLoop(const Functor &task, int delay){_next_id++;_baseloop.TimerAdd(_next_id, delay, task);}// 为新连接构造一个Connection进行管理void NewConnection(int fd){_next_id++;PtrConnection conn(new Connection(_pool.NextLoop(), _next_id, fd));conn->SetMessageCallback(_message_callback);conn->SetClosedCallback(_closed_callback);conn->SetConnectedCallback(_connected_callback);conn->SetAnyEventCallback(_event_callback);conn->SetSrvClosedCallback(std::bind(&TcpServer::RemoveConnection, this, std::placeholders::_1));if (_enable_inactive_release)conn->EnableInactiveRelease(10); // 启动非活跃超时销毁conn->Established();                 // 就绪初始化_conns.insert(std::make_pair(_next_id, conn));}void RemoveConnectionInLoop(const PtrConnection &conn){int id = conn->Id();auto it = _conns.find(id);if (it != _conns.end()){_conns.erase(it);}}// 从管理Connection的_conns移除连接信息void RemoveConnection(const PtrConnection &conn){_baseloop.RunInLoop(std::bind(&TcpServer::RemoveConnectionInLoop, this, conn));}public:TcpServer(int port) : _port(port),_next_id(0),_enable_inactive_release(false),_acceptor(&_baseloop, port),_pool(&_baseloop){_acceptor.SetAcceptCallback(std::bind(&TcpServer::NewConnection, this, std::placeholders::_1));_acceptor.Listen(); // 将监听套接字挂到baseloop上}void SetThreadCount(int count) { return _pool.SetThreadCount(count); }void SetConnectedCallback(const ConnectedCallback &cb) { _connected_callback = cb; }void SetMessageCallback(const MessageCallback &cb) { _message_callback = cb; }void SetClosedCallback(const ClosedCallback &cb) { _closed_callback = cb; }void SetAnyEventCallback(const AnyEventCallback &cb) { _event_callback = cb; }void EnableInactiveRelease(int timeout) { _timeout = timeout, _enable_inactive_release = true; }// 用于添加一个定时任务void RunAfter(const Functor &task, int delay){_baseloop.RunInLoop(std::bind(&TcpServer::RunAfterInLoop, this, task, delay));}void Start(){_pool.Create(); // 创建线程池中的从属线程_baseloop.Start();}
};// 移除监控
void Channel::Remove() { return _loop->RemoveEvent(this); }
void Channel::Update() { return _loop->UpdateEvent(this); }
void TimerWheel::TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb)
{_loop->RunInLoop(std::bind(&TimerWheel::TimerAddInLoop, this, id, delay, cb));
}
void TimerWheel::TimerRefresh(uint64_t id)
{_loop->RunInLoop(std::bind(&TimerWheel::TimerRefreshInLoop, this, id));
}
void TimerWheel::TimerCancel(uint64_t id)
{_loop->RunInLoop(std::bind(&TimerWheel::TimerCancelInLoop, this, id));
}// 忽略SIGPIPE信号,当连接断开的时候,如果我们继续向对端send发送信息,就会触发异常,即SIGPIPE异常,这个就是导致客户端异常退出的原因
class NetWork{public:NetWork(){DBG_LOG("SIGPIPE INIT");signal(SIGPIPE, SIG_IGN); // 忽视SIGPIPE异常,这个会导致进程退出}
};
static NetWork nw;  // 这个是为了执行里面的构造函数
// 预编译是为了防止头文件重复包含
#endif

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/711914.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux学习-C语言-运算符

目录 算术运算符&#xff1a; - * /:不能除0 %:不能对浮点数操作 &#xff1a;自增与运算符 i&#xff1a;先用再加 i:先加再用 --&#xff1a;自减运算符 常量&#xff0c;表达式不可以&#xff0c;--&#xff0c;变量可以 赋值运算符 三目运算符 逗号表达式 size…

alpine创建lnmp环境alpine安装nginx+php5.6+mysql

前言 制作lnmp环境&#xff0c;你可以在alpine基础镜像中安装相关的服务&#xff0c;也可以直接使用Dockerfile创建自己需要的环境镜像。 注意&#xff1a;提前确认自己的alpine版本&#xff0c;本次创建基于alpine3.6进行创建&#xff0c;官方在一些版本中删除了php5 1、拉取…

JS正则02——js正则表达式中常用的方法、常见修饰符的使用详解以及各种方法使用情况示例

JS正则02——js正则表达式中常用的方法、常见修饰符的使用详解以及各种方法使用情况示例 1. 前言1.1 简介1.2 js正则特殊字符即使用示例 2. 创建正则表达式的方式2.1 两种创建正则表达式的方式2.2 关于修饰符 3. 正则表达式中常用的方法3.1 test() 方法——正则表达式对象的方法…

Vue之监测数据的原理(对象)

大家有没有想过&#xff0c;为什么vue可以监测到数据发生改变&#xff1f;其实底层借助了Object.defineProperty&#xff0c;底层有一个Observer的构造函数 让我为大家简单的介绍一下吧&#xff01; 我用对象为大家演示一下 const vm new Vue({el: "#app",data: {ob…

文献速递:帕金森的疾病分享--多模态机器学习预测帕金森病

文献速递&#xff1a;帕金森的疾病分享–多模态机器学习预测帕金森病 Title 题目 Multi-modality machine learning predicting Parkinson’s disease 多模态机器学习预测帕金森病 01 文献速递介绍 对于渐进性神经退行性疾病&#xff0c;早期和准确的诊断是有效开发和使…

【精品】集合list去重

示例一&#xff1a;对于简单类型&#xff0c;比如String public static void main(String[] args) {List<String> list new ArrayList< >();list.add("aaa");list.add("bbb");list.add("bbb");list.add("ccc");list.add(…

网络工程师必备的网络端口大全(建议收藏)

端口是一种数字标识&#xff0c;用于在计算机网络中进行通信&#xff0c;你完全可以把端口简单的理解为是计算机和外界通讯交流的出口。但在网络技术中&#xff0c;端口一般有两种含义&#xff1a; &#xff08;1&#xff09;硬件设备中的端口 如交换机、路由器中用于链接其他…

“金三银四”招聘季,大厂争招鸿蒙人才

在金三银四的招聘季中&#xff0c;各大知名互联网企业纷纷加入了对鸿蒙人才的争夺战。近日&#xff0c;包括淘宝、京东、得物等在内的知名APP均宣布启动鸿蒙星河版原生应用开发计划。这一举措不仅彰显了鸿蒙生态系统的迅猛发展&#xff0c;还催生了人才市场的繁荣景象。据数据显…

遥感影像处理(ENVI+ChatGPT+python+ GEE)处理高光谱及多光谱遥感数据

遥感技术主要通过卫星和飞机从远处观察和测量我们的环境&#xff0c;是理解和监测地球物理、化学和生物系统的基石。ChatGPT是由OpenAI开发的最先进的语言模型&#xff0c;在理解和生成人类语言方面表现出了非凡的能力。本文重点介绍ChatGPT在遥感中的应用&#xff0c;人工智能…

Vue——携带参数跳转路由

Vue学习之——跳转路由 前情回顾 当我们进行点击修改时&#xff0c;会进行跳转到修改页面&#xff0c;为了完成回显数据&#xff08;根据对应id查找&#xff09;&#xff0c;我们需要携带对应选择中的id跳转到修改页面&#xff0c;让其进行查找回显 学习useRoute和useRoute…

webstorm2023.3.4安装与破解

WebStorm安装步骤 打开JetBrains官方网站&#xff08;https://www.jetbrains.com/webstorm/&#xff09; 运行.exe 选择安装路径 第一个意思是是否创建桌面快捷方式&#xff0c;可根据需要选择&#xff1b;第二个.js .css .html勾选后之后js css html文件默认会用webstor…

AI Agent

目录 一、什么是Agent 二、什么是MetaGPT【多智能体框架介绍】 三、MetaGPT的背景 一、什么是Agent 智能体 LLM观察思考行动记忆 Agent&#xff08;智能体&#xff09; 一个设置了一些目标或任务&#xff0c;可以迭代运行的大型语言模型。这与大型语言模型&#xff08;LLM&am…

985机械研一转码,java还是c++?

985机械研一转码&#xff0c;java还是c&#xff1f; 在开始前我分享下我的经历&#xff0c;我刚入行时遇到一个好公司和师父&#xff0c;给了我机会&#xff0c;一年时间从3k薪资涨到18k的&#xff0c; 我师父给了一些 电气工程师学习方法和资料&#xff0c;让我不断提升自己&…

【OpenGL的着色器03】内置变量(gl_Position等)

目录 一、说明 二、着色器的变量 2.1 着色器变量 2.2 着色器内置变量 三、最常见内置变量使用范例 3.1 常见着色器变量 3.2 示例1&#xff1a; gl_PointSize 3.3 示例2&#xff1a;gl_Position 3.4 gl_FragColor 3.5 渲染点片元坐标gl_PointCoord 3.6 gl_PointCoo…

【PyTorch][chapter 20][李宏毅深度学习]【无监督学习][ GAN]【实战】

前言 本篇主要是结合手写数字例子,结合PyTorch 介绍一下Gan 实战 第一轮训练效果 第20轮训练效果,已经可以生成数字了 68 轮 目录&#xff1a; 谷歌云服务器&#xff08;Google Colab&#xff09; 整体训练流程 Python 代码 一 谷歌云服务器&#xff08;Google Colab&…

Open CASCADE学习|曲线曲面连续性

1、曲线的连续性 曲线的连续性是三维建模、动画设计等领域中非常重要的一个概念&#xff0c;它涉及到曲线在不同点之间的连接方式和光滑程度。下面将详细介绍曲线的连续性&#xff0c;包括C连续性和G连续性。 1.1C连续性&#xff08;参数连续性&#xff09; C连续性是指曲线…

使用MyBatisPlus实现向数据库中存储List类型的数据

使用MyBatisPlus实现向数据库中存储List类型的数据 问题描述 建表时&#xff0c;表中的这五个字段为json类型 但是在入库的时候既不能写入数据&#xff0c;也不能查询出数据。 解决方案&#xff1a; 1.首先明确&#xff0c;数据存入的时候是经过了数据类型转化&#xff0c…

数据之光:探索数据库技术的演进之路

✨✨ 欢迎大家来访Srlua的博文&#xff08;づ&#xffe3;3&#xffe3;&#xff09;づ╭❤&#xff5e;✨✨ &#x1f31f;&#x1f31f; 欢迎各位亲爱的读者&#xff0c;感谢你们抽出宝贵的时间来阅读我的文章。 我是Srlua&#xff0c;在这里我会分享我的知识和经验。&#x…

喜讯!持安科技CEO何艺获评安全419《2023年度十大优秀创业者》

近日&#xff0c;由网络安全产业资讯媒体安全419主办的《年度策划》2023年度十大优秀创业者正式出炉&#xff0c;零信任办公安全技术创新企业持安科技创始人兼CEO何艺&#xff0c;获评十大优秀创业者。 这是安全419第二届推出该项目的评选活动&#xff0c;安全419编辑老师在多年…

抽象类、模板方法模式

抽象类概述 在Java中abstract是抽象的意思&#xff0c;如果一个类中的某个方法的具体实现不能确定&#xff0c;就可以申明成abstract修饰的抽象方法&#xff08;不能写方法体了&#xff09;&#xff0c;这个类必须用abstract修饰&#xff0c;被称为抽象类。 抽象方法定义&…