1、ActiveMQ 的网络连接
activeMQ 如果要实现扩展性和高可用性的要求的话,就需要用用到网络连接模式。
NetworkConnector:主要用来配置 broker 与 broker 之间的通信连接
如上图所示,MQ 服务器1 和MQ 服务器2 通过 NewworkConnector 相连,则生产者1和生产者2发送消息,消费者3和消费者4都可以接收到,而生产者3和生产者4发送的消息,消费者1和消费者2同样也可以接收到 ,这样就增强了消息服务器的可用性。
静态网络连接
假设 MQ 服务器1 的 ip 地址是192.168.110.60, MQ 服务器2 的 ip 地址是192.168.110.61
配置说明
修改 MQ 服务器1 的 activeMQ.xml,在 broker 节点里增加如下配置:
<networkConnectors> <networkConnector uri="static://(tcp://192.168.110.60:61616,tcp://192.168.110.61:61616)"/>
</networkConnectors>
修改 MQ 服务器2 的 activeMQ.xml,在 broker 节点里增加如下配置:
<networkConnectors> <networkConnector uri="static://(tcp://192.168.110.61:61616,tcp://192.168.110.60:61616)"/>
</networkConnectors>
重启两台服务器。
生产者连接到 MQ 服务器1(tcp://192.168.110.60:61616)发送消息,消费者连接到 MQ 服务器2(tcp://192.168.110.61:61616)可以获取消息,同时,生产者连接到 MQ 服务器2(tcp://192.168.110.61:61616)发送消息,消费者连接到 MQ 服务器1(tcp://192.168.110.60:61616)也可以获取消息。
两个 broker 通过一个 static 的协议来进行网络连接,一个 Consumer 连接到 broker2 的一个地址上,当 Producer 在 broker1 上以相同的地址发送消息时,此时消息会被转移到Broker2上,也就是说 broker1 会转发消息到 broker2 上。
2、消息回流机制
上面的配置,存在一个问题,如果生产者1向 MQ 服务器1发送10条消息,消费者3从 MQ 服务器2获取了10条消息,此时,万一 MQ 服务器2宕机了,此时这10条消息就会丢失,MQ 服务器1上的消息被 MQ 服务器2拿走了,MQ 服务器1上的消费者读取不到这些消息了。
activemq 从5.6版本开始,在 destinationPolicy 上新增了一个选项 replayWhenNoConsumers 属性设置为 true,这个属性可以用来解决当一个 broker上有需要转发的消息但是没有消费者时,把消息回流到它原始的 broker。同时把enableAudit 设置为false,为了防止消息回流后被当作重复消息而不被分发。
在两台服务器的 activeMQ.xml中,通过如下配置,即可完成消息回流处理。
<policyEntry queue=">" enableAudit="false"> <networkBridgeFilterFactory> <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/> </networkBridgeFilterFactory>
</policyEntry>
3、容错的链接
前面讲述的都是 Client 配置连接到指定的 broker 上,但是如果 broker 的连接失败怎么办呢?默认的情况下,这种协议用于随机的去选择一个链接去连接,如果连接失败了,那么会连接到其他的broker上。默认配置定义了延迟重新连接,意味着传输将会在10s后自动去重新连接可用的broker.当然所以有的重新连接参数都可以根据应用的需要而配置的。
在客户端程序中创建 ConnectionFactory 时使用如下代码即可做到容错的连接配置:
//随机连接不同的broker
ConnectionFactory factory = new ActiveMQConnectionFacory("failover:(tcp://192.168.110.60:61616,tcp://192.168.110.61:61616)?randomize=true")
Failover 协议可用的配置参数
4、动态网络连接
ActiveMQ 使用 Multicast (组播)协议建立服务与远程的 broker 服务的网络链接,实现 broker 的发现机制。
配置一个组播的ip地址,当前网络内所有的 broker 回向组播的服务端发送消息,证明自己是存活的,并且建立连接,这种动态连接的方式会一直不断地进行连接,会消耗网络资源,因此,不建议使用动态网络连接。