epoll

开发高性能网络程序时,windows开发者们言必称iocplinux开发者们则言必称epoll。大家都明白epoll是一种IO多路复用技术,可以非常高效的处理数以百万计的socket句柄,比起以前的selectpoll效率高大发了。我们用起epoll来都感觉挺爽,确实快,那么,它到底为什么可以高速处理这么多并发连接呢?

先简单回顾下如何使用C库封装的3epoll系统调用吧。

[cpp]view plaincopy

 

1        int epoll_create(int size); 

2        int epoll_ctl(int epfd, int op, int fd, struct epoll_event*event); 

3        int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout); 


      使用起来很清晰,首先要调用epoll_create建立一个epoll对象。参数size是内核保证能够正确处理的最大句柄数,多于这个最大数时内核可不保证效果。

epoll_ctl可以操作上面建立的epoll,例如,将刚建立的socket加入到epoll中让其监控,或者把 epoll正在监控的某个socket句柄移出epoll,不再监控它等等。

epoll_wait在调用时,在给定的timeout时间内,当在监控的所有句柄中有事件发生时,就返回用户态的进程。

      从上面的调用方式就可以看到epollselect/poll的优越之处:因为后者每次调用时都要传递你所要监控的所有socketselect/poll系统调用,这意味着需要将用户态的socket列表copy到内核态,如果以万计的句柄会导致每次都要copy几十几百KB的内存到内核态,非常低效。而我们调用epoll_wait时就相当于以往调用select/poll,但是这时却不用传递socket句柄给内核,因为内核已经在epoll_ctl中拿到了要监控的句柄列表。

      所以,实际上在你调用epoll_create后,内核就已经在内核态开始准备帮你存储要监控的句柄了,每次调用epoll_ctl只是在往内核的数据结构里塞入新的socket句柄。

      在内核里,一切皆文件。所以,epoll向内核注册了一个文件系统,用于存储上述的被监控socket。当你调用epoll_create时,就会在这个虚拟的epoll文件系统里创建一个file结点。当然这个file不是普通文件,它只服务于epoll

      epoll在被内核初始化时(操作系统启动),同时会开辟出epoll自己的内核高速cache区,用于安置每一个我们想监控的socket,这些socket会以红黑树的形式保存在内核cache里,以支持快速的查找、插入、删除。这个内核高速cache区,就是建立连续的物理内存页,然后在之上建立slab层,简单的说,就是物理上分配好你想要的size的内存对象,每次使用时都是使用空闲的已分配好的对象。

[cpp]view plaincopy

 

