EMQX安装完成后,需要搭建客户端进行接收数据进一步对数据处理,下面介绍基于若依分离版开源框架来整合EMQX方法。
1.application.yml 添加代码
mqtt:hostUrl: tcp://localhost:1883username: devpassword: devclient-id: MQTT-CLIENT-DEVcleanSession: truereconnect: truetimeout: 100keepAlive: 100defaultTopic: client/dev/reportserverTopic: server/dev/reportisOpen: trueqos: 0
2.pom.xml 引入依赖
<!-- mqtt --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-stream</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency><!--配置文件报错问题--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-configuration-processor</artifactId><optional>true</optional></dependency><!--lombok--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.22</version><scope>provided</scope></dependency>
3.新建 MqttAcceptCallback
package com.ruoyi.iot.mqtt;import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** @Description : MQTT接受服务的回调类* @Author : lsyong* @Date : 2023/8/1 16:29*/@Component
public class MqttAcceptCallback implements MqttCallbackExtended {private static final Logger logger = LoggerFactory.getLogger(MqttAcceptCallback.class);@Autowiredprivate MqttAcceptClient mqttAcceptClient;@Autowiredprivate MqttProperties mqttProperties;/*** 客户端断开后触发** @param throwable*/@Overridepublic void connectionLost(Throwable throwable) {logger.info("连接断开,可以重连");if (MqttAcceptClient.client == null || !MqttAcceptClient.client.isConnected()) {logger.info("【emqx重新连接】....................................................");mqttAcceptClient.reconnection();}}/*** 客户端收到消息触发** @param topic 主题* @param mqttMessage 消息*/@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {logger.info("【接收消息主题】:" + topic);logger.info("【接收消息Qos】:" + mqttMessage.getQos());logger.info("【接收消息内容】:" + new String(mqttMessage.getPayload()));// int i = 1/0;}/*** 发布消息成功** @param token token*/@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {String[] topics = token.getTopics();for (String topic : topics) {logger.info("向主题【" + topic + "】发送消息成功!");}try {MqttMessage message = token.getMessage();byte[] payload = message.getPayload();String s = new String(payload, "UTF-8");logger.info("【消息内容】:" + s);} catch (Exception e) {logger.error("MqttAcceptCallback deliveryComplete error,message:{}", e.getMessage());e.printStackTrace();}}/*** 连接emq服务器后触发** @param b* @param s*/@Overridepublic void connectComplete(boolean b, String s) {logger.info("============================= 客户端【" + MqttAcceptClient.client.getClientId() + "】连接成功!=============================");// 以/#结尾表示订阅所有以test开头的主题// 订阅所有机构主题mqttAcceptClient.subscribe(mqttProperties.getDefaultTopic(), 0);}
}
4.新建 MqttAcceptClient
package com.ruoyi.iot.mqtt;import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** @Description : MQTT接受服务的客户端* @Author : lsyong* @Date : 2023/8/1 16:26*/
@Component
public class MqttAcceptClient {private static final Logger logger = LoggerFactory.getLogger(MqttAcceptClient.class);@Autowiredprivate MqttAcceptCallback mqttAcceptCallback;@Autowiredprivate MqttProperties mqttProperties;public static MqttClient client;private static MqttClient getClient() {return client;}private static void setClient(MqttClient client) {MqttAcceptClient.client = client;}/*** 客户端连接*/public void connect() {MqttClient client;try {client = new MqttClient(mqttProperties.getHostUrl(), mqttProperties.getClientId(),new MemoryPersistence());MqttConnectOptions options = new MqttConnectOptions();options.setUserName(mqttProperties.getUsername());options.setPassword(mqttProperties.getPassword().toCharArray());options.setConnectionTimeout(mqttProperties.getTimeout());options.setKeepAliveInterval(mqttProperties.getKeepAlive());options.setAutomaticReconnect(mqttProperties.getReconnect());options.setCleanSession(mqttProperties.getCleanSession());MqttAcceptClient.setClient(client);// 设置回调client.setCallback(mqttAcceptCallback);client.connect(options);} catch (Exception e) {logger.error("MqttAcceptClient connect error,message:{}", e.getMessage());e.printStackTrace();}}/*** 重新连接*/public void reconnection() {try {client.connect();} catch (MqttException e) {logger.error("MqttAcceptClient reconnection error,message:{}", e.getMessage());e.printStackTrace();}}/*** 订阅某个主题** @param topic 主题* @param qos 连接方式*/public void subscribe(String topic, int qos) {logger.info("========================【开始订阅主题:" + topic + "】========================");try {client.subscribe(topic, qos);} catch (MqttException e) {logger.error("MqttAcceptClient subscribe error,message:{}", e.getMessage());e.printStackTrace();}}/*** 取消订阅某个主题** @param topic*/public void unsubscribe(String topic) {logger.info("========================【取消订阅主题:" + topic + "】========================");try {client.unsubscribe(topic);} catch (MqttException e) {logger.error("MqttAcceptClient unsubscribe error,message:{}", e.getMessage());e.printStackTrace();}}
}
5.新建 MqttCondition
package com.ruoyi.iot.mqtt;import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.context.annotation.Condition;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.core.env.Environment;
import org.springframework.core.type.AnnotatedTypeMetadata;/*** @Description : 自定义配置,通过这个配置,来控制启动项目的时候是否启动mqtt* @Author : lsyong* @Date : 2023/8/1 16:32*/
public class MqttCondition implements Condition {@Overridepublic boolean matches(ConditionContext context, AnnotatedTypeMetadata annotatedTypeMetadata) {//1、能获取到ioc使用的beanfactoryConfigurableListableBeanFactory beanFactory = context.getBeanFactory();//2、获取类加载器ClassLoader classLoader = context.getClassLoader();//3、获取当前环境信息Environment environment = context.getEnvironment();String isOpen = environment.getProperty("mqtt.isOpen");return Boolean.valueOf(isOpen);}
}
6.新建 MqttConfig
package com.ruoyi.iot.mqtt;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;/*** @Description : 启动服务的时候开启监听客户端* @Author : lsyong* @Date : 2023/8/1 16:35*/
@Configuration
public class MqttConfig {@Autowiredprivate MqttAcceptClient mqttAcceptClient;/*** 订阅mqtt** @return*/@Conditional(MqttCondition.class)@Beanpublic MqttAcceptClient getMqttPushClient() {mqttAcceptClient.connect();return mqttAcceptClient;}
}
7.新建 MqttProperties
package com.ruoyi.iot.mqtt;import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;/*** @Description : MQTT配置信息* @Author : lsyong* @Date : 2023/8/1 16:25*/
@Component
@ConfigurationProperties("mqtt")
@Data
public class MqttProperties {/*** 用户名*/private String username;/*** 密码*/private String password;/*** 连接地址*/private String hostUrl;/*** 客户端Id,同一台服务器下,不允许出现重复的客户端id*/private String clientId;/*** 默认连接主题,以/#结尾表示订阅所有以test开头的主题*/private String defaultTopic;/*** 默认服务器发送主题前缀,格式:server:${env}:report:${topic}*/private String serverTopic;/*** 超时时间*/private int timeout;/*** 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端* 发送个消息判断客户端是否在线,但这个方法并没有重连的机制*/private int keepAlive;/*** 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连* 接记录,这里设置为true表示每次连接到服务器都以新的身份连接*/private Boolean cleanSession;/*** 是否断线重连*/private Boolean reconnect;/*** 启动的时候是否关闭mqtt*/private Boolean isOpen;/*** 连接方式*/private Integer qos;/*** 获取默认主题,以/#结尾表示订阅所有以test开头的主题** @return*/public String getDefaultTopic() {return defaultTopic + "/#";}/*** 获取服务器发送主题,格式:server/${env}/report/${topic}** @param topic* @return*/public String getServerTopic(String topic) {return serverTopic + "/" + topic;}
}
8.新建 MqttSendCallBack
package com.ruoyi.iot.mqtt;import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;/*** @Description : MQTT发送客户端的回调类* @Author : lsyong* @Date : 2023/8/1 16:31*/
@Component
public class MqttSendCallBack implements MqttCallbackExtended {private static final Logger logger = LoggerFactory.getLogger(MqttSendCallBack.class);/*** 客户端断开后触发** @param throwable*/@Overridepublic void connectionLost(Throwable throwable) {logger.info("连接断开,可以重连");}/*** 客户端收到消息触发** @param topic 主题* @param mqttMessage 消息*/@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {logger.info("【接收消息主题】: " + topic);logger.info("【接收消息Qos】: " + mqttMessage.getQos());logger.info("【接收消息内容】: " + new String(mqttMessage.getPayload()));}/*** 发布消息成功** @param token token*/@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {String[] topics = token.getTopics();for (String topic : topics) {logger.info("向主题【" + topic + "】发送消息成功!");}try {MqttMessage message = token.getMessage();byte[] payload = message.getPayload();String s = new String(payload, "UTF-8");logger.info("【消息内容】:" + s);} catch (Exception e) {logger.error("MqttSendCallBack deliveryComplete error,message:{}", e.getMessage());e.printStackTrace();}}/*** 连接emq服务器后触发** @param b* @param s*/@Overridepublic void connectComplete(boolean b, String s) {logger.info("============================= 客户端【" + MqttAcceptClient.client.getClientId() + "】连接成功!=============================");}
}
9.新建 MqttSendClient
package com.ruoyi.iot.mqtt;import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.UUID;/*** @Description : MQTT发送客户端* @Author : lsyong* @Date : 2023/8/1 16:30*/
@Component
public class MqttSendClient {private static final Logger logger = LoggerFactory.getLogger(MqttSendClient.class);@Autowiredprivate MqttSendCallBack mqttSendCallBack;@Autowiredprivate MqttProperties mqttProperties;public MqttClient connect() {MqttClient client = null;try {String uuid = UUID.randomUUID().toString().replaceAll("-", "");client = new MqttClient(mqttProperties.getHostUrl(), uuid, new MemoryPersistence());MqttConnectOptions options = new MqttConnectOptions();options.setUserName(mqttProperties.getUsername());options.setPassword(mqttProperties.getPassword().toCharArray());options.setConnectionTimeout(mqttProperties.getTimeout());options.setKeepAliveInterval(mqttProperties.getKeepAlive());options.setCleanSession(true);options.setAutomaticReconnect(false);// 设置回调client.setCallback(mqttSendCallBack);client.connect(options);} catch (Exception e) {logger.error("MqttSendClient connect error,message:{}", e.getMessage());e.printStackTrace();}return client;}/*** 发布消息** @param retained 是否保留* @param topic 主题,格式: server:${env}:report:${topic}* @param content 消息内容*/public void publish(boolean retained, String topic, String content) {MqttMessage message = new MqttMessage();message.setQos(mqttProperties.getQos());message.setRetained(retained);message.setPayload(content.getBytes());MqttDeliveryToken token;MqttClient mqttClient = connect();try {mqttClient.publish(mqttProperties.getServerTopic(topic), message);} catch (MqttException e) {logger.error("MqttSendClient publish error,message:{}", e.getMessage());e.printStackTrace();} finally {disconnect(mqttClient);close(mqttClient);}}/*** 关闭连接** @param mqttClient*/public static void disconnect(MqttClient mqttClient) {try {if (mqttClient != null)mqttClient.disconnect();} catch (MqttException e) {logger.error("MqttSendClient disconnect error,message:{}", e.getMessage());e.printStackTrace();}}/*** 释放资源** @param mqttClient*/public static void close(MqttClient mqttClient) {try {if (mqttClient != null)mqttClient.close();} catch (MqttException e) {logger.error("MqttSendClient close error,message:{}", e.getMessage());e.printStackTrace();}}
}
10.新建测试类 MqttController
package com.ruoyi.iot.mqtt;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** @Description : 测试类* @Author : lsyong* @Date : 2023/8/1 16:35*/
@RestController
@RequestMapping("/mqtt")
public class MqttController {@Autowiredprivate MqttSendClient mqttSendClient;@GetMapping(value = "/publishTopic")public String publishTopic(String topic, String sendMessage) {System.out.println("topic:" + topic);System.out.println("message:" + sendMessage);this.mqttSendClient.publish(false, topic, sendMessage);return "topic:" + topic + "\nmessage:" + sendMessage;}}
放开测试类的访问权限,在com.ruoyi.framework.config 路径下的 SecurityConfig 类中添加如下代码
.antMatchers("/mqtt/**").permitAll()
11.启动项目进行测试
如果连接不上,确认emqx是否启动成功,详细可以查看Windows安装EMQX(搭建MQTT服务)-CSDN博客
连接成功后可以登入EMQX去查看
浏览器访问 http://localhost:8080/mqtt/publishTopic?sendMessage=你好啊
控制台打印