springboot当中使用EMQX(MQTT协议)

本篇博客主要围绕EMQX是什么?、能干什么?、怎么用? 三点来进行整理。

1、MQTT协议

1.1、MQTT简介

在了解EMQX前首先了解一下MQTT协议,MQTT 全称为 Message Queuing Telemetry Transport(消息队列遥测传输),是一种基于 发布/订阅 模式的 轻量级物联网消息传输协议。IBM 公司的安迪·斯坦福-克拉克及 Arcom 公司的阿兰·尼普于 1999 年撰写了该协议的第一个版本1,之后 MQTT 便以简单易实现、支持 QoS、轻量且省带宽等众多特性逐渐成为了 IoT 通讯的标准。

MQTT 协议每个消息最少仅需 2 个字节 (其中报头仅需 1 个字节,其余字节可以全部作为消息载荷)就可以完成通信,专为那些资源和空间有限、功耗敏感的硬件所打造。

1.2、MQTT 协议基本特点

使用发布/订阅消息模式,提供了一对多的消息分发和应用程序的解耦。
不关心负载内容的消息传输。
提供 3 种消息服务质量等级,满足不同投递需求。
很小的传输消耗和协议数据交换,最大限度减少网络流量。
提供连接异常断开时通知相关各方的机制。

1.3、MQTT 应用行业

MQTT 作为一种低开销,低带宽占用的即时通讯协议,可以用极少的代码和带宽为联网设备提供实时可靠的消息服务,它适用于硬件资源有限的设备及带宽有限的网络环境。因此,MQTT 协议广泛应用于物联网、移动互联网、智能硬件、车联网、电力能源等行业。

1.4、MQTT 协议原理

基于发布/订阅模式的 MQTT 协议中有三种角色:发布者(Publisher)、代理(Broker)、订阅者(Subscriber)。发布者向代理发布消息,代理向订阅者转发这些消息。通常情况下,客户端的角色是发布者和订阅者,服务器的角色是代理,但实际上,服务器也可能主动发布消息或者订阅主题,客串一下客户端的角色。
在这里插入图片描述

为了方便理解,MQTT 传输的消息可以简化为:主题(Topic)和载荷(Payload)两部分:

Topic,消息主题,订阅者向代理订阅主题后,一旦代理收到相应主题的消息,就会向订阅者转发该消息。
Payload,消息载荷(也可以理解为传输的数据),订阅者在消息中真正关心的部分,通常是业务相关的。

1.5、MQTT 协议基础概念

1.5.1、会话(Session)

每个客户端与服务器建立连接后就是一个会话,客户端和服务器之间有状态交互。会话可以存在于一个网络连接之间,也可以跨越多个连续的网络连接存在。

1.5.2、订阅(Subscription)

订阅包含一个主题过滤器(Topic Filter)和一个最大的服务质量(QoS)等级。订阅与单个会话(Session)关联。会话可以包含多于一个的订阅。

1.5.3、主题名(Topic Name)

附加在应用消息上的一个标签,被用于匹配服务端已存在的订阅。服务端会向所有匹配订阅的客户端发送此应用消息。

1.5.4、主题过滤器(Topic Filter)

仅在订阅时使用的主题表达式,可以包含通配符,以匹配多个主题名。就是可以通过通配符达到,发一条消息,多个主题能接受到消息的效果。

1.5.5、载荷(Payload)

对于 PUBLISH 报文来说载荷就是业务消息(就是指发送的消息内容),它可以是任意格式(二进制、十六进制、普通字符串、JSON 字符串、Base64)的数据。

1.6、MQTT 协议进阶

1.6.1、消息服务质量(QoS)

MQTT 协议提供了 3 种消息服务质量等级(Quality of Service),它保证了在不同的网络环境下消息传递的可靠性。这里有一点要明白,必须先订阅,发布消息才会收到。假如没订阅,他发送消息了,我再订阅,这时候不管QoS设置几,都是收不到消息的。

1.6.1.1、QoS 0 - 最多分发一次

当 QoS 为 0 时,消息的分发依赖于底层网络的能力。发布者只会发布一次消息,接收者不会应答消息,发布者也不会储存和重发消息。消息在这个等级下具有最高的传输效率,但可能送达一次也可能根本没送达。

1.6.1.2、Qos 1 - 至少分发一次

当 QoS 为 1 时,可以保证消息至少送达一次。MQTT 通过简单的 ACK 机制来保证 QoS 1。

