处理大并发之二 对epoll的理解,epoll客户端服务端代码

http://blog.csdn.net/wzjking0929/article/details/51838370

序言:

该博客是一系列的博客,首先从最基础的epoll说起,然后研究libevent源码及使用方法,最后研究nginx和node.js,关于select,poll这里不做说明,只说明其相对于epoll的不足,其实select和poll我也没用过,因为我选择了epoll。

说起epoll,做过大并发的估计都不陌生,之前做了个STB的工具,用的就是epoll处理并发,测试1.5W的并发(非简单的发消息,包括多个服务端和客户端的处理)是通过的,epoll的功能强大,让我有一定的体会,在nginx和node.js中利用的就是libevent,而libevent使用的就是epoll,如果系统不支持epoll,会选择最佳的方式(select,poll或kqueue中的一种)。

基础资料:

epoll名词解释:

看下百科名片是怎么解释epoll的吧,(http://baike.baidu.com/view/1385104.htm)epoll是Linux内核为处理大批量句柄而作了改进的poll,是Linux下多路复用IO接口select/poll的增强版本,它能显著减少程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率。

关于具体的说明可以参考网上资料,我这里罗列下epoll相对于其他多路复用机制(select,poll)的优点吧:

epoll优点:

1. 支持一个进程打开大数目的socket描述符。

2. IO效率不随FD数目增加而线性下降,传统的select/poll每次调用都会线性扫描全部的集合,导致效率呈现线性下降。

3. 使用mmap加速内核与用户空间的消息传递。无论是select,poll还是epoll都需要内核把FD消息通知给用户空间,如何避免不必要的内存拷贝就很重要,在这点上,epoll是通过内核于用户空间mmap同一块内存实现的。

select和poll的缺点:

1. 每次调用时要重复地从用户态读入参数。

2. 每次调用时要重复地扫描文件描述符。

3. 每次在调用开始时,要把当前进程放入各个文件描述符的等待队列。在调用结束后,又把进程从各个等待队列中删除。

分析libevent的时候,发现一个博客介绍epoll的不错,网址是:http://blog.csdn.net/sparkliang/article/details/4770655

epoll用的的数据结构和函数

数据结构 

typedef union epoll_data { 
                void *ptr; 
                int fd; 
                __uint32_t u32; 
                __uint64_t u64; 
        } epoll_data_t; 

        struct epoll_event { 
                __uint32_t events;      /* epoll events */ 
                epoll_data_t data;      /* user data variable */ 
        }; 

结构体epoll_event 被用于注册所感兴趣的事件和回传所发生待处理的事件. 
其中epoll_data 联合体用来保存触发事件的某个文件描述符相关的数据. 
例如一个client连接到服务器,服务器通过调用accept函数可以得到于这个client对应的socket文件描述符,可以把这文件描述符赋给epoll_data的fd字段以便后面的读写操作在这个文件描述符上进行。epoll_event 结构体的events字段是表示感兴趣的事件和被触发的事件可能的取值为: 
EPOLLIN :表示对应的文件描述符可以读; 
EPOLLOUT:表示对应的文件描述符可以写; 
EPOLLPRI:表示对应的文件描述符有紧急的数据可读 
EPOLLERR:表示对应的文件描述符发生错误; 
EPOLLHUP:表示对应的文件描述符被挂断; 
EPOLLET:表示对应的文件描述符有事件发生; 

ET和LT模式

LT(level triggered)是缺省的工作方式,并且同时支持block和no-block socket.在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的,所以,这种模式编程出错误可能性要小一点。传统的select/poll都是这种模型的代表。

ET (edge-triggered)是高速工作方式,只支持no-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知,直到你做了某些操作导致那个文件描述符不再为就绪状态了(比如,你在发送,接收或者接收请求,或者发送接收的数据少于一定量时导致了一个EWOULDBLOCK 错误)。但是请注意,如果一直不对这个fd作IO操作(从而导致它再次变成未就绪),内核不会发送更多的通知(only once),不过在TCP协议中,ET模式的加速效用仍需要更多的benchmark确认。

ET和LT的区别在于LT事件不会丢弃,而是只要读buffer里面有数据可以让用户读,则不断的通知你。而ET则只在事件发生之时通知。可以简单理解为LT是水平触发,而ET则为边缘触发。
ET模式仅当状态发生变化的时候才获得通知,这里所谓的状态的变化并不包括缓冲区中还有未处理的数据,也就是说,如果要采用ET模式,需要一直read/write直到出错为止,很多人反映为什么采用ET模式只接收了一部分数据就再也得不到通知了,大多因为这样;LT模式是只要有数据没有处理就会一直通知下去的.

函数


[cpp] view plain copy
  1. 1. int epoll_create(int size);  
  2. 2. int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);  
  3. 3. int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);  


详解:

1. int epoll_create(int size);

创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大。需要注意的是,当创建好epoll句柄后,它就是会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。

2. int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

epoll的事件注册函数,它不同与select()是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。第一个参数是epoll_create()的返回值,第二个参数表示动作,用三个宏来表示:
EPOLL_CTL_ADD:注册新的fdepfd中;
EPOLL_CTL_MOD:修改已经注册的fd的监听事件;
EPOLL_CTL_DEL:从epfd中删除一个fd
第三个参数是需要监听的fd,第四个参数是告诉内核需要监听什么事

