一、概述
近期做的一个项目需要传输数据给第三方。根据协定,采用MQTT进行数据的发送和订阅。一般来说,不通系统进行数据对接,一般采用RESTFul接口,走http。mqtt的话,顾名思义,就是一个消息队列。相比RESTFul接口,MQTT方式也许有个好处就是,数据传输给对方后,对方可以收到一个提醒。这个提醒来自于消息队列,不用自己搞。利用这个提醒,也许可以做点啥。除此之外,我不知道还有什么更多的好处。
MQTT的要素:
1)broker,经纪人,即代理地址,如:tcp://10.0.2.18:1883
2)clientID,客户端ID,如"Client001"; 客户端标识,可以自定义,但不能跟receiver同名
3)topic,// 要发布的主题,接收端据此接收,如”monkey/huaguo-moutain“。主题一经定义,可以多次使用。
4)qos,质量服务等级 0,1,2。2最高。
依赖:
<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version>
</dependency>
二、发送
如果单纯发送,客户端无须安装mqtt。Java中发送消息代码如下:
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;public class MqttPublisher {String broker = "tcp://127.0.0.1:1883"; // 替换为你的 MQTT 服务器地址String clientId = "Client001"; // 客户端标识String topic = "mqttdemo/mytopic001"; // 要发布的主题int qos = 2; // 质量服务等级MqttClient client = null;public MqttPublisher(String broker, String clientId, String topic, int qos) throws MqttException {this.broker = broker;this.clientId = clientId;this.topic = topic;this.qos = qos;this.client = new MqttClient(broker, clientId, new MemoryPersistence());MqttConnectOptions connOpts = new MqttConnectOptions();connOpts.setCleanSession(true);System.out.println("发布者正在连接到broker代理 : " + broker);this.client.connect(connOpts);System.out.println("发布者连接成功!");}public void SendMessage(String content){try {//System.out.println("发布者发送的消息: " + content);MqttMessage message = new MqttMessage(content.getBytes());message.setQos(qos);this.client.publish(topic, message);System.out.println("发布者已经发送消息!");} catch (Exception e) {e.printStackTrace();}}public void DisconnectMqtt() throws MqttException {this.client.disconnect();System.out.println("Disconnected");}
}
三、订阅
如果想接收mqtt消息,本机则要安装mqtt服务。windows可安装一个名为mosquitto的软件。但是,它天然好像不对外,如果想被外部访问,比如局域网的其他机器访问,要做一些设置。具体如何设置,我还不清楚。
java中接收消息代码如下:
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;public class MqttReceiver {String broker = "tcp://127.0.0.1:1883"; // 替换为你的 MQTT 服务器地址String clientId = "Client002"; // 客户端标识String topic = "mqttdemo/mytopic001"; // 订阅的主题int qos = 2; // 质量服务等级MqttReceiver(String broker, String clientId, String topic, int qos){this.broker = broker;this.clientId = clientId;this.topic = topic;this.qos = qos;}public void StartMqttReceiver(){try {MqttClient sampleClient = new MqttClient(this.broker, this.clientId, new MemoryPersistence());MqttConnectOptions connOpts = new MqttConnectOptions();connOpts.setCleanSession(true);System.out.println("正在连接到 broker 代理: " + this.broker);sampleClient.connect(connOpts);System.out.println("接受者连接成功!");// 设置回调sampleClient.setCallback(new MqttCallback() {public void connectionLost(Throwable cause) {System.out.println("链接丢失!");}public void messageArrived(String topic, MqttMessage message) throws Exception {System.out.println("接收者接收到了消息: " + topic + " : " + new String(message.getPayload()));}public void deliveryComplete(IMqttDeliveryToken token) {// Called when a message has been delivered}});// 订阅sampleClient.subscribe(this.topic, this.qos);System.out.println("订阅的topic是: " + this.topic);} catch (Exception e) {e.printStackTrace();}}
}
四、测试
订阅和发送,可以是同一个IP,也就是自己发给自己。但注意订阅和发送的clientID不能相同。比如以下代码,自己发给自己,特别利于测试:
import org.eclipse.paho.client.mqttv3.MqttException;public class MainClass {public static void main(String[] args) throws MqttException, InterruptedException {String recever_broker = "tcp://127.0.0.1:1883"; // 替换为你的 MQTT 服务器地址String recever_clientId = "ClientReceiver001"; // 客户端标识String recever_topic = "ocean/south/message/status"; // 订阅的主题int recever_qos = 2; // 质量服务等级MqttReceiver mqttReceiver = new MqttReceiver(recever_broker, recever_clientId, recever_topic, recever_qos);mqttReceiver.StartMqttReceiver();String publisher_broker = "tcp://127.0.0.1:1883"; // 替换为你的 MQTT 服务器地址String publisher_clientId = "ClientPublisher002"; // 客户端标识String publisher_topic = "ocean/south/message/status"; // 发布的主题int publisher_qos = 2; // 质量服务等级MqttPublisher mqttPublisher = new MqttPublisher(publisher_broker, publisher_clientId, publisher_topic,publisher_qos);int cnt = 0;while(true){cnt ++;mqttPublisher.SendMessage("{\"msg\":\"send some message to you!\",\"data\":"+Integer.toString(cnt)+"}");Thread.sleep(1000);}}
}
五、重连
以上例子还比较简单,需要考虑连接失败或突然断掉的时候重连。
1、发送端重连
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;import java.nio.charset.StandardCharsets;public class MqttPublisher extends Thread {String broker; //"tcp://10.0.2.18:1883";String clientId; //"Client001"; 客户端标识,可以自定义,但不能跟receiver同名String topic; // 要发布的主题,接收端据此接收int qos; // 质量服务等级 0,1,2。2最高。MqttClient client = null;public MqttPublisher(String broker, String clientId, String topic, int qos) throws MqttException {this.broker = broker;this.clientId = clientId;this.topic = topic;this.qos = qos;this.client = new MqttClient(broker, clientId, new MemoryPersistence());_connect();}public boolean sendMessage(String content) {boolean ok = false;MqttMessage message = new MqttMessage(content.getBytes(StandardCharsets.UTF_8));message.setQos(qos);int count = 2;while (!ok && count > 0) {count--;try {this.client.publish(topic, message);ok = true;} catch (MqttException e) {System.err.println(e.getMessage());reconnect(1);} catch (Exception e) {System.err.println(e.getMessage());}}return ok;}public void disconnectMqtt() throws MqttException {this.client.disconnect();System.out.println("Disconnected");}public boolean reconnect(long retryTimes) {boolean ok = false;long count = retryTimes;while (!ok && count > 0) {count--;ok = _reconnect();}return ok;}private boolean _reconnect() {boolean ok = false;try {// 关闭现有连接if (this.client != null && client.isConnected()) {this.client.disconnect();}// 重新连接ok = _connect();} catch (MqttException e) {// 处理重新连接失败的情况System.err.println(e.getCause());}return ok;}private boolean _connect() {boolean ok = false;MqttConnectOptions connOpts = new MqttConnectOptions();connOpts.setCleanSession(true);System.out.print(String.format("正在连接到远程mqtt服务器: %s ......", broker));try {this.client.connect(connOpts);ok = true;System.out.println("连接成功!");} catch (MqttException e) {System.out.println("连接失败!");}return ok;}}
2、订阅端重连
订阅端重连当时是采用这样的思路:连接失败时延迟10秒后执行第一次重连尝试,之后每隔30秒执行一次,直到成功。
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;public class MqttReceiver {String broker; //"tcp://10.0.2.18:1883";String clientId; //"Client001"; 客户端标识,可以自定义,但不能跟publish同名String topic; // 要发布的主题,接收端据此接收int qos; // 质量服务等级 0,1,2。2最高。private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);public MqttReceiver(String broker, String clientId, String topic, int qos) {this.broker = broker;this.clientId = clientId;this.topic = topic;this.qos = qos;}public void StartMqttReceiver() {MqttClient sampleClient = null;try {sampleClient = new MqttClient(this.broker, this.clientId, new MemoryPersistence());MqttConnectOptions connOpts = new MqttConnectOptions();connOpts.setCleanSession(true);// 设置回调MqttClient finalSampleClient = sampleClient;sampleClient.setCallback(new MqttCallback() {public void connectionLost(Throwable cause) {System.out.println("监听mqtt服务器连接丢失!");scheduleReconnect(finalSampleClient);}public void messageArrived(String topic, MqttMessage message) throws Exception {System.out.println(String.format("接收到消息(%s): %s", topic, new String(message.getPayload())));}public void deliveryComplete(IMqttDeliveryToken token) {//消息传递完成//System.out.println("a message has been delivered");}});// 订阅sampleClient.subscribe(this.topic, this.qos);sampleClient.connect(connOpts);System.out.println(String.format("正在连接到监听mqtt服务器: %s 成功", this.broker));} catch(MqttException e){System.out.println(String.format("正在连接到监听mqtt服务器: %s 失败:%s", this.broker, e.getMessage()));scheduleReconnect(sampleClient);} catch (Exception e) {System.out.println(String.format("正在连接到监听mqtt服务器: %s 失败:%s", this.broker, e.getMessage()));}}private void scheduleReconnect(MqttClient sampleClient) {if(sampleClient == null) return;final Runnable reconnectTask = () -> {try {if (!sampleClient.isConnected()) {// 尝试重新连接sampleClient.connect();// 重新订阅sampleClient.subscribe(topic, qos);System.out.println("重新连接监听mqtt服务器成功!");// 取消定时任务,因为连接成功了//scheduler.shutdownNow();}} catch (MqttException e) {System.out.println("重新连接监听mqtt服务器出现异常: " + e.getMessage());}};// 延迟10秒后执行第一次重连尝试,之后每隔30秒执行一次scheduler.scheduleWithFixedDelay(() -> {reconnectTask.run();}, 10, 30, TimeUnit.SECONDS);}}