netty客户端源码

随笔记录。

 

//创建一个ChannelFactory(客户端代码)

ChannelFactory factory = new NioClientSocketChannelFactory(

                  Executors.newCachedThreadPool(),

                  Executors.newCachedThreadPool());

// NioClientSocketChannelFactory构造方法

public NioClientSocketChannelFactory(

            Executor bossExecutor, Executor workerExecutor,

            int bossCount, int workerCount) {

        ...

        // 线程池

        this.bossExecutor = bossExecutor;

// 线程池

        this.workerExecutor = workerExecutor;

       // 构建ChannelSink,NioClientSocketPipelineSink实例

       // bossCount默认1,workerCount默认Runtime.getRuntime().availableProcessors() * 2

        sink = new NioClientSocketPipelineSink(

                bossExecutor, workerExecutor, bossCount, workerCount);

}

 

// NioClientSocketPipelineSink构造方法

NioClientSocketPipelineSink(Executor bossExecutor, Executor workerExecutor,

            int bossCount, int workerCount) {

        this.bossExecutor = bossExecutor;

      

        bosses = new Boss[bossCount];

        for (int i = 0; i < bosses.length; i ++) {

            bosses[i] = new Boss(i + 1);

        }

       

        workers = new NioWorker[workerCount];

        for (int i = 0; i < workers.length; i ++) {

            workers[i] = new NioWorker(id, i + 1, workerExecutor);

        }

}

 

// 创建Bootstrap并设置factory(客户端代码)

ClientBootstrap bootstrap = new ClientBootstrap(factory);

// Bootstrap类set方法

public void setFactory(ChannelFactory factory) {

        …

        this.factory = factory;

}

 

// 设置ChannelPipelineFactory,实现getPipeline方法,返回一个ChannelPipeline实现类

// (客户端代码)

bootstrap.setPipelineFactory(new ChannelPipelineFactory() {

              public ChannelPipeline getPipeline() {

                  ChannelPipeline pipeline = Channels.pipeline();

                  pipeline.addLast("encode",new StringEncoder());

                  pipeline.addLast("decode",new StringDecoder());

                  pipeline.addLast("handler1",new TimeClientHandler());

                  return pipeline;

              }

          });

DefaultChannelPipeline类addLast方法

public synchronized void addLast(String name, ChannelHandler handler) {

    if (name2ctx.isEmpty()) {

        // 初始化name2ctx,head,tail

        init(name, handler);

    } else {

        …

        DefaultChannelHandlerContext oldTail = tail;

        DefaultChannelHandlerContext    newTail =   new DefaultChannelHandlerContext(oldTail, null, name, handler);

        …

        // 最新的DefaultChannelHandlerContext放入tail以及更新到oldTail.next中

        oldTail.next = newTail;

        tail = newTail;

        name2ctx.put(name, newTail);

        …

    }

}

// 客户端发起连接请求(客户端代码)

bootstrap.connect (new InetSocketAddress("127.0.0.1", 8080));

// connect源代码解读

ClientBootstrap类connect方法

public ChannelFuture connect(final SocketAddress remoteAddress,

final SocketAddress localAddress) {

         …

        ChannelPipeline pipeline;

        try {

           // 返回 DefaultChannelPipeline对象实例

            pipeline = getPipelineFactory().getPipeline();

        } catch (Exception e) {

            throw new ChannelPipelineException("Failed to initialize a pipeline.", e);

        }

        // Set the options.

        // 返回NioClientSocketChannelFactory实例,并创建NioClientSocketChannel实例

        Channel ch = getFactory().newChannel(pipeline);

        ch.getConfig().setOptions(getOptions());

        // Bind.

        if (localAddress != null) {

            ch.bind(localAddress);

        }

        // Connect.

        return ch.connect(remoteAddress);

}

NioClientSocketChannelFactory类newChannel方法

public SocketChannel newChannel(ChannelPipeline pipeline) {

        //this为NioClientSocketChannelFactory实例

       //pipeline为DefaultChannelPipeline实例

       //sink为NioClientSocketPipelineSink实例

       // sink.nextWorker返回一个NioWorker实例

        return new NioClientSocketChannel(this, pipeline, sink, sink.nextWorker());

}

NioClientSocketChannel类构造方法

