ActiveMQ的传输协议 一、是什么 二、协议 1.TCP(默认) 2.NIO 3.AMQP 4.STOMP 5.SSL 6.MQTT 7 WS 三、NIO配置案例 1.修改activemq.xml 2.重启 3.生产者/消费者 4.性能提升 4.1 配置 4.2 生产者/消费者
一、是什么
官网地址:http://activemq.apache.org/configuring-version-5-transports.html ActiveMQ支持的client-broker通讯协议有:TVP、NIO、UDP、SSL、Http(s)、VM。 其中配置Transport Connector的文件在ActiveMQ安装目录的conf/activemq.xml中的<transportConnectors>标签之内。 URI描述信息的头部都是采用协议名称,唯独在进行openwire协议描述时,URI头却采用的“tcp://······”。这是因为ActiveMQ中默认的消息协议就是openwire。
二、协议
1.TCP(默认)
Transmission Control Protocol(TCP) 1.这是默认的Broker配置,TCP的Client监听端口61616 2.在网络传输数据前,必须要先序列化数据,消息是通过一个叫wire protocol的来序列化成字节流。 3.TCP连接的URI形式如:tcp://HostName:port?key=value&key=value,后面的参数是可选的。 4.TCP传输的的优点: TCP协议传输可靠性高,稳定性强 高效率:字节流方式传递,效率很高 有效性、可用性:应用广泛,支持任何平台 协议参数文档地址:https://activemq.apache.org/components/classic/documentation/tcp-transport-reference
2.NIO
New I/O API Protocol(NIO) 1.NIO协议和TCP协议类似,但NIO更侧重于底层的访问操作。它允许开发人员对同一资源可有更多的client调用和服务器端有更多的负载。
2.适合使用NIO协议的场景: 可能有大量的Client去连接到Broker上,一般情况下,大量的Client去连接Broker是被操作系统的线程所限制的。因此,NIO的实现比TCP需要更少的线程去运行,所以建议使用NIO协议。 可能对于Broker有一个很迟钝的网络传输,NIO比TCP提供更好的性能。 3.NIO连接的URI形式:nio://hostname:port?key=value&key=value 协议参数文档地址:https://activemq.apache.org/components/classic/documentation/nio-transport-reference 默认端口:61618 NIO和TCP协议的编码是一样的,所以只需要替换URL的协议即可实现切换。
3.AMQP
Advanced Message Queuing Protocol(AMQP) 一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件限制。 默认端口:5672 注意:编码和TCP不一样
4.STOMP
Streaming Text Orientation Message Protocol(STOMP) 是流文本定向消息协议
,是一种为MOM(Message Oriented Middleware,面向消息中间件)设计的简单文本协议。 默认端口:61613
5.SSL
Secure Sockets Layer Protocol(SSL)
6.MQTT
Message Queuing Telemetry Transport(MQTT):消息队列遥测传输) IBM开发的一个即时通讯协议,有可能成为物联网的重要组成部分。该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当作传感器和致动器(比如通过Twitter让房屋联网)的通信协议。 默认端口:1883 扩展:https://github.com/fusesource/mqtt-client
7 WS
三、NIO配置案例
1.修改activemq.xml
打开activemq的配置文件,在active安装目录下:conf/activemq.xml 将下面的内容复制到<transportConnectors>标签内
< transportConnector name = " nio" uri = " nio://0.0.0.0:61618?trace=true" />
如果你不特别指定ActiveMQ的网络监听端口,那么这些端口都讲使用BIO网络IO模型 所以为了首先提高单节点的网络吞吐性能,我们需要明确指定ActiveMQ网络IO模型。 如上所示:URI格式头以“nio”开头,表示这个端口使用以TCP协议为基础的NIO网络IO模型。
2.重启
./bin/active
去控制台查看,是否加成功了。
3.生产者/消费者
NIO和TCP协议的编码是一样的,所以只需要替换URL的协议即可实现切换。
package com. qingsi. activemq ; import org. apache. activemq. ActiveMQConnectionFactory ; import javax. jms. * ; public class JmsProduce { public static final String ACTIVEMQ_URL = "nio://192.168.86.128:61618" ; public static final String QUEUE_NAME = "transport_nio" ; public static void main ( String [ ] args) throws JMSException { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory ( ACTIVEMQ_URL ) ; Connection connection = activeMQConnectionFactory. createConnection ( ) ; connection. start ( ) ; Session session = connection. createSession ( false , Session . AUTO_ACKNOWLEDGE ) ; Queue queue = session. createQueue ( QUEUE_NAME ) ; MessageProducer producer = session. createProducer ( queue) ; for ( int i = 0 ; i < 3 ; i++ ) { TextMessage textMessage = session. createTextMessage ( "tx msg--" + i) ; producer. send ( textMessage) ; } producer. close ( ) ; session. close ( ) ; connection. close ( ) ; } }
4.性能提升
问题:URI格式以"nio"开头,代表这个端口使用TCP协议为基础的NIO网络模型。但是这样的设置方式,只能使这个端口支持Openwire协议。 需要将所有的BIO模型,都替换成NIO模型,性能得到提升。
4.1 配置
官网地址:https://activemq.apache.org/components/classic/documentation/auto 将下面的配置,写入到activemq.xml
< transportConnector name = " auto+nio" uri = " auto+nio://0.0.0.0:61608?maximumConnections=1000& wireFormat.maxFrameSize=104857600& org.apache.activemq.transport.nio.SelectorManager.corePoolSize=20& org.apache.activemq.transport.nio.SelectorManager.maximumPoolSize=50" />
./bin/activemq restart
4.2 生产者/消费者
都是只改动了URL,其他的代码一样。所以下面举例了生产者的URL 只要
package com. qingsi. activemq ; import org. apache. activemq. ActiveMQConnectionFactory ; import javax. jms. * ; public class JmsProduce { public static final String ACTIVEMQ_URL = "nio://192.168.86.128:61608" ; public static final String QUEUE_NAME = "nio_auto" ; public static void main ( String [ ] args) throws JMSException { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory ( ACTIVEMQ_URL ) ; Connection connection = activeMQConnectionFactory. createConnection ( ) ; connection. start ( ) ; Session session = connection. createSession ( false , Session . AUTO_ACKNOWLEDGE ) ; Queue queue = session. createQueue ( QUEUE_NAME ) ; MessageProducer producer = session. createProducer ( queue) ; for ( int i = 0 ; i < 3 ; i++ ) { TextMessage textMessage = session. createTextMessage ( "tx msg--" + i) ; producer. send ( textMessage) ; } producer. close ( ) ; session. close ( ) ; connection. close ( ) ; } }