java thrift连接池_由浅入深了解Thrift之客户端连接池化

一、问题描述

在上一篇《由浅入深了解Thrift之服务模型和序列化机制》文章中,我们已经了解了thrift的基本架构和网络服务模型的优缺点。如今的互联网圈中,RPC服务化的思想如火如荼。我们又该如何将thrift服务化应用到我们的项目中哪?实现thrift服务化前,我们先想想这几个问题:服务注册、服务发现、服务健康检测、服务“Load Balance”、隐藏client和server端的交互细节、服务调用端的对象池化。

服务的注册、发现和健康检测,我们使用zookeeper可以很好的解决

服务“Load Balance",我们可以使用简单的算法“权重+随机”,当然也可以使用成熟复杂的算法

服务调用端的对象池化,我们可以使用common pool,使用简单又可以满足我们的需求

二、实现思路

1、thrift server端启动时,每个实例向zk集群以临时节点方式注册(这样,遍历zk上/server下有多少个临时节点就知道有哪些server实例)

thrift server端可以单机多端口多实例或多机部署多实例方式运行。

2、服务调用方实现一个连接池,连接池初始化时,通过zk将在线的server实例信息同步到本地并缓存,同时监听zk下的节点变化。

3、服务调用方与Server通讯时,从连接池中取一个可用的连接,用它实现RPC调用。

d0311a956f3af09138c6c3ecde8c0094.png

三、具体实现

1、thrift server端

thrift server端,向zk中注册server address

8f900a89c6347c561fdf2122f13be562.png

961ddebeb323a10fe0623af514929fc1.png

packagecom.wy.thriftpool.commzkpool;importjava.lang.instrument.IllegalClassFormatException;importjava.lang.reflect.Constructor;importorg.apache.thrift.protocol.TBinaryProtocol;importorg.apache.thrift.protocol.TBinaryProtocol.Factory;importorg.apache.thrift.server.TServer;importorg.apache.thrift.server.TThreadedSelectorServer;importorg.apache.thrift.transport.TFramedTransport;importorg.apache.thrift.transport.TNonblockingServerSocket;importorg.springframework.beans.factory.InitializingBean;importcom.wy.thrift.service.UserService.Processor;importcom.wy.thriftpool.commzkpool.support.ThriftServerAddressReporter;importcom.wy.thriftpool.commzkpool.support.ThriftServerIpTransfer;importcom.wy.thriftpool.commzkpool.support.impl.LocalNetworkIpTransfer;/*** thrift server端,向zk中注册server address

*

*@authorwy

**/

public class ThriftServiceServerFactory implementsInitializingBean {//thrift server 服务端口

privateInteger port;//default 权重

private Integer priority = 1;//service实现类

privateObject service;//thrift server 注册路径

privateString configPath;privateThriftServerIpTransfer ipTransfer;//thrift server注册类

privateThriftServerAddressReporter addressReporter;//thrift server开启服务

privateServerThread serverThread;

@Overridepublic void afterPropertiesSet() throwsException {if (ipTransfer == null) {

ipTransfer= newLocalNetworkIpTransfer();

}

String ip=ipTransfer.getIp();if (ip == null) {throw new NullPointerException("cant find server ip...");

}

String hostname= ip + ":" + port + ":" +priority;

Class extends Object> serviceClass =service.getClass();

ClassLoader classLoader=Thread.currentThread().getContextClassLoader();

Class>[] interfaces =serviceClass.getInterfaces();if (interfaces.length == 0) {throw new IllegalClassFormatException("service-class should implements Iface");

}//reflect,load "Processor";

Processor> processor = null;for (Class>clazz : interfaces) {

String cname=clazz.getSimpleName();if (!cname.equals("Iface")) {continue;

}

String pname= clazz.getEnclosingClass().getName() + "$Processor";try{

Class> pclass =classLoader.loadClass(pname);if (!pclass.isAssignableFrom(Processor.class)) {continue;

}

Constructor> constructor =pclass.getConstructor(clazz);

processor= (Processor>) constructor.newInstance(service);break;

}catch(Exception e) {//TODO

}

}if (processor == null) {throw new IllegalClassFormatException("service-class should implements Iface");

}//需要单独的线程,因为serve方法是阻塞的.

serverThread = newServerThread(processor, port);

serverThread.start();//report

if (addressReporter != null) {

addressReporter.report(configPath, hostname);

}

}class ServerThread extendsThread {privateTServer server;

ServerThread(Processor> processor, int port) throwsException {//设置传输通道

TNonblockingServerSocket serverTransport = newTNonblockingServerSocket(port);//设置二进制协议

Factory protocolFactory = newTBinaryProtocol.Factory();

TThreadedSelectorServer.Args tArgs= newTThreadedSelectorServer.Args(serverTransport);

tArgs.processor(processor);

tArgs.transportFactory(newTFramedTransport.Factory());

tArgs.protocolFactory(protocolFactory);int num = Runtime.getRuntime().availableProcessors() * 2 + 1;

tArgs.selectorThreads(num);

tArgs.workerThreads(num* 10);//网络服务模型

server = newTThreadedSelectorServer(tArgs);

}

@Overridepublic voidrun() {try{

server.serve();

}catch(Exception e) {//TODO

}

}public voidstopServer() {

server.stop();

}

}public voidclose() {

serverThread.stopServer();

}public voidsetService(Object service) {this.service =service;

}public voidsetPriority(Integer priority) {this.priority =priority;

}public voidsetPort(Integer port) {this.port =port;

}public voidsetIpTransfer(ThriftServerIpTransfer ipTransfer) {this.ipTransfer =ipTransfer;

}public voidsetAddressReporter(ThriftServerAddressReporter addressReporter) {this.addressReporter =addressReporter;

}public voidsetConfigPath(String configPath) {this.configPath =configPath;

}

}