发送者:发布消息,并等待接收者的 PUBACK 报文的应答,在规定的时间内没有收到 PUBACK 的应答,发布者会将消息的 DUP 置为1 并重发消息。
接受者:接收到 QoS 为 1 的消息时应该回应 PUBACK 报文,可能因为网络延迟等原因没有及时发出,这时接收者可能会多次接受同一个消息,无论 DUP标志如何,接收者都会将收到的消息当作一个新的消息并发送 PUBACK 报文应答。
核心:就是发送消息的时候,接受者需要确认一次,规定时间内没有确认就会重新发。如果使用这种方式,写业务的时候需要保证幂等性。

1.6.1.3、QoS 2 - 只分发一次

当 QoS 为 2 时,发布者和订阅者通过两次会话来保证消息只被传递一次,这是最高等级的服务质量,消息丢失和重复都是不可接受的。使用这个服务质量等级会有额外的开销。

发送者:发布 QoS 为 2 的消息之后,消息储存起来并等待接收者回复 PUBREC 的消息。
接受者:收到一条 QoS 为 2 的消息时,他会处理此消息并返回一条 PUBREC 进行应答。
发送者:收到 PUBREC 消息后,丢弃掉之前的发布消息。保存 PUBREC 消息,并应答一个 PUBREL。等待接收者回复 PUBCOMP 消息
接受者:当接收者收到 PUBREL 消息之后,它会丢弃掉所有已保存的状态,并回复 PUBCOMP。
发送者:当发送者收到 PUBCOMP 消息之后会清空之前所保存的状态。
核心:发送消息的时候,接受者需要确认两次,来保证消息确实已经送到。

无论在传输过程中何时出现丢包,发送端都负责重发上一条消息。不管发送端是 Publisher(发送端) 还是 Broker(服务器),都是如此。因此,接收端也需要对每一条命令消息都进行应答。

1.6.2、QoS 在发布与订阅中的区别

发布时的 QoS 表示消息发送到服务端时使用的 QoS
订阅时的 QoS 表示服务端向自己转发消息时可以使用的最大 QoS

客户端 A 的发布 QoS 大于客户端 B 的订阅 QoS 时,服务端向客户端 B 转发消息时使用的 QoS 为客户端 B 的订阅QoS。
客户端 A 的发布 QoS 小于客户端 B 的订阅 QoS 时,服务端向客户端 B 转发消息时使用的 QoS 为客户端 A 的发布 QoS。
总结:接收端可以设置订阅Qos为2,这样就可以接所有qos等级消息。也就是发布消息qos为多少,那我这边接受消息就是多少。主要以发布消息的qos为准。

1.6.3、如何选择 MQTT QoS 等级

QoS 级别越高,流程越复杂,系统资源消耗越大。应用程序可以根据自己的网络场景和业务需求,选择合适的 QoS 级别。

以下情况下可以选择 QoS 0

可以接受消息偶尔丢失。
在同一个子网内部的服务间的消息交互,或其他客户端与服务端 网络非常稳定的场景。
以下情况下可以选择 QoS 1

对系统资源消耗较为关注,希望性能最优化。
消息不能丢失,但能接受并处理重复的消息。
以下情况下可以选择 QoS 2

不能忍受消息丢失(消息的丢失会造成生命或财产的损失),且不希望收到重复的消息。
数据完整性与及时性要求较高的银行、消防、航空等行业。

1.6.4、清除会话(Clean Session)

MQTT 客户端向服务器发起 CONNECT 请求时,可以通过 Clean Session 标志设置是否创建全新的会话。

Clean Session 设置为 0 时:

如果存在一个关联此客户标识符的会话,服务端必须基于此会话的状态恢复与客户端的通信。
如果不存在任何关联此客户标识符的会话,服务端必须创建一个新的会话。
Clean Session 设置为 1:

客户端和服务端必须丢弃任何已存在的会话,并开始一个新的会话。
总结:监听端建议设置为0,一般监听端,我们都会配置单例,并且项目启动就开始创建连接监听,设置为0,这样可以保证连接的唯一性,和消息的安全性。

1.6.5、保活心跳(Keep Alive)

MQTT 客户端向服务器发起 CONNECT 请求时,通过 Keep Alive 参数设置保活周期。

客户端在无报文发送时,按 Keep Alive 周期定时发送 2 字节的 PINGREQ 心跳报文,服务端收到 PINGREQ 报文后,回复 2 字节的 PINGRESP 报文。

服务端在 1.5 个心跳周期内,既没有收到客户端发布订阅报文,也没有收到 PINGREQ 心跳报文时,将断开客户端连接。

1.6.6、保留消息(Retained Message)

