将fd设置成非阻塞
void setNonBlock(int fd)
{int res = fcntl(fd, F_GETFL);if (res < 0){std::cerr << "错误" << strerror(errno) << std::endl;return;}fcntl(fd, F_SETFL, res | O_NONBLOCK);
}
select
1.select系统调用是用来让我们的程序监视多个文件描述符的状态变化的;
2.程序会停在select这里等待,直到被监视的文件描述符有一个或多个发生了状态改变;
select函数原型
int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout);
参数解释
参数nfds是需要监视的最大的文件描述符值+1;
rdset,wrset,exset分别对应于需要检测的可读文件描述符的集合、可写文件描述符的集合及异常文件描述符的集合;
参数timeout为结构timeval,用来设置select()的等待时间
参数timeout取值:
NULL:则表示select()没有timeout,select将一直被阻塞,直到某个文件描述符上发生了事件;
0:仅检测描述符集合的状态,然后立即返回,并不等待外部事件的发生。
特定的时间值:如果在指定的时间段里没有事件发生,select将超时返回。
关于fd_set结构
其实fd_set这个结构就是一个整数数组, 更严格的说, 是一个 "位图". 使用位图中对应的位来表示要监视的文件描述符.提供了一组操作fd_set的接口, 来比较方便的操作位图
void FD_CLR(int fd, fd_set *set); // 用来清除描述词组set中相关fd 的位 int FD_ISSET(int fd, fd_set *set); // 用来测试描述词组set中相关fd 的位是否为真 void FD_SET(int fd, fd_set *set); // 用来设置描述词组set中相关fd的位 void FD_ZERO(fd_set *set); // 用来清除描述词组set的全部位
理解select执行过程
(1) 执行fd_set set; FD_ZERO(&set);则set用位表示是0000,0000。
(2) 若fd=5,执行FD_SET(fd,&set);后set变为0001,0000(第5位置为1)
(3) 若再加入fd=2,fd=1,则set变为0001,0011
(4) 执行select(6,&set,0,0,0)阻塞等待
(5) 若fd=1,fd=2上都发生可读事件,则select返回,此时set变为0000,0011。
注意:没有事件发生的fd=5被清空。
select缺点
1.每次调用select, 都需要手动设置fd集合, 从接口使用角度来说也非常不便.
2.每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多时会很大
3.同时每次调用select都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很大
4.select支持的文件描述符数量太小.
selectServer代码
#pragma once#include "sock.hpp"
#include <algorithm>
#include <functional>
int max(int a, int b)
{return a > b ? a : b;
}
namespace select_cbr
{static const int default_port = 8080;static const int fd_num = sizeof(fd_set) * 8;static const int defaultfd = -1;using func = std::function<std::string(const std::string &)>;class SelectServer{public:SelectServer(func funct, int port = default_port) : _func(funct), _port(port), _listensock(-1), _fdarray(nullptr){}void initServer(){_listensock = Sock::Socket();Sock::Bind(_listensock, _port);Sock::Listen(_listensock);_fdarray = new int[fd_num];for (int i = 0; i < fd_num; i++)_fdarray[i] = defaultfd;_fdarray[0] = _listensock;}void Print(){std::cout << "fd list: ";logMessage(DEBUG, "fd[0]==%d", _fdarray[0]);for (int i = 0; i < fd_num; i++){if (_fdarray[i] != defaultfd)std::cout << _fdarray[i] << " ";}std::cout << std::endl;}void Accepter(int listensock){std::string clientIp;uint16_t clientPort;int sock = Sock::Accept(listensock, &clientIp, &clientPort);if (sock < 0)return;logMessage(NORMAL, "accept success [%s:%d]", clientIp.c_str(), clientPort);int i = 0;for (; i < fd_num; i++){if (_fdarray[i] == defaultfd)break;}if (i == fd_num){logMessage(WARNING, "server if full, please wait");close(sock);}else{_fdarray[i] = sock;}Print();}void Recver(int sock, int pos){logMessage(DEBUG, "in recver");char buffer[1024];ssize_t s = recv(sock, buffer, sizeof(buffer) - 1, 0);logMessage(DEBUG, "s===%d", s);if (s > 0){buffer[s] = 0;logMessage(NORMAL, "client# %s", buffer);}else if (s == 0){close(sock);_fdarray[pos] = defaultfd;logMessage(NORMAL, "client quit");return;}else{close(sock);_fdarray[pos] = defaultfd;logMessage(ERROR, "client quit: %s", strerror(errno));return;}std::string response = _func(buffer);write(sock, response.c_str(), response.size());logMessage(DEBUG, "out recver");}void HanderEvent(fd_set &rfds){for (int i = 0; i < fd_num; i++){if (_fdarray[i] == defaultfd)continue;if (FD_ISSET(_fdarray[i], &rfds) && _fdarray[i] == _listensock){logMessage(DEBUG, "1111");Accepter(_listensock);}else if (FD_ISSET(_fdarray[i], &rfds)){logMessage(DEBUG, "2222");Recver(_fdarray[i], i);}}}void start(){int cnt = 1;while (1){fd_set rfds;FD_ZERO(&rfds);int maxfd = 0;for (int i = 0; i < fd_num; i++){if (_fdarray[i] == defaultfd)continue;FD_SET(_fdarray[i], &rfds);maxfd = max(maxfd, _fdarray[i]);}logMessage(DEBUG, "maxfd===%d", maxfd);// struct timeval timeout = {0, 0};int n = select(maxfd + 1, &rfds, nullptr, nullptr, nullptr);logMessage(DEBUG, "cnt==%d", cnt);cnt++;if (n == -1){logMessage(WARNING, "select error,code:%d,err string:%s", errno, strerror(errno));}else if (n == 0){logMessage(NORMAL, "timeout...");}else{logMessage(NORMAL, "get a new link....");HanderEvent(rfds);}sleep(1);}}~SelectServer(){if (_listensock < 0)close(_listensock);if (_fdarray)delete[] _fdarray;}private:int _port;int _listensock;int *_fdarray;func _func;};
}
poll
poll函数接口
#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
pollfd结构
struct pollfd { int fd; /* file descriptor */ short events; /* requested events */ short revents; /* returned events */
};
参数说明
fds是一个poll函数监听的结构列表. 每一个元素中, 包含了三部分内容: 文件描述符, 监听的事件集合, 返回的事件集合.
nfds表示fds数组的长度.
timeout表示poll函数的超时时间, 单位是毫秒(ms).
pollServer代码
#pragma once#include "sock.hpp"
#include <algorithm>
#include <functional>
#include <poll.h>
int max(int a, int b)
{return a > b ? a : b;
}
namespace poll_cbr
{static const int default_port = 8080;static const int defaultfd = -1;static const int fd_num = 2048;using func = std::function<std::string(const std::string &)>;class PollServer{public:PollServer(func funct, int port = default_port) : _func(funct), _port(port), _listensock(-1), _rfds(nullptr){}void ResetItem(int pos){_rfds[pos].fd = defaultfd;_rfds[pos].events = 0;_rfds[pos].revents = 0;}void initServer(){_listensock = Sock::Socket();Sock::Bind(_listensock, _port);Sock::Listen(_listensock);_rfds = new struct pollfd[fd_num];for (int i = 0; i < fd_num; i++){ResetItem(i);}_rfds[0].fd = _listensock;_rfds[0].events = POLLIN;}void Print(){std::cout << "fd list: ";logMessage(DEBUG, "fd[0]==%d", _rfds[0].fd);for (int i = 0; i < fd_num; i++){if (_rfds[i].fd != defaultfd)std::cout << _rfds[i].fd << " ";}std::cout << std::endl;}void Accepter(){std::string clientIp;uint16_t clientPort;int sock = Sock::Accept(_listensock, &clientIp, &clientPort);if (sock < 0)return;logMessage(NORMAL, "accept success [%s:%d]", clientIp.c_str(), clientPort);int i = 0;for (; i < fd_num; i++){if (_rfds[i].fd == defaultfd)break;}if (i == fd_num){logMessage(WARNING, "server if full, please wait");close(sock);}else{_rfds[i].fd = sock;_rfds[i].events = POLLIN;_rfds[i].revents = 0;}Print();}void Recver(int pos){logMessage(DEBUG, "in recver");char buffer[1024];ssize_t s = recv(_rfds[pos].fd, buffer, sizeof(buffer) - 1, 0);logMessage(DEBUG, "s===%d", s);if (s > 0){buffer[s] = 0;logMessage(NORMAL, "client# %s", buffer);}else if (s == 0){close(_rfds[pos].fd);ResetItem(pos);logMessage(NORMAL, "client quit");return;}else{close(_rfds[pos].fd);ResetItem(pos);logMessage(ERROR, "client quit: %s", strerror(errno));return;}std::string response = _func(buffer);write(_rfds[pos].fd, response.c_str(), response.size());logMessage(DEBUG, "out recver");}void HanderEvent(){for (int i = 0; i < fd_num; i++){if (_rfds[i].fd == defaultfd)continue;if (!(_rfds[i].events & POLLIN))continue;if (_rfds[i].fd == _listensock && (_rfds[i].revents & POLLIN))Accepter();else if (_rfds[i].revents & POLLIN)Recver(i);}}void start(){int timeout = -1;while (1){int n = poll(_rfds, fd_num, timeout);if (n == -1){logMessage(WARNING, "poll error,code:%d,err string:%s", errno, strerror(errno));}else if (n == 0){logMessage(NORMAL, "timeout...");}else{logMessage(NORMAL, "get a new link....");HanderEvent();}sleep(1);}}~PollServer(){if (_listensock < 0)close(_listensock);if (_rfds)delete[] _rfds;}private:int _port;int _listensock;struct pollfd *_rfds;func _func;};
}
epoll
epoll的相关系统调用
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
epollServer
#pragma once#include <iostream>
#include <string>
#include <cstring>
#include <functional>
#include <sys/epoll.h>
#include "err.hpp"
#include "log.hpp"
#include "sock.hpp"namespace epoll_cbr
{static const int defaultport = 8888;static const int size = 128;static const int defaultvalue = -1;static const int defalultnum = 64;using func_t = std::function<std::string(const std::string &)>;class EpollServer{public:EpollServer(func_t f, uint16_t port = defaultport, int num = defalultnum): func_(f), _num(num), _revs(nullptr), _port(port), _listensock(defaultvalue), _epfd(defaultvalue){}void initServer(){// 1. 创建socket_listensock = Sock::Socket();Sock::Bind(_listensock, _port);Sock::Listen(_listensock);// 2. 创建epoll模型_epfd = epoll_create(size);if (_epfd < 0){logMessage(FATAL, "epoll create error: %s", strerror(errno));exit(EPOLL_CREATE_ERR);}// 3. 添加listensock到epoll中struct epoll_event ev;ev.events = EPOLLIN | EPOLLET;ev.data.fd = _listensock;epoll_ctl(_epfd, EPOLL_CTL_ADD, _listensock, &ev);// 4. 申请就绪事件的空间_revs = new struct epoll_event[_num];logMessage(NORMAL, "init server success");}void HandlerEvent(int readyNum){logMessage(DEBUG, "HandlerEvent in");for (int i = 0; i < readyNum; i++){uint32_t events = _revs[i].events;int sock = _revs[i].data.fd;if (sock == _listensock && (events & EPOLLIN)){//_listensock读事件就绪, 获取新连接std::string clientip;uint16_t clientport;int fd = Sock::Accept(sock, &clientip, &clientport);if (fd < 0){logMessage(WARNING, "accept error");continue;}// 获取fd成功,放入epollstruct epoll_event ev;ev.events = EPOLLIN;ev.data.fd = fd;epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &ev);}else if (events & EPOLLIN){// 普通的读事件就绪char buffer[1024];int n = recv(sock, buffer, sizeof(buffer), 0);logMessage(DEBUG, "n==%d", n);if (n > 0){buffer[n] = 0;logMessage(DEBUG, "client# %s", buffer);std::string response = func_(buffer);send(sock, response.c_str(), response.size(), 0);}else if (n == 0){// 先从epoll移除,再close fdepoll_ctl(_epfd, EPOLL_CTL_DEL, sock, nullptr);close(sock);logMessage(NORMAL, "client quit");}else{// 先从epoll移除,再close fdepoll_ctl(_epfd, EPOLL_CTL_DEL, sock, nullptr);close(sock);logMessage(ERROR, "recv error, code: %d, errstring: %s", errno, strerror(errno));}}else{}}logMessage(DEBUG, "HandlerEvent out");}void start(){int timeout = -1;while (1){int n = epoll_wait(_epfd, _revs, _num, timeout);switch (n){case 0:logMessage(NORMAL, "timeout ...");break;case -1:logMessage(WARNING, "epoll_wait failed, code: %d, errstring: %s", errno, strerror(errno));break;default:logMessage(NORMAL, "have event ready");HandlerEvent(n);break;}}}~EpollServer(){if (_listensock != defaultvalue)close(_listensock);if (_epfd != defaultvalue)close(_epfd);if (_revs)delete[] _revs;}private:uint16_t _port;int _listensock;int _epfd;struct epoll_event *_revs;int _num;func_t func_;};
}