NioClientSocketChannel(

            ChannelFactory factory, ChannelPipeline pipeline,

            ChannelSink sink, NioWorker worker) {

        //  新创建一个SocketChannel(newSocket() = > SocketChannel.open())

        super(null, factory, pipeline, sink, newSocket(), worker);

        fireChannelOpen(this);

    }

继续看父类NioSocketChannel构造方法

public NioSocketChannel(

            Channel parent, ChannelFactory factory,

            ChannelPipeline pipeline, ChannelSink sink,

            SocketChannel socket, NioWorker worker) {

        super(parent, factory, pipeline, sink);

        this.socket = socket;

        this.worker = worker;

        config = new DefaultNioSocketChannelConfig(socket.socket());

}

继续看父类AbstractChannel构造方法

protected AbstractChannel(

            Channel parent, ChannelFactory factory,

            ChannelPipeline pipeline, ChannelSink sink) {

                   // 传入了一个null值

        this.parent = parent;

                   // NioClientSocketChannelFactory实例

        this.factory = factory;

                   // DefaultChannelPipeline实例

        this.pipeline = pipeline;

        id = allocateId(this);

        pipeline.attach(this, sink);

}

DefaultChannelPipeline类attach方法

public void attach(Channel channel, ChannelSink sink) {

        …

                   // NioClientSocketChannel实例

        this.channel = channel;

                   // NioClientSocketPipelineSink实例

        this.sink = sink;

}

 

// ClientBootstrap类connect方法中ch.connect(remoteAddress)

//类AbstractChannel

public ChannelFuture connect(SocketAddress remoteAddress) {

        return Channels.connect(this, remoteAddress);

}

//类Channels

public static ChannelFuture connect(Channel channel, SocketAddress remoteAddress) {

        if (remoteAddress == null) {

            throw new NullPointerException("remoteAddress");

        }

        ChannelFuture future = future(channel, true);

                   // DefaultChannelPipeline

                   // 新建一个ChannelState实例DownstreamChannelStateEvent

        channel.getPipeline().sendDownstream(new DownstreamChannelStateEvent(

                channel, future, ChannelState.CONNECTED, remoteAddress));

        return future;

}

//类NioClientSocketPipelineSink

public void eventSunk(

            ChannelPipeline pipeline, ChannelEvent e) throws Exception {

        if (e instanceof ChannelStateEvent) {

            ChannelStateEvent event = (ChannelStateEvent) e;

            NioClientSocketChannel channel =

                (NioClientSocketChannel) event.getChannel();

            ChannelFuture future = event.getFuture();

            ChannelState state = event.getState();

            Object value = event.getValue();

 

            switch (state) {

            case OPEN:

                if (Boolean.FALSE.equals(value)) {

                    channel.worker.close(channel, future);

                }

                break;

            case BOUND:

                if (value != null) {

                    bind(channel, future, (SocketAddress) value);

                } else {

                    channel.worker.close(channel, future);

                }

                break;

            case CONNECTED:

                if (value != null) {

                                               //第一次客户端发起连接

                    connect(channel, future, (SocketAddress) value);

                } else {

                    channel.worker.close(channel, future);

                }

                break;

            case INTEREST_OPS:

                channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());

                break;

            }

        } else if (e instanceof MessageEvent) {

            MessageEvent event = (MessageEvent) e;

            NioSocketChannel channel = (NioSocketChannel) event.getChannel();

            boolean offered = channel.writeBuffer.offer(event);

            assert offered;

            channel.worker.writeFromUserCode(channel);

        }

}

private void connect(

            final NioClientSocketChannel channel, final ChannelFuture cf,

            SocketAddress remoteAddress) {

        try {

// channel.socket在初始化NioClientSocketChannel时创建

//nio发起连接,因为设置了socket.configureBlocking(false)

//connect方法立即返回,返回值为false

//此时服务端已经收到了客户端发送的connect事件并进行处理

            if (channel.socket.connect(remoteAddress)) {

                channel.worker.register(channel, cf);

            } else {

                channel.getCloseFuture().addListener(new ChannelFutureListener() {

                    public void operationComplete(ChannelFuture f)

                            throws Exception {

                        if (!cf.isDone()) {

                            cf.setFailure(new ClosedChannelException());

                        }

                    }

                });

                cf.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);

                channel.connectFuture = cf;

                                     //注册事件,nextBoss()返回一个Runnable实例

                nextBoss().register(channel);

            }

 

        } catch (Throwable t) {

            cf.setFailure(t);

            fireExceptionCaught(channel, t);

            channel.worker.close(channel, succeededFuture(channel));

        }

}