4        staticint __init eventpoll_init(void

5        

6            ...... 

7          

8            /* Allocates slabcache used to allocate "struct epitem" items */ 

9            epi_cache = kmem_cache_create("eventpoll_epi", sizeof(struct epitem), 

10                  0,SLAB_HWCACHE_ALIGN|EPI_SLAB_DEBUG|SLAB_PANIC, 

11                  NULL, NULL); 

12        

13          /* Allocates slab cache used to allocate"struct eppoll_entry" */ 

14          pwq_cache = kmem_cache_create("eventpoll_pwq"

15                  sizeof(struct eppoll_entry),0, 

16                  EPI_SLAB_DEBUG|SLAB_PANIC, NULL,NULL); 

17        

18       ... ... 

 

      epoll的高效就在于,当我们调用epoll_ctl往里塞入百万个句柄时,epoll_wait仍然可以飞快的返回,并有效的将发生事件的句柄给我们用户。这是由于我们在调用epoll_create时,内核除了帮我们在epoll文件系统里建了个file结点,在内核cache里建了个红黑树用于存储以后epoll_ctl传来的socket外,还会再建立一个list链表,用于存储准备就绪的事件,当epoll_wait调用时,仅仅观察这个list链表里有没有数据即可。有数据就返回,没有数据就sleep,等到timeout时间到后即使链表没数据也返回。所以,epoll_wait非常高效。

      而且,通常情况下即使我们要监控百万计的句柄,大多一次也只返回很少量的准备就绪句柄而已,所以,epoll_wait仅需要从内核态copy少量的句柄到用户态而已,如何能不高效?!

      那么,这个准备就绪list链表是怎么维护的呢?当我们执行epoll_ctl时,除了把socket放到epoll文件系统里file对象对应的红黑树上之外,还会给内核中断处理程序注册一个回调函数,告诉内核,如果这个句柄的中断到了,就把它放到准备就绪list链表里。所以,当一个socket上有数据到了,内核在把网卡上的数据copy到内核中后就来把socket插入到准备就绪链表里了。

      如此,一颗红黑树,一张准备就绪句柄链表,少量的内核cache,就帮我们解决了大并发下的socket处理问题。执行epoll_create时,创建了红黑树和就绪链表,执行epoll_ctl时,如果增加socket句柄,则检查在红黑树中是否存在,存在立即返回,不存在则添加到树干上,然后向内核注册回调函数,用于当中断事件来临时向准备就绪链表中插入数据。执行epoll_wait时立刻返回准备就绪链表里的数据即可。

      最后看看epoll独有的两种模式LTET。无论是LTET模式,都适用于以上所说的流程。区别是,LT模式下,只要一个句柄上的事件一次没有处理完,会在以后调用epoll_wait时次次返回这个句柄,而ET模式仅在第一次返回。

      这件事怎么做到的呢?当一个socket句柄上有事件时,内核会把该句柄插入上面所说的准备就绪list链表,这时我们调用epoll_wait,会把准备就绪的socket拷贝到用户态内存,然后清空准备就绪list链表,最后,epoll_wait干了件事,就是检查这些socket,如果不是ET模式(就是LT模式的句柄了),并且这些socket上确实有未处理的事件时,又把该句柄放回到刚刚清空的准备就绪链表了。所以,非ET的句柄,只要它上面还有事件,epoll_wait每次都会返回。而ET模式的句柄,除非有新中断到,即使socket上的事件没有处理完,也是不会次次从epoll_wait返回的

#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <pthread.h>

#define MAX_EVENT_NUMBER 1024
#define BUFFER_SIZE 10

int setnonblocking( int fd )
{
    int old_option = fcntl( fd, F_GETFL );
    int new_option = old_option | O_NONBLOCK;
    fcntl( fd, F_SETFL, new_option );
    return old_option;
}
//添加fd
void addfd( int epollfd, int fd, bool enable_et )
{
    epoll_event event;
    event.data.fd = fd;
    event.events = EPOLLIN;
    //是否开启et模式
    if( enable_et )
    {
        event.events |= EPOLLET;
    }
    epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event );
    setnonblocking( fd );
}
//LT模式的工作原理
void lt( epoll_event* events, int number, int epollfd, int listenfd )
{
    char buf[ BUFFER_SIZE ];
    for ( int i = 0; i < number; i++ )
    {
        int sockfd = events[i].data.fd;
        //处理用户注册事件
        if ( sockfd == listenfd )
        {
            struct sockaddr_in client_address;
            socklen_t client_addrlength = sizeof( client_address );
            int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
            addfd( epollfd, connfd, false );
        }//可读事件
        else if ( events[i].events & EPOLLIN )
        {
            //只要socket读缓存中还有未读出的数据,这段代码就被触发
            printf( "event trigger once\n" );
            memset( buf, '\0', BUFFER_SIZE );
            int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 );
            if( ret <= 0 )
            {
                close( sockfd );//表明当前套接字已经关闭
                continue;
            }
            printf( "get %d bytes of content: %s\n", ret, buf );
        }
        else
        {
            printf( "something else happened \n" );
        }
    }
}
//ET模式
void et( epoll_event* events, int number, int epollfd, int listenfd )
{
    char buf[ BUFFER_SIZE ];
    for ( int i = 0; i < number; i++ )
    {
        int sockfd = events[i].data.fd;
        if ( sockfd == listenfd )
        {
            struct sockaddr_in client_address;
            socklen_t client_addrlength = sizeof( client_address );
            int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
            addfd( epollfd, connfd, true );
        }
        else if ( events[i].events & EPOLLIN )
        {
            printf( "event trigger once\n" );
            //这段代码不会重复触发,因此需要我们循环读取数据,以确保socket读缓存中的所有数据全部读出
            while( 1 )
            {
                memset( buf, '\0', BUFFER_SIZE );
                int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 );
                if( ret < 0 )
                {
                    //对于非阻塞IO,条件成立表明数据全部读取完毕,此后。epoll就能再次触发
                    //sockfd上的epollin事件,以驱动下一次读事件
                    if( ( errno == EAGAIN ) || ( errno == EWOULDBLOCK ) )
                    {
                        printf( "read later\n" );
                        break;
                    }
                    close( sockfd );
                    break;
                }
                else if( ret == 0 )
                {
                    close( sockfd );
                }
                else
                {
                    printf( "get %d bytes of content: %s\n", ret, buf );
                }
            }
        }
        else
        {
            printf( "something else happened \n" );
        }
    }
}

