java thrift连接池_由浅入深了解Thrift之客户端连接池化

一、问题描述

在上一篇《由浅入深了解Thrift之服务模型和序列化机制》文章中,我们已经了解了thrift的基本架构和网络服务模型的优缺点。如今的互联网圈中,RPC服务化的思想如火如荼。我们又该如何将thrift服务化应用到我们的项目中哪?实现thrift服务化前,我们先想想这几个问题:服务注册、服务发现、服务健康检测、服务“Load Balance”、隐藏client和server端的交互细节、服务调用端的对象池化。

服务的注册、发现和健康检测,我们使用zookeeper可以很好的解决

服务“Load Balance",我们可以使用简单的算法“权重+随机”,当然也可以使用成熟复杂的算法

服务调用端的对象池化,我们可以使用common pool,使用简单又可以满足我们的需求

二、实现思路

1、thrift server端启动时,每个实例向zk集群以临时节点方式注册(这样,遍历zk上/server下有多少个临时节点就知道有哪些server实例)

thrift server端可以单机多端口多实例或多机部署多实例方式运行。

2、服务调用方实现一个连接池,连接池初始化时,通过zk将在线的server实例信息同步到本地并缓存,同时监听zk下的节点变化。

3、服务调用方与Server通讯时,从连接池中取一个可用的连接,用它实现RPC调用。

d0311a956f3af09138c6c3ecde8c0094.png

三、具体实现

1、thrift server端

thrift server端,向zk中注册server address

8f900a89c6347c561fdf2122f13be562.png

961ddebeb323a10fe0623af514929fc1.png

packagecom.wy.thriftpool.commzkpool;importjava.lang.instrument.IllegalClassFormatException;importjava.lang.reflect.Constructor;importorg.apache.thrift.protocol.TBinaryProtocol;importorg.apache.thrift.protocol.TBinaryProtocol.Factory;importorg.apache.thrift.server.TServer;importorg.apache.thrift.server.TThreadedSelectorServer;importorg.apache.thrift.transport.TFramedTransport;importorg.apache.thrift.transport.TNonblockingServerSocket;importorg.springframework.beans.factory.InitializingBean;importcom.wy.thrift.service.UserService.Processor;importcom.wy.thriftpool.commzkpool.support.ThriftServerAddressReporter;importcom.wy.thriftpool.commzkpool.support.ThriftServerIpTransfer;importcom.wy.thriftpool.commzkpool.support.impl.LocalNetworkIpTransfer;/*** thrift server端,向zk中注册server address

*

*@authorwy

**/

public class ThriftServiceServerFactory implementsInitializingBean {//thrift server 服务端口

privateInteger port;//default 权重

private Integer priority = 1;//service实现类

privateObject service;//thrift server 注册路径

privateString configPath;privateThriftServerIpTransfer ipTransfer;//thrift server注册类

privateThriftServerAddressReporter addressReporter;//thrift server开启服务

privateServerThread serverThread;

@Overridepublic void afterPropertiesSet() throwsException {if (ipTransfer == null) {

ipTransfer= newLocalNetworkIpTransfer();

}

String ip=ipTransfer.getIp();if (ip == null) {throw new NullPointerException("cant find server ip...");

}

String hostname= ip + ":" + port + ":" +priority;

Class extends Object> serviceClass =service.getClass();

ClassLoader classLoader=Thread.currentThread().getContextClassLoader();

Class>[] interfaces =serviceClass.getInterfaces();if (interfaces.length == 0) {throw new IllegalClassFormatException("service-class should implements Iface");

}//reflect,load "Processor";

Processor> processor = null;for (Class>clazz : interfaces) {

String cname=clazz.getSimpleName();if (!cname.equals("Iface")) {continue;

}

String pname= clazz.getEnclosingClass().getName() + "$Processor";try{

Class> pclass =classLoader.loadClass(pname);if (!pclass.isAssignableFrom(Processor.class)) {continue;

}

Constructor> constructor =pclass.getConstructor(clazz);

processor= (Processor>) constructor.newInstance(service);break;

}catch(Exception e) {//TODO

}

}if (processor == null) {throw new IllegalClassFormatException("service-class should implements Iface");

}//需要单独的线程,因为serve方法是阻塞的.

serverThread = newServerThread(processor, port);

serverThread.start();//report

if (addressReporter != null) {

addressReporter.report(configPath, hostname);

}

}class ServerThread extendsThread {privateTServer server;

ServerThread(Processor> processor, int port) throwsException {//设置传输通道

TNonblockingServerSocket serverTransport = newTNonblockingServerSocket(port);//设置二进制协议

Factory protocolFactory = newTBinaryProtocol.Factory();

TThreadedSelectorServer.Args tArgs= newTThreadedSelectorServer.Args(serverTransport);

tArgs.processor(processor);

tArgs.transportFactory(newTFramedTransport.Factory());

tArgs.protocolFactory(protocolFactory);int num = Runtime.getRuntime().availableProcessors() * 2 + 1;

tArgs.selectorThreads(num);

tArgs.workerThreads(num* 10);//网络服务模型

server = newTThreadedSelectorServer(tArgs);

}

@Overridepublic voidrun() {try{

server.serve();

}catch(Exception e) {//TODO

}

}public voidstopServer() {

server.stop();

}

}public voidclose() {

serverThread.stopServer();

}public voidsetService(Object service) {this.service =service;

}public voidsetPriority(Integer priority) {this.priority =priority;

}public voidsetPort(Integer port) {this.port =port;

}public voidsetIpTransfer(ThriftServerIpTransfer ipTransfer) {this.ipTransfer =ipTransfer;

}public voidsetAddressReporter(ThriftServerAddressReporter addressReporter) {this.addressReporter =addressReporter;

}public voidsetConfigPath(String configPath) {this.configPath =configPath;

}

}

