Thrift源码学习二——Server层

Thrift 提供了如图五种模式:TSimpleServer、TNonblockingServer、THsHaServer、TThreadPoolServer、TThreadSelectorServer

image.png​​

TSimpleServer、TThreadPoolServer 属于阻塞模型

TNonblockingServer、THsHaServer、TThreadedSelectorServer 属于非阻塞模型

TServer

TServer 为抽象类

public static class Args extends AbstractServerArgs<Args> {public Args(TServerTransport transport) {super(transport);}
}public static abstract class AbstractServerArgs<T extends AbstractServerArgs<T>> {final TServerTransport serverTransport;// 处理层工厂
  TProcessorFactory processorFactory;// 传输层工厂TTransportFactory inputTransportFactory = new TTransportFactory();TTransportFactory outputTransportFactory = new TTransportFactory();// 协议层工厂TProtocolFactory inputProtocolFactory = new TBinaryProtocol.Factory();TProtocolFactory outputProtocolFactory = new TBinaryProtocol.Factory();
}

TServer 定义的对外方法

/*** The run method fires up the server and gets things going.*/public abstract void serve();
/*** Stop the server. This is optional on a per-implementation basis. Not* all servers are required to be cleanly stoppable.*/public void stop() {}

stop 并不是每个服务都需要优雅的退出,所以没有定义为抽象方法

抽象方法 serve() 由具体的 TServer 实例实现

image.png

TSimpleServer

TSimpleServer 实现比较简单,是单线程阻塞模型,只适合测试开发使用

serve 方法源码分析

public void serve() {// 启动监听 socket
  serverTransport.listen();// 设置服务状态setServing(true);// 不断的等待与处理 socket 请求while(!stopped) {// accept 一个业务 socket 请求client = serverTransport_.accept();if (client != null) {// 通过工厂获取 server 定义的处理层、传输层和协议层processor = processorFactory_.getProcessor(client);inputTransport = inputTransportFactory_.getTransport(client);outputTransport = outputTransportFactory_.getTransport(client);inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);if (eventHandler_ != null) {connectionContext = eventHandler_.createContext(inputProtocol, outputProtocol);}// 阻塞式处理while (true) {// 处理请求事件if (eventHandler_ != null) {eventHandler_.processContext(connectionContext, inputTransport, outputTransport);}// 如果处理层为异步,则退出if(!processor.process(inputProtocol, outputProtocol)) {break;}}}// 关闭
    eventHandler_.deleteContext(connectionContext, inputProtocol, outputProtocol);inputTransport.close();outputTransport.close();setServing(false);}
}

TSimpleServer 工作图

draw.io

TThreadPoolServer

ThreadPoolServer 解决了 TSimple 不支持并发和多连接的问题,引入了线程池

与 TSimple 相同,主线程负责阻塞式监听 socket,而剩下的业务处理则全部交由线程池去处理

public void serve() {// 主线程启动监听 socket
  serverTransport_.listen();// 设置服务状态stopped_ = false;setServing(true);// 等待并处理 socket 请求while (!stopped_) {TTransport client = serverTransport_.accept();// Runnable run 逻辑与 TSimpleServer 类似WorkerProcess wp = new WorkerProcess(client);int retryCount = 0;long remainTimeInMillis = requestTimeoutUnit.toMillis(requestTimeout);while(true) {// 交由线程池来处理
        executorService_.execute(wp);break;} }executorService_.shutdown();setServing(false);
}

TThreadPoolServer 的缺点:

处理能力的好坏受限于线程池的设置

TNoblockingServer

TNoblockingServer 是单线程工作,但该模式采用了 NIO,所有的 socket 被注册到 selector 中,通过一个线程循环 selector 来监控所有 socket,当有就绪的 socket 时,根据不同的请求做不同的动作(读取、写入数据或 accept 新连接)

TNoblockingServer 的 serve 方法在其父类 AbstractNonblockingServer 中定义