int main( int argc, char* argv[] )
{
    if( argc <= 2 )
    {
        printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );
        return 1;
    }
    const char* ip = argv[1];
    int port = atoi( argv[2] );

    int ret = 0;
    struct sockaddr_in address;
    bzero( &address, sizeof( address ) );
    address.sin_family = AF_INET;
    inet_pton( AF_INET, ip, &address.sin_addr );
    address.sin_port = htons( port );

    int listenfd = socket( PF_INET, SOCK_STREAM, 0 );
    assert( listenfd >= 0 );

    ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );
    assert( ret != -1 );

    ret = listen( listenfd, 5 );
    assert( ret != -1 );

    epoll_event events[ MAX_EVENT_NUMBER ];
    int epollfd = epoll_create( 5 );
    assert( epollfd != -1 );
    addfd( epollfd, listenfd, true );

    while( 1 )
    {
        int ret = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );
        if ( ret < 0 )
        {
            printf( "epoll failure\n" );
            break;
        }
        //测试不同模式
        lt( events, ret, epollfd, listenfd );
        et( events, ret, epollfd, listenfd );
    }

    close( listenfd );
    return 0;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/385260.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

剑指offer目录

序号题目1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21

基于升序链表的定时器

#ifndef LST_TIMER#define LST_TIMER#include <time.h>#define BUFFER_SIZE 64class util_timer;//用户数据结构&#xff1a;客户端地址、客户端的socket、socket文件描述符、读缓存和定时器struct client_data{sockaddr_in address;int sockfd;char buf[ BUFFER_SIZE ];…

SIGCHLD信号使用和注意事项

1.SIGCHLD简介 SIGCHILD是指在一个进程终止或者停止时&#xff0c;将SIGCHILD信号发送给其父进程&#xff0c;按照系统默认将忽略此信号&#xff0c;如果父进程希望被告知其子系统的这种状态&#xff0c;则应捕捉此信号。注意&#xff1a;SIGCLD信号与其长得非常相似。SIGCLD是…

