mqtt 发送消息过多_阿里云MQTT服务端注解式消息处理分发与同步调用实践小结

一、前言

前段时间公司预研了设备app端与服务端的交互方案,出于多方面考量最终选用了阿里云的微服务队列MQTT方案,基于此方案,本人主要实践有:

1. 封装了RocketMQ实现MQTT订阅与发布的实现细节;

2. 实现了注解式分发处理,可利用如MqttController, MqttTopicMapping等相关自定义注解的方式来统一订阅MQTT的Topic以及消息处理的分发;

3. 使用了一套请求和响应的同步机制来达到PUB/SUB异步通信的伪同步调用。

Github 地址点此链接

二、RocketMQ的接入细节

1. 为什么服务端要使用RocketMQ接入

阿里云微消息队列MQTT是在以消息队列 RocketMQ 为核心存储的基础上,实现更适合移动互联网和IoT领域的无状态网关,两者之间具备天然的数据互通性。MQTT实例本身并不提供消息数据持久化功能,消息数据持久化需要搭配后端的消息存储实例来使用。因此现阶段每一个阿里云MQTT实例都必须配套一个消息存储实例,即RocketMQ实例来提供消息数据持久化功能,因此他们之间可以说是消息互通的,即可用RocketMQ订阅的方式来消费用MQTT协议发布的消息,同理也可用 MQTT协议订阅的方式来消费RocketMQ发布的消息。

帮助文档也给出了以下两种产品的区别说明:

微消息队列MQTT基于MQTT协议实现,单个客户端的处理能力较弱。因此,微消息队列MQTT适用于拥有大量在线客户端(很多企业设备端过万,甚至上百万),但每个客户端消息较少的场景。
相比之下,消息队列RocketMQ是面向服务端的消息引擎,主要用于服务组件之间的解耦、异步通知、削峰填谷等,服务器规模较小(极少企业服务器规模过万),但需要大量的消息处理,吞吐量要求高。因此,消息队列RocketMQ适用于服务端进行大批量的数据处理和分析的场景。

基于以上区别,官方也推荐在移动端设备上使用微消息队列MQTT,而在服务端应用中则使用消息队列RocketMQ,具体则可以通过 MQTT SDK 以公网访问方式来实现设备间的通信,通过MQ SDK以内网方式来实现服务端通信。

2. RocketMQ如何对接

RocketMQ与MQTT在消息结构和一些属性字段上都有一定的映射关系,具体内容(摘自帮助文档)如下。

微消息队列MQTT使用MQTT协议接入,而消息队列RocketMQ使用的是私有协议,因此,两者的关键概念存在如下映射关系。

fb73025e85d4341240c5a9c1b93aec9e.png
MQ与MQTT消息结构映射关系

如上图所示,MQTT协议中Topic是多级结构,而消息队列RocketMQ的Topic 仅有一级,因此,MQTT中的一级Topic映射到消息队列RocketMQ的Topic,而二级和三级Topic则映射到消息队列RocketMQ的消息属性(Properties)中。

消息队列 RocketMQ 协议中的消息(Message)可以拥有自定义属性(Properties),而MQTT协议目前的版本不支持属性,但为了方便对MQTT协议中的Header信息和设备信息进行溯源,MQTT的部分信息将被映射到 RocketMQ的消息属性中,方便使用消息队列RocketMQ的SDK接入的用户获取。

目前,微消息队列MQTT和消息队列RocketMQ支持的属性字段映射表如下图所示。使用消息队列RocketMQ的SDK的应用和使用消息队列MQTT的SDK的应用进行交互时,可以通过读写这些属性字段来达到信息获取或者设置的目的。

6562f3a1a63d4e993ef95de3ce2290b9.png
属性字段映射关系

3. RocketMQ对MQTT消息订阅的实现

