MQTT是基于代理的轻量级的消息发布订阅传输协议。
1、下载安装代理
进入mosquitto下载地址:Download | Eclipse Mosquitto,进行下载,以win版本为例
下载完成后,在本地文件夹找到下载的代理安装文件
使用管理员身份打开安装
安装时选择自己的安装地址,其他不需要修改,直到安装结束
安装完成后需要做一下修改,进入安装路径
找到该配置文件,修改端口号,修改用户名密码
pwfile.example文件就在当前安装路径下,需要进行配置绝对路径
以管理员身份打开CMD,并切换到d:/mosquitto目录下,运行以下命令添加用户名和密码:
mosquitto_passwd.exe -c D:\softwork\mqtt\mosquitto\pwfile.example admin
提示连续两次输入密码、创建成功。命令解释: -c 创建一个用户、D:\softwork\mqtt\mosquitto\pwfile.example 是将用户创建到 pwfile.example 文件中、admin 是用户名。
修改用户名密码后mosquitto需要重启生效,可以在服务里面找到该服务重启
2、创建Spring Boot项目
2.1、pom.xml文件中引入依赖
<!-- MQTT 客户端依赖 -->
<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version>
</dependency>
2.2、application.properties配置文件添加MQTT配置
# MQTT 配置
mqtt.broker=tcp://127.0.0.1:7788
mqtt.clientId=spring-boot-client
mqtt.username=admin
mqtt.password=admin
mqtt.topic=sensor
mqtt.qos=1
mqtt.cleanSession=true
2.3、创建消息发布类
package com.mqtt.mqttproject.service;import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;/*** @ Author : Gridsum* @ Description :*/
@Service
public class MqttPublisher {@Autowiredprivate MqttClient mqttClient;@Value("${mqtt.topic}")private String topic;@Value("${mqtt.qos}")private int qos;public void publish(String message) throws MqttException {MqttMessage mqttMessage = new MqttMessage(message.getBytes());mqttMessage.setQos(qos);mqttClient.publish(topic, mqttMessage);}
}
2.4、创建消息订阅类
package com.mqtt.mqttproject.service;import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;/*** @ Author : Gridsum* @ Description :*/
@Service
public class MqttSubscriber {@Autowiredprivate MqttClient mqttClient;@Value("${mqtt.topic}")private String topic;@Value("${mqtt.qos}")private int qos;@PostConstructpublic void subscribe() throws MqttException {mqttClient.subscribe(topic, qos, new IMqttMessageListener() {@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {System.out.println("Received message: " + new String(message.getPayload()));}});}
}
2.5、测试消息发布
@Controller
public class BasicController {@Autowiredprivate MqttPublisher mqttPublisher;// http://127.0.0.1:8080/hello?name=lisi@RequestMapping("/hello")@ResponseBodypublic String hello(@RequestParam(name = "name", defaultValue = "unknown user") String name) {try {mqttPublisher.publish("Hello " + name + ", MQTT from Spring Boot!");} catch (MqttException e) {e.printStackTrace();}return "Hello " + name;}
}
2.6、启动服务调用接口,发送消息,会打印收到消息