一、前言
前段时间公司预研了设备app端与服务端的交互方案,出于多方面考量最终选用了阿里云的微服务队列MQTT方案,基于此方案,本人主要实践有:
1. 封装了RocketMQ实现MQTT订阅与发布的实现细节;
2. 实现了注解式分发处理,可利用如MqttController
, MqttTopicMapping
等相关自定义注解的方式来统一订阅MQTT的Topic以及消息处理的分发;
3. 使用了一套请求和响应的同步机制来达到PUB/SUB异步通信的伪同步调用。
Github 地址点此链接
二、RocketMQ的接入细节
1. 为什么服务端要使用RocketMQ接入
阿里云微消息队列MQTT是在以消息队列 RocketMQ 为核心存储的基础上,实现更适合移动互联网和IoT领域的无状态网关,两者之间具备天然的数据互通性。MQTT实例本身并不提供消息数据持久化功能,消息数据持久化需要搭配后端的消息存储实例来使用。因此现阶段每一个阿里云MQTT实例都必须配套一个消息存储实例,即RocketMQ实例来提供消息数据持久化功能,因此他们之间可以说是消息互通的,即可用RocketMQ订阅的方式来消费用MQTT协议发布的消息,同理也可用 MQTT协议订阅的方式来消费RocketMQ发布的消息。
帮助文档也给出了以下两种产品的区别说明:
微消息队列MQTT基于MQTT协议实现,单个客户端的处理能力较弱。因此,微消息队列MQTT适用于拥有大量在线客户端(很多企业设备端过万,甚至上百万),但每个客户端消息较少的场景。
相比之下,消息队列RocketMQ是面向服务端的消息引擎,主要用于服务组件之间的解耦、异步通知、削峰填谷等,服务器规模较小(极少企业服务器规模过万),但需要大量的消息处理,吞吐量要求高。因此,消息队列RocketMQ适用于服务端进行大批量的数据处理和分析的场景。
基于以上区别,官方也推荐在移动端设备上使用微消息队列MQTT,而在服务端应用中则使用消息队列RocketMQ,具体则可以通过 MQTT SDK 以公网访问方式来实现设备间的通信,通过MQ SDK以内网方式来实现服务端通信。
2. RocketMQ如何对接
RocketMQ与MQTT在消息结构和一些属性字段上都有一定的映射关系,具体内容(摘自帮助文档)如下。
微消息队列MQTT使用MQTT协议接入,而消息队列RocketMQ使用的是私有协议,因此,两者的关键概念存在如下映射关系。
如上图所示,MQTT协议中Topic是多级结构,而消息队列RocketMQ的Topic 仅有一级,因此,MQTT中的一级Topic映射到消息队列RocketMQ的Topic,而二级和三级Topic则映射到消息队列RocketMQ的消息属性(Properties)中。
消息队列 RocketMQ 协议中的消息(Message)可以拥有自定义属性(Properties),而MQTT协议目前的版本不支持属性,但为了方便对MQTT协议中的Header信息和设备信息进行溯源,MQTT的部分信息将被映射到 RocketMQ的消息属性中,方便使用消息队列RocketMQ的SDK接入的用户获取。
目前,微消息队列MQTT和消息队列RocketMQ支持的属性字段映射表如下图所示。使用消息队列RocketMQ的SDK的应用和使用消息队列MQTT的SDK的应用进行交互时,可以通过读写这些属性字段来达到信息获取或者设置的目的。
3. RocketMQ对MQTT消息订阅的实现
Properties properties = new Properties();
// 在控制台创建的Group ID
properties.put(PropertyKeyConst.GROUP_ID, "xxx");
// 阿里云AccessKey
properties.put(PropertyKeyConst.AccessKey, "xxx");
// 阿里云SecretKey
properties.put(PropertyKeyConst.SecretKey, "xxx");
// 在RocketMQ控制台的实例基本信息中可查看到的TCP协议接入点
properties.put(PropertyKeyConst.NAMESRV_ADDR,"xxx");
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("topic", "*", new MessageListener() { //订阅全部 Tagpublic Action consume(Message message, ConsumeContext context) {//获得mqtt消息中的第一级topicString mqttFirstTopic = message.getTopic();//获得mqtt消息中除去1级后的所有topicString mqttSecondTopic = message.getUserProperties(PropertyKeyConst.MqttSecondTopic);//获得mqtt消息中的messageIdString messageId = message.getUserProperties("UNIQ_KEY");//获得mqtt消息中的消息体String messageBody = new String(message.getBody());//...return Action.CommitMessage;}});
consumer.start();
实现主要注意2点:
- 这边的 MQ 只需要订阅 MQTT 的一级 Topic 。如果 MQTT 会发布2个 Topic 的消息
robot/alarm
和robot/task/test
,则在此处只需要订阅robot
这个第一级Topic即可。 - MQTT 的一些属性字段可以从 RocketMQ 消息
Message
的userProperties
字段中获得,比如上面代码中通过message.getUserProperties(PropertyKeyConst.MqttSecondTopic);
可以获得 MQTT 中的 除去1级后的所有 Topic 字符串,如上述举例的2个 Topic 可分别获得/alarm
和/task/test
。 具体能够获得哪些字段可以参考上一节的属性字段映射表,也可自行查看PropertyKeyConst
类中定义的一些字符串常量来大概知晓。
使用阿里云MQTT控制台发送一个MQTT消息,如图所示:
在程序中加一个断点获得当前Message
对象的字段如下:
上图可看到userProperties
中的一些值,比如qoslevel
,mqttSecondTopic
等,这些字段都可以在PropertyKeyConst
类中找到对应的字符串常量,但是UNIQ_KEY
,cleansessionflag
等PropertyKeyConst
类中并没有对应的字符串常量,这边暂时就message.getUserProperties("UNIQ_KEY")
这样使用自定义字符量来获得。
4. RocketMQ对MQTT消息发布的实现
Properties properties = new Properties();
// 在控制台创建的Group ID
properties.put(PropertyKeyConst.GROUP_ID, "xxx");
// 阿里云AccessKey
properties.put(PropertyKeyConst.AccessKey, "xxx");
// 阿里云SecretKey
properties.put(PropertyKeyConst.SecretKey, "xxx");
// 在RocketMQ控制台的实例基本信息中可查看到的TCP协议接入点
properties.put(PropertyKeyConst.NAMESRV_ADDR,"xxx");
//设置发送超时时间,单位毫秒
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
Producer producer = ONSFactory.createProducer(properties);
// 在发送消息前,必须调用 start 方法来启动 Producer,只需调用一次即可
producer.start();//发送一个mqtt消息
String parentTopic = topic.substring(0, topic.indexOf("/"));
String subTopic = topic.substring(topic.indexOf("/"));
Message msg = new Message(parentTopic, "", message.getBytes());
msg.putUserProperties(PropertyKeyConst.MqttSecondTopic, subTopic);
msg.putUserProperties(PropertyKeyConst.MqttQOS, qos);
msg.putUserProperties("cleansessionflag", "" + cleanSessionFlag);
SendResult result = producer.send(msg);
- 该代码仅实现了普通消息的同步发送,若需发送顺序消息、延时消息等,可参考SDK帮助文档创建不同的
Producer
实现即可。 - 上述代码将需要发送的MQTT全量Topic拆分成1级与2级,1级Topic设置为MQ中的Topic参数,2级Topic字符串则设为
userProperties
中PropertyKeyConst.MqttSecondTopic
的,其他属相如qoslevel
和cleansessionflag
等也是通过userProperties
的相关字段来设置。
三、注解式分发处理的实现
1. 前置知识点
1.1 BeanPostProcessor
BeanPostProcessor是Spring IOC容器给我们提供的一个扩展接口。BeanPostProcessor接口定义了两个方法:
public interface BeanPostProcessor {// 前置处理Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException;// 后置处理Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException;
}
Spring中Bean的整个生命周期如图所示:
postProcessBeforeInitialization()方法与postProcessAfterInitialization()分别对应图中前置处理和后置处理两个步骤将执行的方法。这两个方法中都传入了bean对象实例的引用,为扩展容器的对象实例化过程提供了很大便利,在这儿几乎可以对传入的实例执行任何操作。
可以看到,Spring容器通过BeanPostProcessor给了我们一个机会对Spring管理的bean进行再加工,注解、AOP等功能的实现均大量使用了BeanPostProcessor。通过实现BeanPostProcessor的接口,在其中处理方法中判断bean对象上是否有自定义的一些注解,如果有,则可以对这个bean实例继续进行其他操作,这也是本例中使用该接口要实现的主要目的。
1.2 ApplicationListener
在IOC的容器的启动过程,当所有的bean都已经处理完成之后,spring ioc容器会有一个发布事件的动作。从 AbstractApplicationContext 的源码中就可以看出:
protected void finishRefresh() {// Initialize lifecycle processor for this context.initLifecycleProcessor();// Propagate refresh to lifecycle processor first.getLifecycleProcessor().onRefresh();// Publish the final event.publishEvent(new ContextRefreshedEvent(this));// Participate in LiveBeansView MBean, if active.LiveBeansView.registerApplicationContext(this);
}
因此当所有的bean都初始化完成并被成功装载后会触发ContextRefreshedEvent
事件。
ApplicationListener是spring中用来监听事件(ApplicationEvent)的传递,每个实现了ApplicationListener接口的bean都会收到ApplicationEvent对象的通知,每个ApplicationListener可根据事件类型只接收处理自己感兴趣的事件,因此利用实现ApplicationListener的接口可以收到监听ContextRefreshedEvent
动作,然后可以写自己的一些处理逻辑,比如初始化环境,准备测试数据、加载一些数据到内存等等。用法如下:
@Component
public class TestApplicationListener implements ApplicationListener<ContextRefreshedEvent>{@Overridepublic void onApplicationEvent(ContextRefreshedEvent event) {//todo:一些处理逻辑}}
1.3 反射
Java反射机制是在运行状态中,对于任意一个类,都能够知道这个类的所有属性和方法;对于任意一个对象,都能够调用它的任意一个方法和属性;这种动态获取的信息以及动态调用对象的方法的功能称为java语言的反射机制。
在Java中,Class类与java.lang.reflect类库一起对反射技术进行了全力的支持。获取Class对象有三种方式:
- 通过实例对象获得:
Class<?> class = object.getClass();
- 通过类名获得:
Class<?> class = ClassName.class;
- 通过类名全路径获得:
Class<?> class = Class.forName("类名全路径");
反射包中常用的类主要有
- Constructor,表示的类的构造方法信息,利用它可以在运行时动态创建对象
- Field,表示类的成员变量信息,通过它可以在运行时动态修改成员变量的属性值(包含private)
- Method,表示类的成员方法信息,通过它可以动态调用对象的方法(包含private)
下面说明一下本例中用到的一些反射api:
//获得Class对象
Class clazz= obj.getClass();//判断注解B是否在此A上
boolean isAnnotation= A.isAnnotationPresent(B.class);//获得该clazz上的注解对象
B b=clazz.getAnnotation(B.class));//获得本类以及父类或者父接口中所有的公共方法
Method[] methods=clazz.getMethods();//获取方法上的所有参数
Parameter[] parameters = method.getParameters();//执行某对象的方法,owner为该对象,paramValues为入参数组,method为Method对象
method.invoke(owner, paramValues);
2. 整体实现思路
- 自定义注解
MqttController
,MqttTopicMapping
,MqttMessageId
,MqttMessageBody
; - 利用BeanPostProcessor,获得所有注解了
MqttController
的bean及其注解值,获得其所有注解了MqttTopicMapping
的Method方法及其注解值,利用两者的注解值作为其key,分别将bean,Method为value放入不同的map中,记录所有注解了MqttController
的注解值作为下一步需要订阅的Topic; - 利用ApplicationListener在所有bean加载完成后使用实例化的
mqConsumer
来订阅所有需要订阅的Topic; - 在mq订阅的处理方法中,根据消息的全Topic在上述步骤的map中获得其对应的bean和Method,同时根据
MqttMessageId
,MqttMessageBody
来设置相关参数,使用method.invoke(owner, paramValues);
实现方法的调用,来达到消息的处理分发。
3. 实现细节
3.1 自定义注解
@MqttController
在类上使用,其中parentTopic值为需要监听的1级Topic,其中使用@Component
可以使其注解的类实例化为为Bean对象放入到Spring容器中,基于此才能在利用BeanPostProcessor中获得其对象。
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public @interface MqttController {/*** 监听的父topic** @return 监听的父topic*/String parentTopic();
}
@MqttTopicMapping
在方法上使用,其中subTopic的值为需要订阅的子Topic,与1级Topic共同组成MQTT的Topic
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MqttTopicMapping {/*** 订阅的子topic,默认可以只订阅1级topic** @return 订阅的子topic*/String subTopic() default "";
}
MqttMessageBody
在方法参数上使用,使得参数自动获得messageBody对象
@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MqttMessageBody {}
MqttMessageId
在方法参数上使用,使得参数自动获得messageId的值
@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MqttMessageId {}
自定义注解的使用示例如下:
@Slf4j
@MqttController(parentTopic = "robot1")
public class MqttRobot1 {@MqttTopicMappingpublic void dealFirstTopic() {log.info("MqttRobot1.dealAlarm 收到消息啦,只处理了一级topic");}@MqttTopicMapping(subTopic = "alarm")public void dealAlarm(@MqttMessageId String messageId, @MqttMessageBody AlarmVo alarmVo) {log.info("MqttRobot1.dealAlarm 收到消息啦");log.info("messageId:{}", messageId);log.info("alarmVo:{}", alarmVo);}@MqttTopicMapping(subTopic = "task")public void dealTask() {log.info("MqttRobot1.dealTask 收到消息啦");}
}
3.2 提取Method和Bean对象
在MqttHandlerFactory类中定义以下几个容器,分别存储mqtt处理类的bean,mqtt处理方法以及parentTopic列表
/*** 用于存储mqtt处理类的bean,key为parentTopic/subTopic*/
private static Map<String, Object> mqttControllers = new HashMap<>();/*** 用于存储mqtt处理方法,key为parentTopic/subTopic*/
private static Map<String, Method> mqttHandlers = new HashMap<>();/*** 存储parentTopic列表*/
private static Set<String> parentTopicSet = new HashSet<>();
利用BeanPostProcessor接口来处理实现了自定义注解的Bean对象。
具体代码及注释如下:
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {Class beanClazz = bean.getClass();//使用的MqttController注解的bean对象if (beanClazz.isAnnotationPresent(MqttController.class)) {//获得MqttController的注解值//存储parentTopic列表String parentTopic = ((MqttController) beanClazz.getAnnotation(MqttController.class)).parentTopic();MqttHandlerFactory.getParentTopicSet().add(parentTopic);for (Method method : beanClazz.getMethods()) {//获得MqttTopicMapping的Method对象if (method.isAnnotationPresent(MqttTopicMapping.class)) {//获得MqttTopicMapping的注解值String subTopic = method.getAnnotation(MqttTopicMapping.class).subTopic();String realTopic;if ("".equals(subTopic)) {realTopic = parentTopic + "/";} else {realTopic = (parentTopic + "/" + subTopic + "/").replaceAll("/+", "/");}if (null != MqttHandlerFactory.getMqttHandler(realTopic)) {throw new MqttBeansException(bean.getClass().getSimpleName() + " topic 重复定义,值为" + realTopic);}//存储mqtt处理类的beanMqttHandlerFactory.registerMqttHandler(realTopic, method);//存储mqtt处理方法MqttHandlerFactory.registerMqttController(realTopic, bean);log.info("MqttHandler Mapped "{}" onto {}", realTopic, method.toString());}}}return bean;
}
3.3 mq的消息订阅与处理
实现ApplicationListener接口在所有Bean对象加载完之后根据前面记录的parentTopicSet作为所有需要订阅的1级Topic开始订阅。
在订阅消息处理中从message信息可以获得其对应的1级Topic与2级Topic,将其处理成MQTT的全Toic并从前面记录的mqttHandlers,mqttControllers中获得对应的Method对象及Bean对象,从message信息中提取对应的messageId级messageBody并设置为使用了@MqttMessageBody
和@MqttMessageId
的注解的参数中,利用反射method.invoke(MqttHandlerFactory.getMqttController(mqttTopic), paramValues);
来实现方法的调用。
具体代码及注释如下:
@Overridepublic void onApplicationEvent(ContextRefreshedEvent event) {//...//设置mqConsumer实例化所需propertiesConsumer mqConsumer = ONSFactory.createConsumer(properties);Set<String> parentTopicSet = MqttHandlerFactory.getParentTopicSet();if (parentTopicSet.size() == 0) {log.warn("当前应用并未有任何topic订阅");}//根据parentTopic和subTopic订阅parentTopicSet.forEach(parentTopic -> {log.info("Add a new rocketMq subscription,topic:{}", parentTopic);mqConsumer.subscribe(parentTopic, "*", (message, context) -> {log.debug("MqReceive Message: " + message);//获得topicString mqttFirstTopic = message.getTopic();String mqttSecondTopic = message.getUserProperties(PropertyKeyConst.MqttSecondTopic);if (null == mqttSecondTopic) {//只有1级topic的情况mqttSecondTopic = "/";}if (!"/".equals(mqttSecondTopic.substring(mqttSecondTopic.length() - 1))) {mqttSecondTopic += "/";}String mqttTopic = mqttFirstTopic + mqttSecondTopic;Method method = MqttHandlerFactory.getMqttHandler(mqttTopic);if (null == method) {log.warn("当前没有处理该topic的handler,topic:{}", mqttTopic);return Action.CommitMessage;} else {//获得mqtt的一些数据String messageId = message.getUserProperties("UNIQ_KEY");String messageBody = new String(message.getBody());//处理入参Parameter[] parameters = method.getParameters();Object[] paramValues = new Object[parameters.length];for (int i = 0; i < parameters.length; i++) {if (parameters[i].isAnnotationPresent(MqttMessageId.class)) {//@MqttMessageId注解的参数paramValues[i] = messageId;} else if (parameters[i].isAnnotationPresent(MqttMessageBody.class)) {//@MqttMessageBody注解的参数Class parameterClazz = parameters[i].getType();try {paramValues[i] = JSONObject.parseObject(messageBody, parameterClazz);} catch (Exception e) {log.error("mqttMessageBody 格式错误,messageId:{},messageBody:{}", messageId, messageBody);// return Action.ReconsumeLater;return Action.CommitMessage;}} else {//自己定义的一些参数就给null把paramValues[i] = null;}}try {method.invoke(MqttHandlerFactory.getMqttController(mqttTopic), paramValues);} catch (Exception e) {log.error("处理失败啦");}}return Action.CommitMessage;});});mqConsumer.start();log.info("MqConsumer Started");}
四、MQTT同步调用的实现
MQTT协议是基于PUB/SUB的异步通信模式,不适用于服务端同步控制设备端返回结果的场景。通过制定一套请求和响应的同步机制,可以无需改动MQTT协议来达到同步调用的目的。
1. 整体实现思路
MQTT的同步调用实际上是使用了两个异步调用完成的,即生产者调用消费者的同时,自己也作为消费者等待某一队列的返回消息,消费者接受到生产者的消息同时,也作为消息发送者发送一消息给生产者。
具体同步调用机制示意如下:
首先服务端和设备端服务端都订阅了相关的Topic,服务端发起同步调用即发布一个示意需同步返回的message到指定request Topic,设备端接收到该message后处理完业务逻辑则会将调用结果发布一个返回message到request消息体中携带的response Topic中,最后服务端接收到设备端返回的message可以从消息体中获得其调用结果。
整个调用过程中客户端需要做的工作有:
- 订阅request Topic
- 收到消息,判断消息是否是同步消息,是的话则处理完业务逻辑后异步发送特别的response Topic
服务端需要做的工作有:
- 统一订阅特别的设备的response Topic
- 发送消息到特别的request Topic
- Future.get(timeout) 模式处理request和response的关系
- 异步超时请求线程处理(处理超时请求,30ms运行一次)
- 消息监控和异常补救
服务端处理异步为同步调用的逻辑借鉴了Dubbo底层将Netty的异步调用转化成同步的方式,下面在实现细节中会具体阐述。
2. 实现细节
MQTT同步调用的代码如下:
public String publishSync(String topic, String qos, boolean cleanSessionFlag, Object data, int timeout)throws MqttRemoteException {String parentTopic = topic.substring(0, topic.indexOf("/"));String subTopic = topic.substring(topic.indexOf("/"));String mId = UUID.randomUUID().toString().replaceAll("-", "");MqttMessage mqttMessage = new MqttMessage(mId, replyParentTopic + "/" + mId, data);Message msg = new Message(parentTopic, "", JSON.toJSONString(mqttMessage).getBytes());msg.putUserProperties(PropertyKeyConst.MqttSecondTopic, subTopic);msg.putUserProperties(PropertyKeyConst.MqttQOS, qos);msg.putUserProperties("cleansessionflag", "" + cleanSessionFlag);MqttFuture mqttFuture = new MqttFuture(mqttMessage, timeout);try {producer.send(msg);} catch (ONSClientException e) {mqttFuture.cancel();throw e;}return mqttFuture.get();
}
前部分就是正常RocketMQ发布MQTT消息的代码,但是发送的消息是自定义的MqttMessage,同时这边在调用producer.send(msg);
前先构建了一个MqttFuture对象然后发送完消息后使用mqttFuture.get();
来获得同步调用的结果。
MqttMessage定义如下,其中syncFlag
表示该消息是否需要同步返回,mId
表示该消息的唯一id,用于本地判断消息返回具体映射哪个同步调用的key,replyTopic
表示该消息需要返回消息的Topic,data
则是具体业务数据。
@Data
public class MqttMessage implements Serializable {private static final long serialVersionUID = 6648680154051903549L;/*** 是否需要同步返回,默认为true*/private boolean syncFlag = true;/*** 生成的id,用uuid生成把*/private final String mId;/*** 客户端返回消息的Topic*/private final String replyTopic;/*** 发送数据*/private Object data;public MqttMessage(String mId, String replyTopic, Object data) {this.mId = mId;this.replyTopic = replyTopic;this.data = data;}
}
MqttFuture对象则是用来处理同步调用的逻辑,每一个MqttFuture对象都有有一个mId作为唯一标识,发送的message消息体,调用返回结果MqttResponse(包括正常mqtt返回或者超时等异常返回结果)和同步调用超时时间,还有一个锁及其创建的Condition,用来处理线程的等待与通知,后面会对其具体逻辑进行分析。每创建一个新的MqttFuture对象都会将其放入到存储MqttFuture的Map中,key即为该消息的mId。
@Slf4j
@Data
public class MqttFuture {public static final int DEFAULT_TIMEOUT = 1000;public static final Map<String, MqttFuture> FUTURES = new ConcurrentHashMap<>();private final Lock lock = new ReentrantLock();private final Condition done = lock.newCondition();/*** 唯一id*/private final String mId;/*** 发送的message消息体*/private final MqttMessage message;/*** 设置的同步调用超时时间*/private final int timeout;/*** 等待开始时间*/private final long start = System.currentTimeMillis();/*** 返回结果*/private volatile MqttResponse response;public MqttFuture(MqttMessage message, int timeout) {this.message = message;this.timeout = timeout;this.mId = message.getMId();FUTURES.put(mId, this);}//...
调用返回结果MqttResponse定义如下,如果是正常成功返回则返回状态是OK且会有对应的消息体messageResult,如果超时或者其他异常情况则会返回对应的错误消息errorMessage。
@Data
public class MqttResponse {/*** ok状态,正常返回result,否则返回errorMessage*/public static final Integer OK = 20000;/*** 客户端超时未处理*/public static final Integer TIMEOUT = 40001;/*** 服务端主动取消*/public static final Integer CANCEL = 40002;private Integer mStatus = OK;/*** request生成的messageId*/private String mId;/*** 收到的消息体*/private String messageResult;/*** 状态不是成功返回的错误信息*/private String errorMessage;public MqttResponse(String mId) {this.mId = mId;}
}
在上述MQ同步调用代码中,若调用producer.send(msg);
同步发送mqtt消息失败的话,则会调用mqttFuture.cancel();
来取消该MqttFture对象,代码如下,这边主要是设置了异常的相应response避过将该MqttFuture对象从存储MqttFuture的Map中移除。
@Slf4j
@Data
public class MqttFuture {//...public void cancel() {MqttResponse errorResult = new MqttResponse(mId);errorResult.setMStatus(MqttResponse.CANCEL);errorResult.setErrorMessage("主动请求取消");response = errorResult;FUTURES.remove(mId);}//...
若调用producer.send(msg);
同步发送mqtt消息成功的话则会调用mqttFuture.get();
来获得其同步调用的结果,具体代码如下,首先判断是否调用已完成(有响应结果mqttResponse,包括正常获得返回结果或者超时等其他异常的情况),若完成的话则直接返回调用结果或者抛出相应的异常,若没有调用完成则获得锁并循环判断是否调用完成,没有的话则调用done.await(timeout, TimeUnit.MILLISECONDS);
来实现该线程的超时等待直至其他线程调用该Condition的signal方法将其唤醒。
@Slf4j
@Data
public class MqttFuture {//...public String get() throws MqttRemoteException {return this.get(timeout);}public String get(int timeout) throws MqttRemoteException {if (timeout <= 0) {timeout = DEFAULT_TIMEOUT;}if (!isDone()) {long start = System.currentTimeMillis();lock.lock();try {while (!isDone()) {done.await(timeout, TimeUnit.MILLISECONDS);if (isDone() || System.currentTimeMillis() - start > timeout) {break;}}} catch (InterruptedException e) {throw new RuntimeException(e);} finally {lock.unlock();}if (!isDone()) {throw new MqttTimeoutException("Waiting client-side response timeout");}}if (response == null) {throw new IllegalStateException("response cannot be null");}if (response.getMStatus().equals(MqttResponse.OK)) {return response.getMessageResult();}if (response.getMStatus().equals(MqttResponse.TIMEOUT)) {throw new MqttTimeoutException("Waiting client-side response timeout");}throw new MqttRemoteException(response.getErrorMessage());}/*** 判断是否有response结果** @return 是否返回结果*/public boolean isDone() {return response != null;}//...
将上述等待唤醒的代码如下,只需调用下面的received方法并传入相应的MqttResponse结果即可,该方法会在后面讲的判断调用超时和正常mqtt消息结果返回的情况中调用。
@Slf4j
@Data
public class MqttFuture {//...private void doReceived(MqttResponse res) {lock.lock();try {response = res;done.signal();} finally {lock.unlock();}}public static void received(MqttResponse response) {MqttFuture future = FUTURES.remove(response.getMId());if (future != null) {future.doReceived(response);} else {log.warn("The timeout response finally returned at " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) + ", response " + response);}}//...
后台会开启一个线程来扫描超时任务,主要是遍历上述存储MqttFuture的Map,如果对应的MqttFuture并没有完成(获得对应的response)且调用时间已超过设置的超时时间,则设置一个超时异常的MqttResponse并调用MqttFuture.received(timeoutResponse);
来唤醒上述代码中的done.await(timeout, TimeUnit.MILLISECONDS);
的线程等待。该线程每30s会遍历一个存储MqttFuture的Map。
@Slf4j
@Data
public class MqttFuture {//...private static class RemotingInvocationTimeoutScan implements Runnable {@Overridepublic void run() {while (true) {try {for (MqttFuture future : FUTURES.values()) {if (future == null || future.isDone()) {continue;}if (System.currentTimeMillis() - future.getStartTimestamp() > future.getTimeout()) {//当前mqtt请求已超时MqttResponse timeoutResponse = new MqttResponse(future.getMId());timeoutResponse.setMStatus(MqttResponse.TIMEOUT);MqttFuture.received(timeoutResponse);}}//每30ms扫一次Thread.sleep(30);} catch (Throwable e) {log.error("Exception when scan the timeout invocation of remoting.", e);}}}}static {Thread th = new Thread(new RemotingInvocationTimeoutScan(), "MqttResponseTimeoutScanTimer");th.setDaemon(true);th.start();}//...
为了处理正常的mqtt的消息返回,除了使用上一节中讲到的自定义注解MqttController
来订阅相关的Topic,还需要订阅特殊的一个Topic来处理mqtt同步调用的返回消息,订阅代码如下,根据收到的mId和messageBody设置对应的MqttResponse并调用MqttFuture.received(response);
来唤醒上述代码中的done.await(timeout, TimeUnit.MILLISECONDS);
的线程等待。
//订阅伪同步请求回复
log.info("Add a new rocketMq subscription,topic:{}", replyParentTopic);
mqConsumer.subscribe(replyParentTopic, "*", (message, context) -> {String mqttSecondTopic = message.getUserProperties(PropertyKeyConst.MqttSecondTopic);if (null == mqttSecondTopic) {mqttSecondTopic = "";}String mId = mqttSecondTopic.replaceAll("/", "");String messageBody = new String(message.getBody());MqttResponse response = new MqttResponse(mId);response.setMessageResult(messageBody);MqttFuture.received(response);return Action.CommitMessage;
});
五、demo使用
1. 项目结构
由于使用了springboot框架来实现该demo,所以项目结构如下:
其中mqtt工具包目录如下:
2. mqtt工具包的使用
2.1 在yml配置中添加相关配置
配置示例如下,其中xxx改为自己使用的即可
ali:mqtt:accessKey: xxxsecretKey: xxxgroupId: xxxnamesrvAddr: xxxsendMsgTimeoutMillis: 3000#消费者线程固定位50个consumeThreadNums: 50
# 用于同步调用返回发送的topicreplyParentTopic: xxx
2.2 添加工具包中的MqttConfig
@Import({ MqttConfig.class})
@Configuration
public class MqttConfigure {}
2.3 自定义注解的使用
@Slf4j
@MqttController(parentTopic = "robot1")
public class MqttRobot1 {@MqttTopicMappingpublic void dealFirstTopic() {log.info("MqttRobot1.dealAlarm 收到消息啦,只处理了一级topic");}@MqttTopicMapping(subTopic = "alarm")public void dealAlarm(@MqttMessageId String messageId, @MqttMessageBody AlarmVo alarmVo) {log.info("MqttRobot1.dealAlarm 收到消息啦");log.info("messageId:{}", messageId);log.info("alarmVo:{}", alarmVo);}@MqttTopicMapping(subTopic = "task")public void dealTask() {log.info("MqttRobot1.dealTask 收到消息啦");}
}
2.4 测试同步调用,模拟MQTT客户端消息返回代码
mqtt客户端实现代码示例参考阿里云官方demo https://github.com/AliwareMQ/lmq-demo
其中xxx的地方都改成自己的即可,下面代码中mqttClient2.publish(replyTopic, message);
即将结果发送到replyTopic中。
public class MqttClientTest {public static void main(String[] args) throws Exception {String instanceId = "xxx";String endPoint = "xxx";String accessKey = "xxx";String secretKey = "xxx";String clientId = "xxx";final String parentTopic = "xxx";//这边需自定义mqtt客户端topic,final String mq4IotTopic = parentTopic + "/" + "xxx" + "/xxx";final int qosLevel = 0;ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey,clientId);final MemoryPersistence memoryPersistence = new MemoryPersistence();final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);final MqttClient mqttClient2 = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);/*** 客户端设置好发送超时时间,防止无限阻塞*/mqttClient.setTimeToWait(5000);final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());mqttClient.setCallback(new MqttCallbackExtended() {@Overridepublic void connectComplete(boolean reconnect, String serverURI) {/*** 客户端连接成功后就需要尽快订阅需要的 topic*/System.out.println("connect success");executorService.submit(new Runnable() {@Overridepublic void run() {try {final String topicFilter[] = { mq4IotTopic };final int[] qos = { qosLevel };mqttClient.subscribe(topicFilter, qos);} catch (MqttException e) {e.printStackTrace();}}});}@Overridepublic void connectionLost(Throwable throwable) {throwable.printStackTrace();}@Overridepublic void messageArrived(String s, MqttMessage mqttMessage) throws Exception {System.out.println("receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));JSONObject jsonObject = JSON.parseObject(new String(mqttMessage.getPayload()));String mId = jsonObject.getString("mId");String replyTopic = jsonObject.getString("replyTopic");String result = mId + "回复啦";MqttMessage message = new MqttMessage(result.getBytes());message.setQos(qosLevel);//这边会将结果发送到replyTopic中mqttClient2.publish(replyTopic, message);System.out.println("发送啦");}@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);}});mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());mqttClient2.connect(connectionOptionWrapper.getMqttConnectOptions());Thread.sleep(Long.MAX_VALUE);}
}
3. demo体验
启动项目可以在控制台看到有如下日志:
从日志中可以看出程序自动处理了自定义注解的Mqtt消息处理的映射,并根据mqtt的1级Topic进行了RocketMQ的相关订阅。
提供了一个测试MQTT消息简单发送接口如下:
@GetMapping("/publish")
public String doPublish(@RequestParam("topic") String topic, @RequestParam("message") String message) {try {return mqttClient.publish(topic, message);} catch (ONSClientException e) {return e.getMessage();}
}
使用 http://localhost:8080/mqtt/publish?topic=robot1/alarm&message={id:"1",code:"heheh"}来调用该接口。
可以在控制台看到如下日志打印:
由于本demo中使用自定义注解订阅了该Topic,所以调用该接口发送消息之后也会被本demo成功接收,并分发到对应的处理函数中,因此调用了该接口后可以在控制台看到如上日志打印,可以看到MQTT消息成功发布,订阅到该消息并实现了MQTT消息的处理分发。
提供了一个测试MQTT同步调用的接口如下:
@GetMapping("/publish-sync")public String publishSync(@RequestParam("topic") String topic, @RequestParam("message") String message) {try {return mqttClient.publishSync(topic, message, 5000);} catch (MqttRemoteException e) {return e.getMessage();}
}
使用http://localhost:8080/mqtt/publish-sync?topic=robot1/alarm/GID_ROBOT@@@DEVICEID_001&message=hehehe来调用接口。
可以看到下图的结果:
这是MQTT同步调用超时的情况,因为此时还没有开启对应的MQTT客户端,因此发送的MQTT消息并没有客户端进行回应,所以出现了调用超时的情况,如果运行上述模拟MQTT客户端消息返回的代码后再次调用该接口,可以看到同步调用成功返回了对应的结果,如图所示。
源码地址如下,仅供学习参考
DavidDingXu/panda-mqttgithub.com六、参考
- 阿里云微消息队列 MQTT帮助文档
- 阿里云消息队列 RocketMQ帮助文档
本文原创,欢迎转载,转载请注明出处,如有不正确的地方恳请各位看官指正。