/*** Begin accepting connections and processing invocations.* 开始接受并处理调用*/
public void serve() {// start any IO threads// 注册一些监听 socket 的线程到 selector 中if (!startThreads()) {return;}// start listening, or exit// 开始监听if (!startListening()) {return;}// 设置服务状态setServing(true);// this will block while we serve// TNonblocking 中实现为 selectAcceptThread_.join(); // 主线程等待 selectAcceptThread 执行完毕// SelectAcceptThread 的 run 方法为 select();// 取出一个就绪的 socket
  waitForShutdown();setServing(false);// do a little cleanup
  stopListening();
}// SelectAcceptThread run 方法
public void run() {while (!stopped_) {select();processInterestChanges();}for (SelectionKey selectionKey : selector.keys()) {cleanupSelectionKey(selectionKey);}
}// SelectAcceptThread Select 过程
private void select() {try {// wait for io events.// NIO 取出一个
    selector.select();// process the io events we receivedIterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();// 遍历就绪的 socketwhile (!stopped_ && selectedKeys.hasNext()) {SelectionKey key = selectedKeys.next();selectedKeys.remove();// if the key is marked Accept, then it has to be the server// transport.// accept 新 socket 并将其注册到 selector 中if (key.isAcceptable()) {handleAccept();} else if (key.isReadable()) {// deal with reads// 处理读数据的 socket 请求
        handleRead(key);} else if (key.isWritable()) {// deal with writes// 处理写数据的 socket 请求
        handleWrite(key);} else {LOGGER.warn("Unexpected state in select! " + key.interestOps());}}} catch (IOException e) {LOGGER.warn("Got an IOException while selecting!", e);}
}// 接收新的连接
private void handleAccept() throws IOException {SelectionKey clientKey = null;TNonblockingTransport client = null;// accept the connectionclient = (TNonblockingTransport)serverTransport.accept();// 注册到 selector 中clientKey = client.registerSelector(selector, SelectionKey.OP_READ);// add this key to the mapFrameBuffer frameBuffer = createFrameBuffer(client, clientKey, SelectAcceptThread.this);clientKey.attach(frameBuffer);
}

TNonblockingServer 模式的缺点:

其还是采用单线程顺序来完成,当业务处理比较复杂耗时,该模式的效率将会下降

TNonblockingServer 工作图:

draw.io

THsHaServer

THsHaServer 是 TNoblockingServer 的子类,处理逻辑基本相同,不同的是,在处理读取请求时,THsHaServer 将处理过程交由线程池来完成,主线程直接返回进行下一次循环,提高了效率

THsHaServer 模式的缺点:

其主线程需要完成对所有 socket 的监听一级数据的写操作,当大请求量时,效率较低

TThreadedSelectorServer

TThreadedSelectorServer 是 Thrift 目前提供的最高级模式,生产环境的首选,其对 TNonblockingServer 进行了扩展

TThreadedSelectorServer 源码中一些关键的属性

public static class Args extends AbstractNonblockingServerArgs<Args> {// 在已接收的连接中选择线程的个数public int selectorThreads = 2;// 执行线程池 ExecutorService 的线程个数private int workerThreads = 5;// 执行请求具体任务的线程池private ExecutorService executorService = null;
}
// The thread handling all accepts
private AcceptThread acceptThread;
// Threads handling events on client transports
private final Set<SelectorThread> selectorThreads = new HashSet<SelectorThread>();
// This wraps all the functionality of queueing and thread pool management
// for the passing of Invocations from the selector thread(s) to the workers
// (if any).
private final ExecutorService invoker;
/*** 循环模式的负载均衡器,用于为新的连接选择 SelectorThread*/
protected static class SelectorThreadLoadBalancer {}
  1. AcceptThread 线程对象,用于监听 socket 的新连接

  2. 多个 SelectorThread 线程对象,用于处理 socket 的读写操作

  3. 一个负载均衡对象 SelectorThreadLoadBalancer,用于决定将 AcceptThread 接收到的 socket 请求分配给哪个 SelectorThread 线程

  4. SelectorThread 线程执行过读写操作后,通过 ExecutorService 线程池来完成此次调用的具体执行

