引入依赖
<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.2</version> </dependency>
启动类
@SpringBootApplication
public class AxiosDemoApplication implements CommandLineRunner {@Autowiredprivate MqttService mqttService;public static void main(String[] args) {SpringApplication.run(AxiosDemoApplication.class, args);}@Overridepublic void run(String... args) throws Exception {// 在新线程中启动 MQTT 客户端new Thread(() -> mqttService.startMqttClient()).start();}
}
MqttServer类
这是一个类,要修改你的主题,ip,用户名密码等
package com.leo.springboothd.service;import com.leo.springboothd.OnMessageCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class MqttService {@Autowiredprivate OnMessageCallback onMessageCallback;public void startMqttClient() {String subTopic = "topic"; // 订阅的主题String pubTopic = "topic";String content = "Hello World66+6";int qos = 2;String broker = "tcp://ip"; // 你自己的 IPString clientId = "java";MemoryPersistence persistence = new MemoryPersistence();try {MqttClient client = new MqttClient(broker, clientId, persistence);// MQTT 连接选项MqttConnectOptions connOpts = new MqttConnectOptions();connOpts.setUserName("java");connOpts.setPassword("1234".toCharArray());// 保留会话connOpts.setCleanSession(true);// 建立连接System.out.println("Connecting to broker: " + broker);client.connect(connOpts);// 连接成功后设置回调client.setCallback(onMessageCallback);System.out.println("Connected");System.out.println("Publishing message: " + content);// 订阅client.subscribe(subTopic);// 消息发布所需参数MqttMessage message = new MqttMessage(content.getBytes());message.setQos(qos);client.publish(pubTopic, message);System.out.println("Message published");// 主循环,保持程序运行while (true) {try {Thread.sleep(1000); // 每秒检查一次} catch (InterruptedException e) {e.printStackTrace();}}} catch (MqttException me) {System.out.println("reason " + me.getReasonCode());System.out.println("msg " + me.getMessage());System.out.println("loc " + me.getLocalizedMessage());System.out.println("cause " + me.getCause());System.out.println("excep " + me);me.printStackTrace();}}
}
OnMessageCallback类
package com.leo.springboothd;import com.leo.springboothd.pojo.Mes;
import com.leo.springboothd.service.MesService;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.time.LocalDateTime;@Component
public class OnMessageCallback implements MqttCallback {private final MesService mesService;@Autowiredpublic OnMessageCallback(MesService mesService) {this.mesService = mesService;}@Overridepublic void connectionLost(Throwable cause) {// 连接丢失后,一般在这里面进行重连System.out.println("连接断开,可以做重连");}@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {// subscribe后得到的消息会执行到这里面System.out.println("接收消息主题: " + topic);System.out.println("接收消息Qos: " + message.getQos());System.out.println("接收消息内容: " + new String(message.getPayload()));// 将消息封装到 Mes 对象中Mes mes = new Mes();mes.setId((long) message.getId());mes.setTopic(topic);mes.setPayload(new String(message.getPayload()));mes.setQos(message.getQos());mes.setTimestamp(LocalDateTime.now());// 保存消息到数据库mesService.save(mes);}@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {System.out.println("deliveryComplete---------" + token.isComplete());}
}
MesService接口
package com.leo.springboothd.service;import com.leo.springboothd.pojo.Mes;public interface MesService {void save(Mes mes);
}
MesService接口实现类
MesServiceImpl1
package com.leo.springboothd.service.impl;import com.leo.springboothd.mapper.MesMapper;
import com.leo.springboothd.pojo.Mes;
import com.leo.springboothd.service.MesService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class MesServiceImpl1 implements MesService {@Autowiredprivate MesMapper mesMapper;@Overridepublic void save(Mes mes) {mesMapper.save(mes);System.out.println(mes);}
}
MesMapper
package com.leo.springboothd.mapper;import com.leo.springboothd.pojo.Mes;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Mapper;@Mapper
public interface MesMapper {@Insert("INSERT INTO Message (id, topic, payload, qos, timestamp) VALUES (#{id}, #{topic}, #{payload}, #{qos}, #{timestamp})")void save(Mes mes);
}
数据库
CREATE TABLE Message (num BIGINT AUTO_INCREMENT PRIMARY KEY,id BIGINT NOT NULL,qos INT NOT NULL,topic VARCHAR(255) NOT NULL,payload TEXT NOT NULL,timestamp DATETIME NOT NULL
)COMMENT='订阅信息表';