ThingsBoard MQTT 连接认证过程 源码分析+图例

整个连接过程如图所示:

 高清图片链接

1、环境准备

  • thingsboard3.5.1 源码启动。(不懂怎么启动的,大家可以看我的博文ThingsBoard3.5.1源码启动)
  • MQTTX 客户端(用来连接 thingsboard MQTT)
  • 默认配置。queue.type=in-memory,cache.type=caffeine

因为我们的目的,是快速了解 thingsboard 的启动过程,所以所有的配置全部采用默认的方式。默认消息队列采用内存队列ConcurrentHashMap,缓存也采用内存缓存caffeine。

使用 customerA 用户账号密码登录,使用设备A1 AccessToken 连接。

2、源码分析

2.1 连接消息生产

2.1.1 入口

大家知道MQTT是基于TCP协议之上的轻量级通信协议,而TCP协议是面向连接、请求响应的通信协议。所以在 thingsboard 这一侧必然有一个服务器实现,用来等待客户端的连接。这个实现就是MqttTransportService

thingsboard 采用 netty 来实现一个MQTT server。

org.thingsboard.server.transport.mqtt.MqttTransportService@PostConstructpublic void init() throws Exception {log.info("Setting resource leak detector level to {}", leakDetectorLevel);ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.valueOf(leakDetectorLevel.toUpperCase()));log.info("Starting MQTT transport...");bossGroup = new NioEventLoopGroup(bossGroupThreadCount);workerGroup = new NioEventLoopGroup(workerGroupThreadCount);ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new MqttTransportServerInitializer(context, false)).childOption(ChannelOption.SO_KEEPALIVE, keepAlive);serverChannel = b.bind(host, port).sync().channel();if (sslEnabled) {b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new MqttTransportServerInitializer(context, true)).childOption(ChannelOption.SO_KEEPALIVE, keepAlive);sslServerChannel = b.bind(sslHost, sslPort).sync().channel();}log.info("Mqtt transport started!");}

其中,关系到 netty server 性能的 bossGroupThreadCount,workerGroupThreadCount

thingsboard 提取出两个参数变量 

NETTY_BOSS_GROUP_THREADS

NETTY_WORKER_GROUP_THREADS

方便用户根据自己的设备台数、部署架构,来优化自己的 netty 性能。

 netty server 的请求处理过程如下图所示,圆圈为具体实现类,方框为方法。

在 MqttTransportHandler#processMqttMsg 方法中,因为我们的消息类型是连接,所以我们会进入 processConnect 方法。

org.thingsboard.server.transport.mqtt.MqttTransportHandlervoid processMqttMsg(ChannelHandlerContext ctx, MqttMessage msg) {if (msg.fixedHeader() == null) {log.info("[{}:{}] Invalid message received", address.getHostName(), address.getPort());ctx.close();return;}deviceSessionCtx.setChannel(ctx);if (CONNECT.equals(msg.fixedHeader().messageType())) {processConnect(ctx, (MqttConnectMessage) msg);} else if (deviceSessionCtx.isProvisionOnly()) {processProvisionSessionMsg(ctx, msg);} else {enqueueRegularSessionMsg(ctx, msg);}}

 在 MqttTransportHandler#processConnect 方法中,由于采用 AccessToken 的授权方式,所以会进入 processAuthTokenConnect

MqttTransportHandler#processAuthTokenConnect 方法中,获取我们在MQTTX填的用户名、密码,然后委托给 DefaultTransportService#process 处理

