# 安装emqx
请参考【https://blog.csdn.net/chenhz2284/article/details/139551293?spm=1001.2014.3001.5502】
# 下面是示例代码
【pom.xml】
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>2.3.12.RELEASE</version>
</dependency>
<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.mqttv5.client</artifactId><version>1.2.5</version>
</dependency>
<dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>2.0.49</version>
</dependency>
<dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>1.2.11</version>
</dependency>
【MyMqttCallback.java】
package com.chz.myMqttV5.demo1;@Slf4j
public class MyMqttCallback implements MqttCallback
{private String clientId;public MyMqttCallback(String clientId){this.clientId = clientId;}public void connectComplete(boolean reconnect, String serverURI) {log.info("{}::MyMqttCallback, reconnect={}, serverURI={}", clientId, reconnect, serverURI);}public void disconnected(MqttDisconnectResponse disconnectResponse) {log.info("{}::disconnected, disconnectResponse={}", clientId, disconnectResponse.getReasonString());}public void deliveryComplete(IMqttToken token) {log.info("{}::deliveryComplete, disconnectResponse={}", clientId, token.isComplete());}public void messageArrived(String topic, MqttMessage message) throws Exception {log.info("{}::messageArrived, topic={}, qos={}, message={}", clientId, topic, message.getQos(), new String(message.getPayload()));}public void mqttErrorOccurred(MqttException exception) {log.info("{}::mqttErrorOccurred, disconnectResponse={}", clientId, exception.getMessage());}public void authPacketArrived(int reasonCode, MqttProperties properties) {log.info("{}::authPacketArrived, reasonCode={}", clientId, reasonCode);}
}
【MyDemo1MqttV5Client.java】
package com.chz.myMqttV5.demo1;@Slf4j
public class MyDemo1MqttV5Client {private static String clientId = MyDemo1MqttV5Client.class.getSimpleName();public static void main(String[] args) throws InterruptedException {String broker = "tcp://192.168.44.228:1883";int subQos = 1;int pubQos = 1;String msg;try {MqttClient client = new MqttClient(broker, clientId);MqttConnectionOptions options = new MqttConnectionOptions();client.setCallback(new MyMqttCallback(clientId));client.connect(options);client.subscribe("device/#", subQos);for(int i=0; i<100; i++){msg = "I am "+clientId+":" + i;MqttMessage message = new MqttMessage(msg.getBytes());message.setQos(pubQos);client.publish("device/1", message);Thread.sleep(3000L);}client.disconnect();client.close();} catch (MqttException e) {e.printStackTrace();}}
}
【MyDemo1MqttV5Sender.java】
package com.chz.myMqttV5.demo1;@Slf4j
public class MyDemo1MqttV5Sender {public static void main(String[] args) throws InterruptedException {String broker = "tcp://192.168.44.228:1883";String clientId = "MyDemo1MqttV5Sender";int subQos = 1;int pubQos = 1;String msg;try {MqttClient client = new MqttClient(broker, clientId);MqttConnectionOptions options = new MqttConnectionOptions();client.setCallback(new MyMqttCallback(clientId));client.connect(options);client.subscribe("device/#", subQos);for(int i=0; i<100; i++){msg = "I am "+clientId+":" + i;MqttMessage message = new MqttMessage(msg.getBytes());message.setQos(pubQos);client.publish("device/2", message);Thread.sleep(3000L);}client.disconnect();client.close();} catch (MqttException e) {e.printStackTrace();}}
}
运行【MyDemo1MqttV5Client、MyDemo1MqttV5Sender】,查看输出日志:
【MyDemo1MqttV5Client】的日志:
【MyDemo1MqttV5Sender】的日志