从netty-example分析Netty组件

分析netty从源码开始

准备工作:

1.下载源代码:https://github.com/netty/netty.git

    我下载的版本为4.1

2. eclipse导入maven工程。

 

netty提供了一个netty-example工程,

分类如下:

Fundamental

  • Echo ‐ the very basic client and server
  • Discard ‐ see how to send an infinite data stream asynchronously without flooding the write buffer
  • Uptime ‐ implement automatic reconnection mechanism

Text protocols

  • Telnet ‐ a classic line-based network application
  • Quote of the Moment ‐ broadcast a UDP/IP packet
  • SecureChat ‐ an TLS-based chat server, derived from the Telnet example

Binary protocols

  • ObjectEcho ‐ exchange serializable Java objects
  • Factorial ‐ write a stateful client and server with a custom binary protocol
  • WorldClock ‐ rapid protocol protyping with Google Protocol Buffers integration

HTTP

  • Snoop ‐ build your own extremely light-weight HTTP client and server
  • File server ‐ asynchronous large file streaming in HTTP
  • Web Sockets (Client & Server) ‐ add a two-way full-duplex communication channel to HTTP using Web Sockets
  • SPDY (Client & Server) ‐ implement SPDY protocol
  • CORS demo ‐ implement cross-origin resource sharing

Advanced

  • Proxy server ‐ write a highly efficient tunneling proxy server
  • Port unification ‐ run services with different protocols on a single TCP/IP port

UDT

  • Byte streams ‐ use UDT in TCP-like byte streaming mode
  • Message flow ‐ use UDT in UDP-like message delivery mode
  • Byte streams in symmetric peer-to-peer rendezvous connect mode
  • Message flow in symmetric peer-to-peer rendezvous connect mode

我们的分析从这里开始,netty是client-server形式的,我们以最简单的discard示例开始,其服务器端代码如下:

/*** Discards any incoming data.*/
public final class DiscardServer {static final boolean SSL = System.getProperty("ssl") != null;static final int PORT = Integer.parseInt(System.getProperty("port", "8009"));public static void main(String[] args) throws Exception {// Configure SSL.final SslContext sslCtx;if (SSL) {SelfSignedCertificate ssc = new SelfSignedCertificate();sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();} else {sslCtx = null;}EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) {ChannelPipeline p = ch.pipeline();if (sslCtx != null) {p.addLast(sslCtx.newHandler(ch.alloc()));}p.addLast(new DiscardServerHandler());}});// Bind and start to accept incoming connections.ChannelFuture f = b.bind(PORT).sync();// Wait until the server socket is closed.// In this example, this does not happen, but you can do that to gracefully// shut down your server.
            f.channel().closeFuture().sync();} finally {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}}
}

上面的代码中使用了下面几个类:

1. EventLoopGroup

实现类为NioEventLoopGroup,其层次结构为:

  EventExecutorGroup为所有类的父接口,它通过next()方法来提供EventExecutor供使用。除此以外,它还负责处理它们的生命周期,允许以优雅的方式关闭。

  EventExecutor是一种特殊的EventExcutorGroup,它提供了一些便利的方法来查看一个线程是否在一个事件循环中执行过,除此以外,它也扩展了EventExecutorGroup,从而提供了一个通用的获取方法的方式。

  EventLoopGroup是一种特殊的EventExcutorGroup,它提供了channel的注册功能,channel在事件循环中将被后面的selection来获取到。

  NioEventLoopGroup继承自MultithreadEventLoopGroup,基于channel的NIO selector会使用该类。

2.ServerBootstrap使ServerChannel容易自举。

  group(EventLoopGroup parentGroup, EventLoopGroup childGroup)方法设置父EventLoopGroup和子EventLoopGroup。这些EventLoopGroup用来处理所有的事件和ServerChannel和Channel的IO。

   channel(Class<? extends C> channelClass)方法用来创建一个Channel实例。创建Channel实例要不使用此方法,如果channel实现是无参构造要么可以使用channelFactory来创建。

  handler(ChannelHandler handler)方法,channelHandler用来处理请求的。

  childHandler(ChannelHandler childHandler)方法,设置用来处理请求的channelHandler。

