ActiveMQ + MQTT 集群搭建(虚机版本) + Springboot使用配置

文章目录

  • 前言
  • 一、ActiveMQ、 MQTT是什么?
    • 1.ActiveMQ介绍
    • 2.MQTT介绍
  • 二、集群搭建步骤
    • 1.下载apache-activemq-5.15.12-bin.tar.gz
    • 2.上传apache-activemq-5.15.12-bin.tar.gz到服务器并解压文件到文件夹clusters、master、slave三个文件夹下面形成三个节点(平等节点)
    • 3.分别修改三个节点的配置文件:activemq.xml,修改连接协议端口号和配置网络代理,修改如下:
    • 4.分别修改三个节点的WEB配置文件:jetty.xml,修改端口号,修改如下:
    • 5.分别到bin目录下运行activemq文件启动activemq,命令如下:
    • 6.全部节点启动后,进入web端界面查询集群配置情况:
  • 三、项目连接配置步骤
    • 1.Activemq连接服务配置:
    • 2.MQTT生产者连接配置:
    • 3.MQTT消费者连接配置:
    • 4.项目使用:
  • 总结


前言

随着技术的不断迭代,在分布式系统中应用消息组件进行通信已经是非常常见的方式,而为了保障消息中间件的高可用性就需要对中间件进行集群化部署,这是应用程序发展的必经之路。


一、ActiveMQ、 MQTT是什么?

1.ActiveMQ介绍

ActiveMQ官网
ActiveMQ是一个开源的、基于Java的消息中间件(Message Oriented Middleware,MOM)实现。它提供了可靠的异步消息传递的功能,用于在分布式系统中进行应用程序之间的通信。

以下是ActiveMQ的一些主要特点和功能:

1.1、 异步消息传递:ActiveMQ支持发布-订阅和点对点模式的消息传递。应用程序可以通过发送和接收消息来进行异步通信。

1.2、持久化和持久订阅:ActiveMQ可以将消息持久化到磁盘,以确保即使在消息发送者和接收者之间的断开连接或重启后,消息也能被正确接收。

1.3、 多种消息传递模式:ActiveMQ支持多种消息传递模式,包括点对点队列、主题订阅和点对点回复等。

1.4、基于JMS标准:ActiveMQ完全支持Java消息服务(JMS)规范,是JMS的一种实现。JMS提供了一系列的API和协议,用于在Java应用程序之间进行消息传递。

1.5、高可用性和故障转移:ActiveMQ支持故障转移和高可用性,可以通过配置多个broker实现自动故障转移和消息备份。

1.6、多种协议支持:ActiveMQ支持多种协议,如AMQP、STOMP、OpenWire、MQTT等。这使得ActiveMQ可以与不同的客户端和应用程序进行集成和通信。

1.7、 插件体系结构:ActiveMQ具有可扩展的插件体系结构,允许开发人员根据需求添加自定义功能和扩展。

1.8、可视化管理工具:ActiveMQ提供了可视化的管理界面,用于监控和管理消息队列、主题、连接等。

作为一种成熟而强大的消息中间件解决方案,ActiveMQ被广泛用于构建可靠的分布式系统、实现异步通信、实现解耦和提高应用程序的可伸缩性等场景。

2.MQTT介绍

MQTT(Message Queuing Telemetry Transport)是一种轻量级、开放、简单的消息传输协议,专门针对物联网(IoT)领域设计。它具有低带宽和低功耗的特点,适用于在资源受限的设备上进行可靠的通信。

以下是MQTT协议的一些关键特点:

1.1、 轻量级:MQTT协议设计简单,通信报文开销小,传输数据量较小,适用于带宽有限的网络环境,能够满足物联网设备的资源限制。

1.2、发布/订阅模式:MQTT采用发布/订阅模式,包含发送消息的发布者和接收消息的订阅者。发布者将消息发布到特定的主题上,而订阅者通过订阅感兴趣的主题来接收消息。

1.3、QoS支持:MQTT支持三种不同的服务质量(QoS)级别:QoS 0(至多一次传输)、QoS 1(至少一次传输)和QoS 2(恰好一次传输)。这种级别的支持确保了消息的可靠性和传递保证。

1.4、消息保留:MQTT支持在特定主题上保留最新的消息。这意味着当订阅者订阅一个主题时,它将立即接收到最新的保留消息,而不仅仅是实时发送的消息。

1.5、心跳机制:MQTT协议定义了心跳机制,通过发送心跳报文,保持客户端和代理服务器之间的连接有效性。如果客户端长时间没有发送心跳,代理服务器将断开连接。