View Code

thrift server address注册到zk

8f900a89c6347c561fdf2122f13be562.png

961ddebeb323a10fe0623af514929fc1.png

packagecom.wy.thriftpool.commzkpool.support.impl;importorg.apache.curator.framework.CuratorFramework;importorg.apache.curator.framework.imps.CuratorFrameworkState;importorg.apache.zookeeper.CreateMode;importcom.wy.thriftpool.commzkpool.support.ThriftServerAddressReporter;/*** thrift server address注册到zk

*

*@authorwy

**/

public class DynamicAddressReporter implementsThriftServerAddressReporter {privateCuratorFramework zookeeper;publicDynamicAddressReporter() {

}publicDynamicAddressReporter(CuratorFramework zookeeper) {this.zookeeper =zookeeper;

}public voidsetZookeeper(CuratorFramework zookeeper) {this.zookeeper =zookeeper;

}

@Overridepublic void report(String service, String address) throwsException {if (zookeeper.getState() ==CuratorFrameworkState.LATENT) {

zookeeper.start();

zookeeper.newNamespaceAwareEnsurePath(service);

}

zookeeper.create().creatingParentsIfNeeded()

.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)

.forPath(service+ "/i_", address.getBytes("utf-8"));

}public voidclose() {

zookeeper.close();

}

}

View Code

。。。

spring配置文件

8f900a89c6347c561fdf2122f13be562.png

961ddebeb323a10fe0623af514929fc1.png

View Code

2、服务调用端

连接池实现

杯了个具,为啥就不能提交。代码在评论中。

连接池工厂,负责与Thrift server通信

8f900a89c6347c561fdf2122f13be562.png

961ddebeb323a10fe0623af514929fc1.png

packagecom.wy.thriftpool.commzkconnpool;importjava.net.InetSocketAddress;importorg.apache.commons.pool.PoolableObjectFactory;importorg.apache.thrift.transport.TSocket;importorg.apache.thrift.transport.TTransport;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importcom.wy.thriftpool.commzkpool.support.ThriftServerAddressProvider;/*** 连接池工厂,负责与Thrift server通信

*

*@authorwy

**/

public class ThriftPoolFactory implements PoolableObjectFactory{private final Logger logger =LoggerFactory.getLogger(getClass());//超时设置

public inttimeOut;private finalThriftServerAddressProvider addressProvider;privatePoolOperationCallBack callback;publicThriftPoolFactory(ThriftServerAddressProvider addressProvider, PoolOperationCallBack callback) {super();this.addressProvider =addressProvider;this.callback =callback;

}public ThriftPoolFactory(ThriftServerAddressProvider addressProvider, PoolOperationCallBack callback, inttimeOut) {super();this.addressProvider =addressProvider;this.callback =callback;this.timeOut =timeOut;

}/*** 创建对象*/@Overridepublic TTransport makeObject() throwsException {try{

InetSocketAddress address=addressProvider.selector();

TTransport transport= new TSocket(address.getHostName(), address.getPort(), this.timeOut);

transport.open();if (callback != null) {

callback.make(transport);

}returntransport;

}catch(Exception e) {

logger.error("creat transport error:", e);throw newRuntimeException(e);

}

}/*** 销毁对象*/@Overridepublic void destroyObject(TTransport transport) throwsException {if (transport != null &&transport.isOpen()) {

transport.close();

}

}/*** 检验对象是否可以由pool安全返回*/@Overridepublic booleanvalidateObject(TTransport transport) {try{if (transport != null && transport instanceofTSocket) {

TSocket thriftSocket=(TSocket) transport;if(thriftSocket.isOpen()) {return true;

}else{return false;

}

}else{return false;

}

}catch(Exception e) {return false;

}

}

@Overridepublic void activateObject(TTransport obj) throwsException {//TODO Auto-generated method stub

}

@Overridepublic void passivateObject(TTransport obj) throwsException {//TODO Auto-generated method stub

}public static interfacePoolOperationCallBack {//创建成功是执行

voidmake(TTransport transport);//销毁之前执行

voiddestroy(TTransport transport);

}

}

