EMQX 实践

MQTT 核心概念

发布订阅

MQTT 基于发布订阅模式,它解耦了消息的发送方(发布者)和接收方(订阅者),引入了一个中间代理的角色来完成消息的路由和分发。发布者和订阅者不需要知道彼此的存在,他们之间唯一的联系就是对消息的一致约定,例如消息将使用什么主题、消息将包含哪些字段等等。这让 MQTT 的通信更加灵活,因为我们可以随时动态地增加或减少订阅者和发布者。通过发布订阅,我们可以轻易地实现消息的广播、组播和单播。

服务端

在发布消息的客户端和订阅的客户端之间充当中介,将所有接收到的消息转发到匹配的订阅客户端。所以有时我们也会直接将服务端称为 Broker。

客户端

使用 MQTT 协议连接到 MQTT 服务端的设备或应用程序。它既可以是发布者,也可以是订阅者,也可以具备这两种身份。

主题

主题被用来标识和区分不同的消息,它是 MQTT 消息路由的基础。发布者可以在发布时指定消息的主题,订阅者则可以选择订阅自己感兴趣的主题来接收相关的消息。

通配符

订阅者可以在订阅的主题中使用通配符来达到一次订阅多个主题的目的。MQTT 提供了单层通配符和多层通配符两种主题通配符,以满足不同的订阅需要。

QoS

MQTT 定义了三种 QoS 等级,来分别提供不同的消息可靠性保证。每条消息都可以在发布时独立设置自己的 QoS。QoS 0 最多交付一次,消息可能丢失;QoS 1 至少交付一次,消息可以保证到达,但是可能重复;QoS 2 只交付一次,消息保证到达,并且不会重复。QoS 越大,消息的传输复杂程度也越高,我们需要根据实际场景来选择合适的 QoS。

会话

QoS 只是设计了消息可靠到达的理论机制,而会话则确保了 QoS 1、2 的协议流程得以真正实现。会话是客户端与服务端之间的有状态交互,它可以仅持续和网络连接一样长的时间,也可以跨越多个网络连接存在,我们通常将后者称为持久会话。我们可以选择让连接从已存在的会话中恢复,也可以选择从一个全新的会话开始。

保留消息

与普通消息不同,保留消息可以保留在 MQTT 服务器中。任何新的订阅者订阅与该保留消息中的主题匹配的主题时,都会立即接收到该消息,即使这个消息是在它们订阅主题之前发布的。这使订阅者在上线后可以立即获得数据更新,而不必等待发布者再次发布消息。在某种程度上,我们可以把保留消息当作是一个消息 “云盘” 来使用:随时上传消息到 “云盘”,然后在任意时刻从 “云盘” 获取消息。当然,这个 “云盘” 还有一个主题下只能存储一条最新的保留消息的限制。

遗嘱消息

发布订阅模式的特性决定了,除了服务器以外没有客户端能够感知到某个客户端从通信网络中离开。而遗嘱消息则为连接意外断开的客户端提供了向其他客户端发出通知的能力。客户端可以在连接时向服务器设置自己的遗嘱消息,服务器将在客户端异常断开后立即或延迟一段时间后发布这个遗嘱消息。而订阅了对应遗嘱主题的客户端,将收到这个遗嘱消息,并且采取相应的措施,例如更新该客户端的在线状态等等。

共享订阅

默认情况下,消息会被转发给所有匹配的订阅者。但有时,我们可能希望多个客户端协同处理接收到的消息,以便以水平扩展的方式来提高负载能力。又或者,我们希望为客户端增加一个备份客户端,当主客户端离线时,能够无缝切换到备份客户端继续接收消息,以确保高可用性。而 MQTT 的共享订阅特性,则提供了这一能力。我们可以将客户端划分为多个订阅组,消息仍然会被转发给所有订阅组,但每个订阅组内每次只会有一个客户端收到消息。


MQTT选型

MQTT BROKER 技术选型


EMQX安装

本地开发环境

