一、EMQX介绍
EMQX是大规模分布式MQTT消息服务器,可以高效可靠连接海量物联网设备,实时处理分发消息与事件流数据,助力构建关键业务的物联网与云应用。EMQX 作为物联网应用开发和物联网平台搭建必须用到的基础设施软件,主要在边缘和云端实现物联网设备互联与设备上云,提供物联网设备接入、协议处理、消息路由、数据存储、流数据处理等核心能力。
二、EMQX安装
访问逛网下载安装包:下载 EMQX
解压zip文件得到软件目录
运行EMQX,打开cmd命令窗口,进入软件bin目录,输入emqx start
命令启动软件
登录emqx控制台,访问http://127.0.0.1:18083/,默认用户名、密码是admin、public。
三、Java实现发送和订阅消息
3.1 基础代码案例
- 引入pom依赖
<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.2</version>
</dependency>
- 订阅和发布消息相关代码
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;public class App {public static void main(String[] args) {String subTopic = "testtopic/#";String pubTopic = "testtopic/1";String content = "Hello World";int qos = 2;String broker = "tcp://127.0.0.1:1883";String clientId = "emqx_test";MemoryPersistence persistence = new MemoryPersistence();try {MqttClient client = new MqttClient(broker, clientId, persistence);// MQTT 连接选项MqttConnectOptions connOpts = new MqttConnectOptions();connOpts.setUserName("用户名");connOpts.setPassword("密码".toCharArray());// 保留会话connOpts.setCleanSession(true);MqttCallback callback = new OnMessageCallback();// 设置回调client.setCallback(callback);// 建立连接System.out.println("Connecting to broker: " + broker);client.connect(connOpts);System.out.println("Connected");System.out.println("Publishing message: " + content);// 订阅主题client.subscribe(subTopic);// 消息发布所需参数MqttMessage message = new MqttMessage(content.getBytes());message.setQos(qos);// 发布消息client.publish(pubTopic, message);System.out.println("Message published");// client.disconnect();
// System.out.println("Disconnected");
// client.close();
// System.exit(0);} catch (MqttException me) {System.out.println("reason " + me.getReasonCode());System.out.println("msg " + me.getMessage());System.out.println("loc " + me.getLocalizedMessage());System.out.println("cause " + me.getCause());System.out.println("excep " + me);me.printStackTrace();}}
}
- 接收消息相关代码
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;public class OnMessageCallback implements MqttCallback {public void connectionLost(Throwable cause) {// 连接丢失后,一般在这里面进行重连System.out.println("连接断开,可以做重连");}public void messageArrived(String topic, MqttMessage message) throws Exception {// subscribe后得到的消息会执行到这里面System.out.println("接收消息主题:" + topic);System.out.println("接收消息Qos:" + message.getQos());System.out.println("接收消息内容:" + new String(message.getPayload()));}public void deliveryComplete(IMqttDeliveryToken token) {System.out.println("deliveryComplete---------" + token.isComplete());}
}