View Code

连接池管理

8f900a89c6347c561fdf2122f13be562.png

961ddebeb323a10fe0623af514929fc1.png

packagecom.wy.thriftpool.commzkconnpool;importorg.apache.thrift.transport.TSocket;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Service;/*** 连接池管理

*

*@authorwy

**/@Servicepublic classConnectionManager {private final Logger logger =LoggerFactory.getLogger(getClass());//保存local对象

ThreadLocal socketThreadSafe = new ThreadLocal();//连接提供池

@AutowiredprivateConnectionProvider connectionProvider;publicTSocket getSocket() {

TSocket socket= null;try{

socket=connectionProvider.getConnection();

socketThreadSafe.set(socket);returnsocketThreadSafe.get();

}catch(Exception e) {

logger.error("error ConnectionManager.invoke()", e);

}finally{

connectionProvider.returnCon(socket);

socketThreadSafe.remove();

}returnsocket;

}

}

View Code

spring配置文件

8f900a89c6347c561fdf2122f13be562.png

961ddebeb323a10fe0623af514929fc1.png

View Code

参考:http://www.cnblogs.com/mumuxinfei/p/3876187.html

由于本人经验有限,文章中难免会有错误,请浏览文章的您指正或有不同的观点共同探讨!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/556857.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Vue 进阶组件实战应用 -- 父子组件传值的应用实例(子父组件传值的两种触发方式)

基础的子组件和父组件通信已经搞定了,可以看此博客 父子组件传值基础应用 需求 现在需求是在一个父页面引用子组件,不只是要实现基本的父子组件传值。并且子组件给父组件传值的触发条件要在父页面触发。 目前小编采用的方式是使用ref 属性this.emit 方法…

学习Spring Boot:(一)入门

微服务 微服务其实是服务化思路的一种最佳实践方向,遵循SOA(面向服务的架构)的思路,各个企业在服务化治理上面的道路已经走得很远了,整个软件交付链上各个环节的基础设施逐渐成熟了,微服务就诞生了。 微服务…

学习Spring Boot:(二)启动原理

前言 主要了解前面的程序入口 SpringBootApplication 这个注解的结构。 正文 参考《SpringBoot揭秘 快速构建微服务体系》第三章的学习,总结下。 SpringBootApplication背后的秘密 Target(ElementType.TYPE) Retention(RetentionPolicy.RUNTIME) Documented In…

学习Spring Boot:(四)应用日志

前言 应用日志是一个系统非常重要的一部分,后来不管是开发还是线上,日志都起到至关重要的作用。这次使用的是 Logback 日志框架。 正文 Spring Boot在所有内部日志中使用Commons Logging,但是默认配置也提供了对常用日志的支持&#xff0c…

学习Spring Boot:(五)使用 devtools热部署

前言 spring-boot-devtools 是一个为开发者服务的一个模块,其中最重要的功能就是自动应用代码更改到最新的App上面去。原理是在发现代码有更改之后,重新启动应用,但是比速度比手动停止后再启动还要更快,更快指的不是节省出来的手工…

学习Spring Boot:(六) 集成Swagger2

前言 Swagger是用来描述和文档化RESTful API的一个项目。Swagger Spec是一套规范,定义了该如何去描述一个RESTful API。类似的项目还有RAML、API Blueprint。 根据Swagger Spec来描述RESTful API的文件称之为Swagger specification file,它使用JSON来表…

java的队列实现方法_Java实现队列的三种方法集合

数组实现队列//数组实现队列class queue{int[] a new int[5];int i 0;//入队操作public void in(int m) {a[i] m;}// 出队列操作 取出最前面的值 通过循环遍历把所有的数据向前一位public int out() {int index 0;int temp a[0];for(int j 0;j < i;j) {a[j] a[j 1];…

php 简转繁体,php如何实现简体繁体转换

思路&#xff1a;根据中文简体、繁体对应的数据表&#xff0c;将其整理成一个以简体字为键&#xff0c;繁体字为值的一个一维数组&#xff0c;类似下面这样的一个数组结构&#xff1a;$dataarray(侧>側,厂>廠);在线学习视频分享&#xff1a;php视频教程好了&#xff0c;根…

学习Spring Boot:(八)Mybatis使用分页插件PageHelper

