Thrift源码学习二——Server层

Thrift 提供了如图五种模式:TSimpleServer、TNonblockingServer、THsHaServer、TThreadPoolServer、TThreadSelectorServer

image.png​​

TSimpleServer、TThreadPoolServer 属于阻塞模型

TNonblockingServer、THsHaServer、TThreadedSelectorServer 属于非阻塞模型

TServer

TServer 为抽象类

public static class Args extends AbstractServerArgs<Args> {public Args(TServerTransport transport) {super(transport);}
}public static abstract class AbstractServerArgs<T extends AbstractServerArgs<T>> {final TServerTransport serverTransport;// 处理层工厂
  TProcessorFactory processorFactory;// 传输层工厂TTransportFactory inputTransportFactory = new TTransportFactory();TTransportFactory outputTransportFactory = new TTransportFactory();// 协议层工厂TProtocolFactory inputProtocolFactory = new TBinaryProtocol.Factory();TProtocolFactory outputProtocolFactory = new TBinaryProtocol.Factory();
}

TServer 定义的对外方法

/*** The run method fires up the server and gets things going.*/public abstract void serve();
/*** Stop the server. This is optional on a per-implementation basis. Not* all servers are required to be cleanly stoppable.*/public void stop() {}

stop 并不是每个服务都需要优雅的退出,所以没有定义为抽象方法

抽象方法 serve() 由具体的 TServer 实例实现

image.png

TSimpleServer

TSimpleServer 实现比较简单,是单线程阻塞模型,只适合测试开发使用

serve 方法源码分析

public void serve() {// 启动监听 socket
  serverTransport.listen();// 设置服务状态setServing(true);// 不断的等待与处理 socket 请求while(!stopped) {// accept 一个业务 socket 请求client = serverTransport_.accept();if (client != null) {// 通过工厂获取 server 定义的处理层、传输层和协议层processor = processorFactory_.getProcessor(client);inputTransport = inputTransportFactory_.getTransport(client);outputTransport = outputTransportFactory_.getTransport(client);inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);if (eventHandler_ != null) {connectionContext = eventHandler_.createContext(inputProtocol, outputProtocol);}// 阻塞式处理while (true) {// 处理请求事件if (eventHandler_ != null) {eventHandler_.processContext(connectionContext, inputTransport, outputTransport);}// 如果处理层为异步,则退出if(!processor.process(inputProtocol, outputProtocol)) {break;}}}// 关闭
    eventHandler_.deleteContext(connectionContext, inputProtocol, outputProtocol);inputTransport.close();outputTransport.close();setServing(false);}
}

TSimpleServer 工作图

draw.io

TThreadPoolServer

ThreadPoolServer 解决了 TSimple 不支持并发和多连接的问题,引入了线程池

与 TSimple 相同,主线程负责阻塞式监听 socket,而剩下的业务处理则全部交由线程池去处理

public void serve() {// 主线程启动监听 socket
  serverTransport_.listen();// 设置服务状态stopped_ = false;setServing(true);// 等待并处理 socket 请求while (!stopped_) {TTransport client = serverTransport_.accept();// Runnable run 逻辑与 TSimpleServer 类似WorkerProcess wp = new WorkerProcess(client);int retryCount = 0;long remainTimeInMillis = requestTimeoutUnit.toMillis(requestTimeout);while(true) {// 交由线程池来处理
        executorService_.execute(wp);break;} }executorService_.shutdown();setServing(false);
}

TThreadPoolServer 的缺点:

处理能力的好坏受限于线程池的设置

TNoblockingServer

TNoblockingServer 是单线程工作,但该模式采用了 NIO,所有的 socket 被注册到 selector 中,通过一个线程循环 selector 来监控所有 socket,当有就绪的 socket 时,根据不同的请求做不同的动作(读取、写入数据或 accept 新连接)

TNoblockingServer 的 serve 方法在其父类 AbstractNonblockingServer 中定义

