一、简介
MQTT(消息队列遥测传输)是ISO 标准(ISO/IEC PRF 20922)下基于发布/订阅范式的消息协议。它工作在 TCP/IP协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型消息协议,为此,它需要一个消息中间件 。此处使用RabbitMQ。
MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。
二、环境准备
2.1 Erlang安装
使用rabbitMQ首先需要安装Erlang环境,因为rabbitMQ是用Erlang语言编写的。
2.1.1 下载安装
官网下载:https://www.erlang.org/patches/otp-26.0 (比较慢,不推荐)
百度网盘下载:https://pan.baidu.com/s/1xU4syn14Bh7QR-skjm_hOg (推荐)
提取码:az1t
2.1.2 环境变量
进入高级系统设置
环境变量: 变量名-ERLANG_HOME 变量值-文件安装路径
配置path: 配置完上面的之后,找到系统变量中的path点击编辑,然后新建:%ERLANG_HOME%\bin
验证: 进入cmd,输入 erl -version 显示版本号就说明安装成功
2.2 RabbtiMQ安装
2.2.1 下载安装
官网下载:http://www.rabbitmq.com/download.html
下载后一通傻瓜式安装即可。
2.2.2 环境变量
变量名-RABBITMQ_SERVER 变量值-文件安装路径
编辑path,点击新建按钮,输入%RABBITMQ_SERVER%\sbin,点击确定
2.2.3 安装mqtt插件
rabbitmq-plugins enable rabbitmq_mqtt
2.2.4 管理控制台安装
rabbitmq-plugins enable rabbitmq_management
2.2.5 访问测试
登录测试: 浏览器输入 http://localhost:15672 ,输入用户名:guest,密码:guest
三、代码实现
3.1 引入依赖
<!-- rabbitmq -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency><!--mqtt依赖包-->
<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.0</version>
</dependency>
3.2 yml配置
# mqtt配置
mqtt:url: ***********username: ***********password: ***********# 间隔时间keep-alive-interval: 60# 超时时间completion-timeout: 30000# 会话保持,默认为falseclean-session: false# 自动连接,默认为trueautomatic-reconnect: true# 生产者配置producer:# 很重要client-id: producer1topic: demo-topic# 传输质量 QoS 0:最多分发一次 QoS 1:至少分发一次(默认) QoS 2:只分发一次qos: 1# 消费者配置subscriber:# 很重要client-id: subscriber1topic: demo-topic# 传输质量 QoS 0:最多分发一次 QoS 1:至少分发一次(默认) QoS 2:只分发一次qos: 0
3.3 配置属性类
package com.qiangesoft.mqtt.config;import lombok.Data;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.stereotype.Component;/*** mqtt配置** @author qiangesoft* @date 2024-04-24*/
@Data
@Component
@Configuration
@ConfigurationProperties(prefix = "mqtt")
public class MqttProperty {/*** 服务地址*/private String url;/*** 账号*/private String username;/*** 密码*/private String password;/*** 间隔时间*/private int keepAliveInterval;/*** 超时时间*/private int completionTimeout;/*** 会话保持,默认为false*/private boolean cleanSession;/*** 自动连接,默认为true*/private boolean automaticReconnect = true;/*** 生产者*/private Client producer = new Client();/*** 消费者*/private Client subscriber = new Client();@Datapublic class Client {/*** 客户端id*/private String clientId;/*** 默认主题*/private String topic;/*** 传输质量* QoS 0:最多分发一次* QoS 1:至少分发一次(默认)* QoS 2:只分发一次*/private int qos = 1;}@Beanpublic MqttPahoClientFactory mqttPahoClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();// 连接参数MqttConnectOptions options = new MqttConnectOptions();options.setServerURIs(new String[]{url});if (StringUtils.isNotBlank(this.username)) {options.setUserName(this.username);}if (StringUtils.isNotBlank(this.password)) {options.setPassword(this.password.toCharArray());}// 心跳时间options.setKeepAliveInterval(this.keepAliveInterval);// 断开是否自动重联options.setAutomaticReconnect(this.automaticReconnect);// 保持session,客户端上线后会接受到它离线的这段时间的消息options.setCleanSession(this.cleanSession);// 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
// options.setWill("willTopic", WILL_DATA, 2, false);factory.setConnectionOptions(options);return factory;}
}
package com.qiangesoft.mqtt.constant;/*** mqtt通用常量信息** @author qiangesoft* @date 2024-04-24*/
public class MqttConstant {/*** 生产者管道*/public static final String OUTBOUND_CHANNEL = "outboundChannel";/*** 消费者管道*/public static final String INBOUND_CHANNEL = "inboundChannel";
}
3.4 生产者
package com.qiangesoft.mqtt.producer;import com.qiangesoft.mqtt.config.MqttProperty;
import com.qiangesoft.mqtt.constant.MqttConstant;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;/*** mqtt生产者** @author qiangesoft* @date 2024-04-24*/
@Configuration
public class MqttProducerConfig {@Autowiredprivate MqttProperty mqttProperty;@Autowiredprivate MqttPahoClientFactory mqttPahoClientFactory;/*** 消息生产通道** @return*/@Bean(name = MqttConstant.OUTBOUND_CHANNEL)public MessageChannel outboundChannel() {return new DirectChannel();}/*** 消息发布** @return*/@Bean@ServiceActivator(inputChannel = MqttConstant.OUTBOUND_CHANNEL)public MessageHandler mqttOutbound() {MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperty.getProducer().getClientId(), mqttPahoClientFactory);messageHandler.setAsync(false);messageHandler.setDefaultQos(mqttProperty.getProducer().getQos());messageHandler.setDefaultTopic(mqttProperty.getProducer().getTopic());return messageHandler;}
}
3.5 消费者
package com.qiangesoft.mqtt.subscriber;import com.qiangesoft.mqtt.config.MqttProperty;
import com.qiangesoft.mqtt.constant.MqttConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;import java.util.Objects;/*** mqtt消费者** @author qiangesoft* @date 2024-04-24*/
@Slf4j
@Configuration
public class MqttSubscriberConfig {@Autowiredprivate MqttProperty mqttProperty;@Autowiredprivate MqttPahoClientFactory mqttPahoClientFactory;/*** 消息订阅通道** @return*/@Bean(name = MqttConstant.INBOUND_CHANNEL)public MessageChannel inboundChannel() {return new DirectChannel();}/*** 消息订阅通道绑定** @return*/@Beanpublic MessageProducer mqttInbound() {MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttProperty.getSubscriber().getClientId(),mqttPahoClientFactory, mqttProperty.getSubscriber().getTopic());adapter.setCompletionTimeout(mqttProperty.getCompletionTimeout());adapter.setConverter(new DefaultPahoMessageConverter());adapter.setQos(mqttProperty.getSubscriber().getQos());adapter.setOutputChannel(inboundChannel());return adapter;}/*** 消息订阅** @return*/@Bean@ServiceActivator(inputChannel = MqttConstant.INBOUND_CHANNEL)public MessageHandler messageHandler() {return message -> {try {String topic = Objects.requireNonNull(message.getHeaders().get("mqtt_receivedTopic")).toString();log.info("订阅主题为: {}", topic);String payload = message.getPayload().toString();log.info("订阅接收到消息:{}", payload);} catch (Exception e) {e.printStackTrace();}};}}
3.6 消息发送网关
package com.qiangesoft.mqtt.service;import com.qiangesoft.mqtt.constant.MqttConstant;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;/*** 消息发送网关** @author qiangesoft* @date 2024-04-24*/
@Component
@MessagingGateway(defaultRequestChannel = MqttConstant.OUTBOUND_CHANNEL)
public interface MqttGateway {/*** 发送到mqtt** @param payload 消息内容*/void sendToMqtt(String payload);/*** 发送到mqtt** @param topic 主题* @param payload 消息内容*/void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);/*** 发送到mqtt** @param topic 主题* @param qos qos* @param payload 消息内容*/void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);}
3.7 发送测试
package com.qiangesoft.mqtt.controller;import com.qiangesoft.mqtt.service.MqttGateway;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** 控制器** @author qiangesoft* @date 2024-04-24*/
@RestController
@RequestMapping("/mqtt")
public class MqttController {@Autowiredprivate MqttGateway mqttGateway;@GetMapping("/send")public String send(String message) {mqttGateway.sendToMqtt(message);return "success";}
}