3. int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);

等待事件的产生,参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个 maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。该函数返回需要处理的事件数目,如返回0表示已超时。

资料总结这么多,其实都是一些理论,关键还在于代码,下面附上我使用epoll开发的服务端和客户端,该代码我会上传到我的资源里,可以免费下载。链接:

如果代码有bug,可以告诉我。

程序demo

首先对服务端和客户端做下说明:

我想实现的是客户端和服务端并发的程序,客户端通过配置并发数,说明有多少个用户去连接服务端。

客户端会发送消息:"Client: i send message Hello Server!”,其中i表示哪一个客户端;收到消息:"Recv Server Msg Content:%s\n"。

例如:

发送:Client: 1 send message "Hello Server!"

接收:Recv Derver Msg Content:Hello, client fd: 6

服务端收到后给客户端回复消息:"Hello, client fd: i",其中i表示服务端接收的fd,用户区别是哪一个客户端。接收客户端消息:"Terminal Received Msg Content:%s\n"

例如:

发送:Hello, client fd: 6

接收:Terminal Received Msg Content:Client: 1 send message "Hello Server!"

备注:这里在接收到消息后,直接打印出消息,如果需要对消息进行处理(如果消息处理比较占用时间,不能立即返回,可以将该消息放入一个队列中,然后开启一个线程从队列中取消息进行处理,这样的话不会因为消息处理而阻塞epoll)。libenent好像对这种有2中处理方式,一个就是回调,要求回调函数,不占用太多的时间,基本能立即返回,另一种好像也是一个队列实现的,这个还需要研究。

服务端代码说明:

服务端在绑定监听后,开启了一个线程,用于负责接收客户端连接,加入到epoll中,这样只要accept到客户端的连接,就将其add EPOLLIN到epoll中,然后进入循环调用epoll_wait,监听到读事件,接收数据,并将事件修改为EPOLLOUT;反之监听到写事件,发送数据,并将事件修改为EPOLLIN。

 

[cpp] view plain copy
  1. cepollserver.h  
  2. #ifndef  C_EPOLL_SERVER_H  
  3. #define  C_EPOLL_SERVER_H  
  4.   
  5. #include <sys/epoll.h>  
  6. #include <sys/socket.h>  
  7. #include <netinet/in.h>  
  8. #include <fcntl.h>  
  9. #include <arpa/inet.h>  
  10. #include <stdio.h>  
  11. #include <stdlib.h>  
  12. #include <iostream>  
  13. #include <pthread.h>  
  14.   
  15. #define _MAX_SOCKFD_COUNT 65535  
  16.   
  17. class CEpollServer  
  18. {  
  19.         public:  
  20.                 CEpollServer();  
  21.                 ~CEpollServer();  
  22.   
  23.                 bool InitServer(const char* chIp, int iPort);  
  24.                 void Listen();  
  25.                 static void ListenThread( void* lpVoid );  
  26.                 void Run();  
  27.   
  28.         private:  
  29.                 int        m_iEpollFd;  
  30.                 int        m_isock;  
  31.                 pthread_t       m_ListenThreadId;// 监听线程句柄  
  32.   
  33. };  
  34.   
  35. #endif  


cepollserver.cpp