08-图7 公路村村通 (30 分

现有村落间道路的统计数据表中&#xff0c;列出了有可能建设成标准公路的若干条道路的成本&#xff0c;求使每个村落都有公路连通所需要的最低成本。 输入格式: 输入数据包括城镇数目正整数N&#xff08;≤&#xff09;和候选道路数目M&#xff08;≤&#xff09;&#xff1b;随…

【Leetcode】33. 搜索旋转排序数组

假设按照升序排序的数组在预先未知的某个点上进行了旋转。 ( 例如&#xff0c;数组 [0,1,2,4,5,6,7] 可能变为 [4,5,6,7,0,1,2] )。 搜索一个给定的目标值&#xff0c;如果数组中存在这个目标值&#xff0c;则返回它的索引&#xff0c;否则返回 -1 。 你可以假设数组中不存在重…

08-图9 关键活动 (30 分

假定一个工程项目由一组子任务构成&#xff0c;子任务之间有的可以并行执行&#xff0c;有的必须在完成了其它一些子任务后才能执行。“任务调度”包括一组子任务、以及每个子任务可以执行所依赖的子任务集。 比如完成一个专业的所有课程学习和毕业设计可以看成一个本科生要完成…

【Leetocde | 10 】54. 螺旋矩阵

解题代码&#xff1a; class Solution { public:vector<int> spiralOrder(vector<vector<int>>& matrix) {if (matrix.empty() || matrix[0].empty()) return {};int m matrix.size(), n matrix[0].size();vector<int> res;int up 0, down m …

09-排序1 排序 (25 分)

给定N个&#xff08;长整型范围内的&#xff09;整数&#xff0c;要求输出从小到大排序后的结果。 本题旨在测试各种不同的排序算法在各种数据情况下的表现。各组测试数据特点如下&#xff1a; 数据1&#xff1a;只有1个元素&#xff1b; 数据2&#xff1a;11个不相同的整数…

网络层

1. 简单解释一些ARP协议的工作过程

【Leetocde | 24 】152. 乘积最大子序列

这道题最直接的方法就是用DP来做&#xff0c;而且要用两个dp数组&#xff0c;其中f[i]表示子数组[0, i]范围内并且一定包含nums[i]数字的最大子数组乘积&#xff0c;g[i]表示子数组[0, i]范围内并且一定包含nums[i]数字的最小子数组乘积&#xff0c;初始化时f[0]和g[0]都初始化…

【Leetcode | 1】3. 无重复字符的最长子串

这里我们可以建立一个HashMap&#xff0c;建立每个字符和其最后出现位置之间的映射&#xff0c;然后我们需要定义两个变量res和left&#xff0c;其中res用来记录最长无重复子串的长度&#xff0c;left指向该无重复子串左边的起始位置的前一个&#xff0c;由于是前一个&#xff…

【Leetcode | 】93. 复原IP地址

class Solution { public:vector<string> strs;//用于存放临时的四个段vector<string> result;//存放结果void dfs(string &s, int beginIndex, int step) {if (step 4 && beginIndex s.size()) //搜索成功{string temRec strs[0] "." …

海量数据(一)

1. 有1亿个浮点数&#xff0c;如果找出期中最大的10000个&#xff1f; 最容易想到的方法是将数据全部排序&#xff0c;然后在排序后的集合中进行查找&#xff0c;最快的排序算法的时间复杂度一般为O&#xff08;nlogn&#xff09;&#xff0c;如快速排序。但是在32位的机器上&a…

1018 锤子剪刀布 (20 分)

大家应该都会玩“锤子剪刀布”的游戏&#xff1a;两人同时给出手势&#xff0c;胜负规则如图所示&#xff1a; 现给出两人的交锋记录&#xff0c;请统计双方的胜、平、负次数&#xff0c;并且给出双方分别出什么手势的胜算最大。 输入格式&#xff1a; 输入第 1 行给出正整数 N…

1019 数字黑洞 (20 分)

给定任一个各位数字不完全相同的 4 位正整数&#xff0c;如果我们先把 4 个数字按非递增排序&#xff0c;再按非递减排序&#xff0c;然后用第 1 个数字减第 2 个数字&#xff0c;将得到一个新的数字。一直重复这样做&#xff0c;我们很快会停在有“数字黑洞”之称的 6174&…

61. 旋转链表

给定一个链表&#xff0c;旋转链表&#xff0c;将链表每个节点向右移动 k 个位置&#xff0c;其中 k 是非负数。 示例 1: 输入: 1->2->3->4->5->NULL, k 2 输出: 4->5->1->2->3->NULL 解释: 向右旋转 1 步: 5->1->2->3->4->NULL…

1020 月饼 (25 分)

月饼是中国人在中秋佳节时吃的一种传统食品&#xff0c;不同地区有许多不同风味的月饼。现给定所有种类月饼的库存量、总售价、以及市场的最大需求量&#xff0c;请你计算可以获得的最大收益是多少。 注意&#xff1a;销售时允许取出一部分库存。样例给出的情形是这样的&#x…

2. 二叉树的深度

题目描述 输入一棵二叉树&#xff0c;求该树的深度。从根结点到叶结点依次经过的结点&#xff08;含根、叶结点&#xff09;形成树的一条路径&#xff0c;最长路径的长度为树的深度。 /* struct TreeNode {int val;struct TreeNode *left;struct TreeNode *right;TreeNode(in…

1021 个位数统计 (15 分

给定一个 k 位整数 1 (0, ,, d​k−1​​>0)&#xff0c;请编写程序统计每种不同的个位数字出现的次数。例如&#xff1a;给定 0&#xff0c;则有 2 个 0&#xff0c;3 个 1&#xff0c;和 1 个 3。 输入格式&#xff1a; 每个输入包含 1 个测试用例&#xff0c;即一个不超过…

【牛客网】X游戏

题目&#xff1a;X游戏 题目描述 我们称一个数 X 为好数, 如果它的每位数字逐个地被旋转 180 度后&#xff0c;我们仍可以得到一个有效的&#xff0c;且和 X 不同的数。要求每位数字都要被旋转。 如果一个数的每位数字被旋转以后仍然还是一个数字&#xff0c; 则这个数是有效…