一、问题描述
在上一篇《由浅入深了解Thrift之服务模型和序列化机制》文章中,我们已经了解了thrift的基本架构和网络服务模型的优缺点。如今的互联网圈中,RPC服务化的思想如火如荼。我们又该如何将thrift服务化应用到我们的项目中哪?实现thrift服务化前,我们先想想这几个问题:服务注册、服务发现、服务健康检测、服务“Load Balance”、隐藏client和server端的交互细节、服务调用端的对象池化。
服务的注册、发现和健康检测,我们使用zookeeper可以很好的解决
服务“Load Balance",我们可以使用简单的算法“权重+随机”,当然也可以使用成熟复杂的算法
服务调用端的对象池化,我们可以使用common pool,使用简单又可以满足我们的需求
二、实现思路
1、thrift server端启动时,每个实例向zk集群以临时节点方式注册(这样,遍历zk上/server下有多少个临时节点就知道有哪些server实例)
thrift server端可以单机多端口多实例或多机部署多实例方式运行。
2、服务调用方实现一个连接池,连接池初始化时,通过zk将在线的server实例信息同步到本地并缓存,同时监听zk下的节点变化。
3、服务调用方与Server通讯时,从连接池中取一个可用的连接,用它实现RPC调用。
三、具体实现
1、thrift server端
thrift server端,向zk中注册server address
packagecom.wy.thriftpool.commzkpool;importjava.lang.instrument.IllegalClassFormatException;importjava.lang.reflect.Constructor;importorg.apache.thrift.protocol.TBinaryProtocol;importorg.apache.thrift.protocol.TBinaryProtocol.Factory;importorg.apache.thrift.server.TServer;importorg.apache.thrift.server.TThreadedSelectorServer;importorg.apache.thrift.transport.TFramedTransport;importorg.apache.thrift.transport.TNonblockingServerSocket;importorg.springframework.beans.factory.InitializingBean;importcom.wy.thrift.service.UserService.Processor;importcom.wy.thriftpool.commzkpool.support.ThriftServerAddressReporter;importcom.wy.thriftpool.commzkpool.support.ThriftServerIpTransfer;importcom.wy.thriftpool.commzkpool.support.impl.LocalNetworkIpTransfer;/*** thrift server端,向zk中注册server address
*
*@authorwy
**/
public class ThriftServiceServerFactory implementsInitializingBean {//thrift server 服务端口
privateInteger port;//default 权重
private Integer priority = 1;//service实现类
privateObject service;//thrift server 注册路径
privateString configPath;privateThriftServerIpTransfer ipTransfer;//thrift server注册类
privateThriftServerAddressReporter addressReporter;//thrift server开启服务
privateServerThread serverThread;
@Overridepublic void afterPropertiesSet() throwsException {if (ipTransfer == null) {
ipTransfer= newLocalNetworkIpTransfer();
}
String ip=ipTransfer.getIp();if (ip == null) {throw new NullPointerException("cant find server ip...");
}
String hostname= ip + ":" + port + ":" +priority;
Class extends Object> serviceClass =service.getClass();
ClassLoader classLoader=Thread.currentThread().getContextClassLoader();
Class>[] interfaces =serviceClass.getInterfaces();if (interfaces.length == 0) {throw new IllegalClassFormatException("service-class should implements Iface");
}//reflect,load "Processor";
Processor> processor = null;for (Class>clazz : interfaces) {
String cname=clazz.getSimpleName();if (!cname.equals("Iface")) {continue;
}
String pname= clazz.getEnclosingClass().getName() + "$Processor";try{
Class> pclass =classLoader.loadClass(pname);if (!pclass.isAssignableFrom(Processor.class)) {continue;
}
Constructor> constructor =pclass.getConstructor(clazz);
processor= (Processor>) constructor.newInstance(service);break;
}catch(Exception e) {//TODO
}
}if (processor == null) {throw new IllegalClassFormatException("service-class should implements Iface");
}//需要单独的线程,因为serve方法是阻塞的.
serverThread = newServerThread(processor, port);
serverThread.start();//report
if (addressReporter != null) {
addressReporter.report(configPath, hostname);
}
}class ServerThread extendsThread {privateTServer server;
ServerThread(Processor> processor, int port) throwsException {//设置传输通道
TNonblockingServerSocket serverTransport = newTNonblockingServerSocket(port);//设置二进制协议
Factory protocolFactory = newTBinaryProtocol.Factory();
TThreadedSelectorServer.Args tArgs= newTThreadedSelectorServer.Args(serverTransport);
tArgs.processor(processor);
tArgs.transportFactory(newTFramedTransport.Factory());
tArgs.protocolFactory(protocolFactory);int num = Runtime.getRuntime().availableProcessors() * 2 + 1;
tArgs.selectorThreads(num);
tArgs.workerThreads(num* 10);//网络服务模型
server = newTThreadedSelectorServer(tArgs);
}
@Overridepublic voidrun() {try{
server.serve();
}catch(Exception e) {//TODO
}
}public voidstopServer() {
server.stop();
}
}public voidclose() {
serverThread.stopServer();
}public voidsetService(Object service) {this.service =service;
}public voidsetPriority(Integer priority) {this.priority =priority;
}public voidsetPort(Integer port) {this.port =port;
}public voidsetIpTransfer(ThriftServerIpTransfer ipTransfer) {this.ipTransfer =ipTransfer;
}public voidsetAddressReporter(ThriftServerAddressReporter addressReporter) {this.addressReporter =addressReporter;
}public voidsetConfigPath(String configPath) {this.configPath =configPath;
}
}
View Code
thrift server address注册到zk
packagecom.wy.thriftpool.commzkpool.support.impl;importorg.apache.curator.framework.CuratorFramework;importorg.apache.curator.framework.imps.CuratorFrameworkState;importorg.apache.zookeeper.CreateMode;importcom.wy.thriftpool.commzkpool.support.ThriftServerAddressReporter;/*** thrift server address注册到zk
*
*@authorwy
**/
public class DynamicAddressReporter implementsThriftServerAddressReporter {privateCuratorFramework zookeeper;publicDynamicAddressReporter() {
}publicDynamicAddressReporter(CuratorFramework zookeeper) {this.zookeeper =zookeeper;
}public voidsetZookeeper(CuratorFramework zookeeper) {this.zookeeper =zookeeper;
}
@Overridepublic void report(String service, String address) throwsException {if (zookeeper.getState() ==CuratorFrameworkState.LATENT) {
zookeeper.start();
zookeeper.newNamespaceAwareEnsurePath(service);
}
zookeeper.create().creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
.forPath(service+ "/i_", address.getBytes("utf-8"));
}public voidclose() {
zookeeper.close();
}
}
View Code
。。。
spring配置文件
View Code
2、服务调用端
连接池实现
杯了个具,为啥就不能提交。代码在评论中。
连接池工厂,负责与Thrift server通信
packagecom.wy.thriftpool.commzkconnpool;importjava.net.InetSocketAddress;importorg.apache.commons.pool.PoolableObjectFactory;importorg.apache.thrift.transport.TSocket;importorg.apache.thrift.transport.TTransport;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importcom.wy.thriftpool.commzkpool.support.ThriftServerAddressProvider;/*** 连接池工厂,负责与Thrift server通信
*
*@authorwy
**/
public class ThriftPoolFactory implements PoolableObjectFactory{private final Logger logger =LoggerFactory.getLogger(getClass());//超时设置
public inttimeOut;private finalThriftServerAddressProvider addressProvider;privatePoolOperationCallBack callback;publicThriftPoolFactory(ThriftServerAddressProvider addressProvider, PoolOperationCallBack callback) {super();this.addressProvider =addressProvider;this.callback =callback;
}public ThriftPoolFactory(ThriftServerAddressProvider addressProvider, PoolOperationCallBack callback, inttimeOut) {super();this.addressProvider =addressProvider;this.callback =callback;this.timeOut =timeOut;
}/*** 创建对象*/@Overridepublic TTransport makeObject() throwsException {try{
InetSocketAddress address=addressProvider.selector();
TTransport transport= new TSocket(address.getHostName(), address.getPort(), this.timeOut);
transport.open();if (callback != null) {
callback.make(transport);
}returntransport;
}catch(Exception e) {
logger.error("creat transport error:", e);throw newRuntimeException(e);
}
}/*** 销毁对象*/@Overridepublic void destroyObject(TTransport transport) throwsException {if (transport != null &&transport.isOpen()) {
transport.close();
}
}/*** 检验对象是否可以由pool安全返回*/@Overridepublic booleanvalidateObject(TTransport transport) {try{if (transport != null && transport instanceofTSocket) {
TSocket thriftSocket=(TSocket) transport;if(thriftSocket.isOpen()) {return true;
}else{return false;
}
}else{return false;
}
}catch(Exception e) {return false;
}
}
@Overridepublic void activateObject(TTransport obj) throwsException {//TODO Auto-generated method stub
}
@Overridepublic void passivateObject(TTransport obj) throwsException {//TODO Auto-generated method stub
}public static interfacePoolOperationCallBack {//创建成功是执行
voidmake(TTransport transport);//销毁之前执行
voiddestroy(TTransport transport);
}
}
View Code
连接池管理
packagecom.wy.thriftpool.commzkconnpool;importorg.apache.thrift.transport.TSocket;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Service;/*** 连接池管理
*
*@authorwy
**/@Servicepublic classConnectionManager {private final Logger logger =LoggerFactory.getLogger(getClass());//保存local对象
ThreadLocal socketThreadSafe = new ThreadLocal();//连接提供池
@AutowiredprivateConnectionProvider connectionProvider;publicTSocket getSocket() {
TSocket socket= null;try{
socket=connectionProvider.getConnection();
socketThreadSafe.set(socket);returnsocketThreadSafe.get();
}catch(Exception e) {
logger.error("error ConnectionManager.invoke()", e);
}finally{
connectionProvider.returnCon(socket);
socketThreadSafe.remove();
}returnsocket;
}
}
View Code
spring配置文件
View Code
参考:http://www.cnblogs.com/mumuxinfei/p/3876187.html
由于本人经验有限,文章中难免会有错误,请浏览文章的您指正或有不同的观点共同探讨!