[cpp] view plain copy
  1. #include "cepollserver.h"  
  2.   
  3. using namespace std;  
  4.   
  5. CEpollServer::CEpollServer()  
  6. {  
  7. }  
  8.   
  9. CEpollServer::~CEpollServer()  
  10. {  
  11.     close(m_isock);  
  12. }  
  13.   
  14. bool CEpollServer::InitServer(const char* pIp, int iPort)  
  15. {  
  16.     m_iEpollFd = epoll_create(_MAX_SOCKFD_COUNT);  
  17.   
  18.     //设置非阻塞模式  
  19.     int opts = O_NONBLOCK;  
  20.     if(fcntl(m_iEpollFd,F_SETFL,opts)<0)  
  21.     {  
  22.         printf("设置非阻塞模式失败!\n");  
  23.         return false;  
  24.     }  
  25.   
  26.     m_isock = socket(AF_INET,SOCK_STREAM,0);  
  27.     if ( 0 > m_isock )  
  28.     {  
  29.         printf("socket error!\n");  
  30.         return false;  
  31.   }  
  32.     
  33.   sockaddr_in listen_addr;  
  34.       listen_addr.sin_family=AF_INET;  
  35.       listen_addr.sin_port=htons ( iPort );  
  36.       listen_addr.sin_addr.s_addr=htonl(INADDR_ANY);  
  37.       listen_addr.sin_addr.s_addr=inet_addr(pIp);  
  38.     
  39.       int ireuseadd_on = 1;//支持端口复用  
  40.       setsockopt(m_isock, SOL_SOCKET, SO_REUSEADDR, &ireuseadd_on, sizeof(ireuseadd_on) );  
  41.     
  42.       if ( bind ( m_isock, ( sockaddr * ) &listen_addr,sizeof ( listen_addr ) ) !=0 )  
  43.       {  
  44.           printf("bind error\n");  
  45.           return false;  
  46.       }  
  47.     
  48.       if ( listen ( m_isock, 20) <0 )  
  49.       {  
  50.           printf("listen error!\n");  
  51.           return false;  
  52.       }  
  53.       else  
  54.       {  
  55.           printf("服务端监听中...\n");  
  56.       }  
  57.     
  58.       // 监听线程,此线程负责接收客户端连接,加入到epoll中  
  59.       if ( pthread_create( &m_ListenThreadId, 0, ( void * ( * ) ( void * ) ) ListenThread, this ) != 0 )  
  60.       {  
  61.           printf("Server 监听线程创建失败!!!");  
  62.           return false;  
  63.       }  
  64.   }  
  65.   // 监听线程  
  66.   void CEpollServer::ListenThread( void* lpVoid )  
  67.   {  
  68.       CEpollServer *pTerminalServer = (CEpollServer*)lpVoid;  
  69.       sockaddr_in remote_addr;  
  70.       int len = sizeof (remote_addr);  
  71.       while ( true )  
  72.       {  
  73.           int client_socket = accept (pTerminalServer->m_isock, ( sockaddr * ) &remote_addr,(socklen_t*)&len );  
  74.           if ( client_socket < 0 )  
  75.           {  
  76.               printf("Server Accept失败!, client_socket: %d\n", client_socket);  
  77.               continue;  
  78.           }  
  79.           else  
  80.           {  
  81.               struct epoll_event    ev;  
  82.               ev.events = EPOLLIN | EPOLLERR | EPOLLHUP;  
  83.               ev.data.fd = client_socket;     //记录socket句柄  
  84.               epoll_ctl(pTerminalServer->m_iEpollFd, EPOLL_CTL_ADD, client_socket, &ev);  
  85.           }  
  86.       }  
  87.   }  
  88.     
  89.   void CEpollServer::Run()  
  90.   {  
  91.       while ( true )  
  92.       {  
  93.           struct epoll_event    events[_MAX_SOCKFD_COUNT];  
  94.           int nfds = epoll_wait( m_iEpollFd, events,  _MAX_SOCKFD_COUNT, -1 );  
  95.           for (int i = 0; i < nfds; i++)  
  96.           {  
  97.               int client_socket = events[i].data.fd;  
  98.               char buffer[1024];//每次收发的字节数小于1024字节  
  99.               memset(buffer, 0, 1024);  
  100.               if (events[i].events & EPOLLIN)//监听到读事件,接收数据  
  101.               {  
  102.                   int rev_size = recv(events[i].data.fd,buffer, 1024,0);  
  103.                   if( rev_size <= 0 )  
  104.                   {  
  105.                       cout << "recv error: recv size: " << rev_size << endl;  
  106.                       struct epoll_event event_del;  
  107.                       event_del.data.fd = events[i].data.fd;  
  108.                       event_del.events = 0;  
  109.                       epoll_ctl(m_iEpollFd, EPOLL_CTL_DEL, event_del.data.fd, &event_del);  
  110.                   }  
  111.                   else  
  112.                   {  
  113.                       printf("Terminal Received Msg Content:%s\n",buffer);  
  114.                       struct epoll_event    ev;  
  115.                       ev.events = EPOLLOUT | EPOLLERR | EPOLLHUP;  
  116.                       ev.data.fd = client_socket;     //记录socket句柄  
  117.                       epoll_ctl(m_iEpollFd, EPOLL_CTL_MOD, client_socket, &ev);  
  118.                   }  
  119.               }  
  120.   else if(events[i].events & EPOLLOUT)//监听到写事件,发送数据  
  121.               {  
  122.                   char sendbuff[1024];  
  123.                   sprintf(sendbuff, "Hello, client fd: %d\n", client_socket);  
  124.                   int sendsize = send(client_socket, sendbuff, strlen(sendbuff)+1, MSG_NOSIGNAL);  
  125.                   if(sendsize <= 0)  
  126.                   {  
  127.                       struct epoll_event event_del;  
  128.                       event_del.data.fd = events[i].data.fd;  
  129.                       event_del.events = 0;  
  130.                       epoll_ctl(m_iEpollFd, EPOLL_CTL_DEL, event_del.data.fd, &event_del);  
  131.                   }  
  132.                   else  
  133.                   {  
  134.                       printf("Server reply msg ok! buffer: %s\n", sendbuff);  
  135.                       struct epoll_event    ev;  
  136.                       ev.events = EPOLLIN | EPOLLERR | EPOLLHUP;  
  137.                       ev.data.fd = client_socket;     //记录socket句柄  
  138.                       epoll_ctl(m_iEpollFd, EPOLL_CTL_MOD, client_socket, &ev);  
  139.                   }  
  140.               }  
  141.               else  
  142.               {  
  143.                   cout << "EPOLL ERROR\n" <<endl;  
  144.                   epoll_ctl(m_iEpollFd, EPOLL_CTL_DEL, events[i].data.fd, &events[i]);  
  145.               }  
  146.           }  
  147.       }  
  148.   }  


main.cpp