SelectorThread 对象源码解析

/*** 多个 SelectorThread 负责处理 socket 的 I/O 操作*/
protected class SelectorThread extends AbstractSelectThread {/*** The work loop. Handles selecting (read/write IO), dispatching, and* managing the selection preferences of all existing connections.* 选择(处理 socket 的网络读写 IO),分配和管理现有连接*/public void run() {while (!stopped_) {select();}}private void select() {// process the io events we receivedIterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();while (!stopped_ && selectedKeys.hasNext()) {SelectionKey key = selectedKeys.next();selectedKeys.remove();// skip if not validif (!key.isValid()) {cleanupSelectionKey(key);continue;}if (key.isReadable()) {// deal with reads
        handleRead(key);} else if (key.isWritable()) {// deal with writes
        handleWrite(key);} else {LOGGER.warn("Unexpected state in select! " + key.interestOps());}}}
}

AcceptThread 对象源码解析

/*** 在服务器传输中选择线程(监听 socket 请求)并向 IO 选择器(SelectorThread)提供新连接*/
protected class AcceptThread extends Thread {// The listen socket to accept onprivate final TNonblockingServerTransport serverTransport;private final Selector acceptSelector;// 负载均衡器,决定将连接分配给哪个 SelectorThreadprivate final SelectorThreadLoadBalancer threadChooser;public void run() {while (!stopped_) {select();}}private void select() {// process the io events we receivedIterator<SelectionKey> selectedKeys = acceptSelector.selectedKeys().iterator();while (!stopped_ && selectedKeys.hasNext()) {SelectionKey key = selectedKeys.next();selectedKeys.remove();// 处理接收的新情求if (key.isAcceptable()) {handleAccept();} else {LOGGER.warn("Unexpected state in select! " + key.interestOps());}}}/*** Accept a new connection.*/private void handleAccept() {final TNonblockingTransport client = doAccept();if (client != null) {// 从负载均衡器中,获取 SelectorThread 线程final SelectorThread targetThread = threadChooser.nextThread();if (args.acceptPolicy == Args.AcceptPolicy.FAST_ACCEPT || invoker == null) {doAddAccept(targetThread, client);} else {// FAIR_ACCEPTinvoker.submit(new Runnable() {public void run() {// 将选择到的线程和连接放入 线程池 处理// 用 targetThread 线程取处理一个给接受的链接 client,如果新连接的队列处于满的状态,则将处于阻塞状态
            doAddAccept(targetThread, client);}});}}}private TNonblockingTransport doAccept() {return (TNonblockingTransport) serverTransport.accept();}// 用 targetThread 线程取处理一个给接受的链接 client,如果新连接的队列处于满的状态,则将处于阻塞状态private void doAddAccept(SelectorThread thread, TNonblockingTransport client) {if (!thread.addAcceptedConnection(client)) {client.close();}}
}

TThreadedSelectorServer 工作图

draw.io

参考资料

  • Thrift server端的几种工作模式分析:http://blog.csdn.net/houjixin/article/details/42779915
  • Thrift 网络服务模型:http://www.cnblogs.com/mumuxinfei/p/3875165.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/539229.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

linux top 命令可视化_25个Linux性能监控工具

一段时间以来&#xff0c;我们在网上向读者介绍了如何为Linux以及类Linux操作系统配置多种不同的性能监控工具。在这篇文章中我们将罗列一系列使用最频繁的性能监控工具&#xff0c;并对介绍到的每一个工具提供了相应的简介链接&#xff0c;大致将其划分为两类&#xff0c;基于…

base64是哪个jar包的_涨知识 | 用maven轻松管理jar包