/*** Begin accepting connections and processing invocations.* 开始接受并处理调用*/
public void serve() {// start any IO threads// 注册一些监听 socket 的线程到 selector 中if (!startThreads()) {return;}// start listening, or exit// 开始监听if (!startListening()) {return;}// 设置服务状态setServing(true);// this will block while we serve// TNonblocking 中实现为 selectAcceptThread_.join(); // 主线程等待 selectAcceptThread 执行完毕// SelectAcceptThread 的 run 方法为 select();// 取出一个就绪的 socket
  waitForShutdown();setServing(false);// do a little cleanup
  stopListening();
}// SelectAcceptThread run 方法
public void run() {while (!stopped_) {select();processInterestChanges();}for (SelectionKey selectionKey : selector.keys()) {cleanupSelectionKey(selectionKey);}
}// SelectAcceptThread Select 过程
private void select() {try {// wait for io events.// NIO 取出一个
    selector.select();// process the io events we receivedIterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();// 遍历就绪的 socketwhile (!stopped_ && selectedKeys.hasNext()) {SelectionKey key = selectedKeys.next();selectedKeys.remove();// if the key is marked Accept, then it has to be the server// transport.// accept 新 socket 并将其注册到 selector 中if (key.isAcceptable()) {handleAccept();} else if (key.isReadable()) {// deal with reads// 处理读数据的 socket 请求
        handleRead(key);} else if (key.isWritable()) {// deal with writes// 处理写数据的 socket 请求
        handleWrite(key);} else {LOGGER.warn("Unexpected state in select! " + key.interestOps());}}} catch (IOException e) {LOGGER.warn("Got an IOException while selecting!", e);}
}// 接收新的连接
private void handleAccept() throws IOException {SelectionKey clientKey = null;TNonblockingTransport client = null;// accept the connectionclient = (TNonblockingTransport)serverTransport.accept();// 注册到 selector 中clientKey = client.registerSelector(selector, SelectionKey.OP_READ);// add this key to the mapFrameBuffer frameBuffer = createFrameBuffer(client, clientKey, SelectAcceptThread.this);clientKey.attach(frameBuffer);
}

TNonblockingServer 模式的缺点:

其还是采用单线程顺序来完成,当业务处理比较复杂耗时,该模式的效率将会下降

TNonblockingServer 工作图:

draw.io

THsHaServer

THsHaServer 是 TNoblockingServer 的子类,处理逻辑基本相同,不同的是,在处理读取请求时,THsHaServer 将处理过程交由线程池来完成,主线程直接返回进行下一次循环,提高了效率

THsHaServer 模式的缺点:

其主线程需要完成对所有 socket 的监听一级数据的写操作,当大请求量时,效率较低

TThreadedSelectorServer

TThreadedSelectorServer 是 Thrift 目前提供的最高级模式,生产环境的首选,其对 TNonblockingServer 进行了扩展

TThreadedSelectorServer 源码中一些关键的属性

public static class Args extends AbstractNonblockingServerArgs<Args> {// 在已接收的连接中选择线程的个数public int selectorThreads = 2;// 执行线程池 ExecutorService 的线程个数private int workerThreads = 5;// 执行请求具体任务的线程池private ExecutorService executorService = null;
}
// The thread handling all accepts
private AcceptThread acceptThread;
// Threads handling events on client transports
private final Set<SelectorThread> selectorThreads = new HashSet<SelectorThread>();
// This wraps all the functionality of queueing and thread pool management
// for the passing of Invocations from the selector thread(s) to the workers
// (if any).
private final ExecutorService invoker;
/*** 循环模式的负载均衡器,用于为新的连接选择 SelectorThread*/
protected static class SelectorThreadLoadBalancer {}
  1. AcceptThread 线程对象,用于监听 socket 的新连接

  2. 多个 SelectorThread 线程对象,用于处理 socket 的读写操作

  3. 一个负载均衡对象 SelectorThreadLoadBalancer,用于决定将 AcceptThread 接收到的 socket 请求分配给哪个 SelectorThread 线程

  4. SelectorThread 线程执行过读写操作后,通过 ExecutorService 线程池来完成此次调用的具体执行