[cpp] view plain copy
  1. #include <iostream>  
  2. #include "cepollserver.h"  
  3.   
  4. using namespace std;  
  5.   
  6. int main()  
  7. {  
  8.         CEpollServer  theApp;  
  9.         theApp.InitServer("127.0.0.1", 8000);  
  10.         theApp.Run();  
  11.   
  12.         return 0;  
  13. }  


客户端代码:

说明:测试是两个并发进行测试,每一个客户端都是一个长连接。代码中在连接服务器(ConnectToServer)时将用户IDsocketid关联起来。用户IDsocketid是一一对应的关系。

cepollclient.h

[cpp] view plain copy
  1. #ifndef _DEFINE_EPOLLCLIENT_H_  
  2. #define _DEFINE_EPOLLCLIENT_H_  
  3. #define _MAX_SOCKFD_COUNT 65535  
  4.   
  5. #include<iostream>  
  6. #include <sys/epoll.h>  
  7. #include <sys/socket.h>  
  8. #include <netinet/in.h>  
  9. #include <fcntl.h>  
  10. #include <arpa/inet.h>  
  11. #include <errno.h>  
  12. #include <sys/ioctl.h>  
  13. #include <sys/time.h>  
  14. #include <string>  
  15.   
  16. using namespace std;  
  17.   
  18. /** 
  19.  * @brief 用户状态 
  20.  */  
  21. typedef enum _EPOLL_USER_STATUS_EM  
  22. {  
  23.         FREE = 0,  
  24.         CONNECT_OK = 1,//连接成功  
  25.         SEND_OK = 2,//发送成功  
  26.         RECV_OK = 3,//接收成功  
  27. }EPOLL_USER_STATUS_EM;  
  28.   
  29. /*@brief 
  30.  *@CEpollClient class 用户状态结构体 
  31.  */  
  32. struct UserStatus  
  33. {  
  34.         EPOLL_USER_STATUS_EM iUserStatus;//用户状态  
  35.         int iSockFd;//用户状态关联的socketfd  
  36.         char cSendbuff[1024];//发送的数据内容  
  37.         int iBuffLen;//发送数据内容的长度  
  38.         unsigned int uEpollEvents;//Epoll events  
  39. };  
  40.   
  41. class CEpollClient  
  42. {  
  43.         public:  
  44.   
  45.                 /** 
  46.                  * @brief 
  47.                  * 函数名:CEpollClient 
  48.                  * 描述:构造函数 
  49.                  * @param [in] iUserCount  
  50.                  * @param [in] pIP IP地址 
  51.                  * @param [in] iPort 端口号 
  52.                  * @return 无返回 
  53.                  */  
  54.                 CEpollClient(int iUserCount, const char* pIP, int iPort);  
  55.   
  56. /** 
  57.                  * @brief 
  58.                  * 函数名:CEpollClient 
  59.                  * 描述:析构函数 
  60.                  * @return 无返回 
  61.                  */  
  62.                 ~CEpollClient();  
  63.   
  64.                 /** 
  65.                  * @brief 
  66.                  * 函数名:RunFun 
  67.                  * 描述:对外提供的接口,运行epoll类 
  68.                  * @return 无返回值 
  69.                  */  
  70.                 int RunFun();  
  71.   
  72.         private:  
  73.   
  74.                 /** 
  75.                  * @brief 
  76.                  * 函数名:ConnectToServer 
  77.                  * 描述:连接到服务器 
  78.                  * @param [in] iUserId 用户ID 
  79.                  * @param [in] pServerIp 连接的服务器IP 
  80.                  * @param [in] uServerPort 连接的服务器端口号 
  81.                  * @return 成功返回socketfd,失败返回的socketfd为-1 
  82.                  */  
  83.                 int ConnectToServer(int iUserId,const char *pServerIp,unsigned short uServerPort);  
  84.   
  85. /** 
  86.                  * @brief 
  87.                  * 函数名:SendToServerData 
  88.                  * 描述:给服务器发送用户(iUserId)的数据 
  89.                  * @param [in] iUserId 用户ID 
  90.                  * @return 成功返回发送数据长度 
  91.                  */  
  92.                 int SendToServerData(int iUserId);  
  93.   
  94.                 /** 
  95.                  * @brief 
  96.                  * 函数名:RecvFromServer 
  97.                  * 描述:接收用户回复消息 
  98.                  * @param [in] iUserId 用户ID 
  99.                  * @param [in] pRecvBuff 接收的数据内容 
  100.                  * @param [in] iBuffLen 接收的数据长度 
  101.                  * @return 成功返回接收的数据长度,失败返回长度为-1 
  102.                  */  
  103.                 int RecvFromServer(int iUserid,char *pRecvBuff,int iBuffLen);  
  104.   
  105.                 /** 
  106.                  * @brief 
  107.                  * 函数名:CloseUser 
  108.                  * 描述:关闭用户 
  109.                  * @param [in] iUserId 用户ID 
  110.                  * @return 成功返回true 
  111.                  */  
  112.                 bool CloseUser(int iUserId);  
  113.   
  114. /** 
  115.                  * @brief 
  116.                  * 函数名:DelEpoll 
  117.                  * 描述:删除epoll事件 
  118.                  * @param [in] iSockFd socket FD 
  119.                  * @return 成功返回true 
  120.                  */  
  121.                 bool DelEpoll(int iSockFd);  
  122.         private:  
  123.   
  124.                 int    m_iUserCount;//用户数量;  
  125.                 struct UserStatus *m_pAllUserStatus;//用户状态数组  
  126.                 int    m_iEpollFd;//需要创建epollfd  
  127.                 int    m_iSockFd_UserId[_MAX_SOCKFD_COUNT];//将用户ID和socketid关联起来  
  128.                 int    m_iPort;//端口号  
  129.                 char   m_ip[100];//IP地址  
  130. };  
  131.   
  132. #endif  