前言相信只要做过 Java 开发的童鞋们&#xff0c;对 Ant 想必都不陌生&#xff0c;我们往往使用 Ant 来构建项目&#xff0c;尤其是涉及到特别繁杂的工作量&#xff0c;一个 build.xml 能够完成编译、测试、打包、部署等很多任务&#xff0c;这在很大的程度上解放了程序员们的双…

Hive数据类型

概述 Hive的内置数据类型可以分为两大类&#xff1a;(1)、基础数据类型&#xff1b;(2)、复杂数据类型。 基础数据类型 数据类型 所占字节 开始支持版本 TINYINT 1byte&#xff0c;-128 ~ 127 SMALLINT 2byte&#xff0c;-32,768 ~ 32,767 INT 4byte,-2,147,483,648 ~ 2,14…

JMS(Java消息服务)与消息队列ActiveMQ基本使用(一)

最近的项目中用到了mq&#xff0c;之前自己一直在码农一样的照葫芦画瓢。最近几天研究了下&#xff0c;把自己所有看下来的文档和了解总结一下。 一. 认识JMS 1.概述 对于JMS,百度百科&#xff0c;是这样介绍的&#xff1a;JMS即Java消息服务&#xff08;Java Message Service&…

python单词反转_python文本 字符串逐字符反转以及逐单词反转

python文本 字符串逐字符反转以及逐单词反转 场景&#xff1a; 字符串逐字符反转以及逐单词反转 首先来看字符串逐字符反转&#xff0c;由于python提供了非常有用的切片&#xff0c;所以只需要一句就可以搞定了 >>> aabc edf degd >>> a[::-1] dged fde cba …

hive复合数据类型之struct

概述 STRUCT&#xff1a;STRUCT可以包含不同数据类型的元素。这些元素可以通过”点语法”的方式来得到所需要的元素&#xff0c;比如user是一个STRUCT类型&#xff0c;那么可以通过user.address得到这个用户的地址。 操作实例 1、创建表 create table student_test(id int,in…

pycharm 运行celery_Celery全面学习笔记

来源介绍Celery 是 Distributed Task Queue&#xff0c;分布式任务队列。分布式决定了可以有多个 worker 的存在&#xff0c;队列表示其是异步操作。Celery 核心模块Celery有一下5个核心角色Task就是任务&#xff0c;有异步任务和定时任务Broker中间人&#xff0c;接收生产者发…

hive复合数据类型之array

概述 ARRAY&#xff1a;ARRAY类型是由一系列相同数据类型的元素组成&#xff0c;这些元素可以通过下标来访问。比如有一个ARRAY类型的变量fruits&#xff0c;它是由[apple,orange,mango]组成&#xff0c;那么我们可以通过fruits[1]来访问元素orange&#xff0c;因为ARRAY类型的…

Exploit开发系列教程-Mona 2 SEH

P3nro5e 2015/07/10 10:580x00 Mona 2 前言 & 准备Mona 2是一种非常有用的插件&#xff0c;它由Corelan Team开发。起初是为Immunity Debugger写的&#xff0c;现在它适用于WinDbg调试器。你将需要为WinDbg x86 和 WinDbg x64安装一些工具&#xff1a;安装Python 2.7 (从这…

python集合的元素可以是_Python集合的元素中,为什么不可以是包含嵌套列表的元组?...

你有一个误解&#xff0c;hash算法针对的是元素的内容&#xff0c;并不是针对指针&#xff0c;所以指针不变不等于可hash。 如果你想深究细节的话&#xff0c;可以看tuple的源码&#xff1a; static Py_hash_t tuplehash(PyTupleObject *v) { Py_uhash_t x; /* Unsigned for de…

python lib库_python_lib基础库

1&#xff1a;argv传递给python脚本的命令行参数列表&#xff0c;argv[0]是脚本的名字(他是平台独立的&#xff0c;不管他是一个路径全名或不是)&#xff0c;如果使用了-c参数选项&#xff0c;argv[0]会被设置为字符串-c&#xff0c;如果没有脚本名传递给python解释器&#xff…