SelectorThread 对象源码解析

/*** 多个 SelectorThread 负责处理 socket 的 I/O 操作*/
protected class SelectorThread extends AbstractSelectThread {/*** The work loop. Handles selecting (read/write IO), dispatching, and* managing the selection preferences of all existing connections.* 选择(处理 socket 的网络读写 IO),分配和管理现有连接*/public void run() {while (!stopped_) {select();}}private void select() {// process the io events we receivedIterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();while (!stopped_ && selectedKeys.hasNext()) {SelectionKey key = selectedKeys.next();selectedKeys.remove();// skip if not validif (!key.isValid()) {cleanupSelectionKey(key);continue;}if (key.isReadable()) {// deal with reads
        handleRead(key);} else if (key.isWritable()) {// deal with writes
        handleWrite(key);} else {LOGGER.warn("Unexpected state in select! " + key.interestOps());}}}
}

AcceptThread 对象源码解析

/*** 在服务器传输中选择线程(监听 socket 请求)并向 IO 选择器(SelectorThread)提供新连接*/
protected class AcceptThread extends Thread {// The listen socket to accept onprivate final TNonblockingServerTransport serverTransport;private final Selector acceptSelector;// 负载均衡器,决定将连接分配给哪个 SelectorThreadprivate final SelectorThreadLoadBalancer threadChooser;public void run() {while (!stopped_) {select();}}private void select() {// process the io events we receivedIterator<SelectionKey> selectedKeys = acceptSelector.selectedKeys().iterator();while (!stopped_ && selectedKeys.hasNext()) {SelectionKey key = selectedKeys.next();selectedKeys.remove();// 处理接收的新情求if (key.isAcceptable()) {handleAccept();} else {LOGGER.warn("Unexpected state in select! " + key.interestOps());}}}/*** Accept a new connection.*/private void handleAccept() {final TNonblockingTransport client = doAccept();if (client != null) {// 从负载均衡器中,获取 SelectorThread 线程final SelectorThread targetThread = threadChooser.nextThread();if (args.acceptPolicy == Args.AcceptPolicy.FAST_ACCEPT || invoker == null) {doAddAccept(targetThread, client);} else {// FAIR_ACCEPTinvoker.submit(new Runnable() {public void run() {// 将选择到的线程和连接放入 线程池 处理// 用 targetThread 线程取处理一个给接受的链接 client,如果新连接的队列处于满的状态,则将处于阻塞状态
            doAddAccept(targetThread, client);}});}}}private TNonblockingTransport doAccept() {return (TNonblockingTransport) serverTransport.accept();}// 用 targetThread 线程取处理一个给接受的链接 client,如果新连接的队列处于满的状态,则将处于阻塞状态private void doAddAccept(SelectorThread thread, TNonblockingTransport client) {if (!thread.addAcceptedConnection(client)) {client.close();}}
}

TThreadedSelectorServer 工作图

draw.io

参考资料

  • Thrift server端的几种工作模式分析:http://blog.csdn.net/houjixin/article/details/42779915
  • Thrift 网络服务模型:http://www.cnblogs.com/mumuxinfei/p/3875165.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/539229.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

base64是哪个jar包的_涨知识 | 用maven轻松管理jar包

前言相信只要做过 Java 开发的童鞋们&#xff0c;对 Ant 想必都不陌生&#xff0c;我们往往使用 Ant 来构建项目&#xff0c;尤其是涉及到特别繁杂的工作量&#xff0c;一个 build.xml 能够完成编译、测试、打包、部署等很多任务&#xff0c;这在很大的程度上解放了程序员们的双…

