ActiveMQ源码解析 建立连接

作为一个消息中间件,有客户端和服务端两部分代码,这次的源码解析系列主要从客户端的代码入手,分成建立连接、消息发送、消息消费三个部分。趁着我昨天弄明白了源码编译的兴奋劲头还没过去,今天研究一下建立连接的部分。

如果读起来吃力,代码部分可以略过,我把主要的功能点给加粗。

通常来说,客户端使用MQ的API建立时,可以分成两个步骤:

  1. 对于连接的配置,比如服务器IP地址,用户名和密码等等
  2. 建立连接并启动
    客户端示例代码:
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(username,password,url);  
ActiveMQConnection connection = connectionFactory.createConnection();  
connection.start();

可以看到主要的方法是ActiveMQConnectionFactory的构造函数,和createConnection(),以及connection中的start()方法。

ActiveMQConnectionFactory中的createConnection
构造函数比较简单,直接把传入的用户名密码和url放在变量里

public ActiveMQConnectionFactory(String userName, String password, URI brokerURL) {setUserName(userName);setPassword(password);setBrokerURL(brokerURL.toString());
}

createConnection方法指向了createActiveMQConnection方法,该方法中主要做的事情有三个:

  1. 建立Transport和通过Transport建立Connection
  2. 配置Connection,建立好的Transport对象会被放到Connection对象中
  3. 启动Transport
//建立Transport和通过Transport建立Connection
Transport transport = createTransport();
connection = createActiveMQConnection(transport, factoryStats);            
//配置
connection.setUserName(userName);            
connection.setPassword(password);            
configureConnection(connection);
//启动Transport
transport.start();

configureConnection(connection);这个方法的作用是对实例化出的ActiveMQConnetion对象中的参数的一系列配置,代码有点长就不上了。
对于我们来说其实主要想看的是连接是如何建立起来的,也就是

Transport transport = createTransport();
connection = createActiveMQConnection(transport, factoryStats);      

createTransport();方法中包含了对客户端传入的url的初步校验,主要是验证URL的合法性,而后调用工厂类TransportFactory.connection(url)来进行连接的建立。