可以选择安装Windows版本
Windows安装EMQ X
官方Windows部署
安装成功后,直接访问http://localhost:18083/
账号/密码:admin/public
image.png

生产环境

EMQX 本身支持分布式集群架构,能够在保证高可用性、容错性和可扩展性的同时,处理大量的客户端和消息。通过使用 EMQX 集群,您可以在一个或多个节点发生故障时仍然保持集群运行,从而享受到容错和高可用性的好处。
相比与之前版本,EMQX 5.0 集群采用了新的 Mria 集群架构,单节点能支持 500 万 MQTT 设备连接,集群可扩展至 1 亿并发 MQTT 连接。官方集群部署
image.png


安全指南

网络与 TLS
介绍了 EMQX 如何支持端对端加密通信,包括如何启用 SSL/TLS 连接和获取 SSL/TLS 证书。
认证
身份认证是物联网应用的重要组成部分,可以帮助有效阻止非法客户端的连接。为了提供更好的安全保障,EMQX 支持多种认证机制,如 X.509 证书认证、密码认证、JWT 认证、基于 MQTT 5.0 协议的增强认证以及 PSK 认证。本节介绍了这些认证机制的工作方式和配置方法。
授权
在 EMQX 中,授权是指对 MQTT 客户端的发布和订阅操作进行权限控制。本节将介绍如何通过内置数据库、文件、或通过集成 MySQL、PostgreSQL、MongoDB 和 Redis 进行授权相关操作。
黑名单
EMQX 为用户提供了黑名单功能,用户可以通过 Dashboard 和 HTTP API 将指定客户端加入黑名单以拒绝该客户端访问,除了客户端标识符以外,还支持直接封禁用户名甚至 IP 地址。
连接抖动检测
EMQX 支持自动封禁那些被检测到短时间内频繁登录的客户端,并且在一段时间内拒绝这些客户端的登录,以避免此类客户端过多占用服务器资源而影响其他客户端的正常使用。

认证

EMQX Dashboard 提供了开箱即用的认证与权限管理功能,用户仅通过用户界面,就可以快速实现客户端认证授权机制的配置,无需编写代码或手动编辑配置文件,即可对接各类数据源与认证服务,实现各个级别与各类场景下的安全配置,以更高的开发效率获得更安全的保障。

创建认证

在认证页面下的右上角,点击 创建 按钮,即可进入到创建认证的页面。创建一个认证需要选择一种认证方式,选择完成后需要选择一个存储或获取认证信息的数据源(JWT 认证方式除外),认证数据可以从这些数据源包括数据库或 HTTP 服务中获取,最后再配置连接到该数据源的连接信息即可。
认证方式:Password-Based,使用客户端 ID 或用户名加密码的认证方式;
image.png
数据源选择:redis
image.png
选择加密方式及加盐方式:加密方式md5 ,加盐方式prefix
image.png
初始化数据到redis:
HMSET “mqtt_user:username” “password_hash” “66ace8890090c2a50e729318d45fe53b” “salt” “abc”

验证

image.png

http签名配置

创建API秘钥

image.png
image.png

记录秘钥

appId: *************
appSecret: *************

MQTT通用组件开发

源码地址

目录

├─component-mqtt-client
└─component-mqtt-client-starter

component-mqtt-client

mqtt上下文

image.png

