nio框架中的多个Selector结构

随着并发数量的提高,传统nio框架采用一个Selector来支撑大量连接事件的管理和触发已经遇到瓶颈,因此现在各种nio框架的新版本都采用多个Selector并存的结构,由多个Selector均衡地去管理大量连接。这里以Mina和Grizzly的实现为例。

   在Mina 2.0中,Selector的管理是由org.apache.mina.transport.socket.nio.NioProcessor来处理,每个NioProcessor对象保存一个Selector,负责具体的select、wakeup、channel的注册和取消、读写事件的注册和判断、实际的IO读写操作等等,核心代码如下:

 public NioProcessor(Executor executor) {
        super(executor);
        try {
            // Open a new selector
            selector = Selector.open();
        } catch (IOException e) {
            throw new RuntimeIoException("Failed to open a selector.", e);
        }
    }


    protected int select(long timeout) throws Exception {
        return selector.select(timeout);
    }

  
    protected boolean isInterestedInRead(NioSession session) {
        SelectionKey key = session.getSelectionKey();
        return key.isValid() && (key.interestOps() & SelectionKey.OP_READ) != 0;
    }


    protected boolean isInterestedInWrite(NioSession session) {
        SelectionKey key = session.getSelectionKey();
        return key.isValid() && (key.interestOps() & SelectionKey.OP_WRITE) != 0;
    }

    protected int read(NioSession session, IoBuffer buf) throws Exception {
        return session.getChannel().read(buf.buf());
    }


    protected int write(NioSession session, IoBuffer buf, int length) throws Exception {
        if (buf.remaining() <= length) {
            return session.getChannel().write(buf.buf());
        } else {
            int oldLimit = buf.limit();
            buf.limit(buf.position() + length);
            try {
                return session.getChannel().write(buf.buf());
            } finally {
                buf.limit(oldLimit);
            }
        }
    }


   这些方法的调用都是通过AbstractPollingIoProcessor来处理,这个类里可以看到一个nio框架的核心逻辑,注册、select、派发,具体因为与本文主题不合,不再展开。NioProcessor的初始化是在NioSocketAcceptor的构造方法中调用的:

public NioSocketAcceptor() {
        super(new DefaultSocketSessionConfig(), NioProcessor.class);
        ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
    }


   直接调用了父类AbstractPollingIoAcceptor的构造函数,在其中我们可以看到,默认是启动了一个SimpleIoProcessorPool来包装NioProcessor:

protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig,
            Class<? extends IoProcessor<T>> processorClass) {
        this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass),
                true);
    }


   这里其实是一个组合模式,SimpleIoProcessorPool和NioProcessor都实现了Processor接口,一个是组合形成的Processor池,而另一个是单独的类。调用的SimpleIoProcessorPool的构造函数是这样:

 private static final int DEFAULT_SIZE = Runtime.getRuntime().availableProcessors() + 1; 
    public SimpleIoProcessorPool(Class<? extends IoProcessor<T>> processorType) {
        this(processorType, null, DEFAULT_SIZE);
    }

    可以看到,默认的池大小是cpu个数+1,也就是创建了cpu+1个的Selector对象。它的重载构造函数里是创建了一个数组,启动一个CachedThreadPool来运行NioProcessor,通过反射创建具体的Processor对象,这里就不再列出了。

    Mina当有一个新连接建立的时候,就创建一个NioSocketSession,并且传入上面的SimpleIoProcessorPool,当连接初始化的时候将Session加入SimpleIoProcessorPool:

 protected NioSession accept(IoProcessor<NioSession> processor,
            ServerSocketChannel handle) throws Exception {

        SelectionKey key = handle.keyFor(selector);
        
        if ((key == null) || (!key.isValid()) || (!key.isAcceptable()) ) {
            return null;
        }

        // accept the connection from the client
        SocketChannel ch = handle.accept();
        
        if (ch == null) {
            return null;
        }

        return new NioSocketSession(this, processor, ch);
    }

        
        private void processHandles(Iterator<H> handles) throws Exception {
            while (handles.hasNext()) {
                H handle = handles.next();
                handles.remove();

                // Associates a new created connection to a processor,
                // and get back a session
                T session = accept(processor, handle);
                
                if (session == null) {
                    break;
                }

                initSession(session, null, null);

                // add the session to the SocketIoProcessor
                session.getProcessor().add(session);
            }
        }


    加入的操作是递增一个整型变量并且模数组大小后对应的NioProcessor注册到session里:


    private IoProcessor<T> nextProcessor() {
        checkDisposal();
        return pool[Math.abs(processorDistributor.getAndIncrement()) % pool.length];
    }

    if (p == null) {
            p = nextProcessor();
            IoProcessor<T> oldp =
                (IoProcessor<T>) session.setAttributeIfAbsent(PROCESSOR, p);
            if (oldp != null) {
                p = oldp;
            }
    }



    这样一来,每个连接都关联一个NioProcessor,也就是关联一个Selector对象,避免了所有连接共用一个Selector负载过高导致server响应变慢的后果。但是注意到NioSocketAcceptor也有一个Selector,这个Selector用来干什么的呢?那就是集中处理OP_ACCEPT事件的Selector,主要用于连接的接入,不跟处理读写事件的Selector混在一起,因此Mina的默认open的Selector是cpu+2个。

    看完mina2.0之后,我们来看看Grizzly2.0是怎么处理的,Grizzly还是比较保守,它默认就是启动两个Selector,其中一个专门负责accept,另一个负责连接的IO读写事件的管理。Grizzly 2.0中Selector的管理是通过SelectorRunner类,这个类封装了Selector对象以及核心的分发注册逻辑,你可以将他理解成Mina中的NioProcessor,核心的代码如下:

protected boolean doSelect() {
        selectorHandler = transport.getSelectorHandler();
        selectionKeyHandler = transport.getSelectionKeyHandler();
        strategy = transport.getStrategy();
        
        try {

            if (isResume) {
                // If resume SelectorRunner - finish postponed keys
                isResume = false;
                if (keyReadyOps != 0) {
                    if (!iterateKeyEvents()) return false;
                }
                
                if (!iterateKeys()) return false;
            }

            lastSelectedKeysCount = 0;
            
            selectorHandler.preSelect(this);
            
            readyKeys = selectorHandler.select(this);

            if (stateHolder.getState(false) == State.STOPPING) return false;
            
            lastSelectedKeysCount = readyKeys.size();
            
            if (lastSelectedKeysCount != 0) {
                iterator = readyKeys.iterator();
                if (!iterateKeys()) return false;
            }

            selectorHandler.postSelect(this);
        } catch (ClosedSelectorException e) {
            notifyConnectionException(key,
                    "Selector was unexpectedly closed", e,
                    Severity.TRANSPORT, Level.SEVERE, Level.FINE);
        } catch (Exception e) {
            notifyConnectionException(key,
                    "doSelect exception", e,
                    Severity.UNKNOWN, Level.SEVERE, Level.FINE);
        } catch (Throwable t) {
            logger.log(Level.SEVERE,"doSelect exception", t);
            transport.notifyException(Severity.FATAL, t);
        }

        return true;
    }


    基本上是一个reactor实现的样子,在AbstractNIOTransport类维护了一个SelectorRunner的数组,而Grizzly用于创建tcp server的类TCPNIOTransport正是继承于AbstractNIOTransport类,在它的start方法中调用了startSelectorRunners来创建并启动SelectorRunner数组:

private static final int DEFAULT_SELECTOR_RUNNERS_COUNT = 2;
 @Override
  public void start() throws IOException {

  if (selectorRunnersCount <= 0) {
                selectorRunnersCount = DEFAULT_SELECTOR_RUNNERS_COUNT;
            }
  startSelectorRunners();

}

 protected void startSelectorRunners() throws IOException {
        selectorRunners = new SelectorRunner[selectorRunnersCount];
        
        synchronized(selectorRunners) {
            for (int i = 0; i < selectorRunnersCount; i++) {
                SelectorRunner runner =
                        new SelectorRunner(this, SelectorFactory.instance().create());
                runner.start();
                selectorRunners[i] = runner;
            }
        }
    }


  可见Grizzly并没有采用一个单独的池对象来管理SelectorRunner,而是直接采用数组管理,默认数组大小是2。SelectorRunner实现了Runnable接口,它的start方法调用了一个线程池来运行自身。刚才我提到了说Grizzly的Accept是单独一个Selector来管理的,那么是如何表现的呢?答案在RoundRobinConnectionDistributor类,这个类是用于派发注册事件到相应的SelectorRunner上,它的派发方式是这样:

public Future<RegisterChannelResult> registerChannelAsync(
            SelectableChannel channel, int interestOps, Object attachment,
            CompletionHandler completionHandler) 
            throws IOException {
        SelectorRunner runner = getSelectorRunner(interestOps);
        
        return transport.getSelectorHandler().registerChannelAsync(
                runner, channel, interestOps, attachment, completionHandler);
    }
    
    private SelectorRunner getSelectorRunner(int interestOps) {
        SelectorRunner[] runners = getTransportSelectorRunners();
        int index;
        if (interestOps == SelectionKey.OP_ACCEPT || runners.length == 1) {
            index = 0;
        } else {
            index = (counter.incrementAndGet() % (runners.length - 1)) + 1;
        }
        
        return runners[index];
    }


    getSelectorRunner这个方法道出了秘密,如果是OP_ACCEPT,那么都使用数组中的第一个SelectorRunner,如果不是,那么就通过取模运算的结果+1从后面的SelectorRunner中取一个来注册。

    分析完mina2.0和grizzly2.0对Selector的管理后我们可以得到几个启示:

1、在处理大量连接的情况下,多个Selector比单个Selector好
2、多个Selector的情况下,处理OP_READ和OP_WRITE的Selector要与处理OP_ACCEPT的Selector分离,也就是说处理接入应该要一个单独的Selector对象来处理,避免IO读写事件影响接入速度。
3、Selector的数目问题,mina默认是cpu+2,而grizzly总共就2个,我更倾向于mina的策略,但是我认为应该对cpu个数做一个判断,如果CPU个数超过8个,那么更多的Selector线程可能带来比较大的线程切换的开销,mina默认的策略并非合适,幸好可以设置这个数值。

原文地址:http://click.aliyun.com/m/21432/              

转载于:https://www.cnblogs.com/iyulang/p/6878559.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/355556.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

js 编码 php解码,浅谈php和js中json的编码和解码

php中1)编码$jsonStr json_encode($array)2)解码$arr json_decode($jsonStr)echo json_encode("中文", JSON_UNESCAPED_UNICODE);添加参数&#xff1a;JSON_UNESCAPED_UNICODE即可。测试环境&#xff1a;PHP Version 5.5.36js中1. 编码var str obj.toJSONString()…

如果您在2015年编写过Java代码-这是您不容错过的趋势

去年我们有机会遇到的最有趣趋势的实用概述 在这篇文章中&#xff0c;我们将回顾构成我们2015年对话的5个主题和新发展。与其他许多年终总结保持较高水平的不同&#xff0c;我们将做一个更实际的操作不用流行语 。 好吧&#xff0c;没有太多*流行语。 与往常一样&#xff0c;对…

mel滤波器组频率响应曲线_了解二阶滤波器的奈奎斯特图

在之前的文章中&#xff0c;我介绍了奈奎斯特图&#xff0c;然后我们通过检查奈奎斯特曲线和截止频率与一阶无源滤波器之间的关系&#xff0c;更详细地探索了这些类型的图。在本文中&#xff0c;我们将查看二阶滤波器的奈奎斯特图。二阶过滤器当我说“二阶”滤波器时&#xff0…

CJOJ 免费航班

Description 小Z在MOI比赛中获得了大奖&#xff0c;奖品是一张特殊的机 票。使用这张机票&#xff0c;可以在任意一个国家内的任意城市之间的免费飞行&#xff0c;只有跨国飞行时才会有额外的费用。小Z获得了一张地图&#xff0c;地图上有城市之间的飞机航班和 费用。已知从每个…

java perl5compiler,Java中正则表达式使用方法详解(四)

3.2 HTML处理实例一下面一个任务是分析HTML页面内FONT标记的所有属性。HTML页面内典型的FONT标记如下所示程序将按照如下形式&#xff0c;输出每一个FONT标记的属性在这种情况下&#xff0c;我建议你使用两个正则表达式。第一个如图十一所示&#xff0c;它从字体标记提取出“&q…

java 缓存接口,java项目中,针对缓存问题的处理方式【接口中的处理方式】

1、在service包中&#xff0c;分别建立了关于缓存的一系列的接口、类等&#xff0c;封装到一个工具包中&#xff1b;临时缓存的接口(代码部分)&#xff1a;packagecom.tools;importjava.util.Date;public interfaceCacheTemplet {//设置添加永久缓存,(缓存唯一索引&#xff0c;…

【hh】我胡汉三又回来了

hh 差不多半年没来机房了&#xff0c;高一的都已经碾压我100题了 开始得比较晚&#xff0c;估计比高一的早两三个月吧&#xff0c;停了这半年落下了不少。 但是没有关系啊&#xff0c;学OI纯粹是好玩嘛&#xff0c;一开始报名的时候根本不知道有联赛这回事&#xff08;其实报名…

python爬取知乎标题_python爬虫 爬取知乎文章标题及评论

目的&#xff1a;学习笔记2.首先我们试着爬取下来一篇文章的评论&#xff0c;通过搜索发现在 response里面我们并没有匹配到评论&#xff0c;说明评论是动态加载的。3.此时我们清空请求&#xff0c;收起评论&#xff0c;再次打开评论 4.完成上面操作后&#xff0c;我们选择XHR&…

php curl 要安装pear,MacOS 安装pear