我们客户端在建立连接的时候,有可能有TCP、UDP等等协议,AMQ实现了简单工厂类FactoryFinder,在TransportFactory.connection(url)方法中,先是通过FactoryFinder根据用户输入的url(比如tcp://192.168.0.1)来找到使用的协议工厂TcpTransportFactory,然后使用TcpTransportFactory中的类来进行连接的建立。这个过程从代码上来看有点曲折:

  1. TransportFactory的connect()调用findTransportFactory方法
  2. findTransportFactory调用FactoryFinder类的newInstance方法
  3. newInstance调用ObjectFactory类的create方法
  4. ObejctFactory是一个接口类,实现类是StandaloneObjectFactory,其中的create方法调用自身的loadClass方法
  5. loadClass方法中最终找到正确的类,返回至TransportFactory中
  6. 如果是tcp连接,最终得到的就是一个实例化的TcpTransportFactory类
public abstact class TransportFactory {
……private static final FactoryFinder TRANSPORT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/");public static Transport connect(URI location) throws Exception {TransportFactory tf = findTransportFactory(location);return tf.doConnect(location);}public static TransportFactory findTransportFactory(URI location) throws IOException {//拆分urlString scheme = location.getScheme();if (scheme == null) {throw new IOException("Transport not scheme specified: [" + location + "]");}TransportFactory tf = TRANSPORT_FACTORYS.get(scheme);if (tf == null) {// 调用FactoryFinder找到正确的TransportFactorytry {tf = (TransportFactory)TRANSPORT_FACTORY_FINDER.newInstance(scheme);TRANSPORT_FACTORYS.put(scheme, tf);} catch (Throwable e) {throw IOExceptionSupport.create("Transport scheme NOT recognized: [" + scheme + "]", e);}}return tf;}
……
}
public class FactoryFinder {
……//通过ObjectFactory来找到正确的TransportFactorypublic Object newInstance(String key) throws IllegalAccessException, InstantiationException, IOException, ClassNotFoundException {return objectFactory.create(path+key);}
……
}

ObjectFactory的设计也是很有趣的。AMQ在代码中的说法是之所以这么实现是因为这样如果用户想自己写一个ObjectFactory,也可以支持。

/*** The strategy that the FactoryFinder uses to find load and instantiate Objects* can be changed out by calling the* {@link org.apache.activemq.util.FactoryFinder#setObjectFactory(org.apache.activemq.util.FactoryFinder.ObjectFactory)}* method with a custom implementation of ObjectFactory.** The default ObjectFactory is typically changed out when running in a specialized container* environment where service discovery needs to be done via the container system.  For example,* in an OSGi scenario.*/public interface ObjectFactory {/*** @param path the full service path* @return*/public Object create(String path) throws IllegalAccessException, InstantiationException, IOException, ClassNotFoundException;}

Anyway,我们现在通过这么曲折的过程得到了一个实例化的TcpTransportFactory对象,下一步应该是调用doConnect(url)方法进行连接的建立了。因为TcpTransportFactory继承了TransportFactory类,doConnect方法仍然是在TransportFactory中的:

public Transport doConnect(URI location) throws Exception {try {//把url里关于Transport的配置提取出来,WireFormat基本都可以看成是url的配置。//如果使用Openwire(默认协议),那么WireFormat就是openwire的相关配置。//见http://activemq.apache.org/configuring-wire-formats.htmlMap<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location));if( !options.containsKey("wireFormat.host") ) {options.put("wireFormat.host", location.getHost());}WireFormat wf = createWireFormat(options);//建立socket连接Transport transport = createTransport(location, wf);//装饰者模式,在连接上包装上MutexTransportFilter、WireFormatNegotiator、InactivityMonitor、ResponseCorrelator四个功能Transport rc = configure(transport, wf, options);//remove autoIntrospectionSupport.extractProperties(options, "auto.");if (!options.isEmpty()) {throw new IllegalArgumentException("Invalid connect parameters: " + options);}return rc;} catch (URISyntaxException e) {throw IOExceptionSupport.create(e);}}

这个方法中主要有三个重要功能:

  1. 配置wireformat
  2. 建立TcpTransport连接
  3. 在连接上包装四大辅助功能
    其中配置WireFormat可以不看,建立TcpTransport其实是在调用createTransport(location, wf);时引用了TcpTransport的构造函数,代码如下:
public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation,URI localLocation) throws UnknownHostException, IOException {this.wireFormat = wireFormat;this.socketFactory = socketFactory;try {this.socket = socketFactory.createSocket();} catch (SocketException e) {this.socket = null;}this.remoteLocation = remoteLocation;this.localLocation = localLocation;this.initBuffer = null;setDaemon(false);}

这里直接调用了socketFactory.createSocket();,使用的是默认的方法,所以无法指定本地网卡建立连接。我看了下其实可以用socketFactory.createSocket(host, port, localHost, localPort)来改写,改写后就可以指定本地IP和端口了。

此外,查了下网络上的资料,四大辅助后续再看:

MutexTransportFilter类实现了对每个请求的同步锁,同一时间只允许发送一个请求,如果有第二个请求需要等待第一个请求发送完毕才可继续发送。

WireFormatNegotiator类实现了在客户端连接broker的时候先发送数据解析相关的协议信息,例如解析版本号,是否使用缓存等信息。

InactivityMonitor类实现了连接成功后启动心跳检查机制,客户端每10秒发送一次心跳信息,服务端每30秒读一次心跳信息,如果没有读到则会断开连接,心跳检测是相互的,客户端也会每30秒读取服务端发送来的心跳信息,如果没有读到也一样会断开连接。

ResponseCorrelator类实现了异步请求但需要获取响应信息否则就会阻塞等待功能。

ActiveMQConnection的Start()
在使用AMQ的过程中,很多用户问我为什么Connection需要start(),不能在createConnection的时候直接start了吗?而且不调用start方法其实不影响发送,但是会影响接收。抱着这样的疑惑,我们来看一下源码。

 /*** Starts (or restarts) a connection's delivery of incoming messages. A call* to <CODE>start</CODE> on a connection that has already been started is* ignored.** @throws JMSException if the JMS provider fails to start message delivery*                 due to some internal error.* @see javax.jms.Connection#stop()*/@Overridepublic void start() throws JMSException {checkClosedOrFailed();ensureConnectionInfoSent();if (started.compareAndSet(false, true)) {for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {ActiveMQSession session = i.next();session.start();}}}

源码里直接对start方法加了注释,说明start就是启动connection可以接收消息的功能。其实源码里可以很明显看出来start里包含了几个步骤:

  1. 检查连接是否关闭或失效
  2. 确认客户端的ConnectionInfo是否被送到服务器
  3. 启动这个Connection中的每一个Session

我好奇的是第二步,看看源码

 /*** Send the ConnectionInfo to the Broker** @throws JMSException*/protected void ensureConnectionInfoSent() throws JMSException {synchronized(this.ensureConnectionInfoSentMutex) {// Can we skip sending the ConnectionInfo packet??if (isConnectionInfoSentToBroker || closed.get()) {return;}//TODO shouldn't this check be on userSpecifiedClientID rather than the value of clientID?if (info.getClientId() == null || info.getClientId().trim().length() == 0) {info.setClientId(clientIdGenerator.generateId());}syncSendPacket(info.copy(), getConnectResponseTimeout());this.isConnectionInfoSentToBroker = true;// Add a temp destination advisory consumer so that// We know what the valid temporary destinations are on the// broker without having to do an RPC to the broker.ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1), consumerIdGenerator.getNextSequenceId());if (watchTopicAdvisories) {advisoryConsumer = new AdvisoryConsumer(this, consumerId);}}}

从源码里还能看到讨论和待办……我觉得我已经深入核心了……这个方法里做了两件事,

发送ConnectionInfo的数据包到服务端,如果info里没有用户自己设定的clientID,还会自动帮忙生成一个。发送的时候调用的是syncSendPacket方法,很明显是个同步发送,需要服务端同步返回response才算发送成功,我理解这里应该是一个试探连接的步骤。
建立一个通往临时目的地的消费者。所以其实每一个ActiveMQConnection的连接中都自动包含了一个消费者。我临时写了个客户端试了下,的确是存在的。
在这里插入图片描述
奇葩的是我就算不调用connection.start()方法,直接发送消息,这个临时消费者也是存在的,所以肯定是在消息发送的时候的哪个地方调用了connection的start方法。

至于为什么不调用start()方法就无法消费,我看了下session的start方法:

/*** Start this Session.** @throws JMSException*/protected void start() throws JMSException {started.set(true);for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {ActiveMQMessageConsumer c = iter.next();c.start();}executor.start();}

原来是在session的start方法里启动了这个session里的consumer,想想session的建立过程的确是不需要调用session.start方法的,但是我们一般是先调用start方法,而后建立consumer,这个逻辑顺序还是有点错乱……
等下一次研究接收端的源码时再深入吧。

本次发现的源码优化点

  1. socket建立时,使用不同的createSocket方法,指定本机IP和端口。
  2. 项目用到了advisory message,每当agent建立/断开连接的时候,ActiveMQ.Advisory.Connection中会生成一条消息,这个消息中带上了ConnectionInfo。项目就是使用这个来即时检测agent的在线和离线状态的。因此如果我们改一下ConnectionInfo,加上agent的一些重要信息,比如agent版本号,操作系统类型,真实IP地址等等,会在获取agent信息的即时性上得到很大的提高。

我真的去试了一下……在ConnectionInfo里添加了一条test=test,然后重新编译服务端和客户端的依赖jar包,开启MQ的logging plugins,并且用客户端去监听了一下ActiveMQ.Advisory.Connection,得到了这样的结果。
在这里插入图片描述

ConnectionInfo {commandId = 1, 
responseRequired = true, 
connectionId = ID:Air.local-51230-1502000963732-1:1, 
clientId = ID:Air.local-51230-1502000963732-0:1, 
clientIp = tcp://127.0.0.1:51231, 
userName = null, password = *****, 
test = test, 
brokerPath = null, 
brokerMasterConnector = false, 
manageable = true, 
clientMaster = true, 
faultTolerant = true, 
failoverReconnect = false}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/249538.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

原生Js_实现广告弹窗

广告样式当页面加载后5s刷新在右下角 <!DOCTYPE html> <html><head><meta charset"utf-8" /><title>Gary图片轮播</title><style type"text/css">#ad{width:300px;height: 300px;background-color:antiquewhite…

springcloud注册中心eureka

1、前提 springcloud的注册中心是以springboot为基础搭建起来的。 开发工具&#xff1a;IDEA 项目管理工具&#xff1a;maven 2、搭建步骤 创建一个web项目&#xff08;建议使用IDEA工具构建项目&#xff09;修改pom文件 <dependency><groupId>org.springframework…

Nancy in .Net Core学习笔记 - 视图引擎

前文中我们介绍了Nancy中的路由&#xff0c;这一篇我们来介绍一下Nancy中的视图引擎。 Nancy中如何返回一个视图(View) 在ASP.NET Mvc中&#xff0c;我们使用ViewResult类来返回一个视图。Nancy中也提供了类似的功能, 在NancyModule类中&#xff0c;Nancy提供了一个ViewRendere…

设计模式之组合模式(Composite 模式)

引入composite模式 在计算机文件系统中&#xff0c;有文件夹的概念&#xff0c;文件夹里面既可以放入文件也可以放入文件夹&#xff0c;但是文件中却不能放入任何东西。文件夹和文件构成了一种递归结构和容器结构。 虽然文件夹和文件是不同的对象&#xff0c;但是他们都可以被放…

HierarchicalBeanFactory接口

HierarchicalBeanFactory 提供父容器的访问功能.至于父容器的设置,需要找ConfigurableBeanFactory的setParentBeanFactory(接口把设置跟获取给拆开了!). HierarchicalBeanFactory源码具体&#xff1a; 1、第一个方法返回本Bean工厂的父工厂。这个方法实现了工厂的分层。 2、第…

C++: C++函数声明的时候后面加const

C: C函数声明的时候后面加const 转自&#xff1a;http://blog.csdn.net/zhangss415/article/details/7998123 非静态成员函数后面加const&#xff08;加到非成员函数或静态成员后面会产生编译错误&#xff09;&#xff0c;表示成员函数隐含传入的this指针为const指针&#xff0…

【计蒜客习题】消除字符串

问题描述 蒜头君喜欢中心对称的字符串&#xff0c;即回文字符串。现在蒜头君手里有一个字符串 SS&#xff0c;蒜头君每次都会进行这样的操作&#xff1a;从 SS 中挑选一个回文的子序列&#xff0c;将其从字符串 SS 中去除&#xff0c;剩下的字符重组成新的字符串 SS。 蒜头君想…

Training a classifier

你已经学习了如何定义神经网络&#xff0c;计算损失和执行网络权重的更新。 现在你或许在思考。 What about data? 通常当你需要处理图像&#xff0c;文本&#xff0c;音频&#xff0c;视频数据&#xff0c;你能够使用标准的python包将数据加载进numpy数组。之后你能够转换这些…

ListableBeanFactory接口

ListableBeanFactory获取bean时,Spring 鼓励使用这个接口定义的api. 还有个Beanfactory方便使用.其他的4个接口都是不鼓励使用的. 提供容器中bean迭代的功能,不再需要一个个bean地查找.比如可以一次获取全部的bean(太暴力了),根据类型获取bean.在看SpringMVC时,扫描包路径下的…

面向对象之三大特性:继承,封装,多态

python面向对象的三大特性&#xff1a;继承&#xff0c;封装&#xff0c;多态。 1. 封装: 把很多数据封装到⼀个对象中. 把固定功能的代码封装到⼀个代码块, 函数, 对象, 打包成模块. 这都属于封装的思想. 具体的情况具体分析. 比如. 你写了⼀个很⽜B的函数. 那这个也可以被称为…

configurablebeanfactory

ConfigurableBeanFactory定义BeanFactory的配置.ConfigurableBeanFactory中定义了太多太多的api,比如类加载器,类型转化,属性编辑器,BeanPostProcessor,作用域,bean定义,处理bean依赖关系,合并其他ConfigurableBeanFactory,bean如何销毁. ConfigurableBeanFactory同时继承了Hi…

外观模式

一、什么是外观模式   有些人可能炒过股票&#xff0c;但其实大部分人都不太懂&#xff0c;这种没有足够了解证券知识的情况下做股票是很容易亏钱的&#xff0c;刚开始炒股肯定都会想&#xff0c;如果有个懂行的帮帮手就好&#xff0c;其实基金就是个好帮手&#xff0c;支付宝…

OC内存管理

OC内存管理 一、基本原理 &#xff08;一&#xff09;为什么要进行内存管理。 由于移动设备的内存极其有限&#xff0c;所以每个APP所占的内存也是有限制的&#xff0c;当app所占用的内存较多时&#xff0c;系统就会发出内存警告&#xff0c;这时需要回收一些不需要再继续使用的…

面试题集锦

1. L1范式和L2范式的区别 (1) L1范式是对应参数向量绝对值之和 (2) L1范式具有稀疏性 (3) L1范式可以用来作为特征选择&#xff0c;并且可解释性较强&#xff08;这里的原理是在实际Loss function 中都需要求最小值&#xff0c;根据L1的定义可知L1最小值只有0&#xff0c;故可以…

Spring注解配置工作原理源码解析

一、背景知识 在【Spring实战】Spring容器初始化完成后执行初始化数据方法一文中说要分析其实现原理&#xff0c;于是就从源码中寻找答案&#xff0c;看源码容易跑偏&#xff0c;因此应当有个主线&#xff0c;或者带着问题、目标去看&#xff0c;这样才能最大限度的提升自身代…

Spring--Context

应用上下文 Spring通过应用上下文&#xff08;Application Context&#xff09;装载bean的定义并把它们组装起来。Spring应用上下文全权负责对象的创建和组装。Spring自带了多种应用上下文的实现&#xff0c;它们之间主要的区别仅仅在于如何加载配置。 1.AnnotationConfigApp…

了解PID控制

2019-03-07 【小记】 了解PID控制 比例 - 积分 - 微分 积分 --- 记忆过去 比例 --- 了解现在 微分 --- 预测未来 转载于:https://www.cnblogs.com/skullboyer/p/10487884.html

program collections

Java byte & 0xff byte[] b new byte[1];b[0] -127;System.out.println("b[0]:"b[0]"; b[0]&0xff:"(b[0] & 0xff));//output:b[0]:-127; b[0]&0xff:129计算机内二进制都是补码形式存储&#xff1a; b[0]: 补码&#xff0c;10000001&…

Spring ConfigurationClassPostProcessor Bean解析及自注册过程

一bean的自注册过程 二,自注册过程说明 1 configurationclassparser解析流程 1、处理PropertySources注解&#xff0c;配置信息的解析 2、处理ComponentScan注解&#xff1a;使用ComponentScanAnnotationParser扫描basePackage下的需要解析的类(SpringBootApplication注解也包…

2019第二周作业

基础作业 实验代码 #include<stdlib.h> int main(void) {FILE*fp;int num[4],i,b,max;char op;if((fpfopen("c:\\tmj.txt","r"))NULL){ printf("File open error!\n"); exit(0);}for(i0;i<4;i){fscanf(fp,"%d%c",&nu…