1.工作及使用背景
工作中需要跟收集各种硬件或传感器数据用于Web展示及统计计算分析,如电表、流量计、泵、控制器等物联网设备。
目前的思路及解决策略是使用力控或者杰控等组态软件实现数据的转储(也会涉及收费问题),通过组态软件自带的转储工具将数据转储到关系型数据库,如MySQL、sqlLite、Postgresql等。然后在BS架构后台程序中通过定时刷数据或者查询时计算的方式进行统计分析计算。
但上述解决方案实际上是实现简单,但是数据统计时机有潜在的偏差风险,且逻辑设计非常别扭,数据库压力大等问题,理论上应该通过消息队列来接收实时数据参与计算的方式,Web系统只负责展示计算统计之后的结果,这样无论是时效还是数据准确性更容易保证,实时数据存储的数据库压力也不存在(可做数据校验用,也可不用),逻辑也不显别扭。
2.开发环境及工具
JDK1.8、maven、Mosquitto、IDEA、postman
3.框架结构及文件声明
因为我用的现成的框架,所以启动模块和业务模块分开了。实际开发调试中完全可以放一起也没关系。
MqttClientConnectorPool | 对外提供一个初始化的Mqtt客户端,在服务启动时初始化 |
MqttMsgSender | 对外提供一个可以执行消息发送的方法 |
MqttMsgSubscriber | 初始化一个Mqtt客户端,并根据配置订阅topic |
TestController | 接收web请求的调用消息发送,用于测试 |
BusinessApplicationStartup | 服务启动时执行,调用MqttClientConnectorPool初始化一个客户端并调起MqttMsgSubscriber的监听等待 |
BusinessApplicationShutdown | 服务正常终止时调用,关闭服务启动默认创建的Mqtt客户端 |
MqttBrokerServer | SpringBoot服务启动类 |
4.具体实现逻辑及代码
4.1 maven依赖
<properties><MQTTv3.version>1.2.5</MQTTv3.version>
</properties><dependencyManagement><dependencies><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.MQTTv3</artifactId><version>${MQTTv3.version}</version></dependency></dependencies>
</dependencyManagement>或者直接使用
<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.MQTTv3</artifactId><version>1.2.5</version>
</dependency>
4.2 MqttClientConnectorPool
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;@Slf4j
public class MqttClientConnectorPool {public static MqttClient mqttClient;/*** 连接MQTT客户端* @return 获取MQTT连队对象*/public static MqttClient connectMQTT() {if (mqttClient != null){log.info("已存在,我深深的脑海!");return mqttClient;}try {// broker及连接信息String broker = "tcp://127.0.0.1:1883";String username = "admin";String password = "123456";String clientId = System.currentTimeMillis() + "";//创建MQTT客户端(指定broker、客户端id、消息持久策略)mqttClient = new MqttClient(broker, clientId, new MemoryPersistence());//创建连接参数配置MqttConnectOptions options = new MqttConnectOptions();options.setUserName(username);options.setPassword(password.toCharArray());//是否清除会话options.setCleanSession(true);//连接超时时间options.setKeepAliveInterval(20);//是否自动重连options.setAutomaticReconnect(true);mqttClient.connect(options);log.info("MqttClient 服务启动broker初始化!");} catch (MqttException e){log.error("MqttClient connect Error:{}", e.getMessage());e.printStackTrace();}return mqttClient;}/*** 关闭MQTT客户端* @param client client*/public static void closeClient(MqttClient client){try {// 断开连接client.disconnect();// 关闭客户端client.close();} catch (MqttException e){log.error("MqttClient disconnect or close Error:{}", e.getMessage());e.printStackTrace();}}/*** 关闭MQTT客户端*/public static void closeStaticClient(){try {if (mqttClient != null){// 断开连接mqttClient.disconnect();// 关闭客户端mqttClient.close();}} catch (MqttException e){log.error("MqttClient disconnect or close Error:{}", e.getMessage());e.printStackTrace();}}
}
4.3 MqttMsgSender
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;@Slf4j
public class MqttMsgSender {public void sendMessage(MqttClient client,String topic,String content,int qos){MqttMessage message = new MqttMessage(content.getBytes());message.setQos(qos);try{client.publish(topic,message);} catch (MqttException e){log.error("MqttClient publish text info Error:{}!", e.getMessage());e.printStackTrace();}}
}
4.4 MqttMsgSubscriber
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;@Slf4j
public class MqttMsgSubscriber {String broker = "tcp://127.0.0.1:1883";String topic = "/deviceUp";String username = "admin";String password = "123456";String clientId = System.currentTimeMillis() + "";int qos = 1;public void readSubscribeTopicMessage(){try {MqttClient client = new MqttClient(broker, clientId, new MemoryPersistence());// 连接参数MqttConnectOptions options = new MqttConnectOptions();options.setUserName(username);options.setPassword(password.toCharArray());//是否清除会话options.setCleanSession(true);options.setConnectionTimeout(60);options.setKeepAliveInterval(60);client.setCallback(new MqttCallback() {@Overridepublic void connectionLost(Throwable throwable) {log.error("连接丢失");}@Overridepublic void messageArrived(String s, MqttMessage mqttMessage) throws Exception {log.info("topic为: " + topic);log.info("qos为: " + mqttMessage.getQos());log.info("消息内容为: " + new String(mqttMessage.getPayload()));}@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {// 当消息被完全传送出去后调用log.info("交付完成 ---Delivery complete!");// 可以在这里处理一些发送完成后的清理工作}});client.connect(options);client.subscribe(topic, qos);} catch (MqttException e){log.error("MqttMsgSubscriber 连接启动异常:{}", e.getMessage());} catch (Exception e){log.error("MqttMsgSubscriber 读取消息异常:{}", e.getMessage());}}}
4.5 TestController
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.springframework.web.bind.annotation.*;import java.util.List;@Slf4j
@RestController
@RequestMapping()
public class TestController {@GetMapping("/test/mqtt/{msg}")public String testSendMqttMsg(@PathVariable("msg") String msg){log.info("消息内容:{}.", msg);MqttClient mqttClient = MqttClientConnectorPool.connectMQTT();MqttMsgSender sender = new MqttMsgSender();String content = "{" + " \"deviceNo\": \"" + msg + "\"," + " \"val\": 232.5" + "}";String topic = "/deviceUp";int qos = 1;if (null != mqttClient){sender.sendMessage(mqttClient, topic, content, qos);} else {log.info("MqttClient为空,无法发送!");return "失败!";}return "成功!";}}
4.6 BusinessApplicationStartup
import 包路径(可以删掉这一行手动导入).MqttClientConnectorPool;
import 包路径(可以删掉这一行手动导入).MqttMsgSubscriber;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;@Slf4j
@Order(10)
@Component
public class BusinessApplicationStartup implements ApplicationRunner {@Overridepublic void run(ApplicationArguments args) throws Exception {log.info("MqttClientConnectorPool ===================== Startup");MqttClientConnectorPool.connectMQTT();log.info("MqttClientConnectorPool ===================== recoveryAllJob Over !");log.info("MqttMsgSubscriber ===================== Startup");// 先订阅等待MqttMsgSubscriber subscriber = new MqttMsgSubscriber();subscriber.readSubscribeTopicMessage();}
}
4.7 BusinessApplicationShutdown
import 包路径(可以删掉这一行手动导入).MqttClientConnectorPool;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class BusinessApplicationShutdown implements ApplicationListener<ContextClosedEvent> {@Overridepublic void onApplicationEvent(ContextClosedEvent contextClosedEvent) {log.info("服务终止! shutdown hook, ContextClosedEvent");MqttClientConnectorPool.closeStaticClient();}}
4.8 MqttBrokerServer
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;@EnableScheduling
@SpringBootApplication
public class MqttBrokerServer {public static void main(String[] args) {SpringApplication.run(MqttBrokerServer.class, args);}}
5.其他备注
5.1 需要Mqtt(Broker)服务器
如果是直接使用示例代码的Mqtt服务器(Broker)配置,需要在自己电脑上安装Mqtt服务器,如mosquitto、EMQX等,具体自行搜索,或者使用公用的Mqtt服务器(我没测试试过)
// 📢注意,当前Broker本人未测试
String broker = "tcp://broker.emqx.io:1883";
String topic = "mqtt/test";
String username = "emqx";
String password = "public";
5.2 调试地址
如果配置文件没配置[server.servlet.context-path],就不需要我自己/backend这一段
6.参考文章
MQTT协议介绍及Java教程
https://baijiahao.baidu.com/s?id=1801542244354727565&wfr=spider&for=pc
7.喜欢作者
暂无