org.thingsboard.server.transport.mqtt.MqttTransportHandlervoid processConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) {log.debug("[{}][{}] Processing connect msg for client: {}!", address, sessionId, msg.payload().clientIdentifier());String userName = msg.payload().userName();String clientId = msg.payload().clientIdentifier();deviceSessionCtx.setMqttVersion(getMqttVersion(msg.variableHeader().version()));if (DataConstants.PROVISION.equals(userName) || DataConstants.PROVISION.equals(clientId)) {deviceSessionCtx.setProvisionOnly(true);ctx.writeAndFlush(createMqttConnAckMsg(ReturnCode.SUCCESS, msg));} else {X509Certificate cert;if (sslHandler != null && (cert = getX509Certificate()) != null) {processX509CertConnect(ctx, cert, msg);} else {processAuthTokenConnect(ctx, msg);}}}private void processAuthTokenConnect(ChannelHandlerContext ctx, MqttConnectMessage connectMessage) {String userName = connectMessage.payload().userName();log.debug("[{}][{}] Processing connect msg for client with user name: {}!", address, sessionId, userName);TransportProtos.ValidateBasicMqttCredRequestMsg.Builder request = TransportProtos.ValidateBasicMqttCredRequestMsg.newBuilder().setClientId(connectMessage.payload().clientIdentifier());if (userName != null) {request.setUserName(userName);}byte[] passwordBytes = connectMessage.payload().passwordInBytes();if (passwordBytes != null) {String password = new String(passwordBytes, CharsetUtil.UTF_8);request.setPassword(password);}transportService.process(DeviceTransportType.MQTT, request.build(),new TransportServiceCallback<>() {@Overridepublic void onSuccess(ValidateDeviceCredentialsResponse msg) {onValidateDeviceResponse(msg, ctx, connectMessage);}@Overridepublic void onError(Throwable e) {log.trace("[{}] Failed to process credentials: {}", address, userName, e);ctx.writeAndFlush(createMqttConnAckMsg(ReturnCode.SERVER_UNAVAILABLE_5, connectMessage));ctx.close();}});}

2.1.2 DefaultTransportService

 一路跟下去

DefaultTbQueueRequestTemplate#sendToRequestTemplate 方法会调用 TbQueueProducer接口 send 方法,往主题 tb_transport.api.requests 发送消息。TbQueueProducer实现类是InMemoryTbQueueProducer

