多路转接之poll
- 一、关于poll
- 认识poll
- 基于poll实现的服务器的原理
- 二、基于poll实现的服务器
- main.cpp
- pollServer.hpp
- sock.hpp
- Log.hpp
- public.hpp
一、关于poll
认识poll
#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
struct pollfd {int fd; /* file descriptor */short events; /* requested events */short revents; /* returned events */
};
poll的返回值ret:
ret<0,poll调用失败。
ret=0,poll超时等待,没有文件描述符的事件就绪。
ret>0,有ret个文件描述符现在事件就绪。
poll的第一个选项fds是一个动态数组,数组中每个元素是struct pollfd的结构体。
poll的第二个选项是动态数组的大小。
poll的最后一个选项timeout:
timeout>0:在timeout毫秒内没有文件描述符的事件就绪,就一直阻塞,直到timeout返回。
timeout=0:select非阻塞。
timeout<0:select阻塞等待。
用户想要内核监视哪个文件描述符的哪个事件,只需要定义struct pollfd结构体,将将文件描述符和事件传给结构体中的fd和events,再将结构体加入fds数组中,让poll监视数组中的文件描述符事件。
内核通过数组中struct pollfd结构,告诉用户哪个文件描述符的哪个事件就绪,在结构体中的fd和revents代表,文件描述符fd的revnents事件就绪。
两条命令:
uname -a;查看linux操作系统信息。
ulimit -a;查看进程可以使用的资源。
event和revent的取值:
基于poll实现的服务器的原理
维护一个struct pollfd结构体的数组,存储需要监视的文件描述符的事件。
将数组的第一个结构体来设置成监听套接字的读事件。
进行poll系统调用,用户将想要监视的文件描述符事件告诉操作系统内核。
若有文件描述符事件就绪poll就返回大于0的数。
用户通过遍历struct pollfd结构体数组,找到revent不为0的结构体,对就绪文件描述符事件做处理。
二、基于poll实现的服务器
main.cpp
#include "pollServer.hpp"
#include <memory>
using namespace std;
using namespace poll_ns;
std::string handle(const std::string& request){return request;
}
static void usage(string proc){cerr<<proc<<" need a port!!!!"<<endl;
}
int main(int argc,char* argv[]){if(argc!=2){usage(argv[0]);exit(1);}uint16_t port=atoi(argv[1]);//实现一个基于poll的只处理读事件的Serverunique_ptr<pollServer> svr(new pollServer(handle,port));svr->init();svr->start();return 0;
}
pollServer.hpp
#pragma once
#include <iostream>
#include "sock.hpp"
#include "public.hpp"
#include <errno.h>
#include <cstring>
#include <sys/select.h>
#include <functional>
#include <poll.h>
using namespace std;
namespace poll_ns{class pollServer{private:static const uint16_t defaultPort=8080;static const int num=2048;static const int defaultfd=-1;using func_t=std::function<std::string(const std::string)>;public://打印有效文件描述符void print(){cout<<"socket list:"<<endl;for(int i=0;i<num;++i){if(_rfds[i].fd!=defaultfd)cout<<_rfds[i].fd<<" ";}cout<<endl;}pollServer(func_t func,uint16_t port=defaultPort):_func(func),_port(port),_listenSockfd(-1),_rfds(nullptr){}void init(){_listenSockfd=Sock::Socket();Sock::Bind(_listenSockfd,_port);Sock::Listen(_listenSockfd);_rfds=new struct pollfd[num];for(int i=0;i<num;++i){resetItem(i);}_rfds[0].fd=_listenSockfd;_rfds[0].events=POLLIN;}//监听套接字的读事件就绪void accepter(){//listenSockfd监听套接字的读事件就绪,也就是监听套接字下有客户端申请新连接string clientIp;uint16_t clientPort;int sockfd=Sock::Accept(_listenSockfd,&clientIp,&clientPort);if(sockfd==-1)return;log(NORMAL,"accept a new link from %s-%d,sockfd=%d",clientIp.c_str(),clientPort,sockfd);int i=0;for(;i<num;++i){if(_rfds[i].fd==defaultfd)break;}if(i==num){log(WARNING,"fdarray is full!!! need close fd");close(sockfd);}else{//将连接套接字文件描述符需要监视的事件加入struct pollfd数组中//下一次poll就可监视连接套接字的状态的了_rfds[i].fd=sockfd;_rfds[i].events=POLLIN;_rfds[i].revents=0;}print();}void resetItem(int i){_rfds[i].fd=defaultfd;_rfds[i].events=0;_rfds[i].revents=0;}//连接套接字的读事件就绪void recver(int pos){char buffer[1024];//不能保证读完一个完整的应用层报文ssize_t n=recv(_rfds[pos].fd,buffer,sizeof(buffer)-1,0);if(n>0){buffer[n]=0;std::cout<<"read a meassge:"<<buffer<<std::endl;}//连接被关闭else if(n==0){close(_rfds[pos].fd);resetItem(pos);std::cout<<"client close sockfd!!!"<<std::endl;}else{close(_rfds[pos].fd);resetItem(pos);std::cerr<<"recv error"<<std::endl;}//buffer反序列化获得request//func处理request,获取response//response反序列化,获得返回信息std::string response= _func(buffer);write(_rfds[pos].fd,response.c_str(),response.size());}//hanlderevents处理读事件void handlerReadevents(){//监听套接字的读事件就绪if(_rfds[0].revents&POLLIN)accepter();for(int i=1;i<num;++i){if(_rfds[i].fd==defaultfd)continue;//连接套接字的读事件就绪if(!(_rfds[i].events&POLLIN))continue;if(_rfds[i].revents&POLLIN)recver(i);//else if(){}//else if(){}}}void start(){//只处理读事件int timeout=-1;while(true){int n=poll(_rfds,num,timeout);switch(n){case 0:cout<<"poll timeout"<<endl;log(NORMAL,"poll timeout");break;case -1:log(ERROR,"poll error,errno=%d,strerror=%s",errno,strerror(errno));break;default://有读事件就绪//找到哪个套接字的读事件就绪了handlerReadevents();break;}sleep(1);}}~pollServer(){if(_listenSockfd>0)close(_listenSockfd);delete[] _rfds;}private:int _listenSockfd;uint16_t _port;struct pollfd* _rfds;func_t _func;};
}
sock.hpp
#pragma oncce
#include <iostream>
#include <string>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <strings.h>
#include <unistd.h>
#include <sys/wait.h>
#include "Log.hpp"
#include "public.hpp"
using namespace std;class Sock{static const int backlog=5;
public://创建套接字文件描述符static int Socket(){int sockfd=socket(AF_INET,SOCK_STREAM,0);if(sockfd==-1){log(ERROR,"create a socket error");exit(SOCKET_ERR);}else log(NORMAL,"create socket success, sockfd=%d",sockfd);//地址复用int opt=1;setsockopt(sockfd,SOL_SOCKET,SO_REUSEADDR|SO_REUSEPORT,&opt,sizeof(opt));return sockfd;}//给套接字绑定端口号static void Bind(int sockfd,uint16_t port){struct sockaddr_in local;socklen_t len=sizeof(local);bzero(&local,len);local.sin_family=AF_INET;local.sin_addr.s_addr=INADDR_ANY;local.sin_port=htons(port);int n=bind(sockfd,(const struct sockaddr*)&local,len);if(n==-1){log(ERROR,"bind sockfd error");exit(BIND_ERR);}else log(NORMAL,"bind sockfd success");}//监听套接字static void Listen(int sockfd){int n=listen(sockfd,backlog);if(n==-1){log(ERROR,"listen sockfd error");exit(BIND_ERR);}else log(NORMAL,"listen sockfd success");}//获取新连接套接字static int Accept(int listenSockfd,string* clientIp,uint16_t* clientPort){struct sockaddr_in client;socklen_t len=sizeof(client);bzero(&client,len);int sockfd=accept(listenSockfd,(struct sockaddr*)&client,&len);if(sockfd==-1)log(ERROR,"accept sockfd error");else {log(NORMAL,"accept a link socket=%d",sockfd);*clientIp=inet_ntoa(client.sin_addr);*clientPort=ntohs(client.sin_port);}return sockfd;}
};
Log.hpp
#pragma once
#include <iostream>
#include <time.h>
#include <stdarg.h>
#include <cstdio>
#define NORMAL 0
#define DEBUG 1
#define WARNING 2
#define ERROR 3
#define FATAL 4
using namespace std;
class LogMessage{
private:string log_path="./log.txt";string err_path="./err.txt";static string getLevel(int level){switch(level){case NORMAL:return "NORMAL";break;case DEBUG:return "DEBUG";break;case WARNING:return "WARNING";break;case ERROR:return "ERROR";break;case FATAL:return "FATAL";break;default:return "OTHER";}}string getTime(){time_t now=time(nullptr);struct tm* t=localtime(&now);int year=t->tm_year+1900;int mon=t->tm_mon+1;int day=t->tm_mday;int hour=t->tm_hour;int min=t->tm_min;int sec=t->tm_sec;char buffer[64];sprintf(buffer,"%d-%02d-%02d %02d:%02d:%02d",year,mon,day,hour,min,sec);return buffer;}
public:void operator()(int level,const char* format,...){string lev=getLevel(level);string time=getTime();va_list list;va_start(list,format);char msg[1024];vsnprintf(msg,sizeof(msg),format,list);FILE* log=fopen(log_path.c_str(),"a+");FILE* err=fopen(err_path.c_str(),"a+");if(log!=nullptr&&err!=nullptr){FILE* cur=nullptr;if(level==NORMAL||level==DEBUG){cur=log;}else cur=err;fprintf(cur,"[%s][%s][%s]\n",lev.c_str(),time.c_str(),msg);fclose(log);fclose(err);} }
};
static LogMessage log;
public.hpp
#pragma once
#include <iostream>
enum{SOCKET_ERR=1,BIND_ERR,LISTEN_ERR,ACCEPT_ERR,USAGE_ERR};