View Code

thrift server address注册到zk

8f900a89c6347c561fdf2122f13be562.png

961ddebeb323a10fe0623af514929fc1.png

packagecom.wy.thriftpool.commzkpool.support.impl;importorg.apache.curator.framework.CuratorFramework;importorg.apache.curator.framework.imps.CuratorFrameworkState;importorg.apache.zookeeper.CreateMode;importcom.wy.thriftpool.commzkpool.support.ThriftServerAddressReporter;/*** thrift server address注册到zk

*

*@authorwy

**/

public class DynamicAddressReporter implementsThriftServerAddressReporter {privateCuratorFramework zookeeper;publicDynamicAddressReporter() {

}publicDynamicAddressReporter(CuratorFramework zookeeper) {this.zookeeper =zookeeper;

}public voidsetZookeeper(CuratorFramework zookeeper) {this.zookeeper =zookeeper;

}

@Overridepublic void report(String service, String address) throwsException {if (zookeeper.getState() ==CuratorFrameworkState.LATENT) {

zookeeper.start();

zookeeper.newNamespaceAwareEnsurePath(service);

}

zookeeper.create().creatingParentsIfNeeded()

.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)

.forPath(service+ "/i_", address.getBytes("utf-8"));

}public voidclose() {

zookeeper.close();

}

}

View Code

。。。

spring配置文件

8f900a89c6347c561fdf2122f13be562.png

961ddebeb323a10fe0623af514929fc1.png

View Code

2、服务调用端

连接池实现

杯了个具,为啥就不能提交。代码在评论中。

连接池工厂,负责与Thrift server通信

8f900a89c6347c561fdf2122f13be562.png

961ddebeb323a10fe0623af514929fc1.png

packagecom.wy.thriftpool.commzkconnpool;importjava.net.InetSocketAddress;importorg.apache.commons.pool.PoolableObjectFactory;importorg.apache.thrift.transport.TSocket;importorg.apache.thrift.transport.TTransport;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importcom.wy.thriftpool.commzkpool.support.ThriftServerAddressProvider;/*** 连接池工厂,负责与Thrift server通信

*

*@authorwy

**/

public class ThriftPoolFactory implements PoolableObjectFactory{private final Logger logger =LoggerFactory.getLogger(getClass());//超时设置

public inttimeOut;private finalThriftServerAddressProvider addressProvider;privatePoolOperationCallBack callback;publicThriftPoolFactory(ThriftServerAddressProvider addressProvider, PoolOperationCallBack callback) {super();this.addressProvider =addressProvider;this.callback =callback;

}public ThriftPoolFactory(ThriftServerAddressProvider addressProvider, PoolOperationCallBack callback, inttimeOut) {super();this.addressProvider =addressProvider;this.callback =callback;this.timeOut =timeOut;

}/*** 创建对象*/@Overridepublic TTransport makeObject() throwsException {try{

InetSocketAddress address=addressProvider.selector();

TTransport transport= new TSocket(address.getHostName(), address.getPort(), this.timeOut);

transport.open();if (callback != null) {

callback.make(transport);

}returntransport;

}catch(Exception e) {

logger.error("creat transport error:", e);throw newRuntimeException(e);

}

}/*** 销毁对象*/@Overridepublic void destroyObject(TTransport transport) throwsException {if (transport != null &&transport.isOpen()) {

transport.close();

}

}/*** 检验对象是否可以由pool安全返回*/@Overridepublic booleanvalidateObject(TTransport transport) {try{if (transport != null && transport instanceofTSocket) {

TSocket thriftSocket=(TSocket) transport;if(thriftSocket.isOpen()) {return true;

}else{return false;

}

}else{return false;

}

}catch(Exception e) {return false;

}

}

@Overridepublic void activateObject(TTransport obj) throwsException {//TODO Auto-generated method stub

}

@Overridepublic void passivateObject(TTransport obj) throwsException {//TODO Auto-generated method stub

}public static interfacePoolOperationCallBack {//创建成功是执行

voidmake(TTransport transport);//销毁之前执行

voiddestroy(TTransport transport);

}

}