cepollclient.cpp

[cpp] view plain copy
  1. #include "cepollclient.h"  
  2.   
  3. CEpollClient::CEpollClient(int iUserCount, const char* pIP, int iPort)  
  4. {  
  5.     strcpy(m_ip, pIP);  
  6.     m_iPort = iPort;  
  7.     m_iUserCount = iUserCount;  
  8.     m_iEpollFd = epoll_create(_MAX_SOCKFD_COUNT);  
  9.     m_pAllUserStatus = (struct UserStatus*)malloc(iUserCount*sizeof(struct UserStatus));  
  10.     for(int iuserid=0; iuserid<iUserCount ; iuserid++)  
  11.     {  
  12.         m_pAllUserStatus[iuserid].iUserStatus = FREE;  
  13.         sprintf(m_pAllUserStatus[iuserid].cSendbuff, "Client: %d send message \"Hello Server!\"\r\n", iuserid);  
  14.         m_pAllUserStatus[iuserid].iBuffLen = strlen(m_pAllUserStatus[iuserid].cSendbuff) + 1;  
  15.         m_pAllUserStatus[iuserid].iSockFd = -1;  
  16.     }  
  17.     memset(m_iSockFd_UserId, 0xFF, sizeof(m_iSockFd_UserId));  
  18. }  
  19.   
  20. CEpollClient::~CEpollClient()  
  21. {  
  22.     free(m_pAllUserStatus);  
  23. }  
  24. int CEpollClient::ConnectToServer(int iUserId,const char *pServerIp,unsigned short uServerPort)  
  25. {  
  26.     if( (m_pAllUserStatus[iUserId].iSockFd = socket(AF_INET,SOCK_STREAM,0) ) < 0 )  
  27.     {  
  28.         cout <<"[CEpollClient error]: init socket fail, reason is:"<<strerror(errno)<<",errno is:"<<errno<<endl;  
  29.         m_pAllUserStatus[iUserId].iSockFd = -1;  
  30.         return  m_pAllUserStatus[iUserId].iSockFd;  
  31.     }  
  32.   
  33.     struct sockaddr_in addr;  
  34.     bzero(&addr, sizeof(addr));  
  35.     addr.sin_family = AF_INET;  
  36.     addr.sin_port = htons(uServerPort);  
  37.     addr.sin_addr.s_addr = inet_addr(pServerIp);  
  38.   
  39.     int ireuseadd_on = 1;//支持端口复用  
  40.     setsockopt(m_pAllUserStatus[iUserId].iSockFd, SOL_SOCKET, SO_REUSEADDR, &ireuseadd_on, sizeof(ireuseadd_on));  
  41.   
  42.     unsigned long ul = 1;  
  43.     ioctl(m_pAllUserStatus[iUserId].iSockFd, FIONBIO, &ul); //设置为非阻塞模式  
  44.   
  45.     connect(m_pAllUserStatus[iUserId].iSockFd, (const sockaddr*)&addr, sizeof(addr));  
  46.     m_pAllUserStatus[iUserId].iUserStatus = CONNECT_OK;  
  47.     m_pAllUserStatus[iUserId].iSockFd = m_pAllUserStatus[iUserId].iSockFd;  
  48.   
  49.     return m_pAllUserStatus[iUserId].iSockFd;  
  50. }  
  51. int CEpollClient::SendToServerData(int iUserId)  
  52. {  
  53.     sleep(1);//此处控制发送频率,避免狂打日志,正常使用中需要去掉  
  54.     int isendsize = -1;  
  55.     if( CONNECT_OK == m_pAllUserStatus[iUserId].iUserStatus || RECV_OK == m_pAllUserStatus[iUserId].iUserStatus)  
  56.     {  
  57.         isendsize = send(m_pAllUserStatus[iUserId].iSockFd, m_pAllUserStatus[iUserId].cSendbuff, m_pAllUserStatus[iUserId  
  58. ].iBuffLen, MSG_NOSIGNAL);  
  59.         if(isendsize < 0)  
  60.         {  
  61.             cout <<"[CEpollClient error]: SendToServerData, send fail, reason is:"<<strerror(errno)<<",errno is:"<<errno<  
  62. <endl;  
  63.         }  
  64.         else  
  65.         {  
  66.             printf("[CEpollClient info]: iUserId: %d Send Msg Content:%s\n", iUserId, m_pAllUserStatus[iUserId].cSendbuff  
  67. );  
  68.             m_pAllUserStatus[iUserId].iUserStatus = SEND_OK;  
  69.         }  
  70.     }  
  71.     return isendsize;  
  72. }  
  73. int CEpollClient::RecvFromServer(int iUserId,char *pRecvBuff,int iBuffLen)  
  74. {  
  75.     int irecvsize = -1;  
  76.     if(SEND_OK == m_pAllUserStatus[iUserId].iUserStatus)  
  77.     {  
  78.         irecvsize = recv(m_pAllUserStatus[iUserId].iSockFd, pRecvBuff, iBuffLen, 0);  
  79.         if(0 > irecvsize)  
  80.         {  
  81.             cout <<"[CEpollClient error]: iUserId: " << iUserId << "RecvFromServer, recv fail, reason is:"<<strerror(errn  
  82. o)<<",errno is:"<<errno<<endl;  
  83.         }  
  84.         else if(0 == irecvsize)  
  85.         {  
  86.             cout <<"[warning:] iUserId: "<< iUserId << "RecvFromServer, STB收到数据为0,表示对方断开连接,irecvsize:"<<ire  
  87. cvsize<<",iSockFd:"<< m_pAllUserStatus[iUserId].iSockFd << endl;  
  88.         }  
  89.         else  
  90.         {  
  91.             printf("Recv Server Msg Content:%s\n", pRecvBuff);  
  92.             m_pAllUserStatus[iUserId].iUserStatus = RECV_OK;  
  93.         }  
  94.     }  
  95.     return irecvsize;  
  96. }  
  97.   
  98. bool CEpollClient::CloseUser(int iUserId)  
  99. {  
  100.     close(m_pAllUserStatus[iUserId].iSockFd);  
  101.     m_pAllUserStatus[iUserId].iUserStatus = FREE;  
  102.     m_pAllUserStatus[iUserId].iSockFd = -1;  
  103.     return true;  
  104. }  
  105.       
  106. int CEpollClient::RunFun()  
  107. {  
  108.     int isocketfd = -1;  
  109.     for(int iuserid=0; iuserid<m_iUserCount; iuserid++)  
  110.     {  
  111.         struct epoll_event event;  
  112.         isocketfd = ConnectToServer(iuserid, m_ip, m_iPort);  
  113.         if(isocketfd < 0)  
  114.             cout <<"[CEpollClient error]: RunFun, connect fail" <<endl;  
  115.         m_iSockFd_UserId[isocketfd] = iuserid;//将用户ID和socketid关联起来  
  116.   
  117.         event.data.fd = isocketfd;  
  118.         event.events = EPOLLIN|EPOLLOUT|EPOLLERR|EPOLLHUP;  
  119.   
  120.         m_pAllUserStatus[iuserid].uEpollEvents = event.events;  
  121.         epoll_ctl(m_iEpollFd, EPOLL_CTL_ADD, event.data.fd, &event);  
  122.   }  
  123.   while(1)  
  124.       {  
  125.           struct epoll_event events[_MAX_SOCKFD_COUNT];  
  126.           char buffer[1024];  
  127.           memset(buffer,0,1024);  
  128.           int nfds = epoll_wait(m_iEpollFd, events, _MAX_SOCKFD_COUNT, 100 );//等待epoll事件的产生  
  129.           for (int ifd=0; ifd<nfds; ifd++)//处理所发生的所有事件  
  130.           {  
  131.               struct epoll_event event_nfds;  
  132.               int iclientsockfd = events[ifd].data.fd;  
  133.               cout << "events[ifd].data.fd: " << events[ifd].data.fd << endl;  
  134.               int iuserid = m_iSockFd_UserId[iclientsockfd];//根据socketfd得到用户ID  
  135.               if( events[ifd].events & EPOLLOUT )  
  136.               {  
  137.                   int iret = SendToServerData(iuserid);  
  138.                   if( 0 < iret )  
  139.                   {  
  140.                       event_nfds.events = EPOLLIN|EPOLLERR|EPOLLHUP;  
  141.                       event_nfds.data.fd = iclientsockfd;  
  142.                       epoll_ctl(m_iEpollFd, EPOLL_CTL_MOD, event_nfds.data.fd, &event_nfds);  
  143.                   }  
  144.                   else  
  145.                   {  
  146.                       cout <<"[CEpollClient error:] EpollWait, SendToServerData fail, send iret:"<<iret<<",iuserid:"<<iuser  
  147.   id<<",fd:"<<events[ifd].data.fd<<endl;  
  148.                       DelEpoll(events[ifd].data.fd);  
  149.                       CloseUser(iuserid);  
  150.                   }  
  151.               }  
  152.   else if( events[ifd].events & EPOLLIN )//监听到读事件,接收数据  
  153.               {  
  154.                   int ilen = RecvFromServer(iuserid, buffer, 1024);  
  155.                   if(0 > ilen)  
  156.                   {  
  157.                       cout <<"[CEpollClient error]: RunFun, recv fail, reason is:"<<strerror(errno)<<",errno is:"<<errno<<e  
  158.   ndl;  
  159.                       DelEpoll(events[ifd].data.fd);  
  160.                       CloseUser(iuserid);  
  161.                   }  
  162.                   else if(0 == ilen)  
  163.                   {  
  164.                       cout <<"[CEpollClient warning:] server disconnect,ilen:"<<ilen<<",iuserid:"<<iuserid<<",fd:"<<events[  
  165.   ifd].data.fd<<endl;  
  166.                       DelEpoll(events[ifd].data.fd);  
  167.                       CloseUser(iuserid);  
  168.                   }  
  169.                   else  
  170.                   {  
  171.                       m_iSockFd_UserId[iclientsockfd] = iuserid;//将socketfd和用户ID关联起来  
  172.                       event_nfds.data.fd = iclientsockfd;  
  173.                       event_nfds.events = EPOLLOUT|EPOLLERR|EPOLLHUP;  
  174.                       epoll_ctl(m_iEpollFd, EPOLL_CTL_MOD, event_nfds.data.fd, &event_nfds);  
  175.                   }  
  176.               }  
  177.               else  
  178.               {  
  179.                   cout <<"[CEpollClient error:] other epoll error"<<endl;  
  180.                   DelEpoll(events[ifd].data.fd);  
  181.                   CloseUser(iuserid);  
  182.               }  
  183.           }  
  184.   }  
  185.   }  
  186.     
  187.   bool CEpollClient::DelEpoll(int iSockFd)  
  188.   {  
  189.       bool bret = false;  
  190.       struct epoll_event event_del;  
  191.       if(0 < iSockFd)  
  192.       {  
  193.           event_del.data.fd = iSockFd;  
  194.           event_del.events = 0;  
  195.           if( 0 == epoll_ctl(m_iEpollFd, EPOLL_CTL_DEL, event_del.data.fd, &event_del) )  
  196.           {  
  197.               bret = true;  
  198.           }  
  199.           else  
  200.           {  
  201.               cout <<"[SimulateStb error:] DelEpoll,epoll_ctl error,iSockFd:"<<iSockFd<<endl;  
  202.           }  
  203.           m_iSockFd_UserId[iSockFd] = -1;  
  204.       }  
  205.       else  
  206.       {  
  207.           bret = true;  
  208.     
  209.       }  
  210.       return bret;  
  211.   }  