JMS(Java消息服务)与消息队列ActiveMQ基本使用(一)

最近的项目中用到了mq&#xff0c;之前自己一直在码农一样的照葫芦画瓢。最近几天研究了下&#xff0c;把自己所有看下来的文档和了解总结一下。 一. 认识JMS 1.概述 对于JMS,百度百科&#xff0c;是这样介绍的&#xff1a;JMS即Java消息服务&#xff08;Java Message Service&…

hive复合数据类型之struct

概述 STRUCT&#xff1a;STRUCT可以包含不同数据类型的元素。这些元素可以通过”点语法”的方式来得到所需要的元素&#xff0c;比如user是一个STRUCT类型&#xff0c;那么可以通过user.address得到这个用户的地址。 操作实例 1、创建表 create table student_test(id int,in…

hive复合数据类型之array

概述 ARRAY&#xff1a;ARRAY类型是由一系列相同数据类型的元素组成&#xff0c;这些元素可以通过下标来访问。比如有一个ARRAY类型的变量fruits&#xff0c;它是由[apple,orange,mango]组成&#xff0c;那么我们可以通过fruits[1]来访问元素orange&#xff0c;因为ARRAY类型的…

hive复合数据类型之map

概述 MAP&#xff1a;MAP包含key->value键值对&#xff0c;可以通过key来访问元素。比如”userlist”是一个map类型&#xff0c;其中username是key&#xff0c;password是value&#xff1b;那么我们可以通过userlist[username]来得到这个用户对应的password&#xff1b; 操…

Beego框架使用

为什么80%的码农都做不了架构师&#xff1f;>>> Beego Web项目目录结构 new 命令是新建一个 Web 项目&#xff0c;我们在命令行下执行 bee new <项目名> 就可以创建一个新的项目。但是注意该命令必须在 $GOPATH/src 下执行。最后会在 $GOPATH/src 相应目录下…

oracle下lag和lead分析函数

Lag和Lead分析函数可以在同一次查询中取出同一字段的前N行的数据(Lag)和后N行的数据(Lead)作为独立的列。 这种操作可以代替表的自联接&#xff0c;并且LAG和LEAD有更高的效率。 语法&#xff1a; [sql] view plaincopy /*语法*/ lag(exp_str,offset,defval) over() Lead(…

jtessboxeditorfx 界面显示不出来_macOS 使用 XQuartz 支持 X11 实现 Linux 图形化界面显示...

更多奇技淫巧欢迎订阅博客&#xff1a;https://fuckcloudnative.io前言在 Windows 中相信大家已经很熟悉使用 Xmanager(Xshell), MobaXterm, SecureCRT 通过 X11 实现 Linux 图形化界面显示&#xff0c;我的需求是在 macOS 下使用 iTerm2 作为 Terminal 实现 X11 图形化界面显示…

EntityFramework Core 2.0 Explicitly Compiled Query(显式编译查询)

前言 EntityFramework Core 2.0引入了显式编译查询&#xff0c;在查询数据时预先编译好LINQ查询便于在请求数据时能够立即响应。显式编译查询提供了高可用场景&#xff0c;通过使用显式编译的查询可以提高查询性能。EF Core已经使用查询表达式的散列来表示自动编译和缓存查询&a…

.net 导出excel_Qt编写的项目作品18-数据导出到Excel及Pdf和打印数据

一、功能特点原创导出数据机制&#xff0c;不依赖任何office组件或者操作系统等第三方库&#xff0c;尤其是支持嵌入式linux。10万行数据9个字段只需要2秒钟完成。只需要四个步骤即可开始急速导出大量数据到Excel。同时提供直接写入数据接口和多线程写入数据接口&#xff0c;不…

图像增强_Keras 常用的图像增强方式