Properties properties = new Properties();
// 在控制台创建的Group ID
properties.put(PropertyKeyConst.GROUP_ID, "xxx");
// 阿里云AccessKey 
properties.put(PropertyKeyConst.AccessKey, "xxx");
// 阿里云SecretKey
properties.put(PropertyKeyConst.SecretKey, "xxx");
// 在RocketMQ控制台的实例基本信息中可查看到的TCP协议接入点
properties.put(PropertyKeyConst.NAMESRV_ADDR,"xxx");
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("topic", "*", new MessageListener() { //订阅全部 Tagpublic Action consume(Message message, ConsumeContext context) {//获得mqtt消息中的第一级topicString mqttFirstTopic = message.getTopic();//获得mqtt消息中除去1级后的所有topicString mqttSecondTopic = message.getUserProperties(PropertyKeyConst.MqttSecondTopic);//获得mqtt消息中的messageIdString messageId = message.getUserProperties("UNIQ_KEY");//获得mqtt消息中的消息体String messageBody = new String(message.getBody());//...return Action.CommitMessage;}});
consumer.start();

实现主要注意2点:

  • 这边的 MQ 只需要订阅 MQTT 的一级 Topic 。如果 MQTT 会发布2个 Topic 的消息 robot/alarmrobot/task/test ,则在此处只需要订阅 robot 这个第一级Topic即可。
  • MQTT 的一些属性字段可以从 RocketMQ 消息 MessageuserProperties 字段中获得,比如上面代码中通过 message.getUserProperties(PropertyKeyConst.MqttSecondTopic); 可以获得 MQTT 中的 除去1级后的所有 Topic 字符串,如上述举例的2个 Topic 可分别获得 /alarm/task/test。 具体能够获得哪些字段可以参考上一节的属性字段映射表,也可自行查看 PropertyKeyConst 类中定义的一些字符串常量来大概知晓。

使用阿里云MQTT控制台发送一个MQTT消息,如图所示:

f451754dda09e418f7b4d64a925c7019.png
MQTT控制台发送消

在程序中加一个断点获得当前Message对象的字段如下:

1c4d74b33f6cdf96c40039d87619bf90.png
Message消息体

上图可看到userProperties中的一些值,比如qoslevelmqttSecondTopic等,这些字段都可以在PropertyKeyConst 类中找到对应的字符串常量,但是UNIQ_KEYcleansessionflagPropertyKeyConst 类中并没有对应的字符串常量,这边暂时就message.getUserProperties("UNIQ_KEY")这样使用自定义字符量来获得。

4. RocketMQ对MQTT消息发布的实现

Properties properties = new Properties();
// 在控制台创建的Group ID
properties.put(PropertyKeyConst.GROUP_ID, "xxx");
// 阿里云AccessKey 
properties.put(PropertyKeyConst.AccessKey, "xxx");
// 阿里云SecretKey
properties.put(PropertyKeyConst.SecretKey, "xxx");
// 在RocketMQ控制台的实例基本信息中可查看到的TCP协议接入点
properties.put(PropertyKeyConst.NAMESRV_ADDR,"xxx");
//设置发送超时时间,单位毫秒
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
Producer producer = ONSFactory.createProducer(properties);
// 在发送消息前,必须调用 start 方法来启动 Producer,只需调用一次即可
producer.start();//发送一个mqtt消息
String parentTopic = topic.substring(0, topic.indexOf("/"));
String subTopic = topic.substring(topic.indexOf("/"));
Message msg = new Message(parentTopic, "", message.getBytes());
msg.putUserProperties(PropertyKeyConst.MqttSecondTopic, subTopic);
msg.putUserProperties(PropertyKeyConst.MqttQOS, qos);
msg.putUserProperties("cleansessionflag", "" + cleanSessionFlag);
SendResult result = producer.send(msg);
  • 该代码仅实现了普通消息的同步发送,若需发送顺序消息、延时消息等,可参考SDK帮助文档创建不同的Producer实现即可。
  • 上述代码将需要发送的MQTT全量Topic拆分成1级与2级,1级Topic设置为MQ中的Topic参数,2级Topic字符串则设为userPropertiesPropertyKeyConst.MqttSecondTopic的,其他属相如qoslevelcleansessionflag等也是通过userProperties的相关字段来设置。

三、注解式分发处理的实现

1. 前置知识点

1.1 BeanPostProcessor

BeanPostProcessor是Spring IOC容器给我们提供的一个扩展接口。BeanPostProcessor接口定义了两个方法:

public interface BeanPostProcessor {// 前置处理Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException;// 后置处理Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException;
}

Spring中Bean的整个生命周期如图所示:

992704fcfcdeba2ece274adf068d09a5.png
Bean生命周期

postProcessBeforeInitialization()方法与postProcessAfterInitialization()分别对应图中前置处理和后置处理两个步骤将执行的方法。这两个方法中都传入了bean对象实例的引用,为扩展容器的对象实例化过程提供了很大便利,在这儿几乎可以对传入的实例执行任何操作。

可以看到,Spring容器通过BeanPostProcessor给了我们一个机会对Spring管理的bean进行再加工,注解、AOP等功能的实现均大量使用了BeanPostProcessor。通过实现BeanPostProcessor的接口,在其中处理方法中判断bean对象上是否有自定义的一些注解,如果有,则可以对这个bean实例继续进行其他操作,这也是本例中使用该接口要实现的主要目的。

1.2 ApplicationListener

在IOC的容器的启动过程,当所有的bean都已经处理完成之后,spring ioc容器会有一个发布事件的动作。从 AbstractApplicationContext 的源码中就可以看出:

protected void finishRefresh() {// Initialize lifecycle processor for this context.initLifecycleProcessor();// Propagate refresh to lifecycle processor first.getLifecycleProcessor().onRefresh();// Publish the final event.publishEvent(new ContextRefreshedEvent(this));// Participate in LiveBeansView MBean, if active.LiveBeansView.registerApplicationContext(this);
}

因此当所有的bean都初始化完成并被成功装载后会触发ContextRefreshedEvent事件。

ApplicationListener是spring中用来监听事件(ApplicationEvent)的传递,每个实现了ApplicationListener接口的bean都会收到ApplicationEvent对象的通知,每个ApplicationListener可根据事件类型只接收处理自己感兴趣的事件,因此利用实现ApplicationListener的接口可以收到监听ContextRefreshedEvent动作,然后可以写自己的一些处理逻辑,比如初始化环境,准备测试数据、加载一些数据到内存等等。用法如下:

@Component
public class TestApplicationListener  implements ApplicationListener<ContextRefreshedEvent>{@Overridepublic void onApplicationEvent(ContextRefreshedEvent event) {//todo:一些处理逻辑}}

1.3 反射

Java反射机制是在运行状态中,对于任意一个类,都能够知道这个类的所有属性和方法;对于任意一个对象,都能够调用它的任意一个方法和属性;这种动态获取的信息以及动态调用对象的方法的功能称为java语言的反射机制。

在Java中,Class类与java.lang.reflect类库一起对反射技术进行了全力的支持。获取Class对象有三种方式:

  • 通过实例对象获得:Class<?> class = object.getClass();
  • 通过类名获得:Class<?> class = ClassName.class;
  • 通过类名全路径获得:Class<?> class = Class.forName("类名全路径");

反射包中常用的类主要有

  • Constructor,表示的类的构造方法信息,利用它可以在运行时动态创建对象
  • Field,表示类的成员变量信息,通过它可以在运行时动态修改成员变量的属性值(包含private)
  • Method,表示类的成员方法信息,通过它可以动态调用对象的方法(包含private)

下面说明一下本例中用到的一些反射api:

//获得Class对象
Class clazz= obj.getClass();//判断注解B是否在此A上
boolean isAnnotation= A.isAnnotationPresent(B.class);//获得该clazz上的注解对象
B b=clazz.getAnnotation(B.class));//获得本类以及父类或者父接口中所有的公共方法
Method[] methods=clazz.getMethods();//获取方法上的所有参数
Parameter[] parameters = method.getParameters();//执行某对象的方法,owner为该对象,paramValues为入参数组,method为Method对象
method.invoke(owner, paramValues);

2. 整体实现思路

  1. 自定义注解MqttControllerMqttTopicMappingMqttMessageIdMqttMessageBody
  2. 利用BeanPostProcessor,获得所有注解了MqttController的bean及其注解值,获得其所有注解了MqttTopicMapping的Method方法及其注解值,利用两者的注解值作为其key,分别将bean,Method为value放入不同的map中,记录所有注解了MqttController的注解值作为下一步需要订阅的Topic;
  3. 利用ApplicationListener在所有bean加载完成后使用实例化的mqConsumer来订阅所有需要订阅的Topic;
  4. 在mq订阅的处理方法中,根据消息的全Topic在上述步骤的map中获得其对应的bean和Method,同时根据MqttMessageIdMqttMessageBody来设置相关参数,使用method.invoke(owner, paramValues);实现方法的调用,来达到消息的处理分发。

3. 实现细节

3.1 自定义注解

@MqttController在类上使用,其中parentTopic值为需要监听的1级Topic,其中使用@Component可以使其注解的类实例化为为Bean对象放入到Spring容器中,基于此才能在利用BeanPostProcessor中获得其对象。

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public @interface MqttController {/*** 监听的父topic** @return 监听的父topic*/String parentTopic();
}

@MqttTopicMapping在方法上使用,其中subTopic的值为需要订阅的子Topic,与1级Topic共同组成MQTT的Topic

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MqttTopicMapping {/*** 订阅的子topic,默认可以只订阅1级topic** @return 订阅的子topic*/String subTopic() default "";
}

MqttMessageBody在方法参数上使用,使得参数自动获得messageBody对象

@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MqttMessageBody {}

MqttMessageId在方法参数上使用,使得参数自动获得messageId的值

@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MqttMessageId {}

自定义注解的使用示例如下:

@Slf4j
@MqttController(parentTopic = "robot1")
public class MqttRobot1 {@MqttTopicMappingpublic void dealFirstTopic() {log.info("MqttRobot1.dealAlarm 收到消息啦,只处理了一级topic");}@MqttTopicMapping(subTopic = "alarm")public void dealAlarm(@MqttMessageId String messageId, @MqttMessageBody AlarmVo alarmVo) {log.info("MqttRobot1.dealAlarm 收到消息啦");log.info("messageId:{}", messageId);log.info("alarmVo:{}", alarmVo);}@MqttTopicMapping(subTopic = "task")public void dealTask() {log.info("MqttRobot1.dealTask 收到消息啦");}
}

3.2 提取Method和Bean对象

在MqttHandlerFactory类中定义以下几个容器,分别存储mqtt处理类的bean,mqtt处理方法以及parentTopic列表

/*** 用于存储mqtt处理类的bean,key为parentTopic/subTopic*/
private static Map<String, Object> mqttControllers = new HashMap<>();/*** 用于存储mqtt处理方法,key为parentTopic/subTopic*/
private static Map<String, Method> mqttHandlers = new HashMap<>();/*** 存储parentTopic列表*/
private static Set<String> parentTopicSet = new HashSet<>();

利用BeanPostProcessor接口来处理实现了自定义注解的Bean对象。

具体代码及注释如下:

@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {Class beanClazz = bean.getClass();//使用的MqttController注解的bean对象if (beanClazz.isAnnotationPresent(MqttController.class)) {//获得MqttController的注解值//存储parentTopic列表String parentTopic = ((MqttController) beanClazz.getAnnotation(MqttController.class)).parentTopic();MqttHandlerFactory.getParentTopicSet().add(parentTopic);for (Method method : beanClazz.getMethods()) {//获得MqttTopicMapping的Method对象if (method.isAnnotationPresent(MqttTopicMapping.class)) {//获得MqttTopicMapping的注解值String subTopic = method.getAnnotation(MqttTopicMapping.class).subTopic();String realTopic;if ("".equals(subTopic)) {realTopic = parentTopic + "/";} else {realTopic = (parentTopic + "/" + subTopic + "/").replaceAll("/+", "/");}if (null != MqttHandlerFactory.getMqttHandler(realTopic)) {throw new MqttBeansException(bean.getClass().getSimpleName() + " topic 重复定义,值为" + realTopic);}//存储mqtt处理类的beanMqttHandlerFactory.registerMqttHandler(realTopic, method);//存储mqtt处理方法MqttHandlerFactory.registerMqttController(realTopic, bean);log.info("MqttHandler Mapped "{}" onto {}", realTopic, method.toString());}}}return bean;
}

3.3 mq的消息订阅与处理

实现ApplicationListener接口在所有Bean对象加载完之后根据前面记录的parentTopicSet作为所有需要订阅的1级Topic开始订阅。

在订阅消息处理中从message信息可以获得其对应的1级Topic与2级Topic,将其处理成MQTT的全Toic并从前面记录的mqttHandlers,mqttControllers中获得对应的Method对象及Bean对象,从message信息中提取对应的messageId级messageBody并设置为使用了@MqttMessageBody@MqttMessageId的注解的参数中,利用反射method.invoke(MqttHandlerFactory.getMqttController(mqttTopic), paramValues);来实现方法的调用。

具体代码及注释如下:

@Overridepublic void onApplicationEvent(ContextRefreshedEvent event) {//...//设置mqConsumer实例化所需propertiesConsumer mqConsumer = ONSFactory.createConsumer(properties);Set<String> parentTopicSet = MqttHandlerFactory.getParentTopicSet();if (parentTopicSet.size() == 0) {log.warn("当前应用并未有任何topic订阅");}//根据parentTopic和subTopic订阅parentTopicSet.forEach(parentTopic -> {log.info("Add a new rocketMq subscription,topic:{}", parentTopic);mqConsumer.subscribe(parentTopic, "*", (message, context) -> {log.debug("MqReceive Message: " + message);//获得topicString mqttFirstTopic = message.getTopic();String mqttSecondTopic = message.getUserProperties(PropertyKeyConst.MqttSecondTopic);if (null == mqttSecondTopic) {//只有1级topic的情况mqttSecondTopic = "/";}if (!"/".equals(mqttSecondTopic.substring(mqttSecondTopic.length() - 1))) {mqttSecondTopic += "/";}String mqttTopic = mqttFirstTopic + mqttSecondTopic;Method method = MqttHandlerFactory.getMqttHandler(mqttTopic);if (null == method) {log.warn("当前没有处理该topic的handler,topic:{}", mqttTopic);return Action.CommitMessage;} else {//获得mqtt的一些数据String messageId = message.getUserProperties("UNIQ_KEY");String messageBody = new String(message.getBody());//处理入参Parameter[] parameters = method.getParameters();Object[] paramValues = new Object[parameters.length];for (int i = 0; i < parameters.length; i++) {if (parameters[i].isAnnotationPresent(MqttMessageId.class)) {//@MqttMessageId注解的参数paramValues[i] = messageId;} else if (parameters[i].isAnnotationPresent(MqttMessageBody.class)) {//@MqttMessageBody注解的参数Class parameterClazz = parameters[i].getType();try {paramValues[i] = JSONObject.parseObject(messageBody, parameterClazz);} catch (Exception e) {log.error("mqttMessageBody 格式错误,messageId:{},messageBody:{}", messageId, messageBody);//                              return Action.ReconsumeLater;return Action.CommitMessage;}} else {//自己定义的一些参数就给null把paramValues[i] = null;}}try {method.invoke(MqttHandlerFactory.getMqttController(mqttTopic), paramValues);} catch (Exception e) {log.error("处理失败啦");}}return Action.CommitMessage;});});mqConsumer.start();log.info("MqConsumer Started");}

四、MQTT同步调用的实现

MQTT协议是基于PUB/SUB的异步通信模式,不适用于服务端同步控制设备端返回结果的场景。通过制定一套请求和响应的同步机制,可以无需改动MQTT协议来达到同步调用的目的。

1. 整体实现思路

MQTT的同步调用实际上是使用了两个异步调用完成的,即生产者调用消费者的同时,自己也作为消费者等待某一队列的返回消息,消费者接受到生产者的消息同时,也作为消息发送者发送一消息给生产者。

具体同步调用机制示意如下:

02a61bb89f885b09ae613bf35d540f0f.png
同步调用示意图

首先服务端和设备端服务端都订阅了相关的Topic,服务端发起同步调用即发布一个示意需同步返回的message到指定request Topic,设备端接收到该message后处理完业务逻辑则会将调用结果发布一个返回message到request消息体中携带的response Topic中,最后服务端接收到设备端返回的message可以从消息体中获得其调用结果。

整个调用过程中客户端需要做的工作有:

  1. 订阅request Topic
  2. 收到消息,判断消息是否是同步消息,是的话则处理完业务逻辑后异步发送特别的response Topic

服务端需要做的工作有:

  1. 统一订阅特别的设备的response Topic
  2. 发送消息到特别的request Topic
  3. Future.get(timeout) 模式处理request和response的关系
  4. 异步超时请求线程处理(处理超时请求,30ms运行一次)
  5. 消息监控和异常补救

服务端处理异步为同步调用的逻辑借鉴了Dubbo底层将Netty的异步调用转化成同步的方式,下面在实现细节中会具体阐述。

2. 实现细节

MQTT同步调用的代码如下:

public String publishSync(String topic, String qos, boolean cleanSessionFlag, Object data, int timeout)throws MqttRemoteException {String parentTopic = topic.substring(0, topic.indexOf("/"));String subTopic = topic.substring(topic.indexOf("/"));String mId = UUID.randomUUID().toString().replaceAll("-", "");MqttMessage mqttMessage = new MqttMessage(mId, replyParentTopic + "/" + mId, data);Message msg = new Message(parentTopic, "", JSON.toJSONString(mqttMessage).getBytes());msg.putUserProperties(PropertyKeyConst.MqttSecondTopic, subTopic);msg.putUserProperties(PropertyKeyConst.MqttQOS, qos);msg.putUserProperties("cleansessionflag", "" + cleanSessionFlag);MqttFuture mqttFuture = new MqttFuture(mqttMessage, timeout);try {producer.send(msg);} catch (ONSClientException e) {mqttFuture.cancel();throw e;}return mqttFuture.get();
}

前部分就是正常RocketMQ发布MQTT消息的代码,但是发送的消息是自定义的MqttMessage,同时这边在调用producer.send(msg);前先构建了一个MqttFuture对象然后发送完消息后使用mqttFuture.get();来获得同步调用的结果。

MqttMessage定义如下,其中syncFlag表示该消息是否需要同步返回,mId表示该消息的唯一id,用于本地判断消息返回具体映射哪个同步调用的key,replyTopic表示该消息需要返回消息的Topic,data则是具体业务数据。

@Data
public class MqttMessage implements Serializable {private static final long serialVersionUID = 6648680154051903549L;/*** 是否需要同步返回,默认为true*/private boolean syncFlag = true;/*** 生成的id,用uuid生成把*/private final String mId;/*** 客户端返回消息的Topic*/private final String replyTopic;/*** 发送数据*/private Object data;public MqttMessage(String mId, String replyTopic, Object data) {this.mId = mId;this.replyTopic = replyTopic;this.data = data;}
}

MqttFuture对象则是用来处理同步调用的逻辑,每一个MqttFuture对象都有有一个mId作为唯一标识,发送的message消息体,调用返回结果MqttResponse(包括正常mqtt返回或者超时等异常返回结果)和同步调用超时时间,还有一个锁及其创建的Condition,用来处理线程的等待与通知,后面会对其具体逻辑进行分析。每创建一个新的MqttFuture对象都会将其放入到存储MqttFuture的Map中,key即为该消息的mId。

@Slf4j
@Data
public class MqttFuture {public static final int DEFAULT_TIMEOUT = 1000;public static final Map<String, MqttFuture> FUTURES = new ConcurrentHashMap<>();private final Lock lock = new ReentrantLock();private final Condition done = lock.newCondition();/*** 唯一id*/private final String mId;/*** 发送的message消息体*/private final MqttMessage message;/*** 设置的同步调用超时时间*/private final int timeout;/*** 等待开始时间*/private final long start = System.currentTimeMillis();/*** 返回结果*/private volatile MqttResponse response;public MqttFuture(MqttMessage message, int timeout) {this.message = message;this.timeout = timeout;this.mId = message.getMId();FUTURES.put(mId, this);}//...

调用返回结果MqttResponse定义如下,如果是正常成功返回则返回状态是OK且会有对应的消息体messageResult,如果超时或者其他异常情况则会返回对应的错误消息errorMessage。

@Data
public class MqttResponse {/*** ok状态,正常返回result,否则返回errorMessage*/public static final Integer OK = 20000;/*** 客户端超时未处理*/public static final Integer TIMEOUT = 40001;/*** 服务端主动取消*/public static final Integer CANCEL = 40002;private Integer mStatus = OK;/*** request生成的messageId*/private String mId;/*** 收到的消息体*/private String messageResult;/*** 状态不是成功返回的错误信息*/private String errorMessage;public MqttResponse(String mId) {this.mId = mId;}
}

在上述MQ同步调用代码中,若调用producer.send(msg);同步发送mqtt消息失败的话,则会调用mqttFuture.cancel();来取消该MqttFture对象,代码如下,这边主要是设置了异常的相应response避过将该MqttFuture对象从存储MqttFuture的Map中移除。

@Slf4j
@Data
public class MqttFuture {//...public void cancel() {MqttResponse errorResult = new MqttResponse(mId);errorResult.setMStatus(MqttResponse.CANCEL);errorResult.setErrorMessage("主动请求取消");response = errorResult;FUTURES.remove(mId);}//...

若调用producer.send(msg);同步发送mqtt消息成功的话则会调用mqttFuture.get();来获得其同步调用的结果,具体代码如下,首先判断是否调用已完成(有响应结果mqttResponse,包括正常获得返回结果或者超时等其他异常的情况),若完成的话则直接返回调用结果或者抛出相应的异常,若没有调用完成则获得锁并循环判断是否调用完成,没有的话则调用done.await(timeout, TimeUnit.MILLISECONDS);来实现该线程的超时等待直至其他线程调用该Condition的signal方法将其唤醒。

@Slf4j
@Data
public class MqttFuture {//...public String get() throws MqttRemoteException {return this.get(timeout);}public String get(int timeout) throws MqttRemoteException {if (timeout <= 0) {timeout = DEFAULT_TIMEOUT;}if (!isDone()) {long start = System.currentTimeMillis();lock.lock();try {while (!isDone()) {done.await(timeout, TimeUnit.MILLISECONDS);if (isDone() || System.currentTimeMillis() - start > timeout) {break;}}} catch (InterruptedException e) {throw new RuntimeException(e);} finally {lock.unlock();}if (!isDone()) {throw new MqttTimeoutException("Waiting client-side response timeout");}}if (response == null) {throw new IllegalStateException("response cannot be null");}if (response.getMStatus().equals(MqttResponse.OK)) {return response.getMessageResult();}if (response.getMStatus().equals(MqttResponse.TIMEOUT)) {throw new MqttTimeoutException("Waiting client-side response timeout");}throw new MqttRemoteException(response.getErrorMessage());}/*** 判断是否有response结果** @return 是否返回结果*/public boolean isDone() {return response != null;}//...

将上述等待唤醒的代码如下,只需调用下面的received方法并传入相应的MqttResponse结果即可,该方法会在后面讲的判断调用超时和正常mqtt消息结果返回的情况中调用。

@Slf4j
@Data
public class MqttFuture {//...private void doReceived(MqttResponse res) {lock.lock();try {response = res;done.signal();} finally {lock.unlock();}}public static void received(MqttResponse response) {MqttFuture future = FUTURES.remove(response.getMId());if (future != null) {future.doReceived(response);} else {log.warn("The timeout response finally returned at " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) + ", response " + response);}}//...

后台会开启一个线程来扫描超时任务,主要是遍历上述存储MqttFuture的Map,如果对应的MqttFuture并没有完成(获得对应的response)且调用时间已超过设置的超时时间,则设置一个超时异常的MqttResponse并调用MqttFuture.received(timeoutResponse);来唤醒上述代码中的done.await(timeout, TimeUnit.MILLISECONDS);的线程等待。该线程每30s会遍历一个存储MqttFuture的Map。

@Slf4j
@Data
public class MqttFuture {//...private static class RemotingInvocationTimeoutScan implements Runnable {@Overridepublic void run() {while (true) {try {for (MqttFuture future : FUTURES.values()) {if (future == null || future.isDone()) {continue;}if (System.currentTimeMillis() - future.getStartTimestamp() > future.getTimeout()) {//当前mqtt请求已超时MqttResponse timeoutResponse = new MqttResponse(future.getMId());timeoutResponse.setMStatus(MqttResponse.TIMEOUT);MqttFuture.received(timeoutResponse);}}//每30ms扫一次Thread.sleep(30);} catch (Throwable e) {log.error("Exception when scan the timeout invocation of remoting.", e);}}}}static {Thread th = new Thread(new RemotingInvocationTimeoutScan(), "MqttResponseTimeoutScanTimer");th.setDaemon(true);th.start();}//...

为了处理正常的mqtt的消息返回,除了使用上一节中讲到的自定义注解MqttController来订阅相关的Topic,还需要订阅特殊的一个Topic来处理mqtt同步调用的返回消息,订阅代码如下,根据收到的mId和messageBody设置对应的MqttResponse并调用MqttFuture.received(response);来唤醒上述代码中的done.await(timeout, TimeUnit.MILLISECONDS);的线程等待。

//订阅伪同步请求回复
log.info("Add a new rocketMq subscription,topic:{}", replyParentTopic);
mqConsumer.subscribe(replyParentTopic, "*", (message, context) -> {String mqttSecondTopic = message.getUserProperties(PropertyKeyConst.MqttSecondTopic);if (null == mqttSecondTopic) {mqttSecondTopic = "";}String mId = mqttSecondTopic.replaceAll("/", "");String messageBody = new String(message.getBody());MqttResponse response = new MqttResponse(mId);response.setMessageResult(messageBody);MqttFuture.received(response);return Action.CommitMessage;
});

五、demo使用

1. 项目结构

由于使用了springboot框架来实现该demo,所以项目结构如下:

75e97126796c2b204d7d9d6295b43d66.png
demo项目结

其中mqtt工具包目录如下:

8489402e6d28594778b8dbc51170e04d.png
mqtt工具包结构

2. mqtt工具包的使用

2.1 在yml配置中添加相关配置

配置示例如下,其中xxx改为自己使用的即可

ali:mqtt:accessKey: xxxsecretKey: xxxgroupId: xxxnamesrvAddr: xxxsendMsgTimeoutMillis: 3000#消费者线程固定位50个consumeThreadNums: 50
#    用于同步调用返回发送的topicreplyParentTopic: xxx

2.2 添加工具包中的MqttConfig

@Import({ MqttConfig.class})
@Configuration
public class MqttConfigure {}

2.3 自定义注解的使用

@Slf4j
@MqttController(parentTopic = "robot1")
public class MqttRobot1 {@MqttTopicMappingpublic void dealFirstTopic() {log.info("MqttRobot1.dealAlarm 收到消息啦,只处理了一级topic");}@MqttTopicMapping(subTopic = "alarm")public void dealAlarm(@MqttMessageId String messageId, @MqttMessageBody AlarmVo alarmVo) {log.info("MqttRobot1.dealAlarm 收到消息啦");log.info("messageId:{}", messageId);log.info("alarmVo:{}", alarmVo);}@MqttTopicMapping(subTopic = "task")public void dealTask() {log.info("MqttRobot1.dealTask 收到消息啦");}
}

2.4 测试同步调用,模拟MQTT客户端消息返回代码

mqtt客户端实现代码示例参考阿里云官方demo https://github.com/AliwareMQ/lmq-demo

其中xxx的地方都改成自己的即可,下面代码中mqttClient2.publish(replyTopic, message);即将结果发送到replyTopic中。

public class MqttClientTest {public static void main(String[] args) throws Exception {String instanceId = "xxx";String endPoint = "xxx";String accessKey = "xxx";String secretKey = "xxx";String clientId = "xxx";final String parentTopic = "xxx";//这边需自定义mqtt客户端topic,final String mq4IotTopic = parentTopic + "/" + "xxx" + "/xxx";final int qosLevel = 0;ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey,clientId);final MemoryPersistence memoryPersistence = new MemoryPersistence();final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);final MqttClient mqttClient2 = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);/*** 客户端设置好发送超时时间,防止无限阻塞*/mqttClient.setTimeToWait(5000);final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());mqttClient.setCallback(new MqttCallbackExtended() {@Overridepublic void connectComplete(boolean reconnect, String serverURI) {/*** 客户端连接成功后就需要尽快订阅需要的 topic*/System.out.println("connect success");executorService.submit(new Runnable() {@Overridepublic void run() {try {final String topicFilter[] = { mq4IotTopic };final int[] qos = { qosLevel };mqttClient.subscribe(topicFilter, qos);} catch (MqttException e) {e.printStackTrace();}}});}@Overridepublic void connectionLost(Throwable throwable) {throwable.printStackTrace();}@Overridepublic void messageArrived(String s, MqttMessage mqttMessage) throws Exception {System.out.println("receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));JSONObject jsonObject = JSON.parseObject(new String(mqttMessage.getPayload()));String mId = jsonObject.getString("mId");String replyTopic = jsonObject.getString("replyTopic");String result = mId + "回复啦";MqttMessage message = new MqttMessage(result.getBytes());message.setQos(qosLevel);//这边会将结果发送到replyTopic中mqttClient2.publish(replyTopic, message);System.out.println("发送啦");}@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);}});mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());mqttClient2.connect(connectionOptionWrapper.getMqttConnectOptions());Thread.sleep(Long.MAX_VALUE);}
}

3. demo体验

启动项目可以在控制台看到有如下日志:

130792937b1d3c44e434e340c1af2018.png
MQTT处理Topic映射日志

28b9eac961f933d79ed8bd9307e393b9.png
MQTT订阅Topic日志

从日志中可以看出程序自动处理了自定义注解的Mqtt消息处理的映射,并根据mqtt的1级Topic进行了RocketMQ的相关订阅。

提供了一个测试MQTT消息简单发送接口如下:

@GetMapping("/publish")
public String doPublish(@RequestParam("topic") String topic, @RequestParam("message") String message) {try {return mqttClient.publish(topic, message);} catch (ONSClientException e) {return e.getMessage();}
}

使用 http://localhost:8080/mqtt/publish?topic=robot1/alarm&message={id:"1",code:"heheh"}来调用该接口。

可以在控制台看到如下日志打印:

7e955305f9c3939b875b87ee0dab5c1e.png
MQTT订阅的消息日志

由于本demo中使用自定义注解订阅了该Topic,所以调用该接口发送消息之后也会被本demo成功接收,并分发到对应的处理函数中,因此调用了该接口后可以在控制台看到如上日志打印,可以看到MQTT消息成功发布,订阅到该消息并实现了MQTT消息的处理分发。

提供了一个测试MQTT同步调用的接口如下:

@GetMapping("/publish-sync")public String publishSync(@RequestParam("topic") String topic, @RequestParam("message") String message) {try {return mqttClient.publishSync(topic, message, 5000);} catch (MqttRemoteException e) {return e.getMessage();}
}

使用http://localhost:8080/mqtt/publish-sync?topic=robot1/alarm/GID_ROBOT@@@DEVICEID_001&message=hehehe来调用接口。

可以看到下图的结果:

456d585067873e676cce0e8a65b8c9d9.png
MQTT同步调用超时

这是MQTT同步调用超时的情况,因为此时还没有开启对应的MQTT客户端,因此发送的MQTT消息并没有客户端进行回应,所以出现了调用超时的情况,如果运行上述模拟MQTT客户端消息返回的代码后再次调用该接口,可以看到同步调用成功返回了对应的结果,如图所示。

abbfc919190ca8a25ea649f7217e726d.png
MQTT同步调用成功返回

源码地址如下,仅供学习参考

DavidDingXu/panda-mqtt​github.com
460b53ce50b2c889a573697c96827e8f.png

六、参考

  • 阿里云微消息队列 MQTT帮助文档
  • 阿里云消息队列 RocketMQ帮助文档

本文原创,欢迎转载,转载请注明出处,如有不正确的地方恳请各位看官指正。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/534271.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

css引入矢量图标_WEB 项目开发中的图标管理解决方案

相信很多前端开发人员在项目开发过程中都会遇到这样一个问题&#xff1a;页面的图标发生改动&#xff0c;需要往图标文件中追加新的图标&#xff0c;却因为图标文件已经打包好而无从下手&#xff0c;重新制作一份图标文件吧&#xff0c;要考虑替换整个项目的图标&#xff0c;工…

判断按键值_Pygame(九)按键事件(2)

Pygame(九)按键事件(2)前情提要前情提要作业解析完整代码# /usr/bin/python3# Author: 爱编程的章老师# Time: 2021/1/7 0007# E-mail: Bluesand2010163.comimport pygame, sys, timefrom random import randintdef homework():作业 挡板接球 一个宽100 ,高10的棕色挡板,…

动态添加input_前端提效必备:动态模版生成

前言在日常开发中&#xff0c;我们需要不停的新建页面和组件。以 Vue 项目为例&#xff0c;我们在新建一个页面的时候&#xff0c;需要经历一遍又一遍重复的过程&#xff1a;1、先新建一个文件夹2、然后新建一个 .vue 文件&#xff0c;写上 、", "" ],"…

在dom最前面插入_JavaScript中的DOM

1. 关于DOM文档对象模型(DocumentObject Model)&#xff0c;是基于浏览器编程的一套API接口&#xff0c;W3C出台的推荐标准&#xff0c;每个浏览器都有一些细微的差别&#xff0c;其中以Mozilla(火狐)的浏览器最与标准接近。通过 DOM&#xff0c;可以访问所有的 HTML元素&#…

当代最值得收藏的画家作品_当代最具潜力和收藏价值的十大画家

原标题&#xff1a;当代最具潜力和收藏价值的十大画家当代从事绘画的人成千上万&#xff0c;哪些名家作品值得收藏&#xff1f;当前都有哪些“潜力股”&#xff0c;相关专家综合市场分析&#xff0c;纯从艺术水准上列出值得收藏的“潜力股”&#xff0c;供爱好书画收藏的各界人…

mysql union all 别名_mysql union和union all

如下先创建2个表&#xff0c;aa bb.CREATE table aa(uid int(20) not null,name VARCHAR(30) not null)engineinnodb default charsetutf8mb4 COLLATE utf8mb4_general_ci;INSERT INTO aa(uid, name) VALUES (10, 张芳);INSERT INTO aa(uid, name) VALUES (11, 王凯);INSERT IN…

ci框架 乱码 mysql_mysql容器乱码问题

在docker-compose.yml文件中定义mysql导入utf-8的万国码services:mysql:image:mysql:5.7# command: [--character-set-serverutf8mb4, --collation-serverutf8mb4_unicode_ci]volumes:-./data/docker/mysql:/var/lib/mysql-./mysql/:/docker-entrypoint-initdb.d/-./conf/mys…

mysql分表 查询 优化_MySQL性能管理及架构(查询优化、分库分表)一遍文章搞定...

相关配置参数&#xff1a;slow_query_log # 启动停止记录慢查日志&#xff0c;慢查询日志默认是没有开启的可以在配置文件中开启(on)slow_query_log_file # 指定慢查日志的存储路径及文件&#xff0c;日志存储和数据从存储应该分开存储long_query_time # 指定记录慢查询日志SQL…

mysql临时表 清空_在数据库中临时表什么时候会被清除呢

展开全部我们仍使用 实验 05 中的环境&#xff0c;略去准备数据的过程。我们仍然使用两个会话&#xff0c;62616964757a686964616fe59b9ee7ad9431333433646439一个会话 run&#xff0c;用于运行主 SQL&#xff1b;另一个会话 ps&#xff0c;用于进行 performance_schema 的观察…

python医学图像分割_基于cv2的医学图像分割

例如&#xff0c;图像如下所示&#xff1a;import cv2import numpy as npimg cv2.imread("mdb168.pgm",0)import matplotlib.pyplot as pltplt.imshow(img, cmap"gray")我想删除图像中所有的伪影和不必要的部分。在为此&#xff0c;我首先对图像进行二值化…

ubuntu下使用python将ppt转成图片_Ubuntu下使用Python实现游戏制作中的切分图片功能...

本文实例讲述了Ubuntu下使用Python实现游戏制作中的切分图片功能。分享给大家供大家参考&#xff0c;具体如下&#xff1a;why拿到一个人物行走的素材&#xff0c;要用TexturePacker打包。TexturePacker打包后&#xff0c;助于游戏加载图片效率&#xff0c;且比较好管理。目前得…

世上最简单的mysql_mysql这样学最简单|基本操作上

这是数据库系列的第一篇文章&#xff0c;主要是对mysql的基本操作有一个了解。本系列的教程会先从基础出发&#xff0c;逐步过渡到优化。一、前提在这里我们不会从如何去安装数据库开始讲起&#xff0c;而是在安装完之后从操作数据库开始&#xff0c;文中所有的代码均在我自己的…

zabbix4.0添加mysql报警_Zabbix4.0系统告警“Zabbix server is not running”

第一步&#xff1a; 查看系统日志&#xff0c;进一步确认原因1 cat /var/log/zabbix/zabbix_server.log问题出现在数据库。第二步 数据库确认1 mysql -u root -p #root用户登陆数据库如果登录不成功&#xff0c;就看一下登录密码是否正确以及zabbix用户是否有权限登录数据库。1…

影响索引的mysql函数_mysql索引对排序的影响实例分析

本文实例讲述了mysql索引对排序的影响。分享给大家供大家参考&#xff0c;具体如下&#xff1a;索引不仅能提高查询速度&#xff0c;还可以添加排序速度&#xff0c;如果order by 后面的语句用到了索引&#xff0c;那么将会提高排序的速度。测试1、创建测试表&#xff1a;t15表…

php如何对 mysql 中text类型拆分存入一个数组_PHP递归实现无限级分类,可选返回字符串和数组...

正 文&#xff1a;在一些复杂的系统中&#xff0c;要求对信息栏目进行无限级的分类&#xff0c;以增强系统的灵活性。那么PHP是如何实现无限级分类的呢&#xff1f;我们在本文中使用递归算法并结合mysql数据表实现无限级分类。递归&#xff0c;简单的说就是一段程序代码的重复调…

python 开发框架 ant_Golang/python语言开发的分布式游戏服务器框架 mqant

软件介绍mqantmqant 是一款基于 Golang 语言的简洁&#xff0c;高效&#xff0c;高性能的分布式游戏服务器框架&#xff0c;研发的初衷是要实现一款能支持高并发&#xff0c;高性能&#xff0c;高实时性的游戏服务器框架&#xff0c;也希望 mqant 未来能够做即时通讯和物联网方…

java w3c xml_org.w3c.dom(java dom)解析XML文档

首先来了解点Java DOM 的 API:1.解析器工厂类&#xff1a;DocumentBuilderFactory创建的方法&#xff1a;DocumentBuilderFactory dbf DocumentBuilderFactory.newInstance();2.解析器&#xff1a;DocumentBuilder创建方法&#xff1a;通过解析器工厂类来获得 DocumentBuilder…

mysql migrations_Code First Migrations更新数据库结构(数据迁移)

背景 code first起初当修改model后&#xff0c;要持久化至数据库中时&#xff0c;总要把原数据库给删除掉再创建(DropCreateDatabaseIfModelChanges)&#xff0c;此时就会产生一个问题&#xff0c;当我们的旧数据库中包含一些测试数据时&#xff0c;当持久化更新后&#xff0c;…

java 机器码 虚拟机_Java虚拟机:源码到机器码

无论什么语言写的代码&#xff0c;其到最后都是通过机器码运行的&#xff0c;无一例外。那么对于 Java 语言来说&#xff0c;其从源代码到机器码&#xff0c;这中间到底发生了什么呢&#xff1f;这就是今天我们要聊的。如下图所示&#xff0c;编译器可以分为&#xff1a;前端编…

docker 远程连接 文件看不到_开发提升十倍生产力: IDEA 远程一键部署 Spring Boot 到 Docker...

一、开发前准备二、新建项目《Java 2019 超神之路》《Dubbo 实现原理与源码解析 —— 精品合集》《Spring 实现原理与源码解析 —— 精品合集》《MyBatis 实现原理与源码解析 —— 精品合集》《Spring MVC 实现原理与源码解析 —— 精品合集》《Spring Boot 实现原理与源码解析…