main.cpp

[cpp] view plain copy
  1. #include "cepollclient.h"  
  2.   
  3. int main(int argc, char *argv[])  
  4. {  
  5.         CEpollClient *pCEpollClient = new CEpollClient(2, "127.0.0.1", 8000);  
  6.         if(NULL == pCEpollClient)  
  7.         {  
  8.                 cout<<"[epollclient error]:main init"<<"Init CEpollClient fail"<<endl;  
  9.         }  
  10.   
  11.         pCEpollClient->RunFun();  
  12.   
  13.         if(NULL != pCEpollClient)  
  14.         {  
  15.                 delete pCEpollClient;  
  16.                 pCEpollClient = NULL;  
  17.         }  
  18.   
  19.         return 0;  
  20. }  


运行结果:

客户端:

服务端:


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/383826.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++基类指针指向派生类(指针)

我们常用基类指针指向派生类对象来实现多态性。 私有继承不允许基类指针指向派生类 基类指针只能访问到基类中含有的公有成员。 当用基类指针指向派生类对象在动态分配堆上内存的时候&#xff0c;析构函数必须是虚函数! 成员如果是数据成员的话访问的是基类的版本&#xff…

一个简单的linux线程池

http://blog.csdn.net/wzjking0929/article/details/20312675 线程池&#xff1a;简单地说&#xff0c;线程池 就是预先创建好一批线程&#xff0c;方便、快速地处理收到的业务。比起传统的到来一个任务&#xff0c;即时创建一个线程来处理&#xff0c;节省了线程的创建和回收的…