View Code

连接池管理

8f900a89c6347c561fdf2122f13be562.png

961ddebeb323a10fe0623af514929fc1.png

packagecom.wy.thriftpool.commzkconnpool;importorg.apache.thrift.transport.TSocket;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Service;/*** 连接池管理

*

*@authorwy

**/@Servicepublic classConnectionManager {private final Logger logger =LoggerFactory.getLogger(getClass());//保存local对象

ThreadLocal socketThreadSafe = new ThreadLocal();//连接提供池

@AutowiredprivateConnectionProvider connectionProvider;publicTSocket getSocket() {

TSocket socket= null;try{

socket=connectionProvider.getConnection();

socketThreadSafe.set(socket);returnsocketThreadSafe.get();

}catch(Exception e) {

logger.error("error ConnectionManager.invoke()", e);

}finally{

connectionProvider.returnCon(socket);

socketThreadSafe.remove();

}returnsocket;

}

}

View Code

spring配置文件

8f900a89c6347c561fdf2122f13be562.png

961ddebeb323a10fe0623af514929fc1.png

View Code

参考:http://www.cnblogs.com/mumuxinfei/p/3876187.html

由于本人经验有限,文章中难免会有错误,请浏览文章的您指正或有不同的观点共同探讨!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/556857.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Vue 进阶组件实战应用 -- 父子组件传值的应用实例(子父组件传值的两种触发方式)

基础的子组件和父组件通信已经搞定了,可以看此博客 父子组件传值基础应用 需求 现在需求是在一个父页面引用子组件,不只是要实现基本的父子组件传值。并且子组件给父组件传值的触发条件要在父页面触发。 目前小编采用的方式是使用ref 属性this.emit 方法…

学习Spring Boot:(一)入门

微服务 微服务其实是服务化思路的一种最佳实践方向,遵循SOA(面向服务的架构)的思路,各个企业在服务化治理上面的道路已经走得很远了,整个软件交付链上各个环节的基础设施逐渐成熟了,微服务就诞生了。 微服务…

java有几种变量_java有多少种变量?java类变量怎么使用?

相信有很多刚入行学习java技术的人,对java有多少种变量都不是很清楚,清楚的了解java变量java人员才可以写出好代码,那么java有多少种变量?今天我们就来讲解一下。成员变量:就是声明为类的属性的变量。静态变量(也叫做类变量)&…

学习Spring Boot:(二)启动原理

前言 主要了解前面的程序入口 SpringBootApplication 这个注解的结构。 正文 参考《SpringBoot揭秘 快速构建微服务体系》第三章的学习,总结下。 SpringBootApplication背后的秘密 Target(ElementType.TYPE) Retention(RetentionPolicy.RUNTIME) Documented In…

java 虚基类_重拾C++之虚函数和虚基类以及抽象类

一、引言好久没接触过C了,今天突然要用一点感觉号蛋疼,用惯了python感觉C一点都不会了。声明了一个类的对象居然用这种方法,脑子绝对是被驴(python)踢了class A{...}aA();//尼玛这都能行,被踢大了二、虚函数和一般函数虚函数就是加…

学习Spring Boot:(三)配置文件

前言 Spring Boot使用习惯优于配置(项目中存在大量的配置,此外还内置了一个习惯性的配置,让你无需手动进行配置)的理念让你的项目快速运行起来。 正文 使用配置文件注入属性 Spring Boot 默认的配置文件src/main/java/resourc…

c语言中闰年的流程图_C语言-算法与流程图

《C语言-算法与流程图》由会员分享,可在线阅读,更多相关《C语言-算法与流程图(22页珍藏版)》请在人人文库网上搜索。1、目录,第一章 绪论 第二章 算法与流程图 第三章 数据类型、运算符和表达式 第四章 程序的控制结构 第五章 函数 第六章 数组 第七章 指…

学习Spring Boot:(四)应用日志

前言 应用日志是一个系统非常重要的一部分,后来不管是开发还是线上,日志都起到至关重要的作用。这次使用的是 Logback 日志框架。 正文 Spring Boot在所有内部日志中使用Commons Logging,但是默认配置也提供了对常用日志的支持&#xff0c…