3. ChannelInitializer一种特殊的ChannelInboundHandler

  当Channel注册到它的eventLoop中时,ChannelInitializer提供了一个方便初始化channel的方法。该类的实现通常用来设置ChannelPipeline的channel,通常使用在Bootstrap#handler(ChannelHandler),ServerBootstrap#handler(ChannelHandler)和ServerBootstrap#childHandler(ChannelHandler)三个场景中。示例:

 public class MyChannelInitializer extends ChannelInitializer{public void initChannel({@link Channel} channel) {channel.pipeline().addLast("myHandler", new MyHandler());}}

然后:

ServerBootstrap bootstrap = ...;
...
bootstrap.childHandler(new MyChannelInitializer());

注意:这个类标示为可共享的,因此实现类重用时需要时安全的。

4. ChannelPipeline相关

  理解ChannelPipeline需要先理解ChannelHandler,

 4.1 ChannelHandler

  处理一个IO事件或者翻译一个IO操作,并且传递给ChannelPineline的下一个handler。

   你可以使用ChannelHandlerAdapter来替代它

   因为这个接口有太多接口需要实现,因此你只有实现ChannelHandlerAdapter就可以代替实现这个接口。

  Context对象

  ChannelHandlerContext封装了ChannelHandler。ChannelHandler应该通过context对象与它所属的ChannelPipeLine进行交互。通过使用context对象,ChannelHandler可以传递上行或者下行事件,或者动态修改pipeline,或者存储特定handler的信息(使用AttributeKey)。

  状态管理

  一个channelHandler通常需要存储一些状态信息。最简单最值得推荐的方法是使用member变量:

 public interface Message {// your methods here
  }public class DataServerHandler extends  SimpleChannelInboundHandler<Message> {private boolean loggedIn;{@code @Override}protected void messageReceived( ChannelHandlerContext ctx, Message message) {Channel ch = e.getChannel();if (message instanceof LoginMessage) {authenticate((LoginMessage) message);loggedIn = true;} else (message instanceof GetDataMessage) {if (loggedIn) {ch.write(fetchSecret((GetDataMessage) message));} else {fail();}}}...}

注意:handler的状态附在ChannePipelineContext上,因此可以增加相同的handler实例到不同的pipeline上:

  public class DataServerInitializer extends ChannelInitializer<Channel> {private static final DataServerHandler SHARED = new DataServerHandler();@Overridepublic void initChannel(Channel channel) {channel.pipeline().addLast("handler", SHARED);}}

 @Sharable注解

  在上面的示例中,使用了一个AttributeKey,你可能注意到了@Sharable注解。

  如果一个ChannelHandler使用@sharable进行注解,那就意味着你仅仅创建了一个handler一次,可以添加到一个或者多个ChannelPipeline多次而不会产生竞争。

  如果没有指定该注解,你必须每次都创建一个新的handler实例,并且增加到一个ChannelPipeline,因为它没有像member变量一样,它有一个非共享的状态。

4.2  ChannelPipeline

      ChanelPipeline是一组ChanelHandler的集合,它处理或者解析Channel的Inbound事件和OutBound操作。ChannelPipeline的实现是Intercepting Filter的一种高级形式,这样用户可以控制事件如何处理,一个pipeline内部ChannelHandler如何交互。

   pipeline事件流程

  上图描述了IO事件如何被一个ChannelPipeline的ChannelHandler处理的。一个IO事件被一个ChannelInBoundHandler处理或者ChannelOutboundHandler,然后通过调用在ChannelHandlerContext中定义的事件传播方法传递给最近的handler,传播方法有ChannelHandlerContext#filreChannelRead(Object)和ChannelHandlerContext#write(Object)。

   一个Inbound事件通常由Inbound handler来处理,如上如左上。一个Inbound handler通常处理在上图底部IO线程产生的Inbound数据。Inbound数据通过真实的输入操作如SocketChannel#read(ByteBuffer)来获取。如果一个inbound事件越过了最上面的inbound handler,该事件将会被抛弃到而不会通知你或者如果你需要关注,打印出日志。

  一个outbound事件由上图的右下的outbound handler来处理。一个outbound handler通常由outbound流量如写请求产生或者转变的。如果一个outbound事件越过了底部的outbound handler,它将由channel关联的IO线程处理。IO线程通常运行的是真实的输出操作如SocketChannel#write(byteBuffer).