void sendToRequestTemplate(Request request, UUID requestId, SettableFuture<Response> future, ResponseMetaData<Response> responseMetaData) {log.trace("[{}] Sending request, key [{}], expTime [{}], request {}", requestId, request.getKey(), responseMetaData.expTime, request);if (messagesStats != null) {messagesStats.incrementTotal();}// 将消息发送给消息队列topic是tb_transport.api.requestsrequestTemplate.send(TopicPartitionInfo.builder().topic(requestTemplate.getDefaultTopic()).build(), request, new TbQueueCallback() {@Overridepublic void onSuccess(TbQueueMsgMetadata metadata) {if (messagesStats != null) {messagesStats.incrementSuccessful();}log.trace("[{}] Request sent: {}, request {}", requestId, metadata, request);}@Overridepublic void onFailure(Throwable t) {if (messagesStats != null) {messagesStats.incrementFailed();}pendingRequests.remove(requestId);future.setException(t);}});}
1、TbQueueProducer 接口的实现类有很多个,具体发送消息的实现类是哪一个呢?
因为我们使用内存队列方式启动,所以实现类是 InMemoryTbQueueProducer

2、怎么确定发送的主题是 tb_transport.api.requests

主题是通过 requestTemplate获取的

而 requestTemplate又是 DefaultTbQueueRequestTemplate的一个属性,通过 Builder 构建器注入进来的。

对于DefaultTbQueueRequestTemplate的初始化,thingsboard 提供了很多基于不同种消息队列的实现方式。我们现在所用的是内存队列,所以进入InMemoryTbTransportQueueFactory

InMemoryTbTransportQueueFactory中,对于DefaultTbQueueRequestTemplate.requestTemplate

的初始化,使用的是TbQueueTransportApiSettings的配置。

requestsTopic 读取的,就是 tb_transport.api.requests 这一主题。

3、更进一步

认真分析初始化过程,得出下面请求主题的初始化图。

2.1.3 InMemoryTbQueueProducer

InMemoryTbQueueProducer#send 调用 DefaultInMemoryStorage#put 方法 

DefaultInMemoryStorage 往自己持有的 ConcurrentHashMap 中存放消息,

key 是主题 tb_transport.api.requests,value 是存放有消息的 LinkedBlockingQueue 内存队列

2.1.4 一个更抽象的发送模型

TbQueueProducer 往队列 queue 发送消息,主题 tb_transport.api.requests,而不管这个消息的实现是内存队列、kafka、RabbitMQ、ServiceBus 等。TbQueueConsumer 从queue中消费消息。至此,生产连接请求消息的过程结束。

2.2 消费消息

2.2.1 InMemoryTbQueueConsumer

我们知道现在消息生产者接口 TbQueueProducer 的实现类是 InMemoryTbQueueProducer,则它必然有一个消息消费者实现接口 TbQueueConsumer,消费者实现类是 InMemoryTbQueueConsumer

 InMemoryTbQueueConsumer 中对于消息的消费只有把消息从 ConcurrentHashMap 拉取出来的逻辑,而没有具体处理的逻辑,则处理的逻辑,是存在于调用这个 poll 方法的地方。

org.thingsboard.server.queue.memory.InMemoryTbQueueConsumer@Overridepublic List<T> poll(long durationInMillis) {if (subscribed) {@SuppressWarnings("unchecked")List<T> messages = partitions.stream().map(tpi -> {try {return storage.get(tpi.getFullTopicName());} catch (InterruptedException e) {if (!stopped) {log.error("Queue was interrupted.", e);}return Collections.emptyList();}}).flatMap(List::stream).map(msg -> (T) msg).collect(Collectors.toList());if (messages.size() > 0) {return messages;}try {Thread.sleep(durationInMillis);} catch (InterruptedException e) {if (!stopped) {log.error("Failed to sleep.", e);}}}return Collections.emptyList();}

poll 方法的调用端,全局是搜不到的。

 我们可以探究一下它的构造方法,看看谁初始化了它,则谁就有可能调用它的 poll 方法。排除掉它自己,有两个类初始化了 InMemoryTbQueueConsumer,分别是 InMemoryMonolithQueueFactory 和 InMemoryTbTransportQueueFactory

InMemoryTbTransportQueueFactory 订阅的主题,是 tb_transport.api.responses 不是我们要找的 tb_transport.api.requests,忽略。

2.2.2 InMemoryMonolithQueueFactory

我们先来看一下 InMemoryMonolithQueueFactoryInMemoryMonolithQueueFactory 里面有一个方法,传入的 TbQueueTransportApiSettings,刚好就是我们请求消息的主题配置类。

org.thingsboard.server.queue.provider.InMemoryMonolithQueueFactory
@Overridepublic TbQueueConsumer<TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg>> createTransportApiRequestConsumer() {return new InMemoryTbQueueConsumer<>(storage, transportApiSettings.getRequestsTopic());}org.thingsboard.server.queue.settings.TbQueueTransportApiSettings 
@Lazy
@Data
@Component
public class TbQueueTransportApiSettings {// tb_transport.api.requests@Value("${queue.transport_api.requests_topic}")private String requestsTopic;
}

查看对方法 createTransportApiRequestConsumer 的调用,找到一个非具体队列实现的调用类TbCoreTransportApiService

2.2.3 TbCoreTransportApiService

TbCoreTransportApiService 初始化 init 方法,会创建 TbQueueConsumer——也就是具体的实现类 InMemoryTbQueueConsumer 注入到 DefaultTbQueueResponseTemplate.requestTemplate,然后执行 DefaultTbQueueResponseTemplate#init() 方法。

 2.2.4 DefaultTbQueueResponseTemplate

至此,我们找到了 InMemoryTbQueueConsumer#poll 调用的地方。

继续往下,看看对于消息 requests,是怎么消费的。 

 2.2.5 DefaultTransportApiService

 通过 AccessToken 查找到设备的授权 DeviceCredentials (即device_credentials表记录)然后构造 DeviceInfo 返回给设备端。

org.thingsboard.server.service.transport.DefaultTransportApiService// credentialsId 就是 AccessToken
private ListenableFuture<TransportApiResponseMsg> validateCredentials(String credentialsId, DeviceCredentialsType credentialsType) {DeviceCredentials credentials = deviceCredentialsService.findDeviceCredentialsByCredentialsId(credentialsId);if (credentials != null && credentials.getCredentialsType() == credentialsType) {return getDeviceInfo(credentials);} else {return getEmptyTransportApiResponseFuture();}}

 

2.2.6 消费消息流程图

3、总结

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/21248.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

7-15 位模式(dump_bits)---PTA实验C++

一、题目描述 为方便调试位运算相关程序&#xff0c;先做个展现位模式的小工具。 建议参照以下接口实现&#xff1a; // 利用函数重载特性&#xff1a;string dump_bits(char x);string dump_bits(short x);string dump_bits(int x);string dump_bits(long long x);// 或用函…

JVM类加载过程

在Java虚拟机规范中&#xff0c;把描述类的数据从class文件加载到内存&#xff0c;并对数据进行校验、转换解析和初始化&#xff0c;最终形成可以被虚拟机直接使用的java.lang.Class对象&#xff0c;这个过程被称作类加载过程。一个类在整个虚拟机周期内会经历如下图的阶段&…

DIYP对接骆驼后台IPTV管理,退出菜单中显示用户名已经网络信息,MAC,剩余天数,套餐名称等

演示&#xff1a;https://url03.ctfile.com/f/1779803-1042599473-4dc000?p8976 (访问密码: 8976) 后台加上EPG&#xff0c;增加一些播放源的动态端口替换。 前台app上&#xff0c;退出菜单中显示用户名已经网络信息&#xff0c;MAC&#xff0c;剩余天数&#xff0c;套餐名称…

Python知识点17---包

提前说一点&#xff1a;如果你是专注于Python开发&#xff0c;那么本系列知识点只是带你入个门再详细的开发点就要去看其他资料了&#xff0c;而如果你和作者一样只是操作其他技术的Python API那就足够了。 Python的包&#xff0c;你可以把它看成是一个大的模块&#xff0c;它…

JAVA基础|多线程

什么是线程&#xff1f; 线程&#xff08;Thread&#xff09;是一个程序内部的一条执行流程。 多线程是什么&#xff1f; 多线程是指从软硬件上实现的多条执行流程的技术&#xff08;多条线程由CPU负责调度执行&#xff09; 一. 如何在程序中创建出多条线程&#xff1f; Ja…

Windows10系统中安装与配置PyTorch(无GPU版本)

文章目录 1. 什么是PyTorch2. PyTorch的安装与配置&#xff08;无GPU&#xff09;2.1 创建环境2.2 安装pytorch库&#xff08;无GPU&#xff09;2.3 验证安装结果 1. 什么是PyTorch PyTorch 是一种用于构建深度学习模型且功能完备的开源框架&#xff0c;通常用于处理图像识别和…

OpenCV 的几种查找图像中轮廓边缘的方法

原始图片&#xff1a; 1、Sobel() Sobel 算子结合了高斯平滑和微分&#xff0c;用于计算图像的梯度&#xff0c;从而突出显示边缘。 import cv2# 读取图像 image cv2.imread(image.png, cv2.IMREAD_GRAYSCALE)# 使用 Sobel 算子查找水平和垂直边缘 sobel_x cv2.Sobel(image…

建筑企业有闲置资质怎么办?

如果建筑企业拥有闲置资质&#xff0c;可以考虑以下几种方式来充分利用这些资质&#xff1a; 1. 租赁或转让资质&#xff1a; 将闲置的建筑资质租赁给其他企业或个人使用&#xff0c;或者通过转让的方式将资质出售给有需要的企业或个人。 2. 提供咨询服务&#xff1a; 利用建…

OneForall工具的下载安装和使用(Windows和Linux)

目录 OneForall的介绍 OneForall的下载 OneForall的安装 安装要求 安装步骤&#xff08;git 版&#xff09; 安装&#xff08;kali&#xff09; OneForall的使用命令 在Windows 在Linux&#xff08;kali&#xff09; OneForall的结果说明 免责声明 本文所提供的文字和…

车辆前向碰撞预警系统性能要求和测试规程

前言 本文整理《GB/T 33577-2017 智能运输系统-车辆前向碰撞预警系统性能要求和测试规程》国标文件关键信息,FCW系统性能和测试右给深层次的认识。 术语和定义 车辆前向碰撞预警系统 forward vehicle collision warning system自车 subject vehicle(SV)目标车辆 target ve…

【Linux】查找和压缩

一、文件查找 1、命令查找 which 2、文件查找、依赖数据库 locate 3、文件查找 find 语法&#xff1a;find [path] [options] [expression] [action] ①按文件名 -name按名 -iname可不区分大小写 ②按文件大小 5M&#xff1a;5M以上文件 5M&#xff1a;5M文件 -…

高中数学:解三角形相关公式总结及用法总结

一、正弦定理 二、余弦定理 三、三角形面积公式 由正弦定理&#xff0c;可以推出三角形的面积公式&#xff1a; S*ab*sinC S*ac*sinB S*bc*sinA 四、使用方法总结 五、练习 例题1 解析 对条件等式进行变形&#xff0c;结合余弦定理&#xff0c;求出∠A的度数&#xff0c;从而…

Nginx01-HTTP简介与Nginx简介(安装、命令介绍、目录介绍、配置文件介绍)

目录 HTTP简介HTTP原理查看访问网站的详细流程curl -vwget --debug 查看网站访问量HTTP协议版本HTTP协议交互HTTP 请求请求报文起始行请求头 HTTP响应响应报文起始行响应头 Nginx常见的Web服务常见网站服务 安装NginxNginx目录结构Nginx启动管理Nginx常用命令 Nginx配置文件主配…

国内外主流大模型语言技术大比拼

国内外主流大模型语言技术对比 2024 自2017年起&#xff0c;美国深度布局人工智能&#xff0c;全面融入经济、文化与社会。至2023年&#xff0c;中国凭借自研技术平台崭露头角&#xff0c;ChatGPT及其技术成国家战略焦点&#xff0c;引领未来科技浪潮。中美竞逐&#xff0c;人工…

香橙派 AI pro:AI 加速初体验

香橙派 AI pro&#xff1a;AI 加速初体验 在AI领域&#xff0c;不断涌现的硬件产品为开发者提供了前所未有的便利和可能性。今天&#xff0c;我要介绍的这款产品——香橙派 AIpro&#xff0c;就是其中的佼佼者。在昇腾 AI 芯片的加持下&#xff0c;这款开发板有着出色的算力。…

961题库 北航计算机 操作系统 附答案 选择题形式

有题目和答案&#xff0c;没有解析&#xff0c;不懂的题问大模型即可&#xff0c;无偿分享。 第1组 习题 计算机系统的组成包括&#xff08; &#xff09; A、程序和数据 B、处理器和内存 C、计算机硬件和计算机软件 D、处理器、存储器和外围设备 财务软件是一种&#xff…

【Qt 学习笔记】Qt窗口 | 对话框 | Qt对话框的分类及介绍

博客主页&#xff1a;Duck Bro 博客主页系列专栏&#xff1a;Qt 专栏关注博主&#xff0c;后期持续更新系列文章如果有错误感谢请大家批评指出&#xff0c;及时修改感谢大家点赞&#x1f44d;收藏⭐评论✍ Qt窗口 | 对话框 | 模态对话框 文章编号&#xff1a;Qt 学习笔记 / 51…

Java反序列化漏洞与URLDNS利用链分析

前言 前面学习过 Java 反序列化漏洞的部分知识&#xff0c;总结过几篇文章&#xff1a; 文章发布日期内容概括《渗透测试-JBoss 5.x/6.x反序列化漏洞》2020-07-08JBoss 反序列化漏洞 CVE-2017-12149 的简单复现&#xff0c;使用了 ysoserial 和 CC5 链&#xff0c;未分析漏洞…

easy-captcha生成验证码

引入依赖 <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-data-redis --> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>…

机械设计手册第一册:公差

形位公差的标注&#xff1a; 形位公差框格中&#xff0c;不仅要表达形位公差的特征项目、基准代号和其他符号&#xff0c;还要正确给出公差带的大小、形状等内容。 1.形位公差框格&#xff1a; 形位公差框格由两个框格或多个格框组成&#xff0c;框格中的主要内容从左到右按…