// Boss内部类 NioClientSocketPipelineSink

void register(NioClientSocketChannel channel) {

    Runnable registerTask = new RegisterTask(this, channel);

    Selector selector;

 

    synchronized (startStopLock) {

        if (!started) {

            // Open a selector if this worker didn't start yet.

            try {

                // 打开一个选择器

                this.selector = selector =  Selector.open();

            } catch (Throwable t) {

                throw new ChannelException(

                        "Failed to create a selector.", t);

            }

 

            // Start the worker thread with the new Selector.

            boolean success = false;

            try {

                //启动线程,消费任务队列

//bossExecutor是客户端代码Executors.newCachedThreadPool()所创建

// nio的selector.select(500)操作

                DeadLockProofWorker.start(

                        bossExecutor,

                        new ThreadRenamingRunnable(

                                this, "New I/O client boss #" + id + '-' + subId));

                success = true;

            } finally {

                if (!success) {

                    // Release the Selector if the execution fails.

                    try {

                        selector.close();

                    } catch (Throwable t) {

                        logger.warn("Failed to close a selector.", t);

                    }

                    this.selector = selector = null;

                    // The method will return to the caller at this point.

                }

            }

        } else {

            // Use the existing selector if this worker has been started.

            selector = this.selector;

        }

 

        assert selector != null && selector.isOpen();

 

        started = true;

        //写入队列一个注册任务

        boolean offered = registerTaskQueue.offer(registerTask);

        assert offered;

    }

 

    if (wakenUp.compareAndSet(false, true)) {

        selector.wakeup();

    }

}

//类DeadLockProofWorker

public static void start(final Executor parent, final Runnable runnable) {

  //parent为bossExecutor,即一个线程池

        ......

//开启一个子线程

        parent.execute(new Runnable() {

            public void run() {

                PARENT.set(parent);

                try {

                   // ThreadRenamingRunnable实例

                    runnable.run();

                } finally {

                    PARENT.remove();

                }

            }

        });

}

//类ThreadRenamingRunnable

public void run() {

       ......

        // Run the actual runnable and revert the name back when it ends.

        try {

          //runnable为Boss实例

            runnable.run();

        } finally {

            if (renamed) {

                // Revert the name back if the current thread was renamed.

                // We do not check the exception here because we know it works.

                currentThread.setName(oldThreadName);

            }

        }

}

// Boss内部类 NioClientSocketPipelineSink中

public void run() {

    boolean shutdown = false;

    Selector selector = this.selector;

    long lastConnectTimeoutCheckTimeNanos = System.nanoTime();

    for (;;) {

        wakenUp.set(false);

 

        try {

            // 设置超时阻塞

            int selectedKeyCount = selector.select(500);

 

            if (wakenUp.get()) {

                selector.wakeup();

            }

            // 消费队列中的事件

            //nio中register操作

            processRegisterTaskQueue();

 

            if (selectedKeyCount > 0) {

                //处理选择器获取到的事件

                processSelectedKeys(selector.selectedKeys());

            }

            ……

        } catch (Throwable t) {

           ……

        }

    }

}

 

private void processRegisterTaskQueue() {

for (;;) {

         //获取事件,task为registerTaskQueue.offer(registerTask);RegisterTask实例

        final Runnable task = registerTaskQueue.poll();

        if (task == null) {

            break;

        }

       //执行NioClientSocketPipelineSink中的内部类RegisterTask的Run方法

        task.run();

    }

}

 

//内部类RegisterTask NioClientSocketPipelineSink中

public void run() {

try {

         // nio socket注册,只有完成注册以后,才能和服务端进行通信

        channel.socket.register(

                boss.selector, SelectionKey.OP_CONNECT, channel);

    } catch (ClosedChannelException e) {

        channel.worker.close(channel, succeededFuture(channel));

    }

   ……

}

private void processSelectedKeys(Set<SelectionKey> selectedKeys) {

    for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {

        SelectionKey k = i.next();

        i.remove();

 

        if (!k.isValid()) {

            close(k);

            continue;

        }

 

        if (k.isConnectable()) {

            //完成客户端连接

            connect(k);

        }

    }

}

 