1.6、安全性支持:MQTT提供了基于TLS/SSL的加密和身份验证机制,以确保消息的机密性和安全性。

1.7、广泛的应用:MQTT广泛应用于物联网领域,例如传感器网络、远程监测、智能家居、工业自动化等。

MQTT协议的轻量级和简单性使得它成为连接大量设备和传输数据的理想选择,尤其是在资源受限的物联网环境中。它以其可靠性、灵活性和互通性在物联网行业得到了广泛应用。


二、集群搭建步骤

1.下载apache-activemq-5.15.12-bin.tar.gz

官网下载地址

2.上传apache-activemq-5.15.12-bin.tar.gz到服务器并解压文件到文件夹clusters、master、slave三个文件夹下面形成三个节点(平等节点)

注意:三个节点是公平节点。一开始我是想做集群节点clusters做数据分发,然后master和slave做主从的。后面发现存在问题,在资源较少情况下clusters、clusters、slave都为单节点的情况下,clusters一挂掉,集群关系就破裂了,没有节点给master和slave做数据分发了,这样的配置不友好。
于是我就把三个节点配置成了平等节点,任何节点宕机都能正常运行。

3.分别修改三个节点的配置文件:activemq.xml,修改连接协议端口号和配置网络代理,修改如下:

3.1、配置默认的传输协议OpenWire 和 支持硬件的传输协议MQTT;
3.2、配置网络代理networkConnectors,做节点间数据传输;
3.3、duplex设置为true,则一个连接上可以双向流动消息(双工连接),默认值为false,默认情况下,在两个提供者之间的连接上的消息流动方向是单向(单工连接);
3.4、修改三个节点的brokerName为localhost_clusters、localhost_master、localhost_slave;

<transportConnectors><!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --><transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=10000&amp;wireFormat.maxFrameSize=104857600"/><transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=10000&amp;wireFormat.maxFrameSize=104857600"/></transportConnectors><!-- 配置网络代理,cluster 节点需要与 master 跟 slave 进行穿透 --><networkConnectors><networkConnector name="network-clusters" uri="static:(tcp://192.168.10.41:61617,tcp://192.168.10.41:61618)" duplex="true" /></networkConnectors><transportConnectors><!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --><transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=10000&amp;wireFormat.maxFrameSize=104857600"/><transportConnector name="mqtt" uri="mqtt://0.0.0.0:2884?maximumConnections=10000&amp;wireFormat.maxFrameSize=104857600"/></transportConnectors><!-- 配置网络代理,master 节点需要与 cluster 跟 slave 进行穿透 --><networkConnectors><networkConnector name="network-master" uri="static:(tcp://192.168.10.41:61616,tcp://192.168.10.41:61618)" duplex="true" /></networkConnectors><transportConnectors><!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --><transportConnector name="openwire" uri="tcp://0.0.0.0:61618?maximumConnections=10000&amp;wireFormat.maxFrameSize=104857600"/><transportConnector name="mqtt" uri="mqtt://0.0.0.0:2885?maximumConnections=10000&amp;wireFormat.maxFrameSize=104857600"/></transportConnectors><!-- 配置网络代理,slave 节点需要与 master 跟 cluster 进行穿透 --><networkConnectors><networkConnector name="network-slave" uri="static:(tcp://192.168.10.41:61616,tcp://192.168.10.41:61617)" duplex="true" /></networkConnectors>
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost_clusters" dataDirectory="${activemq.data}"><broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost_master" dataDirectory="${activemq.data}"><broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost_slave" dataDirectory="${activemq.data}">

4.分别修改三个节点的WEB配置文件:jetty.xml,修改端口号,修改如下:

<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start"><!-- the default port number for the web console --><property name="host" value="0.0.0.0"/><property name="port" value="8161"/>
</bean><bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start"><!-- the default port number for the web console --><property name="host" value="0.0.0.0"/><property name="port" value="8171"/>
</bean><bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start"><!-- the default port number for the web console --><property name="host" value="0.0.0.0"/><property name="port" value="8181"/>
</bean>

5.分别到bin目录下运行activemq文件启动activemq,命令如下:

sh activemq start

6.全部节点启动后,进入web端界面查询集群配置情况:

6.1、进入web端界面http://192.168.10.41:8161、http://192.168.10.41:8171、http://192.168.10.41:8181,登录账号密码admin/admin,到Network查看是否有另外两个节点的连接情况,若有另外两个节点的连接信息并且Remote Address为true,则集群建立完毕;