C++制表符

制表符的转义字符为\t&#xff0c;一般情况下长度为8个空格&#xff0c;这里的8个指的是从上一个字符串的开头开始算&#xff0c;往后数8个&#xff0c;不够的话就补空格。 如果前面的字符串的长度大于等于8个&#xff0c;例如前面字符串的长度为x,那么就会补(8-x%8)个空格 例…

C++派生类含有成员对象构造函数析构函数顺序

参考博客&#xff1a;传送门1 当类中含有对象成员时&#xff1a; 类的构造函数要包含对成员对象的初始化&#xff0c;如果构造函数的成员初始化列表没有包含对成员对象的初始化&#xff0c;系统会自动调用成员对象的无参构造函数。顺序上&#xff1a;先调用成员对象的构造函数…

链表逆序的原理及实例

http://blog.csdn.net/wangqing_12345/article/details/51757294 尾插法建立链表&#xff0c;带头结点设链表节点为typedef struct node {int data;struct node *next;}node_t, *pnode_t;要求将一带链表头List head的单向链表逆序。 分析&#xff1a; 1). 若链表为空或只有一个…

C++关于虚基类、构造函数、析构函数、成员对象的两个程序浅析

预备博客&#xff1a; C虚继承中构造函数和析构函数顺序问题以及原理 C派生类含有成员对象构造函数析构函数顺序 C虚基类成员可见性 程序一如下&#xff1a; #include<iostream> using namespace std; class A { public:A(int a) :x(a) { cout << "A const…

C++小型公司管理系统