private void connect(SelectionKey k) {

    NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();

try {

         //nio完成客户端连接

        if (ch.socket.finishConnect()) {

            k.cancel();

            //NioWorker类注册

            ch.worker.register(ch, ch.connectFuture);

        }

    } catch (Throwable t) {

       .......

    }

}

 

类NioWorker负责读写事件注册处理

未完待续...

转载于:https://www.cnblogs.com/liuxinan/p/6073424.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/489600.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

汉字的ascii码值范围_ASCII代码

同学们&#xff0c;我们都知道计算机只能接受二进制信息&#xff0c;很显然直接给出英文的ABCD计算机并不认识&#xff0c;那为何敲击键盘屏幕就可以显示出对应的字符呢&#xff1f;为了解决这个问题&#xff0c;计算机采用一套编码&#xff0c;每个编码都是唯一的&#xff0c;…

从ICLR提交论文看机器学习的趋势和风口

大数据文摘出品来源&#xff1a;deepsense2013年才举办第一届的ICLR&#xff08;The International Conference on Learning Representations&#xff09;发展迅猛&#xff0c;如今已成为是最重要的国际机器学习会议之一&#xff0c;甚至可以和ICML&#xff0c;NeurIPS和CVPR这…

php 登录安全认证,介绍几种常用的web安全认证方式

本文为大家介绍了五种常用的web安全认证方式&#xff0c;具有一定的参考价值&#xff0c;希望能对大家有所帮助。1、Http Basic Auth这是一种最古老的安全认证方式&#xff0c;这种方式就是简单的访问API的时候&#xff0c;带上访问的username和password&#xff0c;由于信息会…

日期选择控件-laydate

laydate控件非常简单易用&#xff0c;只需要调用一个个函数就可以轻松实现日期时间选择。 <% page language"java" import"java.util.*" pageEncoding"UTF-8"%><%String path request.getContextPath();String basePath request.getS…

python decorator. decorator_Python中decorator使用实例

在我以前介绍 Python 2.4 特性的Blog中已经介绍过了decorator了&#xff0c;不过&#xff0c;那时是照猫画虎&#xff0c;现在再仔细描述一下它的使用。关于decorator的详细介绍在 Python 2.4中的Whats new中已经有介绍&#xff0c;大家可以看一下。如何调用decorator基本上调用…

php文件上传到虚拟主机,php源码上传到虚拟主机(php源码上传到服务器)

php网站的源码在上传到虚拟主机之前&#xff0c;需要做什么修改本人小白&#xff0c;只知道需。这个啊&#xff0c;倒是简单&#xff0c;你下载个ftp软件&#xff0c;登陆上传即可&#xff0c;不过要注意传对目录&#xff0c;一般的虚拟主机都有好几目录的&#xff0c;要传合适…

这个“大脑”收获一份大奖!

来源&#xff1a;新华社第17届亚洲-太平洋通讯社组织&#xff08;亚通组织&#xff09;全体大会8日在韩国首尔闭幕。大会颁发了亚通组织卓越通讯社品质奖&#xff0c;中国新华通讯社与越南通讯社分别获奖。这是亚通组织主席、阿塞拜疆国家新闻社社长阿斯兰阿斯兰诺夫&#xff0…

arm-linux-gnueabi和arm-linux-gnueabihf 的区别

转载整理自&#xff1a;http://www.cnblogs.com/xiaotlili/p/3306100.html 一、 什么是ABI和EABI1 、ABI ABI(二进制应用程序接口-Application Binary Interface (ABI) for the ARM Architecture)在计算机中&#xff0c;应用二进制接口描述了应用程序&#xff08;或者其他类型&…

检查用户名是否存在的servlet代码怎么写_Servlet详解!!!

1 掌握 请求转发2 掌握 请求重定向3 掌握cookie1. 请求转发介绍(1) 为什么需要请求转发?以此请求的处理需要多个Servlet的联动操作,第一个Servlet需要用到其他Servlet已经声明的逻辑处理代码(2) 请求转发的本质是什么&#xff1f;其实就是在一个Servlet中调用其他的Servlet2. …

学习人工智能必须攻克三道门槛:数学基础、英语水平与编程技术

来源&#xff1a;搜狐广义的说&#xff0c;人工智能包含诸多不同方法&#xff0c;其主旨是让程序像一个智能体一样解决问题。机器学习是实现人工智能的一种方法&#xff0c;它不完全依靠预先设计&#xff0c;而是从数据中进行总结&#xff0c;达到模拟记忆、推理的作用。包括诸…