示例,假设我们创建下面这样一个pipeline:

 ChannelPipeline} p = ...;p.addLast("1", new InboundHandlerA());p.addLast("2", new InboundHandlerB());p.addLast("3", new OutboundHandlerA());p.addLast("4", new OutboundHandlerB());p.addLast("5", new InboundOutboundHandlerX());

  在上例中,inbound开头的handler意味着它是一个inbound handler。outbound开头的handler意味着它是一个outbound handler。上例的配置中当一个事件进入inbound时handler的顺序是1,2,3,4,5;当一个事件进入outbound时,handler的顺序是5,4,3,2,1.在这个最高准则下,ChannelPipeline跳过特定handler的处理来缩短stack的深度:

  3,4没有实现ChannelInboundHandler,因而一个inbound事件的处理顺序是1,2,5.

  1,2没有实现ChannelOutBoundhandler,因而一个outbound事件的处理顺序是5,4,3

  若5同时实现了ChannelInboundHandler和channelOutBoundHandler,一个inbound和一个outbound事件的执行顺序分别是125和543.

  一个事件跳向下一个handler

  如上图所示,一个handler触发ChannelHandlerContext中的事件传播方法,然后传递到下一个handler。这些方法有:

  inbound 事件传播方法:

  

      ChannelHandlerContext#fireChannelRegistered()ChannelHandlerContext#fireChannelActive()ChannelHandlerContext#fireChannelRead(Object)ChannelHandlerContext#fireChannelReadComplete()ChannelHandlerContext#fireExceptionCaught(Throwable)ChannelHandlerContext#fireUserEventTriggered(Object)ChannelHandlerContext#fireChannelWritabilityChanged()ChannelHandlerContext#fireChannelInactive()ChannelHandlerContext#fireChannelUnregistered()

 

  outbound事件传播方法:

ChannelHandlerContext#bind(SocketAddress, ChannelPromise)
ChannelHandlerContext#connect(SocketAddress, SocketAddress, ChannelPromise)
ChannelHandlerContext#write(Object, ChannelPromise)
ChannelHandlerContext#flush()
ChannelHandlerContext#read()
ChannelHandlerContext#disconnect(ChannelPromise)
ChannelHandlerContext#close(ChannelPromise)
ChannelHandlerContext#deregister(ChannelPromise)

下面的示例展示了事件是如何传播的:

  public class MyInboundHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext} ctx) {System.out.println("Connected!");ctx.fireChannelActive();}}public clas MyOutboundHandler extends ChannelOutboundHandlerAdapter {@Overridepublic void close(ChannelHandlerContext} ctx, ChannelPromise} promise) {System.out.println("Closing ..");ctx.close(promise);}}

  创建一个pipeline

  在pipeline中,一个用户一般由一个或者多个ChannelHandler来接收IO事件(例如读)和IO操作请求(如写或者close)。例如,一个典型的服务器pipeline通常具有以下几个handler,但最多有多少handler取决于协议和业务逻辑的复杂度:

Protocol Decoder--将二进制数据(如ByteBuffer)转换成一个java对象

Protocol Encoder--将一个java对象转换成二进制数据。

Business Logic Handler--处理真实的业务逻辑(如数据库访问)。

