【pom.xml】
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>2.3.12.RELEASE</version>
</dependency>
<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.2</version>
</dependency>
<dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>2.0.49</version>
</dependency>
【MyDemo6MqttCallback.java】
package com.chz.myMqttV3.demo6;@Slf4j
public class MyDemo6MqttCallback implements MqttCallbackExtended {private MqttClient client;private MqttConnectOptions options;private String[] topics;public MyDemo6MqttCallback(MqttClient client, MqttConnectOptions options, String[] topics){this.client = client;this.options = options;this.topics = topics;}@SneakyThrows@Overridepublic void connectionLost(Throwable throwable) {log.error("connectionLost", throwable);while (!client.isConnected()) {log.info("emqx重新连接....................................................");client.connect(options);Thread.sleep(1000);}}@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {log.info("messageArrived: topic={}, message={}", topic, new String(message.getPayload()));}@SneakyThrows@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {if( token!=null ){MqttMessage message = token.getMessage();String topic = token.getTopics()==null ? null : Arrays.asList(token.getTopics()).toString();String str = message==null ? null : new String(message.getPayload());log.info("deliveryComplete: topic={}, message={}", topic, str);} else {log.info("deliveryComplete: null");}}@SneakyThrows@Overridepublic void connectComplete(boolean reconnect, String serverURI) {log.info("connectComplete: reconnect={}, serverURI={}", reconnect, serverURI);if( topics.length > 0 ){int[] qosArr = new int[topics.length];Arrays.fill(qosArr, 2);MyDemo6MqttMessageListener[] listeners = new MyDemo6MqttMessageListener[topics.length];Arrays.fill(listeners, new MyDemo6MqttMessageListener());client.subscribe(topics, qosArr, listeners);}}
}
【MyDemo6MqttMessageListener.java】
package com.chz.myMqttV3.demo6;@Slf4j
public class MyDemo6MqttMessageListener implements IMqttMessageListener
{@Overridepublic void messageArrived(String topic, MqttMessage message) {log.info("messageArrived: topic={}, message={}", topic, new String(message.getPayload()));}
}
【MyDemo6MqttClient1Test.java】
package com.chz.myMqttV3.demo6;public class MyDemo6MqttClient1Test
{public static void main(String[] args) throws MqttException {MqttConnectOptions options = new MqttConnectOptions();options.setUserName("admin");options.setPassword("public".toCharArray());options.setCleanSession(true);options.setAutomaticReconnect(true);options.setConnectionTimeout(20);options.setKeepAliveInterval(10);MqttClient client = new MqttClient("tcp://192.168.44.228:1883", "MyDemo6MqttClient1Test", new MemoryPersistence());client.setCallback(new MyDemo6MqttCallback(client, options, new String[]{"device/#"}));client.connect(options);}
}
【MyDemo6MqttSenderTest.java】
package com.chz.myMqttV3.demo6;public class MyDemo6MqttSenderTest
{public static void main(String[] args) throws UnknownHostException, MqttException, InterruptedException {MqttConnectOptions options = new MqttConnectOptions();options.setUserName("admin");options.setPassword("public".toCharArray());options.setCleanSession(true);options.setAutomaticReconnect(true);options.setConnectionTimeout(20);options.setKeepAliveInterval(10);// 这里设置遗嘱消息options.setWill("device/1", "I am MyDemo6MqttSenderTest, I am dead!!!".getBytes(), 1, false);MqttClient client = new MqttClient("tcp://192.168.44.228:1883", "MyDemo6MqttSenderTest", new MemoryPersistence());client.setCallback(new MyDemo6MqttCallback(client, options, new String[]{}));client.connect(options);}
}
启动【MyDemo6MqttSenderTest、MyDemo6MqttClient1Test】,等两个进程都正常启动完之后,将【MyDemo6MqttSenderTest】进程杀掉。会发现【MyDemo6MqttClient1Test】自动收到消息【I am MyDemo6MqttSenderTest, I am dead!!!】