MQTT 客户端向服务器发布(PUBLISH)消息时,可以设置保留消息(Retained Message)标志。保留消息会驻留在消息服务器,后来的订阅者订阅主题时可以接收到最新一条(注意,是只有最近的一条)保留消息。

1.6.7、遗嘱消息(Will Message)

MQTT 客户端向服务端发送 CONNECT 请求时,可以携带遗嘱消息。MQTT 客户端异常下线时(客户端断开前未向服务器发送 DISCONNECT 消息),MQTT 消息服务器会发布遗嘱消息。

在连接的时候通过调用 MqttConnectOptions 实例的 setWill 方法来设定。任何订阅了下面的主题的客户端都可以收到该遗嘱消息。

//方法1MqttConnectOptions.setWill(MqttTopic topic, byte[] payload, int qos, boolean retained)
//方法2MqttConnectOptions.setWill(java.lang.String topic, byte[] payload, int qos, boolean retained)

以下情况下会发送 Will Message:

服务端发生了I/O 错误或者网络失败;
客户端在定义的心跳时期失联;
客户端在发送下线包( DISCONNECT)之前关闭网络连接;
服务端在收到下线包之前关闭网络连接。
总结:发送遗嘱信息可以理解为,创建客户端连接的时候,告诉服务器(mqtt服务器)我挂了之后,给哪些主题发这些消息。当订阅到遗嘱消息之后,他就知道监听端挂了,我不能给他发消息了,遗嘱消息在客户端正常调用 disconnect 方法之后并不会被发送。

高级使用场景:
这里介绍一下如何将 Retained(保留) 消息与Will (遗嘱)消息结合起来进行使用。

客户端 A 遗嘱消息设定为”offline“,该遗嘱主题与一个普通发送状态的主题设定成同一个 A/status;
当客户端 A 连接时,向主题 A/status 发送 “online” 的 Retained 消息,其它客户端订阅主题 A/status的时候,获取 Retained 消息为 “online” ;
当客户端 A 异常断开时,系统自动向主题 A/status 发送”offline“的消息,其它订阅了此主题的客户端会马上收到”offline“消息;如果遗嘱消息被设定了 Retained 的话,这时有新的订阅A/status主题的客户端上线的时候,获取到的消息为“offline”。

1.7. Docker 中安装 EMQ

在 Docker 中安装 EMQ(Erlang MQTT Broker,Erlang 版本的 MQTT 代理程序)是一个简单且方便的方式,以下是安装 EMQ 的基本步骤:
(1)拉取 EMQ 镜像:在命令行中执行以下命令拉取 EMQ 官方 Docker 镜像:

docker pull emqx/emqx

(2)运行 EMQ 容器:运行以下命令来创建并启动 EMQ 容器:

docker run -d --name emqx -p 18083:18083 -p 1883:1883 -p 4369:4369 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 8884:8884 -p 8081:8081 emqx/emqx

(3)验证 EMQ 安装:可以通过浏览器访问 http://localhost:18083 来查看 EMQ 的 Web 管理界面,以验证 EMQ 是否成功安装和运行。默认的用户名和密码为 admin、public。

2、EMQ X Cloud

2.1、EMQ X Cloud简介

通过开放标准的物联网协议 MQTT、MQTT over WebSocket、CoAP/LwM2M 将数以亿计的物联网设备可靠地连接到 EMQ X Cloud。通过 TLS/SSL 和基于 X.509 证书的认证确保安全的双向通信。
在这里插入图片描述

在该模型中,EMQ X Cloud 提供的 MQTT 服务不仅为设备与设备、设备与应用间架起桥梁,同时可将需要的数据进行持久化,以便非实时应用在后续对获取的数据加以利用。

2.2、EMQ X Cloud优势

2.2.1、协议支持完整

支持 MQTT v3.1,v3.1.1 与 v5.0 协议版本,是全球首个支持 MQTT 5.0 的公有云服务,支持 MQTT WebSocket 服务,完整支持 QoS0, QoS1 与 QoS2 级别 MQTT 消息。

2.2.2、多种协议接入

支持包含 MQTT、MQTT-SN、CoAP、LwM2M、私有 TCP 协议在内的多种通信协议接入,覆盖各类行业应用;可根据您的特殊使用场景定制私有化功能,充分契合业务需求。

2.2.3、容量预估与伸缩

通过连接数与消息吞吐量自动预估容量,通过紧密的监控来制定伸缩计划,集群大小可随业务规模平滑调整。

2.3、EMQ X 和 RabbitMQ对比

