项目中用到的MQTT物联网通信协议,记录一下工具类,方便翻阅
用到的依赖:
<!--mqtt--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-stream</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency><!--分页插件--><!-- https://mvnrepository.com/artifact/com.github.pagehelper/pagehelper-spring-boot-starter --><dependency><groupId>com.github.pagehelper</groupId><artifactId>pagehelper-spring-boot-starter</artifactId><version>1.4.1</version></dependency><dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpclient</artifactId><version>4.5.2</version></dependency>
MqttPushClient.Java
package com.youming.client.equipment.configure;import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;/*** 发布连接类* @author wfeil211@foxmail.com* @date 2023-2-11 20:11:46*/
@Slf4j
@Component
public class MqttPushClient {@Lazy@Autowiredprivate PushCallback pushCallback;private static MqttClient client;public static void setClient(MqttClient client) {MqttPushClient.client = client;}public static MqttClient getClient() {return client;}public void connect(String host, String clientID, String username, String password, int timeout, int keepalive) {MqttClient client;try {client = new MqttClient(host, clientID, new MemoryPersistence());MqttConnectOptions options = new MqttConnectOptions();options.setCleanSession(true);options.setUserName(username);options.setPassword(password.toCharArray());options.setConnectionTimeout(timeout);options.setKeepAliveInterval(keepalive);// automaticReconnect 为 true 表示断线自动重连,但仅仅只是重新连接,并不订阅主题;在 connectComplete 回调函数重新订阅options.setAutomaticReconnect(true);MqttPushClient.setClient(client);try {//设置回调类client.setCallback(pushCallback);//client.connect(options);IMqttToken iMqttToken = client.connectWithResult(options);boolean complete = iMqttToken.isComplete();log.info("MQTT连接" + (complete ? "成功" : "失败"));} catch (Exception e) {e.printStackTrace();}} catch (Exception e) {e.printStackTrace();}}/*** 发布,默认qos为0,非持久化** @param topic 主题名* @param pushMessage 消息*/public void publish(String topic, String pushMessage) {publish(0, false, topic, pushMessage);}/*** 发布** @param qos* @param retained* @param topic* @param pushMessage*/public void publish(int qos, boolean retained, String topic, String pushMessage) {MqttMessage message = new MqttMessage();message.setQos(qos);message.setRetained(retained);message.setPayload(pushMessage.getBytes());MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic);log.info("发送消息至"+topic+",消息内容:"+pushMessage);if (null == mTopic) {log.error("主题不存在:{}", mTopic);}try {mTopic.publish(message);} catch (Exception e) {log.error("mqtt发送消息异常:", e);}}}
MqttSubClient.Java
package com.youming.client.equipment.configure;import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.stereotype.Component;/*** 订阅类* @author wfeil211@foxmail.com* @date 2023-2-11 20:11:46*/
@Slf4j
@Component
public class MqttSubClient {public void subScribeDataPublishTopic(String defaultTopic) {//订阅test_queue主题String mqtt_topic[] = defaultTopic.split(",");for (int i = 0; i < mqtt_topic.length; i++) {subscribe(mqtt_topic[i], 0);//订阅主题}}/*** 订阅某个主题,qos默认为0** @param topic*/public void subscribe(String topic) {subscribe(topic, 0);}/*** 订阅某个主题** @param topic 主题名* @param qos*/public void subscribe(String topic, int qos) {try {MqttClient client = MqttPushClient.getClient();if (client == null) return;client.subscribe(topic, qos);log.info("订阅主题:{}", topic);} catch (MqttException e) {e.printStackTrace();}}}
MqttConfig .java
package com.youming.client.equipment.configure;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;/*** 配置类* @author wfeil211@foxmail.com* @date 2023-2-11 20:11:46*/
@Component
@ConfigurationProperties("spring.mqtt")
public class MqttConfig {@Lazy@Autowiredprivate MqttPushClient mqttPushClient;@Lazy@Autowiredprivate MqttSubClient mqttSubClient;/*** 用户名*/private String username;/*** 密码*/private String password;/*** 连接地址*/private String hostUrl;/*** 客户Id*/private String clientId;/*** 默认连接话题*/private String defaultTopic;/*** 超时时间*/private int timeout;/*** 保持连接数*/private int keepalive;public String getUsername() {return username;}public void setUsername(String username) {this.username = username;}public String getPassword() {return password;}public void setPassword(String password) {this.password = password;}public String getHostUrl() {return hostUrl;}public void setHostUrl(String hostUrl) {this.hostUrl = hostUrl;}public String getClientId() {return clientId;}public void setClientId(String clientId) {this.clientId = clientId;}public String getDefaultTopic() {return defaultTopic;}public void setDefaultTopic(String defaultTopic) {this.defaultTopic = defaultTopic;}public int getTimeout() {return timeout;}public void setTimeout(int timeout) {this.timeout = timeout;}public int getKeepalive() {return keepalive;}public void setKeepalive(int keepalive) {this.keepalive = keepalive;}/*** 连接至mqtt服务器,获取mqtt连接** @return*/@Beanpublic MqttPushClient getMqttPushClient() {//连接至mqtt服务器,获取mqtt连接mqttPushClient.connect(hostUrl, clientId, username, password, timeout, keepalive);//一连接mqtt,就订阅默认需要订阅的主题(如test_queue)mqttSubClient.subScribeDataPublishTopic(defaultTopic);return mqttPushClient;}}
PushCallback.Java
package com.youming.client.equipment.configure;import com.youming.client.equipment.api.IEquipmentDataHisService;
import com.youming.client.equipment.api.IEquipmentDataService;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;import java.io.UnsupportedEncodingException;
import java.util.concurrent.TimeUnit;/*** 回调类* @author wfeil211@foxmail.com* @date 2023-2-11 20:11:46*/
@Slf4j
@Component
public class PushCallback implements MqttCallback {@Lazy@Autowiredprivate MqttConfig mqttConfig;@Lazy@Autowiredprivate MqttSubClient mqttSubClient;@Autowiredprivate IEquipmentDataHisService equipmentDataHisService;@Autowiredprivate IEquipmentDataService equipmentDataService;@SneakyThrows@Overridepublic void connectionLost(Throwable cause) {// 连接丢失后,一般在这里面进行重连log.info("连接断开,正在重连");MqttClient client = MqttPushClient.getClient();while (true){if(client.isConnected()){log.info("重连成功");mqttSubClient.subScribeDataPublishTopic(mqttConfig.getDefaultTopic());return;}client.reconnect();try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {e.printStackTrace();}}}/*** 发送消息,消息到达后处理方法** @param token*/@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {log.info("deliveryComplete---------" + token.isComplete());}/*** 订阅主题接收到消息处理方法** @param topic* @param message*/@Overridepublic void messageArrived(String topic, MqttMessage message) throws UnsupportedEncodingException {// subscribe后得到的消息会执行到这里面,这里在控制台有输出log.info("接收消息主题 : " + topic);log.info("接收消息Qos : " + message.getQos());byte[] bytesData = new byte[message.getPayload().length];for (int i = 0; i < message.getPayload().length; i++) {bytesData[i] = (byte) message.getPayload()[i];}String frame = javax.xml.bind.DatatypeConverter.printHexBinary(bytesData).toLowerCase();log.info("接收消息内容 : " + frame);if(frame.length() == 96){equipmentDataService.asyncSaveOrUpdateEquipment(frame);}if(frame.length() == 74){equipmentDataHisService.asyncSaveOrUpdateEquipmentHis(frame);}}}