基于事件的 NIO 多线程服务器

2019独角兽企业重金招聘Python工程师标准>>> hot3.png

JDK1.4 的 NIO 有效解决了原有流式 IO 存在的线程开销的问题,在 NIO 中使用多线程,主要目的已不是为了应对每个客户端请求而分配独立的服务线程,而是通过多线程充分使用用多个 CPU 的处理能力和处理中的等待时间,达到提高服务能力的目的。 

多线程的引入,容易为本来就略显复杂的 NIO 代码进一步降低可读性和可维护性。引入良好的设计模型,将不仅带来高性能、高可靠的代码,也将带来一个惬意的开发过程。

线程模型

NIO 的选择器采用了多路复用(Multiplexing)技术,可在一个选择器上处理多个套接字, 通过获取读写通道来进行 IO 操作。由于网络带宽等原因,在通道的读、写操作中是容易出现等待的, 所以在读、写操作中引入多线程,对性能提高明显,而且可以提高客户端的感知服务质量。所以本文的模型将主要通过使用读、写线程池 来提高与客户端的数据交换能力。

如下图所示,服务端接受客户端请求后,控制线程将该请求的读通道交给读线程池,由读线程池分配线程完成对客户端数据的读取操作;当读线程完成读操作后,将数据返回控制线程,进行服务端的业务处理;完成 业务处理后,将需回应给客户端的数据和写通道提交给写线程池,由写线程完成向客户端发送回应数据的操作。

(NIO 多线程服务器模型)

(NIO 多线程服务器模型)

同时整个服务端的流程处理,建立于事件机制上。在 [接受连接->读->业务处理->写 >关闭连接 ]这个 过程中,触发器将触发相应事件,由事件处理器对相应事件分别响应,完成服务器端的业务处理。 
下面我们就来详细看一下这个模型的各个组成部分。

相关事件定义 在这个模型中,我们定义了一些基本的事件:

(1)onAccept:当服务端收到客户端连接请求时,触发该事件。通过该事件我们可以知道有新的客户端呼入。该事件可用来控制服务端的负载。例如,服务器可设定同时只为一定数量客户端提供服务,当同时请求数超出数量时,可在响应该事件时直接抛出异常,以拒绝新的连接。 

(2)onAccepted:当客户端请求被服务器接受后触发该事件。该事件表明一个新的客户端与服务器正式建立连接。 

(3)onRead:当客户端发来数据,并已被服务器控制线程正确读取时,触发该事件 。该事件通知各事件处理器可以对客户端发来的数据进行实际处理了。需要注意的是,在本模型中,客户端的数据读取是由控制线程交由读线程完成的,事件处理器不需要在该事件中进行专门的读操作,而只需将控制线程传来的数据进行直接处理即可。 

(4)onWrite:当客户端可以开始接受服务端发送数据时触发该事件,通过该事件,我们可以向客户端发送回应数据。 在本模型中,事件处理器只需要在该事件中设置 

(5)onClosed:当客户端与服务器断开连接时触发该事件。 

(6)onError:当客户端与服务器从连接开始到最后断开连接期间发生错误时触发该事件。通过该事件我们可以知道有什么错误发生。


事件回调机制的实现

在这个模型中,事件采用广播方式,也就是所有在册的事件处理器都能获得事件通知。这样可以将不同性质的业务处理,分别用不同的处理器实现,使每个处理器的业务功能尽可能单一。 
如下图:整个事件模型由监听器、事件适配器、事件触发器、事件处理器组成。

(事件模型)