建立连接
public MqttClientApp connect() {countDownLatch = new CountDownLatch(1);Vertx.vertx().deployVerticle(this);return this;
}
接收消息
 @Overridepublic void start() {if (Objects.isNull(this.mqttClient)) {this.mqttClient = MqttClient.create(vertx, createMqttClientOptions());}//接收服务端消息处理handlermqttClient.publishHandler(pub -> {Buffer buffer = pub.payload();String topicName = pub.topicName();String[] split = topicName.split("/");String string = buffer.toString(StandardCharsets.UTF_8);UpMessage upRawMessage = new UpMessage();HashMap<String, Object> headers = Maps.newHashMap();headers.put("topic",topicName);headers.put("qos",pub.qosLevel().value());upRawMessage.setHeaders(headers);upRawMessage.setMessageContent(string);upRawMessage.setProductKey(split[0]);upRawMessage.setDeviceId(split[1]);mqttListenerList.forEach(f -> {String topic = f.getTopic();String[] listenerTopic = topic.split("/");boolean flag = true;for (int i = 0; i < split.length; i++) {if (allWildcard.equals(listenerTopic[i])) {break;}if (singleWildcard.equals(listenerTopic[i])) {continue;}if (!split[i].equals(listenerTopic[i])) {flag = false;break;}}if (flag){f.onMessage(upRawMessage);}});});mqttClient.closeHandler(unused -> getVertx().setTimer(RECONNECT_INTERVAL, h -> start()));mqttClient.connect(mqttConfig.getListenerInfos().getPort(), mqttConfig.getListenerInfos().getHost(),s -> {if (s.succeeded()) {log.info("MqttClient connect success.");subscribe();countDownLatch.countDown();} else {log.error("MqttClient connect fail: ", s.cause());if (s.cause() != null) {vertx.setTimer(RECONNECT_INTERVAL, handler -> this.start());}}});}
长连接推送消息
public MqttResp publish(MqttReq request) {
MqttResp response = new MqttResp();
Buffer payload = Buffer.buffer(request.getMessageContent());
mqttClient.publish(request.getTopic(), payload, MqttQoS.valueOf(request.getQos()), false, false, s -> {if (s.succeeded()) {log.info("===>MqttClient publish success[{}]", s.result());} else {log.error("===>MqttClient publish fail.", s.cause());}
});
response.setCode(200);
return response;
}
http推送消息
public Map<String, ?> callHttp(MqttReq params) {
String path = "";
String url = config().getAddress() + path;
log.debug("http url[{}] requestBodyStr[{}]", url, params.getMessageContent());Dict dict = Dict.create();
dict.set("topic", params.getTopic());              //订阅主题
dict.set("payload", params.getMessageContent());   //内容
dict.set("qos", 0);                                //质量
dict.set("retain",false);                          //是否保存
String requestBodyStr = JSON.toJSONString(dict);RequestBody requestBody = RequestBody.create(HTTP_MEDIA_TYPE_JSON_UTF8, requestBodyStr);
Request request = new Request.Builder()
.url(url)
.post(requestBody)
.header("Content-Type", "application/json")
.header("Authorization", Credentials.basic(config().getAppId(), config().getAppSecret()))
.build();try (Response response = getHttpClientInstance().newCall(request).execute()) {log.debug("Call http success. url[{}] response[{}]", url, response);if (response.code() == 404) {return ImmutableMap.of("code", 404, "Message", "404 Not Found");} else if (!response.isSuccessful()) {return ImmutableMap.of("code", response.code(), "Message", "Server Error");}// 输出响应内容assert response.body() != null;String string = response.body().string();return JSON.parseObject(string);
} catch (IOException e) {log.warn("Call http failed, {}. url[{}] requestBodyStr[{}]", e.getMessage(), url, requestBodyStr);
}return Collections.emptyMap();
}
Mqtt配置信息

image.png

public class MqttConfig {private String appId;private String appSecret;private String address;private String username;private String password;private ListenerInfo listenerInfos;@Data@NoArgsConstructor@AllArgsConstructorpublic static class ListenerInfo {private String host;private int port;private boolean ssl;//订阅的topicprivate List<String> subscribeTopics;}}
监听信息接口

image.png

public interface MqttListener {void setTopic(String topic);String getTopic();void onMessage(Message message);
}

component-mqtt-client-starter

MqttClientAutoConfiguration

image.png

META-INF

image.png

com.gitee.xmhzzz.component.mqtt.client.MqttClientAutoConfiguration

Mqtt服务实战demo

通过component-mqtt-client-starter快速构建mqtt-service服务
image.png

发生消息

public class MqttController {@Autowiredprivate IMqttApi IMqttApi;@PostMapping("/pub/tcp")public void pubTcp(){MqttReq mqttReq = new MqttReq();mqttReq.setTopic("topicA/001/in");Map<String, Object> map = Maps.newHashMap();map.put("1","o");mqttReq.setData(map);IMqttApi.tcpPub(mqttReq);}
}

监听消息

@Slf4j
@Component
public class AMqttListener implements MqttListener {private String topic;public AMqttListener() {this.topic = "topicA/+/msg";}@Overridepublic void setTopic(String topic) {}@Overridepublic String getTopic() {return this.topic;}@Overridepublic void onMessage(Message message) {log.info("a message[{}]", JSONObject.toJSONString(message));}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/751433.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

mybatis实践篇(一)

日志&#xff08;logImpl&#xff09; StdOutImpl <setting name"logImpl" value"org.apache.ibatis.logging.stdout.StdOutImpl"/>Slf4jImpl <setting name"logImpl" value"org.apache.ibatis.logging.slf4j.Slf4jImpl"/&…

cannot find -xml2: No such file or directory的解决方法

一&#xff0c;问题现象 在编译库的时候出现如下图所示的报错&#xff1a;C:/msys64/mingw32/bin/…/lib/gcc/i686-w64-mingw32/13.2.0/…/…/…/…/i686-w64-mingw32/bin/ld.exe: ca nnot find -lxml2: No such file or directory collect2.exe: error: ld returned 1 exit s…

146 Linux 网络编程2 ,Socket编程,如何创建Linux 服务器 和linux 客户端

IPport 就是一个程序在网络上的身份证号码。 这意味着我们需要如果写一个服务器&#xff0c;至少需要将这台服务器的ip 和 端口号写到程序里面。 实际上更细化的说&#xff1a;应该是将这三都写进程序里面 &#xff1a; IP类型&#xff08;IPV4或者IPV6&#xff09;&#xff…

linux——进程(1)

目录 一、概念 1.1、认识进程 1.2、进程描述符&#xff08;PCB&#xff09; 1.3、进程的结构体&#xff08;task_struct&#xff09; 二、查看进程 三、获取进程的Pid和PPid 3.1、通过系统调用获取进程的PID和PPID 四、创建进程 4.1、fork() 4.2、用if进行分流 五、…

NCV1117ST50T3G线性稳压器芯片中文资料规格书PDF数据手册引脚图图片价格参数

产品概述&#xff1a; NCP1117系列为低压差&#xff08;LDO&#xff09;正向线性电压稳压器&#xff0c;能够提供超过1.0A的输出电流&#xff0c;800mA时温度范围内最大压差为1.2V。这一系列包括八个固定输出电压&#xff1a;1.5V、1.8V、2.0V、2.5V、2.85V、3.3V、5.0V 和 12…

2024/3/15 记录简版抖音部署遇到的问题

1、Centos连不上网 参考这一篇&#xff1a;虚拟机 CentOS 有线连接图标直接消失&#xff0c;网络连接不上&#xff0c;网络连接失败的解决方案&#xff08;亲测有效&#xff09;_centos网络图标不见了-CSDN博客 2、SQLyog连接不到docker中的mysql 原因是对密码有加密过程 &a…

STM32F407_多点电容触摸(GT911)驱动

目录标题 前言1、简单介绍2、触摸芯片与主机的硬件连接3、内部寄存器3.1、控制寄存器&#xff08;0X8040&#xff09;3.2、配置寄存器组&#xff08;0X8047~0X8100&#xff09;3.3、状态寄存器(0x814E)3.4、坐标寄存器(0x8150-0x8177) 4、初始化流程4.1、IIC地址选择4.2、更新G…

html--简历

文章目录 html html <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"maximum-scale1.0,minimum-scale1.0,user-scalable0,widthdevice-width,initial-scale1.0&qu…

字母异位词分组【每日一题】

可以通过案例找到规律&#xff0c;每个词排序完后是同一个&#xff0c;所以通过hasmap存储排序过的值做key&#xff0c;值是存储单词集合。 package HasTable;import java.util.*;class Solution {static List<List<String>> groupAnagrams(String[] strs) {Map&l…

jupyter notebook 突然莫名奇妙的白屏

jupyter notebook 突然莫名奇妙的白屏 事件背景&#xff1a; 最近在折腾openai&#xff0c;哎&#xff0c;一言难尽&#xff0c;使用的是conda管理python版本的切换&#xff0c;使用jupyter notebook来运行python程序&#xff0c;其实PyCharm也行&#xff0c;但是&#xff0c;…

【递归搜索回溯专栏】专题二:二叉树中的深搜----二叉树剪枝

本专栏内容为&#xff1a;递归&#xff0c;搜索与回溯算法专栏。 通过本专栏的深入学习&#xff0c;你可以了解并掌握算法。 &#x1f493;博主csdn个人主页&#xff1a;小小unicorn ⏩专栏分类&#xff1a;递归搜索回溯专栏 &#x1f69a;代码仓库&#xff1a;小小unicorn的代…

redis发布订阅与stream类型

发布订阅 redis发布订阅(pub/sub)是一种消息通信模式&#xff1b;发送者(pub)发送消息&#xff0c;订阅者(sub)接收消息。redis客户端可以订阅任意数量的频道。 基础命令&#xff1a; 语法 redis publish命令基本语法如下&#xff1a; redis 127.0.0.1:6379> PUBLISH ch…

Matlab|考虑可再生能源消纳的电热综合能源系统日前经济调度模型

目录 1 主要内容 模型示意图 目标函数 程序亮点 2 部分程序 3 程序结果 4 下载链接 1 主要内容 本程序参考文献《考虑可再生能源消纳的建筑综合能源系统日前经济调度模型》模型&#xff0c;建立了电热综合能源系统优化调度模型&#xff0c;包括燃气轮机、燃气锅炉、余热…

Python基础(七)之数值类型集合

Python基础&#xff08;七&#xff09;之数值类型集合 1、简介 集合&#xff0c;英文set。 集合&#xff08;set&#xff09;是由一个或多个元素组成&#xff0c;是一个无序且不可重复的序列。 集合&#xff08;set&#xff09;只存储不可变的数据类型&#xff0c;如Number、…

修改yolov9的模型打印不出来Gflops的解决办法

正在修改yolov9的模块&#xff0c;发现修改后的模型没有GFlops这个参数 解决办法&#xff1a; 找到utils/torch_utils.py这个文件&#xff0c;有一个model_info函数 然后将其中的stride改为固定的640就可以打印了。 stride max(int(model.stride.max()), 32) if hasattr(mo…

telnet命令使用

window启用telnet telnet命令连接服务端 启动netty服务端后&#xff0c;使用如下cmd命令连接服务端&#xff0c;按enter&#xff0c;将连接到netty服务端 再按CTRL ]&#xff0c;进入命令交互界面 输入 help&#xff0c;查看命令介绍 发送消息&#xff0c;再断开连接&…

Linux:系统初始化,内核优化,性能优化(2)

优化ssh协议 Linux&#xff1a;ssh配置_ssh配置文件-CSDN博客https://blog.csdn.net/w14768855/article/details/131520745?ops_request_misc%257B%2522request%255Fid%2522%253A%2522171068202516800197044705%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fb…

拜占庭将军问题相关问题

1、拜占庭将军问题基本描述 问题 当我们讨论区块链共识时&#xff0c;为什么会讨论拜占庭将军问题&#xff1f; 区块链网络的本质是一个分布式系统&#xff0c;在存在恶意节点的情况下&#xff0c;希望 整个系统当中的善良节点能够对于重要的信息达成一致&#xff0c;这个机…

2024年3月18日 十二生肖 今日运势

小运播报&#xff1a;2024年3月18日&#xff0c;星期一&#xff0c;农历二月初九 &#xff08;甲辰年丁卯月辛巳日&#xff09;&#xff0c;法定工作日。 红榜生肖&#xff1a;牛、鸡、猴 需要注意&#xff1a;鼠、虎、猪 喜神方位&#xff1a;西南方 财神方位&#xff1a;…