EMQ X 是基于高并发的 Erlang/OTP 语言平台开发,支持百万级连接、分布式集群架构、发布订阅模式的开源 MQTT 消息服务器。开源至今,EMQ X 在全球物联网市场得到了广泛应用。在开源版基础上,还陆续发展了商业版和提供云版本(cloud-hosting)(https://www.emqx.com/zh/cloud)。EMQ X 支持很多插件,具有强大拓展能力,用户依靠插件可以实现更多的功能。

RabbitMQ 是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ 服务器也是基于 Erlang 语言开发的,现在可以通过插件配置的形式,使其支持 MQTT 协议。

2.3.1、测试场景

以下的测试均使用了 QoS 1 的消息。当发送 QoS 1 的消息时,这些消息每次都要作为可持久化的备份保存在硬盘上。所以队列空间的使用也尤为重要。

这次评测使用了一个云主机 M5 large 的实例,每个 MQTT 消息服务器集群由 3 个节点组成,每个节点的配置是双核,8GB 内存。需要强调的是,我们对于 EMQ X 和 RabbitMQ 的测试使用了完全一致的硬件资源以消除变量。

压力测试将会有两个场景,「多对一」 和 「一对多」。

多对一
许多设备作为发布者,如温度传感器或者是压力传感器,发送数据给一个服务器。服务器再将这些数据发送给一个控制器(即订阅者)处理这些数据。
在这里插入图片描述

一对多
一个控制器作为发布者将消息传送给服务器,再由服务器将这些消息传送给多个作为订阅者的设备。
在这里插入图片描述

在每个场景里,「多」的那一方的数量将会从 2000 个逐渐上升到 10000 个。每个场景里,每一秒会发送一条载荷为 256 字节的消息。这样的发布并不会造成过大的吞吐量。仅仅使用 256 字节载荷是为了展示出这两个服务器的工作原理,以及他们的集群模式如何对这些场景作出反应的。

2.3.2、测试结果

左侧Y轴是指 CPU 占用,底部X轴是指「多」侧的客户端数量变化。

多对一
从 「多对一」 的结果可以看出,EMQ X 和 RabbitMQ 相比并没有太大差别。

在这里插入图片描述

一对多
但是从「一对多」的结果来看,RabbitMQ 相比于 EMQ X 确实有很明显的差距。

在这里插入图片描述

2.3.3、测试总结

结果表明:在「多对一」 场景中,EMQ X 和 RabbitMQ 相比并没有太大差别;而在「一对多」场景中,RabbitMQ 则较 EMQ X 产生了较为明显的差距。相比较而言,rabbitmq使用MQTT协议,和EMQX使用MQTT协议存在着一定的差距。

2.3.4、注意

使用MQTT的发布-订阅模型不能满足使用要求。可以选择使用AMQP。

3、Eclipse Paho Java

Paho Java客户端是用Java编写的MQTT客户端库,用于开发在JVM或其他Java兼容平台(例如Android)上运行的应用程序。
Paho不仅可以对接EMQ X Broker,还可以对接满足符合MQTT协议规范的消息代理服务端,目前Paho可以支持到MQTT5.0以下版本。MQTT3.3.1协议版本基本能满足百分之九十多的接入场景。

4、SpringBoot整合Eclipse Paho Java

EMQX是消息服务器,而我们java想要发送消息,和订阅消息都是和服务器打交道,想要和服务器打交道就需要想办法连上他,这时候就需要用到了Eclipse Paho Java客户端,用来在java当中连接EMQX消息服务器。

下面案例是按照我的应用场景来写的,监听单独用了一个客户端存入了内存,使用了static变量,启动项目的时候初始化,发送客户端并没有存入内存,而是发送一条,创建一个客户端。这里有一点需要注意,客户端id一定不要重复,就是对于MQTT服务器来说,clientid一定要保持唯一。

4.1、导入依赖
我用的springboot版本是2.3.9.RELEASE

   <!-- mqtt --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-stream</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency><!--配置文件报错问题--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-configuration-processor</artifactId><optional>true</optional></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.22</version><scope>provided</scope></dependency>

4.2、读取配置

在application.yml当中添加

这个配置只是作为客户端的
mqtt:hostUrl: tcp://192.168.56.103:1883username: adminpassword: publicclient-id: equipment_maincleanSession: truereconnect: truetimeout: 100keepAlive: 100defaultTopic: client:report:1isOpen: trueqos: 1
这个配置是作为客户端和服务器发送信息
mqtt:hostUrl: tcp://192.168.1.77:1883username: devpassword: devclient-id: MQTT-CLIENT-DEVcleanSession: truereconnect: truetimeout: 100keepAlive: 100
#  defaultTopic: client/dev/reportdefaultTopic: server/dev/reportserverTopic: server/dev/reportisOpen: trueqos: 0

通过这个文件来读取配置

@Component
@ConfigurationProperties("mqtt")
@Data
public class MqttProperties {/*** 用户名*/private String username;/*** 密码*/private String password;/*** 连接地址*/private String hostUrl;/*** 客户端Id,同一台服务器下,不允许出现重复的客户端id*/private String clientId;/*** 默认连接主题*/private String defaultTopic;/*** 超时时间*/private int timeout;/*** 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端* 发送个消息判断客户端是否在线,但这个方法并没有重连的机制*/private int keepAlive;/*** 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连* 接记录,这里设置为true表示每次连接到服务器都以新的身份连接*/private Boolean cleanSession;/*** 是否断线重连*/private Boolean reconnect;/*** 启动的时候是否关闭mqtt*/private Boolean isOpen;/*** 连接方式*/private Integer qos;
}

4.3、添加mqtt接受服务的客户端

@Component
public class MqttAcceptClient {private static final Logger logger = LoggerFactory.getLogger(MqttAcceptClient.class);@Autowiredprivate MqttAcceptCallback mqttAcceptCallback;@Autowiredprivate MqttProperties mqttProperties;public static MqttClient client;private static MqttClient getClient() {return client;}private static void setClient(MqttClient client) {MqttAcceptClient.client = client;}/*** 客户端连接*/public void connect() {MqttClient client;try {client = new MqttClient(mqttProperties.getHostUrl(), mqttProperties.getClientId(), new MemoryPersistence());MqttConnectOptions options = new MqttConnectOptions();options.setUserName(mqttProperties.getUsername());options.setPassword(mqttProperties.getPassword().toCharArray());options.setConnectionTimeout(mqttProperties.getTimeout());options.setKeepAliveInterval(mqttProperties.getKeepAlive());options.setAutomaticReconnect(mqttProperties.getReconnect());options.setCleanSession(mqttProperties.getCleanSession());MqttAcceptClient.setClient(client);try {// 设置回调client.setCallback(mqttAcceptCallback);client.connect(options);} catch (Exception e) {e.printStackTrace();}} catch (Exception e) {e.printStackTrace();}}/*** 重新连接*/public void reconnection() {try {client.connect();} catch (MqttException e) {e.printStackTrace();}}/*** 订阅某个主题** @param topic 主题* @param qos   连接方式*/public void subscribe(String topic, int qos) {logger.info("==============开始订阅主题==============" + topic);try {client.subscribe(topic, qos);} catch (MqttException e) {e.printStackTrace();}}/*** 取消订阅某个主题** @param topic*/public void unsubscribe(String topic) {logger.info("==============开始取消订阅主题==============" + topic);try {client.unsubscribe(topic);} catch (MqttException e) {e.printStackTrace();}}
}

4.4、添加mqtt接受服务的回调类

@Component
public class MqttAcceptCallback implements MqttCallbackExtended {private static final Logger logger = LoggerFactory.getLogger(MqttAcceptCallback.class);@Autowiredprivate MqttAcceptClient mqttAcceptClient;/*** 客户端断开后触发** @param throwable*/@Overridepublic void connectionLost(Throwable throwable) {logger.info("连接断开,可以做重连");if (MqttAcceptClient.client == null || !MqttAcceptClient.client.isConnected()) {logger.info("emqx重新连接....................................................");mqttAcceptClient.reconnection();}}/*** 客户端收到消息触发** @param topic       主题* @param mqttMessage 消息*/@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {logger.info("接收消息主题 : " + topic);logger.info("接收消息Qos : " + mqttMessage.getQos());logger.info("接收消息内容 : " + new String(mqttMessage.getPayload()));
//        int i = 1/0;}/*** 发布消息成功** @param token token*/@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {String[] topics = token.getTopics();for (String topic : topics) {logger.info("向主题:" + topic + "发送消息成功!");}try {MqttMessage message = token.getMessage();byte[] payload = message.getPayload();String s = new String(payload, "UTF-8");logger.info("消息的内容是:" + s);} catch (MqttException e) {e.printStackTrace();} catch (UnsupportedEncodingException e) {e.printStackTrace();}}/*** 连接emq服务器后触发** @param b* @param s*/@Overridepublic void connectComplete(boolean b, String s) {logger.info("--------------------ClientId:"+ MqttAcceptClient.client.getClientId() + "客户端连接成功!--------------------");// 以/#结尾表示订阅所有以test开头的主题// 订阅所有机构主题mqttAcceptClient.subscribe("client:report:1", 0);}
}

4.5、添加mqtt发送客户端

@Component
public class MqttSendClient {private static final Logger logger = LoggerFactory.getLogger(MqttSendClient.class);@Autowiredprivate MqttSendCallBack mqttSendCallBack;@Autowiredprivate MqttProperties mqttProperties;public MqttClient connect() {MqttClient client = null;try {String uuid = UUID.randomUUID().toString().replaceAll("-","");client = new MqttClient(mqttProperties.getHostUrl(),uuid , new MemoryPersistence());MqttConnectOptions options = new MqttConnectOptions();options.setUserName(mqttProperties.getUsername());options.setPassword(mqttProperties.getPassword().toCharArray());options.setConnectionTimeout(mqttProperties.getTimeout());options.setKeepAliveInterval(mqttProperties.getKeepAlive());options.setCleanSession(true);options.setAutomaticReconnect(false);try {// 设置回调client.setCallback(mqttSendCallBack);client.connect(options);} catch (Exception e) {e.printStackTrace();}} catch (Exception e) {e.printStackTrace();}return client;}/*** 发布消息* 主题格式: server:report:$orgCode(参数实际使用机构代码)** @param retained    是否保留* @param orgCode     orgId* @param pushMessage 消息体*/public void publish(boolean retained, String orgCode, String pushMessage) {MqttMessage message = new MqttMessage();message.setQos(mqttProperties.getQos());message.setRetained(retained);message.setPayload(pushMessage.getBytes());MqttDeliveryToken token;MqttClient mqttClient = connect();try {mqttClient.publish("server:report:" + orgCode, message);} catch (MqttException e) {e.printStackTrace();} finally {disconnect(mqttClient);close(mqttClient);}}/*** 关闭连接** @param mqttClient*/public static void disconnect(MqttClient mqttClient) {try {if (mqttClient != null) mqttClient.disconnect();} catch (MqttException e) {e.printStackTrace();}}/*** 释放资源** @param mqttClient*/public static void close(MqttClient mqttClient) {try {if (mqttClient != null) mqttClient.close();} catch (MqttException e) {e.printStackTrace();}}
}

4.6、添加mqtt发送客户端的回调类

@Component
public class MqttSendCallBack implements MqttCallbackExtended {private static final Logger logger = LoggerFactory.getLogger(MqttSendCallBack.class);/*** 客户端断开后触发** @param throwable*/@Overridepublic void connectionLost(Throwable throwable) {logger.info("连接断开,可以做重连");}/*** 客户端收到消息触发** @param topic       主题* @param mqttMessage 消息*/@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {logger.info("接收消息主题 : " + topic);logger.info("接收消息Qos : " + mqttMessage.getQos());logger.info("接收消息内容 : " + new String(mqttMessage.getPayload()));}/*** 发布消息成功** @param token token*/@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {String[] topics = token.getTopics();for (String topic : topics) {logger.info("向主题:" + topic + "发送消息成功!");}try {MqttMessage message = token.getMessage();byte[] payload = message.getPayload();String s = new String(payload, "UTF-8");logger.info("消息的内容是:" + s);} catch (MqttException e) {e.printStackTrace();} catch (UnsupportedEncodingException e) {e.printStackTrace();}}/*** 连接emq服务器后触发** @param b* @param s*/@Overridepublic void connectComplete(boolean b, String s) {logger.info("--------------------ClientId:"+ MqttAcceptClient.client.getClientId() + "客户端连接成功!--------------------");}
}

4.7、添加配置类
自定义配置,通过这个配置,来控制启动项目的时候是否启动mqtt

public class MqttCondition implements Condition {@Overridepublic boolean matches(ConditionContext context, AnnotatedTypeMetadata annotatedTypeMetadata) {//1、能获取到ioc使用的beanfactoryConfigurableListableBeanFactory beanFactory = context.getBeanFactory();//2、获取类加载器ClassLoader classLoader = context.getClassLoader();//3、获取当前环境信息Environment environment = context.getEnvironment();String isOpen = environment.getProperty("mqtt.isOpen");return Boolean.valueOf(isOpen);}
}

4.8、启动服务的时候开启监听客户端

@Configuration
public class MqttConfig {@Autowiredprivate MqttAcceptClient mqttAcceptClient;/*** 订阅mqtt** @return*/@Conditional(MqttCondition.class)@Beanpublic MqttAcceptClient getMqttPushClient() {mqttAcceptClient.connect();return mqttAcceptClient;}
}

4.9、测试类

@RestController
@RequestMapping("/mqtt")
public class MqttController {@Autowiredprivate MqttSendClient MqttSendClient;@GetMapping(value = "/publishTopic")public Object publishTopic(String sendMessage) {System.out.println("message:"+sendMessage);sendMessage=sendMessage+" : {\"name\":\"ljf\",\"age\":345}";MqttSendClient.publish(false,"client:report:2",sendMessage);return null;}}

5、发送和监听消息测试
在这里插入图片描述

测试监听:
http://192.168.1.77:18083/ 进入mq端 查看订阅客户端 账号:admin 密码: public
在这里插入图片描述

测试发送消息:
访问:http://localhost:8080/mqtt/publishTopic?sendMessage=测试mqtt数据通信&topic=main

我的测试是俩个客户端,下面是结果
在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/691130.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

智慧城市驿站:智慧公厕升级版,打造现代化城市生活的便捷配套

随着城市化进程的加速&#xff0c;人们对城市生活质量的要求也越来越高。作为智慧城市建设的一项重要组成部分&#xff0c;多功能城市智慧驿站应运而生。它集合了信息技术、设计美学、结构工艺、系统集成、环保节能等多个亮点&#xff0c;将现代科技与城市生活相融合&#xff0…

qt for python创建UI界面

现在很多库都有用到python,又想使用QT creater创作界面&#xff0c;来使用。 1.使用的版本 使用虚拟机安装Ubuntu22.04&#xff0c;Ubuntu使用命令行安装qt,默认安装的是QT5&#xff0c;不用来回调了&#xff0c;就用系统默认的吧&#xff0c;不然安装工具都要费不少事情。pyt…

SimpleDateFormat为什么是线程不安全的?

目录 在日常开发中&#xff0c;Date工具类使用频率相对较高&#xff0c;大家通常都会这样写&#xff1a;这很简单啊&#xff0c;有什么争议吗&#xff1f;格式化后出现的时间错乱。看看Java 8是如何解决时区问题的&#xff1a;在处理带时区的国际化时间问题&#xff0c;推荐使用…

[力扣 Hot100]Day30 两两交换链表中的节点

题目描述 给你一个链表&#xff0c;两两交换其中相邻的节点&#xff0c;并返回交换后链表的头节点。你必须在不修改节点内部的值的情况下完成本题&#xff08;即&#xff0c;只能进行节点交换&#xff09;。 出处 思路 前两个结点先偷一手用交换val做&#xff0c;从链表第1…

算法面试八股文『 模型详解篇 』

说在前面 这是本系列的第二篇博客&#xff0c;主要是整理了一些经典模型的原理和结构&#xff0c;面试有时候也会问到这些模型的细节&#xff0c;因此都是需要十分熟悉的。光看原理还不够&#xff0c;最好是能用代码试着复现&#xff0c;可以看看李沐老师深度学习的教材&#…

沁恒CH32V30X学习笔记11---使用外部时钟模式2采集脉冲计数

使用外部时钟模式2采集脉冲计数 使用外部触发模式 2 能在外部时钟引脚输入的每一个上升沿或下降沿计数。将 ECE 位置位时,将使用外部时钟源模式 2。使用外部时钟源模式 2 时,ETRF 被选定为 CK_PSC。ETR 引脚经过可选的反相器(ETP),分频器(ETPS)后成为 ETRP,再经过滤波…

自动化上位机开发C#100例:如何用面向对象的方式封装雷赛运动控制卡EtherCAT总线卡(C#代码)

自动化上位机开发C#100例:雷赛运动控制卡EtherCAT总线卡C#封装类 文章目录 LTDMC.dll下载LTDMC.cs LTDMC.dll C#调用封装下载ICard.cs 运动控制卡接口Card.cs 运动控制卡抽象类CardLTDMC.cs 雷赛运动控制卡EtherCAT总线卡实现类CardList.cs 总线卡列表封装 LTDMC.dll下载 最新…

人工智能|机器学习——基于机器学习的舌苔检测

代码下载&#xff1a; 基于深度学习的舌苔检测毕设留档.zip资源-CSDN文库 1 研究背景 1.1.研究背景与意义 目前随着人们生活水平的不断提高&#xff0c;对于中医主张的理念越来越认可&#xff0c;对中医的需求也越来越多。在诊断中&#xff0c;中医通过观察人的舌头的舌质、苔…

基于STM32F407的coreJSON使用教程

目录 概述 工程建立 代码集成 函数介绍 使用示例 概述 coreJSON是FreeRTOS中的一个组件库&#xff0c;支持key查找的解析器&#xff0c;他只是一个解析器&#xff0c;不能生成json数据。同时严格执行 ECMA-404 JSON 标准。该库用 C 语言编写&#xff0c;设计符合 ISO C90…

基于Java SSM框架实现生鲜食品o2o商城系统项目【项目源码+论文说明】计算机毕业设计

基于java的SSM框架实现生鲜食品o2o商城系统演示 摘要 随着社会的发展&#xff0c;社会的各行各业都在利用信息化时代的优势。计算机的优势和普及使得各种信息系统的开发成为必需。 生鲜食品o2o商城系统&#xff0c;主要的模块包括查看管理员&#xff1b;首页、个人中心、用户…

数学建模:BP神经网络(含python实现)

原理 BP 神经网络&#xff0c;也称为多层感知机&#xff08;Multilayer Perceptron&#xff0c;MLP&#xff09;&#xff0c;是一种常见的神经网络模型&#xff0c;用于解决各种机器学习问题&#xff0c;包括分类和回归。BP 代表“反向传播”&#xff08;Backpropagation&#…

领域驱动设计(Domain Driven Design)

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、场景和要求二、领域模型关键词1.领域2.子域3.通用语言4.限界上下文5.领域模型6.实体和值对象7.聚合根8.领域服务9.领域事件 总结 前言 Domain Driven Desi…

备战蓝桥杯---动态规划(应用1)

话不多说&#xff0c;直接看题&#xff1a; 首先我们考虑暴力&#xff0c;用二维前缀和即可&#xff0c;复杂度为o(n^4). 其实&#xff0c;我们不妨枚举任意2行&#xff0c;枚举以这个为边界的最大矩阵。 我们把其中的每一列前缀和维护出来&#xff0c;相当于把一个矩阵压缩成…

1902_野火FreeRTOS教程内核在STM32中用到的2个中断PENDSV和SYSTICK

1902_野火FreeRTOS教程内核在STM32中用到的2个中断PENDSV和SYSTICK 全部学习汇总&#xff1a; g_FreeRTOS: FreeRTOS学习笔记 (gitee.com) 上面是涉及到的源代码&#xff0c;而这次需要分析的就是78、79行的两个中断。首先&#xff0c;需要确认NVIC_SYSPRI2寄存器的作用。 进一…

PostgreSQL使用session_exec和file_fdw实现失败次数锁定用户策略

使用session_exec 、file_fdw以及自定义函数实现该功能。 缺陷&#xff1a;实测发现锁用户后&#xff0c;进去解锁特定用户。只能允许一次登陆&#xff0c;应该再次登陆的时候&#xff0c;触发函数&#xff0c;把之前的日志里的错误登陆的信息也计算到登录次数里了。而且foreig…

macOS上使用VScode编译配置C++语言开发环境

本文介绍macOS上使用VScode编译配置C语言开发环境 1.准备工作 安装C/C插件 2.配置c_cpp_properties.json文件 [⇧⌘P]打开命令模式&#xff0c;选择[C/Cpp: Edit Configurations(JSON)]命令&#xff0c;回车后会自动生成一个.vscode目录&#xff0c;目录下有一个c_cpp_prope…

数学在现代经济学研究中的作用

数学在现代经济学研究中的作用 The Role of Mathematics in Modern Economic Research 经济学&#xff0c;作为一门研究人类如何在资源有限的情况下做出选择的社会科学&#xff0c;历来都与数学有着紧密的联系。随着科技的发展&#xff0c;特别是在信息时代数据量的爆炸性增长&…

【漏洞复现】H3C 路由器多系列信息泄露漏洞

Nx01 产品简介 H3C路由器是一款高性能的路由器产品&#xff0c;具有稳定的性能和丰富的功能。它采用了先进的路由技术和安全机制&#xff0c;可以满足不同用户的需求&#xff0c;广泛应用于企业、运营商和数据中心等领域。 Nx02 漏洞描述 H3C路由器多系列存在信息泄露漏洞&…

林浩然与杨凌芸的Java奇遇记:Map世界的恋爱攻略

林浩然与杨凌芸的Java奇遇记&#xff1a;Map世界的恋爱攻略 The Java Adventure of Lin Haoran and Yang Lingyun: Love Strategy in the Map World 在一个充满代码香气的世界里&#xff0c;男主角林浩然&#xff0c;一个热衷于Java编程的程序员大侠&#xff0c;以其深厚的内功…

K8s进阶之路-核心概念/架构:

架构&#xff1a;Master/Node Master组件--主控节点{ 负责集群管理&#xff08;接收用户事件转化成任务分散到node节点上&#xff09;} Apiserver&#xff1a; 资源操作的唯一入口&#xff0c;提供认证、授权、API注册和发现等机制 Scheduler &#xff1a; 负责集群资源调度&am…