(事件模型)

  1. 监听器(Serverlistener):这是一个事件接口,定义需监听的服务器事件,如果您需要定义更多的事件,可在这里进行扩展。 

     public interface Serverlistener { public void onError(String error); public void onAccept() throws Exception; public void onAccepted(Request request) throws Exception; public void onRead(Request request) throws Exception; public void onWrite(Request request, Response response) throws Exception; public void onClosed(Request request) throws Exception; }
  2. 事件适配器(EventAdapter):对 Serverlistener 接口实现一个适配器 (EventAdapter),这样的好处是最终的事件处理器可以只处理所关心的事件。 

     public abstract class EventAdapter implements Serverlistener { public EventAdapter() { } public void onError(String error) {} public void onAccept() throws Exception {} public void onAccepted(Request request)  throws Exception {} public void onRead(Request request)  throws Exception {} public void onWrite(Request request, Response response)  throws Exception {} public void onClosed(Request request)  throws Exception {} }
  3. 事件触发器(Notifier):用于在适当的时候通过触发服务器事件,通知在册的事件处理器对事件做出响应。触发器以 Singleton 模式实现,统一控制整个服务器端的事件,避免造成混乱。 

    public class Notifier { private static Arraylist listeners = null; private static Notifier instance = null; private Notifier() { listeners = new Arraylist(); } /** * 获取事件触发器* @return 返回事件触发器*/ public static synchronized Notifier getNotifier() { if (instance == null) { instance = new Notifier(); return instance; } else return instance; } /** * 添加事件监听器* @param l 监听器*/ public void addlistener(Serverlistener l) { synchronized (listeners) { if (!listeners.contains(l)) listeners.add(l); } } public void fireOnAccept() throws Exception { for (int i = listeners.size() - 1; i >= 0; i--) ( (Serverlistener) listeners.get(i)).onAccept(); } ....// other fire method }
  4. 事件处理器(Handler):继承事件适配器,对感兴趣的事件进行响应处理,实现业务处理。以下是一个简单的事件处理器实现,它响应 onRead 事件,在终端打印出从客户端读取的数据。 

     public class ServerHandler extends EventAdapter { public ServerHandler() { } public void onRead(Request request) throws Exception { System.out.println("Received: " + new String(data)); } }
  5. 事件处理器的注册。为了能让事件处理器获得服务线程的事件通知,事件处理器需在触发器中注册。 

     ServerHandler handler = new ServerHandler(); Notifier.addlistener(handler);


实现 NIO 多线程服务器

NIO 多线程服务器主要由主控服务线程、读线程和写线程组成。

(线程模型)

(线程模型)

  1. 主控服务线程(Server):主控线程将创建读、写线程池,实现监听、接受客户端请求,同时将读、写通道提交由相应的读线程(Reader)和写服务线程 (Writer) ,由读写线程分别完成对客户端数据的读取和对客户端的回应操作。

    public class Server implements Runnable { .... private static int MAX_THREADS = 4; public Server(int port) throws Exception { .... // 创建无阻塞网络套接selector = Selector.open(); sschannel = ServerSocketChannel.open(); sschannel.configureBlocking(false); address = new InetSocketAddress(port); ServerSocket ss = sschannel.socket(); ss.bind(address); sschannel.register(selector, SelectionKey.OP_ACCEPT); } public void run() { System.out.println("Server started ..."); System.out.println("Server listening on port: " + port); // 监听while (true) { try { int num = 0; num = selector.select(); if (num > 0) { Set selectedKeys = selector.selectedKeys(); Iterator it = selectedKeys.iterator(); while (it.hasNext()) { SelectionKey key = (SelectionKey) it.next(); it.remove(); // 处理 IO 事件if ( (key.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) { // Accept the new connection ServerSocketChannel ssc = (ServerSocketChannel) key.channel();notifier.fireOnAccept(); SocketChannel sc = ssc.accept(); sc.configureBlocking(false); // 触发接受连接事件Request request = new Request(sc); notifier.fireOnAccepted(request); // 注册读操作 , 以进行下一步的读操作sc.register(selector,  SelectionKey.OP_READ, request);} else if ( (key.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ ) { // 提交读服务线程读取客户端数据Reader.processRequest(key);  key.cancel(); } else if ( (key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE ) { // 提交写服务线程向客户端发送回应数据Writer.processRequest(key);  key.cancel(); } } } else { addRegister();  // 在 Selector 中注册新的写通道} } catch (Exception e) { notifier.fireOnError("Error occured in Server: " + e.getMessage()); continue; } } } .... 
    }
  2. 读线程(Reader):使用线程池技术,通过多个线程读取客户端数据,以充分利用网络数据传输的时间,提高读取效率。

    public class Reader extends Thread { public void run() { while (true) { try { SelectionKey key; synchronized (pool) { while (pool.isEmpty()) { pool.wait(); } key = (SelectionKey) pool.remove(0); } // 读取客户端数据,并触发 onRead 事件read(key);  } catch (Exception e) { continue; } } } .... }
  3. 写线程(Writer):和读操作一样,使用线程池,负责将服务器端的数据发送回客户端。

     public final class Writer extends Thread { public void run() { while (true) { try { SelectionKey key; synchronized (pool) { while (pool.isEmpty()) { pool.wait(); } key = (SelectionKey) pool.remove(0); } // 向客户端发送数据,然后关闭连接,并分别触发 onWrite,onClosed 事件write(key); } catch (Exception e) { continue; } } } .... }


具体应用

NIO 多线程模型的实现告一段落,现在我们可以暂且将 NIO 的各个 API 和烦琐的调用方法抛于脑后,专心于我们的实际应用中。 
我们用一个简单的 TimeServer(时间查询服务器)来看看该模型能带来多么简洁的开发方式。 
在这个 TimeServer 中,将提供两种语言(中文、英文)的时间查询服务。我们将读取客户端的查询命令(GB/EN),并回应相应语言格式的当前时间。在应答客户的请求的同时,服务器将进行日志记录。做为示例,对日志记录,我们只是简单地将客户端的访问时间和 IP 地址输出到服务器的终端上。

  1. 实现时间查询服务的事件处理器(TimeHandler): 

    public class TimeHandler extends EventAdapter { public TimeHandler() { } public void onWrite(Request request, Response response) throws Exception { String command = new String(request.getDataInput()); String time = null; Date date = new Date(); // 判断查询命令if (command.equals("GB")) { // 中文格式DateFormat cnDate = DateFormat.getDateTimeInstance(DateFormat.FulL, DateFormat.FulL, Locale.CHINA); time = cnDate.format(date); } else { // 英文格式DateFormat enDate = DateFormat.getDateTimeInstance(DateFormat.FulL, DateFormat.FulL, Locale.US); time = enDate.format(date); } response.send(time.getBytes()); } }
  2. 实现日志记录服务的事件处理器(LogHandler):

    public class LogHandler extends EventAdapter { public LogHandler() { } public void onClosed(Request request) throws Exception { String log = new Date().toString() + " from " + request.getAddress().toString(); System.out.println(log); } public void onError(String error) { System.out.println("Error: " + error); } 
    }
  3. 启动程序:

    public class Start { public static void main(String[] args) { try { LogHandler loger = new LogHandler(); TimeHandler timer = new TimeHandler(); Notifier notifier = Notifier.getNotifier(); notifier.addlistener(loger); notifier.addlistener(timer); System.out.println("Server starting ..."); Server server = new Server(5100); Thread tServer = new Thread(server); tServer.start(); } catch (Exception e) { System.out.println("Server error: " + e.getMessage()); System.exit(-1); } } }


小结

通过例子我们可以看到,基于事件回调的 NIO 多线程服务器模型,提供了清晰直观的实现方式,可让开发者从 NIO 及多线程的技术细节中摆脱出来,集中精力关注具体的业务实现。


附录

演示程序( nioserver.zip)。


转载于:https://my.oschina.net/u/167082/blog/205428

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/291764.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

随机给出三十道四则运算题目

这是课上练习,应用了随机函数,涉及是三个部分第一操作数、运算符、第二操作数,这三个部分都是随机产生的:第一、第二操作数可以应用随机函数产生符合条件的数值,运算符的产生可以在0-3,之间产生随机整数,分…

php反转数字_【PHP】php实现数组反转

php里面有个函数可以反转数组,工作中也经常用到,非常方便。今天来自己实现这样的功能。$arr [2,5,6,1,8,16,12];function reverse($arr){$left 0;$right count($arr) -1;$temp [];while ($left < $right){$temp[$left] $arr[$right];$temp[$right] $arr[$left];$left;…

第6章 C控制语句:循环

学习笔记——《C Prime Plus》 第6章 C控制语句&#xff1a;循环6.1 再探 while 循环6.1.1 程序注释6.1.2 C风格读取循环6.2 while 语句6.2.1 终止 while 循环6.2.2 何时终止循环6.2.3 while&#xff1a;入口循环条件6.2.4 语法要点6.3 _Bool 类型6.4 不确定循环和计数循环6.5 …

Android之用tcpdump常用抓包命令使用总结

1、搞好Android手机抓包环境 1 手机需要root 2 把tcpdump工具 push到手机 /data/local 目录下去,至于怎么搞,读者百度。 2、常见tcpdump抓包命令介绍 -w 把包数据直接写入文件而不进行分析和打印输出. 这些包数据可在随后通过-r 选项来重新…

openresty 前端开发进阶一之http后端

2019独角兽企业重金招聘Python工程师标准>>> 做前端开发&#xff0c;大多数情况下&#xff0c;都需要跟后端打交道&#xff0c;而最常见的方式则是通过http请求&#xff0c;进行通信。 在openresty中&#xff0c;通过http跟后端整合通信的方式又很多种&#xff0c;各…

php集成环境

WampServer转载于:https://www.cnblogs.com/longhs/p/3583495.html

第7章 C控制语句:分支和跳转

学习笔记——《C Prime Plus》 第7章 C控制语句&#xff1a;分支和跳转7.1 if 语句7.2 if else 语句7.2.1 介绍 getchar() 和 putchar()7.4 一个统计单词的程序7.1 if 语句 下程序读取一列数据&#xff0c;每个数据都表示每日的最低温度&#xff08;℃&#xff09;&#xff0c…

Objective-C中的self和super

1.有过面向对象的人知道&#xff0c;self相当于this&#xff0c;super相当于调用父类的方法 2.self是类的隐藏的参数&#xff0c;指向当前调用方法的类&#xff0c;另一个隐藏参数是_cmd&#xff0c;代表当前类方法的selector。 super并不是隐藏的参数&#xff0c;它只是一个”…

ip校验和及udp校验和的计算方法

一、ip校验和的计算: 计算方法: 1. ip包头(共20个字节)按照每16个bit作为一个值依次进行相加 2. 将计算结果的进位加到低16位上 3. 将结果取反 ip包头的内存内容 eg: 45 00 00 20 0F B8 00 00 80 11 00 00 C0 A8 0A 9F C0 A8 0A C7 将 0x4500 0x0020 0x0FB8 0x0000 0x…

【Blog.Core开源】网关统一集成下游服务文档

一般看到公众号更新&#xff0c;就是大概率要开始上班了&#x1f602;上回书咱们说到了《【Blog.Core开源】快速预览Admin界面效果》&#xff0c;这样我们就可以专注于后端开发&#xff0c;而且也能快速的实现效果的预览。那今天我们继续来往下走&#xff0c;说一说网关相关的内…

android4.3 截屏功能的尝试与失败分析

1.背景 上一篇讲了在源码中捕获到了android手机的截屏函数&#xff08;同时按下电源键与音量减&#xff0c;详情http://blog.csdn.net/buptgshengod/article/details/19911909&#xff09;&#xff0c;经过一周的研究还是没有在手机上实现系统截屏功能&#xff0c;总结下尝试的…

numpy方法总结

2019独角兽企业重金招聘Python工程师标准>>> 一、数组方法 创建数组&#xff1a;arange()创建一维数组&#xff1b;array()创建一维或多维数组&#xff0c;其参数是类似于数组的对象&#xff0c;如列表等 反过来转换则可以使用numpy.ndarray.tolist()函数&#xff0…

一些移动端开发的细节记录

好久不来写东西了,最近太忙,给自己搞的很累,对自己选择的道路有些不自信了. 决定干够半年后,大概四月中旬,会离职休息两三个月,去几个喜欢的地方看看,锻炼打球减肥,再把最近想要做过的项目整理一下,编写一些自己的插件和库,把之前积累的一些书过一下. 对前端的热爱没有丝毫改变…

阿里云离线数据仓库

阿里云离线数据仓库第1章 数据仓库概念第2章 项目需求及架构设计2.1 项目需求分析2.2 阿里云技术框架2.2.1 技术选型2.2.2 系统数据流程设计第3章 数据生成模块3.1 埋点数据基本格式3.2 事件日志数据3.2.1 商品列表页&#xff08;loading&#xff09;3.2.2 商品曝光&#xff08…

C++之extern和string的find函数和substr函数和data()函数使用总结

1、extern使用总结 网上看的例子,特么没有一个看懂的,为什么会用到这个extern呢?因为有一个cpp文件需要另外一个cpp文件的里面的值,第一反应想到的是static,因为java 里面如果在变量前面加了static,一切就好说了,class.成员变量,然后特么我也去找c++里面的static…

技术分享 | 混合云模式下SaaS端前端最佳实践

导读&#xff1a;集成开放平台采用的是混合云部署架构&#xff0c;包含两个大的组件&#xff0c;管理控制台和引擎。管理控制台是SaaS的&#xff0c;部署在公有云&#xff0c;按租户隔离。引擎部署在客户私有云。一套SaaS版的管理控制台如何适配不同客户的引擎&#xff0c;本文…

统计素数个数

10:判决素数个数总时间限制:1000ms 内存限制:65536kB描述 输入两个整数X和Y&#xff0c;输出两者之间的素数个数&#xff08;包括X和Y&#xff09;。输入 两个整数X和Y&#xff08;1 < X,Y < 105&#xff09;。输出 输出一个整数&#xff0c;表示X&#xff0c;…

记一则Hadoop DataNode OOM故障,以及解决方案

一、故障症状最近公司一个集群跑大任务时&#xff0c;datanode日志报DataXceiveServer: Exiting due to:java.lang.OutOfMemoryError: unable to create new native thread异常&#xff0c;然后计算节点上的DataNode直接挂掉。DataNode异常日志截图如下&#xff1a;2014-03-06 …

阿里云实时数据仓库

阿里云实时数据仓库——学习笔记 课程目标 学习搭建一个实时数据仓库&#xff0c;掌握数据采集、存储、计算、输出、展示等整个业务流程。整个实时数据仓库系统是在阿里云架构上搭建&#xff0c;掌握并学会运用各个服务组件&#xff0c;及各个组件之间如何联动。前置知识要求&…

[转]svn常用命令

谢谢原作者:http://blog.sina.com.cn/s/blog_963453200101eiuq.html 1、检出svn co http://路径(目录或文件的全路径) [本地目录全路径] --username 用户名 --password 密码svn co svn://路径(目录或文件的全路径) [本地目录全路径] --username 用户名 --password 密码…