hive复合数据类型之map

概述 MAP&#xff1a;MAP包含key->value键值对&#xff0c;可以通过key来访问元素。比如”userlist”是一个map类型&#xff0c;其中username是key&#xff0c;password是value&#xff1b;那么我们可以通过userlist[username]来得到这个用户对应的password&#xff1b; 操…

Beego框架使用

为什么80%的码农都做不了架构师&#xff1f;>>> Beego Web项目目录结构 new 命令是新建一个 Web 项目&#xff0c;我们在命令行下执行 bee new <项目名> 就可以创建一个新的项目。但是注意该命令必须在 $GOPATH/src 下执行。最后会在 $GOPATH/src 相应目录下…

oracle下lag和lead分析函数

Lag和Lead分析函数可以在同一次查询中取出同一字段的前N行的数据(Lag)和后N行的数据(Lead)作为独立的列。 这种操作可以代替表的自联接&#xff0c;并且LAG和LEAD有更高的效率。 语法&#xff1a; [sql] view plaincopy /*语法*/ lag(exp_str,offset,defval) over() Lead(…

802d简明调试手册_SINUMERIK-828D简明调试手册.pdf

SINUMERIK 828D / 828D BASIC简明调试手册SINUMERIKAnswers for industry. SIEMENSABC01.2012 ASINUMERIK 828D / 828D BASIC V04.04SP01123PLC 45NC 67PLC 891011121314151617PLC 18i1 11.1 11.1.1 NC 31.1.2 31.2

jtessboxeditorfx 界面显示不出来_macOS 使用 XQuartz 支持 X11 实现 Linux 图形化界面显示...

更多奇技淫巧欢迎订阅博客&#xff1a;https://fuckcloudnative.io前言在 Windows 中相信大家已经很熟悉使用 Xmanager(Xshell), MobaXterm, SecureCRT 通过 X11 实现 Linux 图形化界面显示&#xff0c;我的需求是在 macOS 下使用 iTerm2 作为 Terminal 实现 X11 图形化界面显示…

EntityFramework Core 2.0 Explicitly Compiled Query(显式编译查询)

前言 EntityFramework Core 2.0引入了显式编译查询&#xff0c;在查询数据时预先编译好LINQ查询便于在请求数据时能够立即响应。显式编译查询提供了高可用场景&#xff0c;通过使用显式编译的查询可以提高查询性能。EF Core已经使用查询表达式的散列来表示自动编译和缓存查询&a…

Oracle Minus关键字 不包含 取差集

Oracle Minus关键字   SQL中的MINUS关键字   SQL中有一个MINUS关键字&#xff0c;它运用在两个SQL语句上&#xff0c;它先找出第一条SQL语句所产生的结果&#xff0c;然后看这些结果有没有在第二个SQL语句的结果 中。如果有的话&#xff0c;那这一笔记录就被去除&#xff0…

python扫描器甄别操作系统类型_20189317 《网络攻防技术》 第三周作业

一.教材内容总结1.网络踩点&#xff1a;web搜索与挖掘、DNS和IP查询、网络拓扑侦察(1)网络踩点目标确定(2)技术手段&#xff1a;web信息搜索与挖掘、DNS和IP查询、网络拓扑侦察(3)web信息搜索与挖掘&#xff1a;基本搜索与挖掘技巧、高级搜索与挖掘技巧、编程实现google搜索、元…

python 网页重定向_小试牛刀:python爬虫爬取springer开放电子书.

首先声明,本文旨在记录反思,并没有资源,代码也不具有借鉴意义(水平实在不行.某天,水群的时候发现群友发了一个文件,里面是疫情时期springer开放的免费电子书名单,同时还附有下载链接,总共有400多本,这要是一个一个下载不得累死个人,只下载自己感兴趣的书也是一个好主意,但是,我…