1、下载Pearcurl -O https://pear.php.net/go-pear.phar2、安装Pearsudo php -d detect_unicode0 go-pear.phar安装过程需要进行简单的配置&#xff0c;如下Below is a suggested file layout for your new PEAR installation. Tochange individual locations, type the number…

CentOS7 下调教mysql记实 之一

迁移数据库时遇到错误&#xff1a; Error Code: 1418. This function has none of DETERMINISTIC, NO SQL, or READS SQL DATA in its declaration and binary logging is enabled (you *might* want to use the less safe log_bin_trust_function_creators variable) 解释&…

java 映射数组_Java中的数组,列表,集合,映射,元组,记录文字

java 映射数组有时&#xff0c;当我对JavaScript的强大功能和表现力感到兴奋时&#xff0c;我发现自己错过了Java世界中的一两个功能。 除了lambda表达式/闭包或任何您想称为“匿名函数”的东西之外&#xff0c;它还对数组&#xff0c;数组&#xff0c;列表&#xff0c;集合&am…

知道接口地址 如何传数据_如何选显示器连接线?四种主流接口要知道

前两天家里电脑显示器的线坏了&#xff0c;火急火燎的买了根线&#xff0c;谁知道买回来之后接口不匹配&#xff0c;不能用。显示器为什么要有这么多接口呢&#xff1f;这些接口又有什么区别呢&#xff1f;必须把它搞清楚&#xff01;这不&#xff0c;经过我的不屑努力&#xf…

docker rabbitmq php扩展,Docker开启RabbitMQ延时消息队列

前言经常在开发中会遇到一些不需要同步执行的业务&#xff0c;那我们就需要用到消息队列来进行异步执行&#xff0c;但是对于某些业务就还需要用到延时的功能&#xff0c;比如订单支付超时关闭&#xff0c;那么这个时候我们就需要开启消息队列的延时功能&#xff0c;当然也有朋…

[转]Eclipse插件开发之基础篇(3) 插件的测试与调试

原文地址&#xff1a;http://www.cnblogs.com/liuzhuo/archive/2010/08/17/eclipse_plugin_1_1_2.html 1. 使用JUnit对插件进行测试 Eclipse中已经嵌入了JUnit&#xff0c;我们可以使用JUnit为插件进行单体测试。一般的JUnit是不可以对插件部分(对Eclipse的API依赖的部分)进行测…

您应该考虑将应用程序升级到Spring 4的5个理由

Spring Framework于2004年首次发布&#xff0c;是顶级Java框架之一。 Spring 4已于2013年12月发布&#xff0c;它是支持Java 8的第一个框架版本。了解为什么应该考虑将应用程序升级到Spring 4。 注意&#xff1a;我最初将此博客文章写在公司博客http://blog.goyello.com上 。 …

大整数乘法c语言代码_大整数乘法

大整数乘法和我们小学学过的乘法公式一样&#xff08;如下图&#xff09;&#xff0c;就是按位相乘&#xff0c;两个数中的每一位彼此相乘&#xff0c;然后将相同列的结果加起来&#xff0c;最后统一处理进位即可。#include <iostream> #include <cstring> using n…

13.5.SolrCloud集群使用手册之数据导入

转载请出自出处:http://www.cnblogs.com/hd3013779515/ 1.使用curl命令方式 SolrCloud时会根据路由规则路由到各个shard。 删除所有数据 curl http://192.168.137.171:8080/solr-cloud/myc_shard1_replica1/update?committrue -H "Content-Type: text/xml" --data-b…

dematel matlab,决策与实验室方法,DEMATEL分析方法介绍

DEMATEL实施步骤第一步&#xff1a;从研究目的出发&#xff0c;确定研究指标或元素。量化各元素之间的相互关系。得到直接影响矩阵。第二步&#xff1a;通过归一化原始关系矩阵。得到规范直接影响矩阵。第三步&#xff1a;由规范化直接影响矩阵。计算得到综合影响矩阵。。第四步…

python如何打印字符串_如何在Python中打印“漂亮”字符串输出

Standard Python string formatting就足够了。 # assume that your data rows are tuples template "{0:8}|{1:10}|{2:15}|{3:7}|{4:10}" # column widths: 8, 10, 15, 7, 10 print template.format("CLASSID", "DEPT", "COURSE NUMBER&qu…

mysql5.7环境,MySQL-5.7-线上生产环境部署

环境信息&#xff1a;Centos-7.2.1511MySQL 5.7.22业务名称core本篇文章数据库安装用于线上生成所使用&#xff0c;所安装的数据库使用为Percona版本&#xff0c;同样本站高可用部署都是基于本篇文章基础进行的。部署搭建&#xff1a;安装相关依赖并下载MySQL移动到指定目录yum…