java用户登录窗口怎么删除_从窗口中删除 Headers 栏 . 窗口过程由不同的用户启动...

我正在使用此代码(在Windows 2003上)删除和调整窗口大小:Process process Process.GetProcessById(12121);IntPtr mwh process.MainWindowHandle;SetWindowLong(mwh, GWL_STYLE, WS_VISIBLE);ShowWindowAsync(mwh, 3);SetWindowPos(mwh, new IntPtr(0), 0, 0, 0, …

学习Spring Boot:(五)使用 devtools热部署

前言 spring-boot-devtools 是一个为开发者服务的一个模块,其中最重要的功能就是自动应用代码更改到最新的App上面去。原理是在发现代码有更改之后,重新启动应用,但是比速度比手动停止后再启动还要更快,更快指的不是节省出来的手工…

java单位数_java – 优化代码以查找给定数量N的阶乘的单位数

我在竞赛中尝试了一个问题,其确切陈述是这样的:Given a number N. The task is to find the unit digit of factorial of givennumber N.Input:First line of input contains number of testcases T. For each testcase, therewill be a single line containing N.O…

学习Spring Boot:(六) 集成Swagger2

前言 Swagger是用来描述和文档化RESTful API的一个项目。Swagger Spec是一套规范,定义了该如何去描述一个RESTful API。类似的项目还有RAML、API Blueprint。 根据Swagger Spec来描述RESTful API的文件称之为Swagger specification file,它使用JSON来表…

java的队列实现方法_Java实现队列的三种方法集合

数组实现队列//数组实现队列class queue{int[] a new int[5];int i 0;//入队操作public void in(int m) {a[i] m;}// 出队列操作 取出最前面的值 通过循环遍历把所有的数据向前一位public int out() {int index 0;int temp a[0];for(int j 0;j < i;j) {a[j] a[j 1];…

学习Spring Boot:(七)集成Mybatis

前面都是用的是spring data JPA&#xff0c;现在学习下Mybatis&#xff0c;而且现在Mybatis也像JPA那样支持注解形式了&#xff0c;也非常方便&#xff0c;学习一下。 数据库 mysql 5.7 添加依赖 在pom文件中添加&#xff1a; <mybatis.version>1.3.1</mybatis.ve…

php 简转繁体,php如何实现简体繁体转换

思路&#xff1a;根据中文简体、繁体对应的数据表&#xff0c;将其整理成一个以简体字为键&#xff0c;繁体字为值的一个一维数组&#xff0c;类似下面这样的一个数组结构&#xff1a;$dataarray(侧>側,厂>廠);在线学习视频分享&#xff1a;php视频教程好了&#xff0c;根…

学习Spring Boot:(八)Mybatis使用分页插件PageHelper

首先Mybqtis可以通过SQL 的方式实现分页很简单&#xff0c;只要在查询SQL 后面加上limit #{currIndex} , #{pageSize}就可以了。 本文主要介绍使用拦截器的方式实现分页。 实现原理 拦截器实现了拦截所有查询需要分页的方法&#xff0c;并且利用获取到的分页相关参数统一在s…

php递归删除文件,PHP 递归删除文件夹

用PHP实现递归删除整个文件夹。如果有什么不对的&#xff0c;请大家指教。/***遍历删除文件夹**param $dir 要删除文件夹的文件夹*/public function del_Dir($dir){$flag $this->is_empty_dir($dir);if( $flagfalse ){$dp opendir($dir);while(false ! $file readdir($dp…

学习Spring Boot:(九)统一异常处理

前言 开发的时候&#xff0c;每个controller的接口都需要进行捕捉异常的处理&#xff0c;以前有的是用切面做的&#xff0c;但是SpringMVC中就自带了ControllerAdvice &#xff0c;用来定义统一异常处理类&#xff0c;在 SpringBoot 中额外增加了 RestControllerAdvice。 使用…

php7 ast,PHP7 的抽象语法树(AST)带来的变化

什么是抽象语法树&#xff1f;抽象语法树(abstract syntax tree&#xff0c;AST)是源代码的抽象语法结构的树状表示&#xff0c;树上的每个节点都表示源代码中的一种结构&#xff0c;这所以说是抽象的&#xff0c;是因为抽象语法树并不会表示出真实语法出现的每一个细节&#x…

学习Spring Boot:(十)使用hibernate validation完成数据后端校验

前言 后台数据的校验也是开发中比较注重的一点&#xff0c;用来校验数据的正确性&#xff0c;以免一些非法的数据破坏系统&#xff0c;或者进入数据库&#xff0c;造成数据污染&#xff0c;由于数据检验可能应用到很多层面&#xff0c;所以系统对数据校验要求比较严格且追求可…