6.2、图片如下:
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述


三、项目连接配置步骤

1.Activemq连接服务配置:

ActiveMQ连接配置开箱即用

failover是一种连接URL配置选项,用于指定多个ActiveMQ broker的连接地址。当一个broker发生故障或不可用时,客户端会自动尝试连接配置中的其他broker。以此机制来实现多节点的集群连接模式。

spring:activemq:broker-url: failover:(tcp://192.168.10.41:61616,tcp://192.168.10.41:61617,tcp://192.168.10.41:61618)user: adminpassword: adminpool:enabled: truepackages:trust-all: true

2.MQTT生产者连接配置:

注意:MQTT的连接配置是我自定义的,在项目里有相关代码配合使用;

mqtt:brokers: tcp://192.168.10.41:2884,tcp://192.168.10.41:2885,tcp://192.168.10.41:1883clientIds: dig-producer1,dig-producer2qos: 1userName: adminpassword: admin

3.MQTT消费者连接配置:

注意:MQTT的连接配置是我自定义的,在项目里有相关代码配合使用;

mqtt:topics: V5008Upload/#,V6800Upload/#qoss: 1,2brokers: tcp://192.168.10.41:2884,tcp://192.168.10.41:2885,tcp://192.168.10.41:1883clientIds: dig-consumer1,dig-consumer2userNames: adminwords: admin

4.项目使用:

4.1、ActiveMQ配置使用
activeMQ配置使用比较简单,也不是本文的重点,简单贴一点代码


import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.RedeliveryPolicy;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.config.SimpleJmsListenerContainerFactory;import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;@Configuration
public class ActiveMqConfig {@Value("${spring.activemq.broker-url}")private String brokerUrl;/*** 队列模式(消息将按顺序一个一个地被消费,每个消息只能被一个消费者接收)*/@Beanpublic JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory activeMQConnectionFactory) {// SimpleJmsListenerContainerFactory适用于JMS 1.1规范// 消息监听容器工厂SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory();// 关闭事务factory.setSessionTransacted(false);// 手动确认消息factory.setSessionAcknowledgeMode(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);// 设置监听容器工厂的发布订阅域为队列模式,即采用点对点消息传递模式factory.setPubSubDomain(false);factory.setConnectionFactory(activeMQConnectionFactory);return factory;}/*** 配置名字为givenConnectionFactory的连接工厂** @return*/@Bean("givenConnectionFactory")public ActiveMQConnectionFactory connectionFactory() throws JMSException {ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);// 自定义消费重试机制RedeliveryPolicy policy = new RedeliveryPolicy();// 消息处理失败重新处理次数,默认为5次policy.setMaximumRedeliveries(5);// 启用指数退避策略,以延长每次重试的间隔时间policy.setUseExponentialBackOff(Boolean.TRUE);// 设置初始重试延迟时间为0毫秒,意味着消息处理失败时立即进行重试policy.setInitialRedeliveryDelay(0);// 设置每次重试之间的延迟时间为3秒policy.setRedeliveryDelay(3000L);// 设置指数退避的增加倍数,每次重试的延迟时间将按比例增加policy.setBackOffMultiplier(2);// 设置最大重试延迟时间为20秒policy.setMaximumRedeliveryDelay(20000L);factory.setRedeliveryPolicy(policy);Connection connection = factory.createConnection();connection.start();return factory;}//    /**
//     *  发布-订阅模式(消息会被广播给所有订阅该主题的消费者)
//     */
//    @Bean("topicListener")
//    public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ActiveMQConnectionFactory givenConnectionFactory) {
//        // 设置为发布订阅模式, 默认情况下使用生产消费者方式
//        // DefaultJmsListenerContainerFactory 2.0规范
//        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
//        bean.setSessionTransacted(false);
//        bean.setSessionAcknowledgeMode(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
//        bean.setPubSubDomain(true);
//        bean.setConnectionFactory(givenConnectionFactory);
//        return bean;
//    }}
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;@Service
public class ActivimqProducer {@Autowiredprivate JmsMessagingTemplate jmsMessagingTemplate;/*** 发送队列模式** @param queueName* @param message*/public void sendMqQueue(String queueName, String message) {this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), message);}}
import com.test.common.enums.QueueType;
import com.test.local.mqtt.process.EquipmentAssetsProcess;
import com.test.local.mqtt.process.MqttDataProcessing;
import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;import javax.jms.Session;@Slf4j
@Component
public class ActivimqConsumer {@Autowiredprivate ThreadPoolTaskExecutor threadPoolTaskExecutor;@Autowiredprivate EquipmentAssetsProcess equipmentAssetsProcess;@JmsListener(destination = QueueType.LABEL_STATE, containerFactory = "jmsListenerContainerQueue")public void consumerLabelState(ActiveMQMessage activeMQMessage, String message, Session session) {if (StringUtils.isNotEmpty(message)) {threadPoolTaskExecutor.execute(new MqttDataProcessing(equipmentAssetsProcess,message,QueueType.LABEL_STATE,activeMQMessage,session));}}
}
import com.test.fastjson.JSON;
import com.test.common.constants.Constants;
import com.test.common.enums.PatternStatusEnum;
import com.test.common.enums.QueueType;
import com.test.common.redis.RedisCache;
import com.test.entity.TagInfo;
import com.test.local.entity.*;
import com.test.service.RegionCheckRecordService;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.command.ActiveMQMessage;
import org.springframework.beans.factory.annotation.Autowired;import javax.jms.Session;
import java.util.UUID;@Slf4j
public class MqttDataProcessing implements Runnable {@Autowiredprivate RedisCache redisCache;@Autowiredprivate RegionCheckRecordService regionCheckRecordService;private String topic;private String message;private EquipmentAssetsProcess equipmentAssetsProcess;private ActiveMQMessage activeMQMessage;private Session session;public MqttDataProcessing(EquipmentAssetsProcess equipmentAssetsProcess,String message,String topic,ActiveMQMessage activeMQMessage,Session session) {this.topic = topic;this.message = message;this.equipmentAssetsProcess = equipmentAssetsProcess;this.activeMQMessage = activeMQMessage;this.session = session;}@SneakyThrows@Overridepublic void run() {String logId = UUID.randomUUID().toString().replace("-", "");try {if (QueueType.LABEL_STATE.equals(topic)) {  LabelState labelState = JSON.parseObject(message, LabelState.class); if (labelState.getData() != null && labelState.getData().size() > 0) {equipmentAssetsProcess.processLabelState(labelState, logId);}activeMQMessage.acknowledge();}} catch (Exception e) {// 重发session.recover();log.error("异常,重新消费!logId={},topic={},message={}", logId, topic, message, e);}}}

4.2、MQTT配置使用
对于MQTT的分布式我是这么理解的:

在消费端,同时连接多个节点进行消费,硬件发送的消息定义一个唯一id,此时会有ABC三个消费者等待硬件发送过来的消息,于是使用redisson的分布式锁lock.tryLock来限制消息只被消费一次。

在生产端,同时连接多个节点进行消息发送,因为我们的硬件只能连接到一个节点上面(硬件不能支持多节点代理消费)在一个节点宕机后才会去尝试连接备选节点,所有我们对所有节点都发送消息,保证该消息能被硬件接收并消费到,另外两个节点多发送的消息也不会造成问题(直接无视了),因为硬件同一时刻只能连接一个节点进行消费。

消费者配置

import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.stereotype.Repository;@RefreshScope
@Repository
@Data
public class ConfigMqtt {private String[] topics = new String[]{"test1Upload/#","test2Upload/#","test3Upload/#"};// @Value("${mqtt.qoss}")private int[] qoss = new int[]{2,2,2};@Value("${mqtt.brokers}")private String[] brokers;@Value("${mqtt.clientIds}")private String[] clientIds;@Value("${mqtt.userNames}")private String userNames;@Value("${mqtt.words}")private String words;}
import com.alibaba.fastjson.JSON;
import com.test.common.redis.RedisCache;
import com.test.config.ConfigMqtt;
import com.test.util.HexConvert;
import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;import javax.annotation.Resource;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Map;
import java.util.concurrent.TimeUnit;/*** 订阅者:订阅硬件mqtt主题信息,消费硬件发送消息,转换硬件消息发送到ActiveMQ队列,最终到其他微服务处理ActiveMQ队列的消息*/
@Slf4j
@Service
public class MqttSubscription {@Autowiredprivate ConfigMqtt configMqtt;@Autowiredprivate SubscriptionJSON subscriptionJSON;@Autowiredprivate SubscriptionHEX subscriptionHEX;@Resourceprivate ThreadPoolTaskExecutor threadPoolTaskExecutor;@Resourceprivate RedissonClient redissonClient;@Resourceprivate RedisCache redisCache;@Beanpublic void client() throws Exception {String[] hosts = configMqtt.getBrokers();String[] clientIds = configMqtt.getClientIds();// 多个for (int i = 0; i < hosts.length; i++) {String host = hosts[i];String clientId = clientIds[i];try {InetAddress ip4 = Inet4Address.getLocalHost();clientId = clientId + "-" + ip4.getHostAddress();} catch (UnknownHostException e) {log.error("MqttSubscription-client-configMqtt" + configMqtt);log.error("MqttSubscription-client-e" + e);}String finalClientId = clientId;threadPoolTaskExecutor.execute(() -> this.myClient(host, finalClientId));}}private void myClient(String host, String clientId) {try {String[] topics = configMqtt.getTopics();int[] qos = configMqtt.getQoss();String userName = configMqtt.getUserNames();String passWord = configMqtt.getWords();// host为主机名,test为clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存MqttClient client = new MqttClient(host, clientId, new MemoryPersistence());// MQTT的连接设置MqttConnectOptions options = new MqttConnectOptions();// todo:ch:设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,断线重连会消费断线期间的消息options.setCleanSession(true);// 设置连接的用户名options.setUserName(userName);// 设置连接的密码options.setPassword(passWord.toCharArray());// 设置超时时间 单位为秒options.setConnectionTimeout(10);// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制options.setKeepAliveInterval(20);// 自动重连options.setAutomaticReconnect(true);// todo:ch:设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息options.setWill("willTopic", (clientId + "与服务器断开连接").getBytes(), 0, false);// 设置回调函数client.setCallback(new MqttCallback() {public void connectionLost(Throwable cause) {while (true) {try {client.connect(options);client.subscribe(topics, qos);break;} catch (Exception e) {e.printStackTrace();log.error("mqtt客户端id-clientId:" + clientId);log.error("mqtt连接异常-e", e);log.error("mqtt连接异常-cause" + cause);try {Thread.sleep(5000);} catch (InterruptedException ex) {ex.printStackTrace();}}}}public void messageArrived(String topic, MqttMessage message) throws Exception {String key = "MQTT-";String uuid = "";// 消息存在if (message.getPayload().length > 0) {byte[] req = message.getPayload();if (topic.contains("test1") && configMqtt.getDataFormat().equals("JSON")) {String data = new String(req);Map<String, Object> map = JSON.parseObject(data);if (map.containsKey("uuid") && map.get("uuid") != null) {uuid = map.get("uuid").toString();}else {uuid = clientId;}} else if (topic.contains("test1") && configMqtt.getDataFormat().equals("HEX")) {String data = HexConvert.convertStringToHex(req);uuid = data.subSequence(data.length() - 8, data.length()).toString();} else if (topic.contains("test2") || topic.contains("test3")) {String data = HexConvert.convertStringToHex(req);uuid = data.subSequence(data.length() - 8, data.length()).toString();}if (!Strings.isNullOrEmpty(uuid)) {key += uuid;}}// 分布式锁,防止多应用节点产生重复消息RLock lock = redissonClient.getLock(key);try {// 加锁,等待30秒锁自动释放, 不在finally手动释放了,给予30秒的缓冲时间boolean resultLock = lock.tryLock(0, 30, TimeUnit.SECONDS);if (resultLock) {String data = new String(message.getPayload());log.info("mqtt-clientId:" + clientId);log.info("mqtt-key:" + key);
//                            log.info("message-ID:" + message.getId());log.info("messageArrived-topic" + topic);log.info("messageArrived-message" + data);if (topic.contains("test1") && configMqtt.getDataFormat().equals("JSON")) {// 处理硬件的消息,发送到ActiveMQ,最终在别的微服务进行消息消费......} else if (topic.contains("test1") && configMqtt.getDataFormat().equals("HEX")) {// 处理硬件的消息,发送到ActiveMQ,最终在别的微服务进行消息消费......} else if (topic.contains("test2") || topic.contains("test3")) {// 处理硬件的消息,发送到ActiveMQ,最终在别的微服务进行消息消费......}}} catch (Exception e) {e.printStackTrace();log.error("mqtt客户端id-clientId:" + clientId);log.error("mqtt发布信息异常-e", e);log.error("mqtt发布信息异常-topic" + topic);log.error("mqtt发布信息异常-message" + message.toString());}
//                    finally {
//                        if (lock.isLocked()) {
//                            if (lock.isHeldByCurrentThread()) {
//                                lock.unlock();
//                            }
//                        }
//                    }}public void deliveryComplete(IMqttDeliveryToken token) {}});// todo:是否需要永久重新连接,能否设定固定重连次数    或者固定多少秒重连一次(类似心跳机制)int retryCount = 0;while (!client.isConnected()) {try {Thread.sleep(getBackoffTime(retryCount));client.connect(options);client.subscribe(topics, qos);} catch (Exception e) {log.error("Reconnect attempt failed", e);retryCount++;}}} catch (MqttException e) {log.error("mqtt客户端id-clientId:" + clientId);log.error("mqtt连接错误:", e);}}private long getBackoffTime(int retryCount) {// 使用指数退避算法计算重连时间long waitTime = Math.min(1000 * (1 << retryCount), 60000); // 最大等待时间为60秒return waitTime;}
}

生产者配置

import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.stereotype.Repository;@RefreshScope
@Repository
@Data
public class ConfigMqtt {@Value("${mqtt.brokers}")private String[] brokers;@Value("${mqtt.clientIds}")private String[] clientIds;@Value("${mqtt.qos}")private int qos;@Value("${mqtt.userName}")private String userName;@Value("${mqtt.password}")private String password;}
import com.test.local.config.ConfigMqtt;
import com.test.local.utils.HexConvert;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;import javax.annotation.Resource;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;/***  MQTT生产者,生产消息发送到硬件*/@Slf4j
@Service
public class MqttConnect {private volatile static MqttClient mqttClientSingleton;private volatile static List<MqttClient> mqttClientSingletonList = new ArrayList<>();@Autowiredprivate ConfigMqtt configMqtt;@Resourceprivate ThreadPoolTaskExecutor threadPoolTaskExecutor;private MqttConnect() {}/*** 创建 多个 mqtt 实例*/public static List<MqttClient> getMoreInstance(ConfigMqtt configMqtt) {String[] hosts = configMqtt.getBrokers();String[] clientIds = configMqtt.getClientIds();// 创建多个节点连接实例if (mqttClientSingletonList == null || mqttClientSingletonList.size() < hosts.length) {if (mqttClientSingletonList != null && !mqttClientSingletonList.isEmpty() && mqttClientSingletonList.size() < hosts.length) {mqttClientSingletonList.forEach(re -> {try {re.disconnect();re.close();} catch (Exception e) {}});// 清除原有实例mqttClientSingletonList.clear();}// 多个for (int i = 0; i < hosts.length; i++) {String clientIdRe = clientIds[i];String broker = hosts[i];String userName = configMqtt.getUserName();String password = configMqtt.getPassword();StringBuffer clientId = new StringBuffer();try {InetAddress ip4 = Inet4Address.getLocalHost();clientId.append(clientIdRe).append("-").append(ip4.getHostAddress()).append("-").append(HexConvert.getStringRandom(11));} catch (UnknownHostException e) {log.error("MqttClient-getInstance-e" + e);}MemoryPersistence persistence = new MemoryPersistence();synchronized (MqttConnect.class) {MqttClient mqttClient = null;try {// 创建客户端mqttClient = new MqttClient(broker, clientId.toString(), persistence);// 创建链接参数MqttConnectOptions connOpts = new MqttConnectOptions();// 在重新启动和重新连接时记住状态connOpts.setCleanSession(true);// 设置连接的用户名connOpts.setUserName(userName);connOpts.setPassword(password.toCharArray());// 建立连接mqttClient.connect(connOpts);mqttClientSingletonList.add(mqttClient);} catch (MqttException me) {log.error("reason " + me.getReasonCode());log.error("msg " + me.getMessage());log.error("loc " + me.getLocalizedMessage());log.error("cause " + me.getCause());log.error("excep " + me);log.error("发送连接mqtt异常" + me);try {mqttClient.disconnect();mqttClient.close();} catch (Exception e) {}//将 mqtt 置空mqttClient = null;me.printStackTrace();}}}}return mqttClientSingletonList;}/*** 发布消息给硬件*/public void publish(String version, String gateway, String content) {StringBuffer topic = new StringBuffer();topic.append(version).append("Download/").append(gateway);int qos = configMqtt.getQos();// mqtt多节点消息发送 -- 每个节点都发送一份消息让硬件消费List<MqttClient> clientList = MqttConnect.getMoreInstance(configMqtt);if (!clientList.isEmpty()) {clientList.forEach(client -> {threadPoolTaskExecutor.execute(() -> {try {// 创建消息MqttMessage message = new MqttMessage(content.getBytes());// 设置消息的服务质量message.setQos(qos);log.info("发送消息到MQTT供硬件消费");log.info("================client:"+client.getClientId());log.info("================topic:"+topic);log.info("================message:"+message);// 发布消息client.publish(topic.toString(), message);} catch (MqttException me) {log.error("reason " + me.getReasonCode());log.error("msg " + me.getMessage());log.error("loc " + me.getLocalizedMessage());log.error("cause " + me.getCause());log.error("excep " + me);log.error("发送连接mqtt异常" + me);}});});}}
}

总结

近期有时间总结了一下前段时间搭建ActiveMQ + MQTT集群并且在微服务中使用的流程。经此,牛马小陈同学巩固了中间件和分布式概念知识。MQTT的分布式使用是出于自己对分布式的理解然后手写的,目前能正常进行分布式消费,对于MQTT的理解还不是很深,很多处理非常的粗糙,欢迎各位新手同学一起学习、各路大佬批评指正,谢谢!

ActiveMQ + MQTT使用docker方式部署如下:
ActiveMQ + MQTT 集群搭建(docker版本)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/809831.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Win10下安装Anaconda

Anaconda是可以便捷获取包且对包能够进行管理&#xff0c;同时对环境可以统一管理的发行版本&#xff0c;它包含了conda、Python在内的超过180个科学包及其依赖项。 安装Anaconda Anaconda官方下载网址&#xff1a;https://www.anaconda.com/download 官网页面中&#xff0c…

Docker日志查看神器

探索Dozzle&#xff1a;简单实用的Docker日志查看工具 在容器化应用程序的开发和部署中&#xff0c;日志管理是至关重要的一环。为了便于查看和监控Docker容器的日志信息&#xff0c;开发人员和运维团队需要便捷的工具。Dozzle 就是这样一款简单实用的Docker日志查看工具&…

2024 Mathorcup高校数学建模挑战赛(A题)| PCI冲突问题 | 建模秘籍文章代码思路大全

铛铛&#xff01;小秘籍来咯&#xff01; 小秘籍团队独辟蹊径&#xff0c;以整数规划&#xff0c;多元回归等强大工具&#xff0c;构建了解决复杂问题的独特方案。深度学习, 混沌模型的妙用&#xff0c;为降低非法野生动物贸易提供新视角。通过综合分析&#xff0c;描绘出概率、…

基于springboot实现知识管理系统项目【项目源码+论文说明】计算机毕业设计

基于springboot实现知识管理系统演示 摘要 随着信息互联网信息的飞速发展&#xff0c;无纸化作业变成了一种趋势&#xff0c;针对这个问题开发一个专门适应师生作业交流形式的网站。本文介绍了知识管理系统的开发全过程。通过分析企业对于知识管理系统的需求&#xff0c;创建了…

日出6万单!美区“开塞露”卖疯了,保健赛道正式起飞!

质疑养生&#xff0c;理解养生&#xff0c;加入养生&#xff01; 从保温杯里泡枸杞&#xff0c;到桌上摆满保健品&#xff0c;"养生"已经从一种模糊的概念转变为了生活中的刚需。在加班、熬夜、脱发这些"亚健康"标签的围绕下&#xff0c;年轻人开始重视自…

k8s基础入门

前言 开始学习K8S了&#xff0c;下面就是笔记整理 简介 k8s是谷歌开源得容器管理系统&#xff0c;主要功能包括 基于容器得应用部署&#xff0c;维护和滚动升级负载均衡和服务发现跨机器和跨地区得集群调度自动伸缩无状态服务和有状态服务广泛得Volume支持插件保持扩展性 …

ELFK的部署

目录 一、ELFK&#xff08;FilebeatELK&#xff09; 1. Filebeat的作用 2. ELFK工作流程 3. ELFK的部署 3.1 服务器配置 3.2 服务器环境 3.3 安装filebeat 3.4 修改filebeat主配置文件 3.5 在logstash组件所在节点&#xff08;apache节点&#xff09;上新建一个logstas…

如何管好地铁站见新质生产力 | 图扑数字孪生

智慧车站建设是现代交通领域的重要发展方向&#xff0c;旨在通过集成先进的信息技术&#xff0c;提升车站的运营效率、安全性及乘客体验。基于既有的综合监控技术&#xff0c;通过集成多种传感器和数据采集设备&#xff0c;实现对车站设备、环境、客流、人员等对象群的智能感知…

这样画箱线图,为你的SCI论文增色!

高级箱线图的绘制 下面的箱线图比较美观&#xff0c;非常适合数据量不大、且分布明显的时候使用。 在论文撰写中&#xff0c;图表的清晰和吸引人的展示方式是至关重要的。箱线图&#xff08;Whisker Plot&#xff09;是一种展示数据分布的经典工具&#xff0c;它不仅可以清楚地…

第十一届蓝桥杯大赛第二场省赛试题 CC++ 研究生组-七段码

#include<iostream> using namespace std; const int N 10, M 7; int e[N][N] {0}, f[N], open[N];//e[i][j]表示i和j之间是否连通&#xff1b;f[i]表示结点i的父节点&#xff1b;open[i] 1表示结点i打开&#xff0c;0表示关闭 long long ans 0;int find(int x){if(…

【ROS2笔记一】ROS2的基本组件

1.ROS2的基本组件 与ROS1类似的&#xff0c;ROS2也具有node&#xff0c;topic&#xff0c;service&#xff0c;action之类的组件&#xff0c;并且也具有rqt等工具。 可以像使用ROS1的命令行的方式&#xff08;参这里【ROS学习笔记7】ROS中的常用命令行&#xff09;&#xff0…

Mysql内存表及使用场景(12/16)

内存表&#xff08;Memory引擎&#xff09; InnoDB引擎使用B树作为主键索引&#xff0c;数据按照索引顺序存储&#xff0c;称为索引组织表&#xff08;Index Organized Table&#xff09;。 Memory引擎的数据和索引分开存储&#xff0c;数据以数组形式存放&#xff0c;主键索…

【保姆级讲解Nginx】

&#x1f308;个人主页: 程序员不想敲代码啊 &#x1f3c6;CSDN优质创作者&#xff0c;CSDN实力新星&#xff0c;CSDN博客专家 &#x1f44d;点赞⭐评论⭐收藏 &#x1f91d;希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出指正&#xff0c;让我们共…

拿到迅雷网盘推广授权,离财富自由更进一步

为了提高平台知名度、增加平台用户数量&#xff0c;迅雷网盘推出了网盘推广的新项目。网盘推广的奖励很丰厚&#xff0c;只要有人点击、下载你分享的资料&#xff0c;你就能获得佣金。听到这里&#xff0c;是不是已经有人心动了&#xff1f;心动的人一定要看完下面的网盘推广&a…

【开发篇】十七、基准测试框架JMH

文章目录 1、JMH2、运行方式二3、死代码与黑洞变量4、可视化分析5、案例&#xff1a;日期格式化方法性能测试6、总结7、整合到SpringBoot 判断一个方法的耗时 ⇒ endTime-startTime ⇒ 不准确&#xff0c;首先部分对象懒加载&#xff0c;第一次请求会慢一些&#xff0c;其次&am…

YOLOv8改进 | 检测头篇 | 自研超分辨率检测头HATHead助力超分辨率检测(混合注意力变换器检测头)

一、本文介绍 本文给大家带来的改进机制是由由我本人利用HAT注意力机制(超分辨率注意力机制)结合V8检测头去掉其中的部分内容形成一种全新的超分辨率检测头。混合注意力变换器(HAT)的设计理念是通过融合通道注意力和自注意力机制来提升单图像超分辨率重建的性能。通道注意…

多无人机集群协同避障

matlab2020a正常运行 场景1规划结果 场景2规划结果 场景3规划结果 代码地址&#xff1a; 多无人机集群协同避障效果&#xff08;5架&#xff09;资源-CSDN文库

【D3.js Tidy tree绘制树形图,单棵树,左右树,平移,拖拽,树形中的天花板实现,源码实现】

这里写自定义目录标题 D3.js Tidy tree绘制树形图,单棵树,左右树,平移,拖拽,树形中的天花板实现,源码实现D3 简介D3 官网有很多例子,这里说的是Tidy tree[树形图表svg][左侧关系->中间对象<-右侧关系 ] 树形实现 D3.js Tidy tree绘制树形图,单棵树,左右树,平移,拖拽,树形…

BM25和语言模型的改进研究

原文链接&#xff1a; BM25和语言模型的改进研究 摘要&#xff1a; 近期关于搜索引擎排名函数的研究报告指出&#xff0c;BM25和带Dirichlet平滑的语言模型有所改进。本研究通过在INEX 2009维基百科语料库上训练&#xff0c;然后在INEX 2010和9个TREC语料库上测试&#xff0…

ES6 全详解 let 、 const 、解构赋值、剩余运算符、函数默认参数、扩展运算符、箭头函数、新增方法,promise、Set、class等等

目录 ES6概念ECMAScript6简介ECMAScript 和 JavaScript 的关系ES6 与 ECMAScript 2015 的关系 1、let 、 const 、var 区别2、变量解构赋值1、数组解构赋值2、对象解构赋值3、字符串的解构赋值 3、展开剩余运算符1、**展开运算符(...)**2、**剩余运算符(...)** 4、函数的拓展函…