让我们用下面的示例展示:

 static final  EventExecutorGroup group = new  DefaultEventExecutorGroup(16);...ChannelPipeline} pipeline = ch.pipeline();pipeline.addLast("decoder", new MyProtocolDecoder());pipeline.addLast("encoder", new MyProtocolEncoder());// Tell the pipeline to run MyBusinessLogicHandler's event handler methods// in a different thread than an I/O thread so that the I/O thread is not blocked by// a time-consuming task.// If your business logic is fully asynchronous or finished very quickly, you don't// need to specify a group.pipeline.addLast(group, "handler", new MyBusinessLogicHandler());

  线程安全

  因为ChannelPipeline是线程安全的,一个channelhandler可以在任意时间内增加或者删除。例如,当有敏感信息交换时,你可以插入一个加密handler,然后当信息交换结束后删除该handler。

 4.3 Channel

 Channel是网络socket的一个纽带或者一个处理IO操作如读、写、连接、绑定的组件。一个Channel提供如下信息:

    当前channel的状态,如它是否开启?是否连接?

    Channel的ChannelConfig的配置参数,如接受缓存大小;

    channel支持的IO操作,如读、写、连接、绑定;

    channel支持的ChannelPipeline,它处理所有的IO事件和channel关联的请求。

  所有的IO操作都是异步的。

  在Netty中所有的IO操作都是异步的。这意味着所有的IO调用将立即返回,但不保证在调用结束时请求的IO操作都已经执行完毕。而是在请求操作处理完成、失败或者取消时返回一个ChannelFuture来通知。

     Channel是继承性的。

  一个Channel可以它如何创建的来获取它的父Channel(#parent()方法)。例如:一个由ServerSocketChannel接受的SocketChannel调用parent()方法时返回ServerSocketChannel。

  继承的结构依赖于Channel的所属transport实现。例如,你可以新写一个Channel实现,它创建了一个共享同一个socket连接的子channel,如BEEP和SSH

 向下去获取特定transport操作。

  一些transport会暴露一些该transport特定的操作。Channel向下转换到子类型可以触发这些操作。例如:老的IO datargram transport,DatagramChannel提供了多播的join和leave操作。

  释放资源

  当Channel处理完后,一定记得调用close()或者close(ChannelPromise)来释放资源。

5. channelFuture

channelFuture是异步IO操作的返回值。

  在Netty中所有的IO操作都是异步的。这意味着所有的IO调用将立即返回,但不保证在调用结束时请求的IO操作都已经执行完毕。而是在请求操作处理完成、失败或者取消时返回一个ChannelFuture来通知。

  当一个IO操作开始时,创建一个新的future。ChannelFuture要么是uncompleted,要么是completed。新的future开始时是uncompleted---既不是成功、失败,也不是取消,因为IO操作还没有开始呢。若IO操作结束时future要么成功,要么失败或者取消,标记为completed的future有更多特殊的意义,例如失败的原因。请注意:即使失败和取消也属于completed状态。

  有很多方法可以查询IO操作是否完成:等待完成,检索IO操作的结果。同样也允许你增加ChannelFutureListenner,这样你可以在IO操作完成后获得通知。

  在尽可能的情况下,推荐addListenner()方法而不是await()方法,当IO操作完成后去完成接下来的其它任务时去获取通知。

6.ChannelHandlerContext

  对ChannelHandler相关信息的包装。

小结

  netty处理请求的总流程是经过ChannelPipeline中的多个ChannelHandler后,返回结果ChannelFuture。如下图所示:

具体I/O操作调用的流程, 
应用->Channel的I/O操作->调用Pipeline相应的I/O操作->调用ChannelHandlerContext的相应I/O操作->调用ChannelHandler的相应操作->Channel.UnSafe中相关的I/O操作。 
应用为什么不直接调用Channel.UnSafe接口中的I/O操作呢,而要绕一个大圈呢?因为它是框架,要支持扩展。 
执行者完成操作后,是如何通知命令者的呢?一般流程是这样的: 
Channel.UnSafe中执行相关的I/O操作,根据操作结果->调用ChannelPipeline中相应发fireXXXX()接口->调用ChannelHandlerContext中相应的fireXXXX()接口->调用ChannelHandler中相应方法->调用应用中的相关逻辑。

 

 参考文献:

【1】http://www.jiancool.com/article/71493268111/

【2】http://blog.csdn.net/pingnanlee/article/details/11973769

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/459442.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

cep

cep posted on 2015-12-16 17:03 秦瑞It行程实录 阅读(...) 评论(...) 编辑 收藏 转载于:https://www.cnblogs.com/ruiy/p/5051673.html

RHCS集群原理概述

一、 什么是RHCSRHCS是Red Hat Cluster Suite的缩写&#xff0c;也就是红帽集群套件&#xff0c;RHCS是一个能够提供高可用性、高可靠性、负载均衡、存储共享且经济廉价的集群工具集合&#xff0c;它将集群系统中三大集群架构融合一体&#xff0c;可以给web应用、数据库应用等提…

Linux学习笔记11——文件I/O之二

一、文件共享 内核使用三种数据结构表示打开的文件&#xff0c;它们之间的关系决定了在文件共享方面一个进程对另一个进程可能产生的影响。 1、每个进程在进程表中都有一个记录项&#xff0c;记录项中包含有一张打开文件描述表  2、内核为所有打开文件维持一张文件表  3、每…

Git Proxy开关

2019独角兽企业重金招聘Python工程师标准>>> 这个是配合ShadowSocks使用的&#xff0c;在~/.bash_aliases或者~/.bash_profile中设置以下代码&#xff1a; #git proxy enable alias gitpe"git config --global http.proxy socks5://127.0.0.1:1080;git config …

平衡二叉查找树插入节点操作( AVLTree ):旋转、调整平衡

AVL树的插入 在向一棵本来高度平衡的AVL树中插入一个新节点时&#xff0c;如果树中某个结点的平衡因子的绝对值 > 1&#xff0c;则出现了不平衡。设新插入结点为P&#xff0c;从结点P到根节点的路径上&#xff0c;每个结点为根的子树的高度都可能增加1&#xff0c;因此在每…

Fork/Join框架介绍

转http://www.infoq.com/cn/articles/fork-join-introduction/ 1. 什么是Fork/Join框架 Fork/Join框架是Java7提供了的一个用于并行执行任务的框架&#xff0c; 是一个把大任务分割成若干个小任务&#xff0c;最终汇总每个小任务结果后得到大任务结果的框架。 我们再通过Fork和…

为什么析构函数可以能声明为虚函数,构造函数不可以

转自&#xff1a;http://blog.csdn.NET/chen825919148/article/details/8020550 构造函数不能声明为虚函数&#xff0c;析构函数可以声明为虚函数&#xff0c;而且有时是必须声明为虚函数。 不建议在构造函数和析构函数里面调用虚函数。 构造函数不能声明为虚函数的原因是: 1 …

信号集操作函数,信号未决、阻塞、递达

转载&#xff1a;信号集操作函数&#xff0c;信号阻塞与未决 一&#xff0c;信号集及相关操作函数 信号集被定义为一种数据类型&#xff1a; typedef struct { unsigned long sig[_NSIG_WORDS]&#xff1b; } sigset_t 信号集用来描述信号的集合&#xff0c;每个信号占用一位&a…

线程安全和可重入函数的联系与区别

1、 线程安全&#xff1a; 线程安全是多线程访问时&#xff0c;采用了加锁机制&#xff0c;当一个线程访问该类的某个数据时&#xff0c;进行保护&#xff0c;其他线程不能进行访问直到该线程访问完&#xff0c;其他线程才可以使用。不会出现数据不一致或数据污染。 线程不…

C++11 多线程 基础

C11开始支持多线程编程&#xff0c;之前多线程编程都需要系统的支持&#xff0c;在不同的系统下创建线程需要不同的API如pthread_create()&#xff0c;Createthread()&#xff0c;beginthread()等&#xff0c;使用起来都比较复杂&#xff0c;C11提供了新头文件<thread>、…

LB负载均衡集群--LVS

LB集群&#xff1a;LB集群是load balance 集群的简写&#xff0c;翻译成中文就是负载均衡集群。常用的负载均衡开源软件有nginx、lvs、keepalived &#xff0c;商业的硬件负载设备F5、Netscale。LB集群架构&#xff1a;当用户的请求过来时&#xff0c;会直接发到分发器&#xf…

2015 UESTC 搜索专题B题 邱老师降临小行星 记忆化搜索

邱老师降临小行星 Time Limit: 20 Sec Memory Limit: 256 MB 题目连接 http://acm.uestc.edu.cn/#/contest/show/61Description 人赢邱老师和任何男生比&#xff0c;都是不虚的。有一天&#xff0c;邱老师带妹子(们)来到了一个N行M列平面的小行星。对于每一个着陆地点&#xf…

优化表的数据类型

我们可以使用PROCEDURE ANALYSE()对当前已有应用的表类型的判断&#xff0c;该函数可以对数据表中的列的数据类型提出优化建议&#xff0c;可以根据应用的实际情况酌情考虑是否实施优化。语法&#xff1a; SELECT * FROM tbl_name PROCEDURE ANALYSE(); SELECT * FROM tb…

Linux 信号之mysleep

一、 用alarm和pause实现sleep(3)函数,称为mysleep。 1. main函数调用mysleep函数,后者调用sigaction注册了SIGALRM信号的处理函数sig_alrm。 2. 调用alarm(seconds)设定闹钟。 3. 调用pause等待,内核切换到别的进程运行。 4. seconds秒之后,闹钟超时,内核发SIGALRM给这个…

JAVA 操作系统已经来到第五个版本了 现陆续放出三个版本 这是第二个版本

1 package System2;2 3 import javax.swing.*;4 5 import java.awt.*;6 import java.awt.event.ActionEvent;7 import java.awt.event.ActionListener;8 import java.awt.event.KeyListener;9 import java.util.*;10 /**11 * 作者:范铭祥12 * 内容及功能&#xff1a; 显示框…

标准Web系统的架构分层

1、架构体系分层图 在上图中我们描述了Web系统架构中的组成部分。并且给出了每一层常用的技术组件/服务实现。需要注意以下几点&#xff1a; 系统架构是灵活的&#xff0c;根据需求的不同&#xff0c;不一定每一层的技术都需要使用。例如&#xff1a;一些简单的CRM系统可能在产…

数据链路层差错检测:CRC(循环冗余检验)

1、循环冗余检验&#xff08;CRC&#xff09;&#xff1a; 在发送端&#xff0c;先把数据划分为祖&#xff0c;假定每组K个比特。现假定待传送的数据M 101001&#xff08;k6&#xff09;。CRC运算就是在数据M后面添加提供差错检测的n位冗余码&#xff0c;然后构成一个帧发送出…

算法导论笔记:25所有节点对的最短路径问题

本章考虑在给定的有向加权图G(V, E)&#xff0c;对于所有的节点u,v∈V&#xff0c;找到一条从节点u到节点v的最短路径。希望以表格的形式表示输出&#xff1a;第u行第v列给出的是节点u到节点v的最短路径权重。 对于这个问题&#xff0c;如果是运行|V|次单源最短路径算法来解决所…

iOS开发~UI布局(二)storyboard中autolayout和size class的使用详解

一、概要&#xff1a;前一篇初步的描述了size class的概念&#xff0c;那么实际中如何使用呢&#xff0c;下面两个问题是我们一定会遇到的&#xff1a;1、Xcode6中增加了size class&#xff0c;在storyboard中如何使用&#xff1f; 2、auto layout该如何与size class配合来进行…

Linux:守护进程解析、如何实现守护进程

1、守护进程&#xff1a; 守护进程也称精灵进程&#xff08;Daemon&#xff09;&#xff0c;是运行在后台的⼀一种特殊进程。它独立于控制终端且周期性地执行某种任务或等待处理某些发生的事件。守护进程是⼀一种很有用的进程。Linux的大多数服务器就是用守护进程实现的。比如…