oracle 批量 重建索引,Oracle重建索引Shell脚本、SQL脚本分享

索引是提高数据库查询性能的有力武器。没有索引&#xff0c;就好比图书馆没有图书标签一样&#xff0c;找一本书自己想要的书比登天还难。然而索引在使用的过程中&#xff0c;尤其是在批量的DML的情形下会产生相应的碎片&#xff0c;以及B树高度会发生相应变化&#xff0c;因此…

pandas 学习(二)—— pandas 下的常用函数

import pandas as pd; 1. 数据处理函数 pd.isnull()/pd.notnull()&#xff1a;用于检测缺失数据&#xff1b;2. 辅助函数 pd.to_datetime()3. Series 与 DataFrame 的成员函数 drop(labels, axis0, levelNone, inplaceFalse, errors’raise’) 注意第一个参数&#xff08;label…

python中set index_python中set基础应用

set:类似dict,是一组key的集合&#xff0c;不存储value本质是无序和无重复元素的集合#创建#创建set需要一个list或者tuple或者dict作为输入集合s1set({1,2,3,4,5})s2set({1,2,2,5,3,3,5})s3set({1:"123",2:"daf"})print(s1)#{1, 2, 3, 4, 5}print(s2)#{1, …

【智能驾驶】自动驾驶深度感知技术对车和行人的检测

来源&#xff1a;小马智行第二场技术沙龙今天我主要想分享自动驾驶感知技术在探索的过程中&#xff0c;采用的传统方法和深度学习方法。传统方法不代表多传统&#xff0c;深度学习也不代表多深度。它们有各自的优点&#xff0c;也都能解决各自的问题&#xff0c;最终希望将其结…

matlab读取其他位置,将文件的数据读取到matlab中,进行编辑,然后将其保存到其他位置...

将文件的数据读取到matlab中&#xff0c;进行编辑&#xff0c;然后将其保存到其他位置 我有一个名为EXP1_SQ1_Template.txt的文件。这是一个简单的文本文件&#xff0c;包含以下8行&#xff1a;LOAD BOX 1 SUBJ M1_299633_D295158_JUN191910_Aut_ERROR2 EXPT St(m)_Se(n)_Rat1 …

oracle 11g安装过程中问题:找不到WFMLRSVCApp.ear

网上的方法是将两个压缩包解压到同一个目录中&#xff0c;我的方法是不再此解压&#xff0c;麻烦&#xff0c;直接将解压出的内容剪切过去&#xff0c;方便省事&#xff0c;原理也是相同的。解决方法&#xff1a; 将win64_11gR2_database_2of2解压出的文件&#xff0c;\win64_1…

python字符串随机排序_python 随机数使用方法,推导以及字符串,双色球小程序实例...

#随机数的使用import random #导入randomrandom.randint(0,9)#制定随机数0到9irandom.sample(range(1,34),6)#输出6个随机数&#xff0c;范围是1到34i.sort()#排序方法&#xff0c;排序时更改原数组&#xff0c;无返回值sorted(i)#排序函数&#xff0c;排序时不影响原数组&…

中国信通院《新型智慧城市发展研究报告》

来源&#xff1a;云头条本报告结合新时期我国新型智慧城市的建设重点&#xff0c;围绕顶层设计、体制机制、智能基础设施、智能运行中枢、智慧生活、智慧生产、智慧治理、智慧生态、技术创新与标准体系和安全保障体系等十大核心要素&#xff0c;深入分析研究了我国新型智慧城市…

python处理一亿条数据_Python基础数据处理库

Numpy 简介import numpy as npNumpy是应用Python进行科学计算的基础库。它的功能包括多维数组、基本线性代数、基本统计计算、随机模拟等。Numpy的核心功能是ndarray 类&#xff0c;即多维数组。多维数组是线性代数中非常广泛的概念&#xff0c;如一维数组就是向量&#xff0c;…

oracle 内存分析工具,IDE 中的分析工具

IDE 中的分析工具Oracle Solaris Studio IDE 提供的交互式图形分析工具可用于检查在 IDE 内部运行的项目的性能。分析工具使用 Oracle Solaris Studio 实用程序和操作系统实用程序来收集数据。可通过 "Profile Project"(分析项目)按钮使用分析工具。Monitor Project(…