项目要求&#xff1a; 编写一个程序实现小型公司的人员信息管理系统。该公司雇员&#xff08;employee&#xff09;包括经理&#xff08;manager&#xff09;&#xff0c;技术人员&#xff08;technician&#xff09;、销售员&#xff08;salesman&#xff09;和销售部经理&…

Linux网络编程“惊群”问题总结

http://www.cnblogs.com/Anker/p/7071849.html 1、前言 我从事Linux系统下网络开发将近4年了&#xff0c;经常还是遇到一些问题&#xff0c;只是知其然而不知其所以然&#xff0c;有时候和其他人交流&#xff0c;搞得非常尴尬。如今计算机都是多核了&#xff0c;网络编程框架也…

yfan.qiu linux硬链接与软链接

http://www.cnblogs.com/yfanqiu/archive/2012/06/11/2545556.html Linux 系统中有软链接和硬链接两种特殊的“文件”。 软链接可以看作是Windows中的快捷方式&#xff0c;可以让你快速链接到目标档案或目录。 硬链接则透过文件系统的inode来产生新档名&#xff0c;而不是产生…

Linux C++线程池实例

http://www.cnblogs.com/danxi/p/6636095.html 想做一个多线程服务器测试程序&#xff0c;因此参考了github的一些实例&#xff0c;然后自己动手写了类似的代码来加深理解。 目前了解的线程池实现有2种思路&#xff1a; 第一种&#xff1a; 主进程创建一定数量的线程&#xff0…

Java编写简单的自定义异常类

除了系统中自己带的异常&#xff0c;我们也可以自己写一些简单的异常类来帮助我们处理问题。 所有的异常命名都是以Exception结尾&#xff0c;并且都是Exception的子类。 假设我们要编写一个人类的类&#xff0c;为了判断年龄的输入是否合法&#xff0c;我们编写了一个名为Il…

【Java学习笔记九】多线程

程序&#xff1a;计算机指令的集合&#xff0c;它以文件的形式存储在磁盘上&#xff0c;是应用程序执行的蓝本。 进程&#xff1a;是一个程序在其自身的地址空间中的一次执行活动。进程是资源申请、调度和独立运行的单位&#xff0c;因此&#xff0c;它使用系统中的运行资源。而…

【C++学习笔记四】运算符重载

当调用一个重载函数和重载运算符时&#xff0c;编译器通过把您所使用的参数类型和定义中的参数类型相比较&#xff0c;巨鼎选用最合适的定义。&#xff08;重载决策&#xff09; 重载运算符时带有特殊名称的函数&#xff0c;函数名是由关键字operator和其后要重载的运算符符号…

C++(纯)虚函数重写时访问权限更改问题

我们知道在Java中是自动实现多态的&#xff0c;Java中规定重写的方法的访问权限不能缩小。那么在C中我们实现多态的时候是否可以更改&#xff08;缩小&#xff09;访问权限呢&#xff1f; 经过测试&#xff0c;得到的答案如下&#xff1a;如果用基类指针指向派生类对象实现多态…

C++ — 智能指针的简单实现以及循环引用问题

http://blog.csdn.net/dawn_sf/article/details/70168930 智能指针 ____________________________________________________ 今天我们来看一个高大上的东西&#xff0c;它叫智能指针。 哇这个名字听起来都智能的不得了&#xff0c;其实等你了解它你一定会有一点失望的。。。。因…

C++开发者都应该使用的10个C++11特性

http://blog.jobbole.com/44015/ 感谢冯上&#xff08;治不好你我就不是兽医 &#xff09;的热心翻译。如果其他朋友也有不错的原创或译文&#xff0c;可以尝试推荐给伯乐在线。】 在C11新标准中&#xff0c;语言本身和标准库都增加了很多新内容&#xff0c;本文只涉及了一些皮…

shared_ptr的一些尴尬

http://blog.csdn.net/henan_lujun/article/details/8984543 shared_ptr在boost库中已经有多年了&#xff0c;C11又为其正名&#xff0c;把他引入了STL库&#xff0c;放到了std的下面&#xff0c;可见其颇有用武之地&#xff1b;但是shared_ptr是万能的吗&#xff1f;有没有什…

C++转换构造函数和类型转换函数

参考博客&#xff1a;https://blog.csdn.net/feiyanaffection/article/details/79183340 隐式类型转换 如果不同类型的数据在一起操作的时候编译器会自动进行一个数据类型转换。例如常用的基本数据类型有如下类型转换关系&#xff1a; 转换构造函数 构造函数有且仅有一个参数…

C++析构函数执行顺序

今天发现主程序中有多个对象时析构函数的执行顺序不是对象定义的顺序&#xff0c;而是对象定义顺序反过来。 思考了一下&#xff0c;结合之前继承、成员对象等的析构函数执行的顺序&#xff0c;我觉得析构函数执行的顺序为&#xff1a;构造函数的顺序反过来&#xff0c;可能是…

c++写时拷贝1

http://blog.csdn.net/SuLiJuan66/article/details/48882303 Copy On Write Copy On Write(写时复制)使用了“引用计数”&#xff08;reference counting&#xff09;&#xff0c;会有一个变量用于保存引用的数量。当第一个类构造时&#xff0c;string的构造函数会根据传入的参…