1. MQTT简介
MQTT是一种物联网消息协议,为Message Queuing Telemetry Transport的缩写,即消息队列传输探测,协议基于发布订阅模式进行通信,有开销低、带宽小、轻量的特点,通常应用在物联网数据采集、移动应用、智能硬件、电力、能源等领域。
相关概念
三种身份:
- 客户端(Client):MQTT 客户端是发送和接收消息的应用程序。
- 服务器(Broker):也叫“代理”,服务器是处理消息的应用程序,位于发布者和订阅者中间,负责接收消息,并按照某种规则发送给订阅者。
- 主题(Topic): 主题是消息的标识符,用于区分不同类型的消息。
MQTT 消息
MQTT传输的消息可以分为:主题(topic)和负载(payload)两部分
- 主题,可以理解为消息的类型
- 负载,可以理解为消息的内容
消息服务质量QoS(Quality of Service)
Qos用于保证在不同的网络环境下消息传递的可靠性,分为3个等级
- 0 消息最多传递一次,消息发布完全依赖底层TCP/IP网络,可能会发生消息丢失, 也就是发出去就不管了,也被叫做“即发即弃”
- 1 消息传递至少 1 次,确保消息到达,但消息重复可能会发生,发送者将会存储发送的信息直到发送者收到一次来自接收者的PUBACK格式的应答。
- 2 消息仅传送一次,确保消息到达一次
2. SpringBoot集成Mqtt
Spring集成Mqtt常用的有两种方式,一种是直接使用Mqtt的客户端库,如Eclipse Paho
,另外一种是spring integration mqtt
第一种:使用Mqtt客户端库
依赖引入:org.eclipse.paho.client.mqttv3
<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.0</version>
</dependency>
服务端配置
public class MqttSendMsgService {private static String clientId = "test";private static String username = "admin";private static String password = "xxxxxx";private static String broker = "tcp://xxxxx:1883";public ReturnT<String> mqttSend(String param) {MqttClient client;try {client = new MqttClient(broker, clientId, new MemoryPersistence());client.setCallback(new MqttCallback() {public void connectionLost(Throwable cause) {System.out.println("Connection lost: " + cause.getMessage());}@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {System.out.println("Message arrived: " + mqttMessage.getPayload());}@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {System.out.println("Delivery complete");}});MqttConnectOptions connOpts = new MqttConnectOptions();connOpts.setUserName(username);connOpts.setPassword(password.toCharArray());client.connect(connOpts);log.info("Connected to MQTT Broker!");//主题String topic="test/simple";//消息String content="发送测试";MqttMessage message = new MqttMessage();message.setQos(1);message.setRetained(false);message.setPayload(content.getBytes());//消息发送client.publish(topic,message);} catch (MqttException e) {e.printStackTrace();}return ReturnT.SUCCESS;}
}
上面这种使用起来比较简单,生产环境使用最多的还是下面这种
第二种:使用 Spring integration进行集成,这里以发送消息为例
依赖引入
<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId><version>5.5.14</version>
</dependency>
添加yaml配置
mqtt.url = tcp://xxxxx:1883
mqtt.username = admin
mqtt.password = 123456
mqtt.clientId = test
mqtt.defaultTopic = /test/send
mqtt.keepAliveInterval = 60
mqtt.automaticReconnect = true
mqtt.cleanSession = false
mqtt.connectionTimeout = 30
mqtt.maxInflight = 1024
添加对应的属性配置类
@Component
public class MqttConfigProperties {@Value("${mqtt.url}")private String url;@Value("${mqtt.username}")private String username;@Value("${mqtt.password}")private String password;@Value("${mqtt.clientId}")private String clientId;@Value("${mqtt.defaultTopic}")private String defaultTopic;@Value("${mqtt.keepAliveInterval}")private Integer keepAliveInterval;@Value("${mqtt.automaticReconnect}")private Boolean automaticReconnect;@Value("${mqtt.cleanSession}")private Boolean cleanSession;@Value("${mqtt.connectionTimeout}")private Integer connectionTimeout;@Value("${mqtt.maxInflight}")private Integer maxInflight;
}
创建客户端配置类
@Configuration
@IntegrationComponentScan
public class MqttConfig {@Autowiredprivate MqttConfigProperties mqttConfigProperties;@Beanpublic MqttConnectOptions mqttConnectOptions() {log.info("初始化mqtt信息{}", JSON.toJSON(mqttConfigProperties));MqttConnectOptions options = new MqttConnectOptions();options.setUserName(mqttConfigProperties.getUsername());options.setPassword(mqttConfigProperties.getPassword().toCharArray());options.setServerURIs(new String[]{mqttConfigProperties.getUrl()});options.setKeepAliveInterval(mqttConfigProperties.getKeepAliveInterval());options.setAutomaticReconnect(mqttConfigProperties.getAutomaticReconnect());options.setCleanSession(mqttConfigProperties.getCleanSession());options.setConnectionTimeout(mqttConfigProperties.getConnectionTimeout());options.setMaxInflight(mqttConfigProperties.getMaxInflight());return options;}@Beanpublic MqttPahoClientFactory mqttPahoClientFactory(MqttConnectOptions mqttConnectOptions) {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();factory.setConnectionOptions(mqttConnectOptions);return factory;}// 推送通道@Beanpublic MessageChannel mqttOutputChannel() {return new DirectChannel();}@Bean@ServiceActivator(inputChannel = "mqttOutputChannel")public MessageHandler sendHandler(MqttPahoClientFactory mqttPahoClientFactory) {MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttConfigProperties.getClientId() + "-publish", mqttPahoClientFactory);messageHandler.setAsync(true);messageHandler.setDefaultQos(1);messageHandler.setDefaultTopic(mqttConfigProperties.getDefaultTopic());log.info("初始化mqttOutputChannel...");return messageHandler;}}
发送网关接口
@MessagingGateway(defaultRequestChannel = "mqttOutputChannel")
public interface MqttGateway {/*** 发送消息** @param topic* @param data*/void send(@Header(MqttHeaders.TOPIC) String topic, String data);
}
这样,在发送消息时,直接将消息网关注入,调用发送方法就可以发送了
mqttGateway.send(topic, JSONObject.toJSONString(msg));
参考:
https://mqtt.org/