欢迎关注 “小白玩转Python”&#xff0c;发现更多 “有趣”在使用神经网络和深度学习模型时&#xff0c;需要进行数据准备。对于更复杂的物体识别任务&#xff0c;也越来越需要增加数据量。数据增加意味着增加数据量。换句话说&#xff0c;拥有更大的数据集意味着更健壮的模型…

Facebook产品经理的三年叙事与协作思考

产品经理和研发工程师的关系经常被大家调侃&#xff0c;可偏偏就有同时受到研发和设计都喜欢的“别人家的产品经理”&#xff0c;沟通协调、对接需求、项目把控面面俱到还有好人缘。有没有人天生就是产品经理&#xff1f;产品经理的工作就是写需求写需求和写需求么&#xff1f;…

sis新地址_坚若磐石不掉速,老平台升级新选择,入手昱联Asint 500G SSD

我是文章的原作者&#xff0c;文章首发于&#xff1a;什么值得买爱折腾的老狐狸​zhiyou.smzdm.com首发文章链接&#xff1a;坚若磐石不掉速&#xff0c;老平台升级新选择&#xff0c;入手昱联Asint 500G SSD _值客原创_什么值得买​post.smzdm.com虽然说&#xff0c;现在越来越…

进度条设置_为你的练习设置进度条

在我们的日常练习中&#xff0c;遇到最多的一个问题就是不知道自己练得怎么样了&#xff1f;还需不需要继续&#xff0c;或者调整练习方法。这种问题大多出现在自学吉他的学生当中&#xff0c;因为得不到老师的反馈&#xff0c;自己练得对不对&#xff0c;够不够&#xff0c;都…

Python之路(第二篇):Python基本数据类型字符串(一)

一、基础1、编码 UTF-8:中文占3个字节 GBK&#xff1a;中文占2个字节 Unicode、UTF-8、GBK三者关系 ascii码是只能表示英文字符&#xff0c;用8个字节表示英文&#xff0c;unicode是统一码&#xff0c;世界通用码&#xff0c;规定采用2个字节对世界各地不同文字进行编码&#x…

python ftp下载文件_文件上传下载Python

点击上方蓝字关注我&#xff01;图片来源 pexels.com简单实现文件上传、下载1 Server端 # -*- coding: utf-8 -*-import jsonimport os__author__ sange# Time : 2020/8/17 下午5:26# Author : sange# File : tcpserver_socket.py# Software: PyCharmimport socketserv…

react json转换_Typescript + React 新手篇

极链科技前端工程师茅丹丹前言 TS是什么Type Type (标准JS)。TS的官方网站&#xff1a;Type is a typed superset of Java that compiles to plain Java。Type是一个编译到纯JS的有类型定义的JS超集。 TS优点 TS 最大的优势是它提供了强大的静态分析能力&#xff0c;结合 TSL…

android listview 滑动条显示_第七十六回:Android中UI控件之RecyclerView基础

各位看官们&#xff0c;大家好&#xff0c;上一回中咱们说的是Android中UI控件之ListView优化的例子&#xff0c;这一回咱们说的例子是UI控件之RecyclerView。闲话休提&#xff0c;言归正转。让我们一起Talk Android吧&#xff01;看官们&#xff0c;我们在前面章回中介绍了Lis…

Hive的数据模型-外部表

概述 包含External 的表叫外部表 删除外部表只删除metastore的元数据&#xff0c;不删除hdfs中的表数据 外部表 只有一个过程&#xff0c;加载数据和创建表同时完成&#xff0c;并不会移动到数据仓库目录中&#xff0c;只是与外部数据建立一个链接。当删除一个 外部表 时&…

centos默认安装mysql_centos6.x默认安装mysql5.7

1. yum 安装 mysql5.7 yum 源yum localinstall mysql57-community-release-el6-8.noarch.rpm2. 查看是否成功安装MySQL Yum Repositoryyum repolist enabled|grep ""mysql.*-community.*3. 安装mysqlyum install mysql-community-server4.开启数据库服务service mys…