首先Mybqtis可以通过SQL 的方式实现分页很简单&#xff0c;只要在查询SQL 后面加上limit #{currIndex} , #{pageSize}就可以了。 本文主要介绍使用拦截器的方式实现分页。 实现原理 拦截器实现了拦截所有查询需要分页的方法&#xff0c;并且利用获取到的分页相关参数统一在s…

学习Spring Boot:(九)统一异常处理

前言 开发的时候&#xff0c;每个controller的接口都需要进行捕捉异常的处理&#xff0c;以前有的是用切面做的&#xff0c;但是SpringMVC中就自带了ControllerAdvice &#xff0c;用来定义统一异常处理类&#xff0c;在 SpringBoot 中额外增加了 RestControllerAdvice。 使用…

php7 ast,PHP7 的抽象语法树(AST)带来的变化

什么是抽象语法树&#xff1f;抽象语法树(abstract syntax tree&#xff0c;AST)是源代码的抽象语法结构的树状表示&#xff0c;树上的每个节点都表示源代码中的一种结构&#xff0c;这所以说是抽象的&#xff0c;是因为抽象语法树并不会表示出真实语法出现的每一个细节&#x…

学习Spring Boot:(十)使用hibernate validation完成数据后端校验

前言 后台数据的校验也是开发中比较注重的一点&#xff0c;用来校验数据的正确性&#xff0c;以免一些非法的数据破坏系统&#xff0c;或者进入数据库&#xff0c;造成数据污染&#xff0c;由于数据检验可能应用到很多层面&#xff0c;所以系统对数据校验要求比较严格且追求可…

php 仿高德,仿高德路线规划滑动效果

因为项目有个界面要模仿高德地图路径规划滑动效果&#xff0c;因此写了demo&#xff0c;并简单说下分析过程。高德地图效果演示:仿高德路线规划滑动.gifdemo效果演示:高德地图规划滑动.gif一. 分析首先&#xff0c;我们可以看出这个滚动的视图应该是UIScrollView或者UIScrollVi…

学习Spring Boot:(十八)Spring Boot 中session共享

前言 前面我们将 Redis 集成到工程中来了&#xff0c;现在需要用它来做点实事了。这次为了解决分布式系统中的 session 共享的问题&#xff0c;将 session 托管到 Redis。 正文 引入依赖 除了上篇文章中引入 spring-boot-starter-data-redis&#xff0c;还需要 spring-sess…

学习Spring Boot:(十九)Shiro 中使用缓存

前言 在 shiro 中每次去拦截请求进行权限认证的时候&#xff0c;都会去数据库查询该用户的所有权限信息&#xff0c; 这个时候就是有一个问题了&#xff0c;因为用户的权限信息在短时间内是不可变的&#xff0c;每次查询出来的数据其实都是重复数据&#xff0c;没必要每次都去…

学习Spring Boot:(二十五)使用 Redis 实现数据缓存

前言 由于 Ehcache 存在于单个 java 程序的进程中&#xff0c;无法满足多个程序分布式的情况&#xff0c;需要将多个服务器的缓存集中起来进行管理&#xff0c;需要一个缓存的寄存器&#xff0c;这里使用的是 Redis。 正文 当应用程序要去缓存中读取数据&#xff0c;但是缓存…

学习Spring Boot:(二十七)Spring Boot 2.0 中使用 Actuator

前言 主要是完成微服务的监控&#xff0c;完成监控治理。可以查看微服务间的数据处理和调用&#xff0c;当它们之间出现了异常&#xff0c;就可以快速定位到出现问题的地方。 springboot - version: 2.0 正文 依赖 maven 项目 在 pom.xml 文件中加入 actuator 的依赖&…

php实现注销功能,laravel 实现用户登录注销并限制功能

在项目根目录输入&#xff1a; php artisan make:controller Admin/LoginControllerphp artisan make:model Model/Admin -m运行之后 项目中会新增两个PHP文件新创建了admins用户表&#xff0c;此用户表默认新建中只有主键&#xff0c;创建时间&#xff0c;编辑时间。我们接下来…

将ipynb文件转为py的简单方法(图文并茂)

打开可以使用jupyter命令的命令窗口&#xff08;如果没有jupyter则需要先安装jupyter&#xff09;&#xff0c;cd 命令进入到 ipynb 文件所在的文件夹&#xff0c;执行 jupyter nbconvert --to script xxx.ipynb 即可完成 ipynb 文件到 py 文件的转化&#xff0c;执行 jupyter …

学习Spring Boot:(二十八)Spring Security 权限认证

前言 主要实现 Spring Security 的安全认证&#xff0c;结合 RESTful API 的风格&#xff0c;使用无状态的环境。 主要实现是通过请求的 URL &#xff0c;通过过滤器来做不同的授权策略操作&#xff0c;为该请求提供某个认证的方法&#xff0c;然后进行认证&#xff0c;授权成…