ThingsBoard MQTT 连接认证过程 源码分析+图例

整个连接过程如图所示:

 高清图片链接

1、环境准备

  • thingsboard3.5.1 源码启动。(不懂怎么启动的,大家可以看我的博文ThingsBoard3.5.1源码启动)
  • MQTTX 客户端(用来连接 thingsboard MQTT)
  • 默认配置。queue.type=in-memory,cache.type=caffeine

因为我们的目的,是快速了解 thingsboard 的启动过程,所以所有的配置全部采用默认的方式。默认消息队列采用内存队列ConcurrentHashMap,缓存也采用内存缓存caffeine。

使用 customerA 用户账号密码登录,使用设备A1 AccessToken 连接。

2、源码分析

2.1 连接消息生产

2.1.1 入口

大家知道MQTT是基于TCP协议之上的轻量级通信协议,而TCP协议是面向连接、请求响应的通信协议。所以在 thingsboard 这一侧必然有一个服务器实现,用来等待客户端的连接。这个实现就是MqttTransportService

thingsboard 采用 netty 来实现一个MQTT server。

org.thingsboard.server.transport.mqtt.MqttTransportService@PostConstructpublic void init() throws Exception {log.info("Setting resource leak detector level to {}", leakDetectorLevel);ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.valueOf(leakDetectorLevel.toUpperCase()));log.info("Starting MQTT transport...");bossGroup = new NioEventLoopGroup(bossGroupThreadCount);workerGroup = new NioEventLoopGroup(workerGroupThreadCount);ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new MqttTransportServerInitializer(context, false)).childOption(ChannelOption.SO_KEEPALIVE, keepAlive);serverChannel = b.bind(host, port).sync().channel();if (sslEnabled) {b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new MqttTransportServerInitializer(context, true)).childOption(ChannelOption.SO_KEEPALIVE, keepAlive);sslServerChannel = b.bind(sslHost, sslPort).sync().channel();}log.info("Mqtt transport started!");}

其中,关系到 netty server 性能的 bossGroupThreadCount,workerGroupThreadCount

thingsboard 提取出两个参数变量 

NETTY_BOSS_GROUP_THREADS

NETTY_WORKER_GROUP_THREADS

方便用户根据自己的设备台数、部署架构,来优化自己的 netty 性能。

 netty server 的请求处理过程如下图所示,圆圈为具体实现类,方框为方法。

在 MqttTransportHandler#processMqttMsg 方法中,因为我们的消息类型是连接,所以我们会进入 processConnect 方法。

org.thingsboard.server.transport.mqtt.MqttTransportHandlervoid processMqttMsg(ChannelHandlerContext ctx, MqttMessage msg) {if (msg.fixedHeader() == null) {log.info("[{}:{}] Invalid message received", address.getHostName(), address.getPort());ctx.close();return;}deviceSessionCtx.setChannel(ctx);if (CONNECT.equals(msg.fixedHeader().messageType())) {processConnect(ctx, (MqttConnectMessage) msg);} else if (deviceSessionCtx.isProvisionOnly()) {processProvisionSessionMsg(ctx, msg);} else {enqueueRegularSessionMsg(ctx, msg);}}

 在 MqttTransportHandler#processConnect 方法中,由于采用 AccessToken 的授权方式,所以会进入 processAuthTokenConnect

MqttTransportHandler#processAuthTokenConnect 方法中,获取我们在MQTTX填的用户名、密码,然后委托给 DefaultTransportService#process 处理

org.thingsboard.server.transport.mqtt.MqttTransportHandlervoid processConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) {log.debug("[{}][{}] Processing connect msg for client: {}!", address, sessionId, msg.payload().clientIdentifier());String userName = msg.payload().userName();String clientId = msg.payload().clientIdentifier();deviceSessionCtx.setMqttVersion(getMqttVersion(msg.variableHeader().version()));if (DataConstants.PROVISION.equals(userName) || DataConstants.PROVISION.equals(clientId)) {deviceSessionCtx.setProvisionOnly(true);ctx.writeAndFlush(createMqttConnAckMsg(ReturnCode.SUCCESS, msg));} else {X509Certificate cert;if (sslHandler != null && (cert = getX509Certificate()) != null) {processX509CertConnect(ctx, cert, msg);} else {processAuthTokenConnect(ctx, msg);}}}private void processAuthTokenConnect(ChannelHandlerContext ctx, MqttConnectMessage connectMessage) {String userName = connectMessage.payload().userName();log.debug("[{}][{}] Processing connect msg for client with user name: {}!", address, sessionId, userName);TransportProtos.ValidateBasicMqttCredRequestMsg.Builder request = TransportProtos.ValidateBasicMqttCredRequestMsg.newBuilder().setClientId(connectMessage.payload().clientIdentifier());if (userName != null) {request.setUserName(userName);}byte[] passwordBytes = connectMessage.payload().passwordInBytes();if (passwordBytes != null) {String password = new String(passwordBytes, CharsetUtil.UTF_8);request.setPassword(password);}transportService.process(DeviceTransportType.MQTT, request.build(),new TransportServiceCallback<>() {@Overridepublic void onSuccess(ValidateDeviceCredentialsResponse msg) {onValidateDeviceResponse(msg, ctx, connectMessage);}@Overridepublic void onError(Throwable e) {log.trace("[{}] Failed to process credentials: {}", address, userName, e);ctx.writeAndFlush(createMqttConnAckMsg(ReturnCode.SERVER_UNAVAILABLE_5, connectMessage));ctx.close();}});}

2.1.2 DefaultTransportService

 一路跟下去

DefaultTbQueueRequestTemplate#sendToRequestTemplate 方法会调用 TbQueueProducer接口 send 方法,往主题 tb_transport.api.requests 发送消息。TbQueueProducer实现类是InMemoryTbQueueProducer

void sendToRequestTemplate(Request request, UUID requestId, SettableFuture<Response> future, ResponseMetaData<Response> responseMetaData) {log.trace("[{}] Sending request, key [{}], expTime [{}], request {}", requestId, request.getKey(), responseMetaData.expTime, request);if (messagesStats != null) {messagesStats.incrementTotal();}// 将消息发送给消息队列topic是tb_transport.api.requestsrequestTemplate.send(TopicPartitionInfo.builder().topic(requestTemplate.getDefaultTopic()).build(), request, new TbQueueCallback() {@Overridepublic void onSuccess(TbQueueMsgMetadata metadata) {if (messagesStats != null) {messagesStats.incrementSuccessful();}log.trace("[{}] Request sent: {}, request {}", requestId, metadata, request);}@Overridepublic void onFailure(Throwable t) {if (messagesStats != null) {messagesStats.incrementFailed();}pendingRequests.remove(requestId);future.setException(t);}});}
1、TbQueueProducer 接口的实现类有很多个,具体发送消息的实现类是哪一个呢?
因为我们使用内存队列方式启动,所以实现类是 InMemoryTbQueueProducer

2、怎么确定发送的主题是 tb_transport.api.requests

主题是通过 requestTemplate获取的

而 requestTemplate又是 DefaultTbQueueRequestTemplate的一个属性,通过 Builder 构建器注入进来的。

对于DefaultTbQueueRequestTemplate的初始化,thingsboard 提供了很多基于不同种消息队列的实现方式。我们现在所用的是内存队列,所以进入InMemoryTbTransportQueueFactory

InMemoryTbTransportQueueFactory中,对于DefaultTbQueueRequestTemplate.requestTemplate

的初始化,使用的是TbQueueTransportApiSettings的配置。

requestsTopic 读取的,就是 tb_transport.api.requests 这一主题。

3、更进一步

认真分析初始化过程,得出下面请求主题的初始化图。

2.1.3 InMemoryTbQueueProducer

InMemoryTbQueueProducer#send 调用 DefaultInMemoryStorage#put 方法 

DefaultInMemoryStorage 往自己持有的 ConcurrentHashMap 中存放消息,

key 是主题 tb_transport.api.requests,value 是存放有消息的 LinkedBlockingQueue 内存队列

2.1.4 一个更抽象的发送模型

TbQueueProducer 往队列 queue 发送消息,主题 tb_transport.api.requests,而不管这个消息的实现是内存队列、kafka、RabbitMQ、ServiceBus 等。TbQueueConsumer 从queue中消费消息。至此,生产连接请求消息的过程结束。

2.2 消费消息

2.2.1 InMemoryTbQueueConsumer

我们知道现在消息生产者接口 TbQueueProducer 的实现类是 InMemoryTbQueueProducer,则它必然有一个消息消费者实现接口 TbQueueConsumer,消费者实现类是 InMemoryTbQueueConsumer

 InMemoryTbQueueConsumer 中对于消息的消费只有把消息从 ConcurrentHashMap 拉取出来的逻辑,而没有具体处理的逻辑,则处理的逻辑,是存在于调用这个 poll 方法的地方。

org.thingsboard.server.queue.memory.InMemoryTbQueueConsumer@Overridepublic List<T> poll(long durationInMillis) {if (subscribed) {@SuppressWarnings("unchecked")List<T> messages = partitions.stream().map(tpi -> {try {return storage.get(tpi.getFullTopicName());} catch (InterruptedException e) {if (!stopped) {log.error("Queue was interrupted.", e);}return Collections.emptyList();}}).flatMap(List::stream).map(msg -> (T) msg).collect(Collectors.toList());if (messages.size() > 0) {return messages;}try {Thread.sleep(durationInMillis);} catch (InterruptedException e) {if (!stopped) {log.error("Failed to sleep.", e);}}}return Collections.emptyList();}

poll 方法的调用端,全局是搜不到的。

 我们可以探究一下它的构造方法,看看谁初始化了它,则谁就有可能调用它的 poll 方法。排除掉它自己,有两个类初始化了 InMemoryTbQueueConsumer,分别是 InMemoryMonolithQueueFactory 和 InMemoryTbTransportQueueFactory

InMemoryTbTransportQueueFactory 订阅的主题,是 tb_transport.api.responses 不是我们要找的 tb_transport.api.requests,忽略。

2.2.2 InMemoryMonolithQueueFactory

我们先来看一下 InMemoryMonolithQueueFactoryInMemoryMonolithQueueFactory 里面有一个方法,传入的 TbQueueTransportApiSettings,刚好就是我们请求消息的主题配置类。

org.thingsboard.server.queue.provider.InMemoryMonolithQueueFactory
@Overridepublic TbQueueConsumer<TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg>> createTransportApiRequestConsumer() {return new InMemoryTbQueueConsumer<>(storage, transportApiSettings.getRequestsTopic());}org.thingsboard.server.queue.settings.TbQueueTransportApiSettings 
@Lazy
@Data
@Component
public class TbQueueTransportApiSettings {// tb_transport.api.requests@Value("${queue.transport_api.requests_topic}")private String requestsTopic;
}

查看对方法 createTransportApiRequestConsumer 的调用,找到一个非具体队列实现的调用类TbCoreTransportApiService

2.2.3 TbCoreTransportApiService

TbCoreTransportApiService 初始化 init 方法,会创建 TbQueueConsumer——也就是具体的实现类 InMemoryTbQueueConsumer 注入到 DefaultTbQueueResponseTemplate.requestTemplate,然后执行 DefaultTbQueueResponseTemplate#init() 方法。

 2.2.4 DefaultTbQueueResponseTemplate

至此,我们找到了 InMemoryTbQueueConsumer#poll 调用的地方。

继续往下,看看对于消息 requests,是怎么消费的。 

 2.2.5 DefaultTransportApiService

 通过 AccessToken 查找到设备的授权 DeviceCredentials (即device_credentials表记录)然后构造 DeviceInfo 返回给设备端。

org.thingsboard.server.service.transport.DefaultTransportApiService// credentialsId 就是 AccessToken
private ListenableFuture<TransportApiResponseMsg> validateCredentials(String credentialsId, DeviceCredentialsType credentialsType) {DeviceCredentials credentials = deviceCredentialsService.findDeviceCredentialsByCredentialsId(credentialsId);if (credentials != null && credentials.getCredentialsType() == credentialsType) {return getDeviceInfo(credentials);} else {return getEmptyTransportApiResponseFuture();}}

 

2.2.6 消费消息流程图

3、总结

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/21248.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

7-15 位模式(dump_bits)---PTA实验C++

一、题目描述 为方便调试位运算相关程序&#xff0c;先做个展现位模式的小工具。 建议参照以下接口实现&#xff1a; // 利用函数重载特性&#xff1a;string dump_bits(char x);string dump_bits(short x);string dump_bits(int x);string dump_bits(long long x);// 或用函…

JVM类加载过程

在Java虚拟机规范中&#xff0c;把描述类的数据从class文件加载到内存&#xff0c;并对数据进行校验、转换解析和初始化&#xff0c;最终形成可以被虚拟机直接使用的java.lang.Class对象&#xff0c;这个过程被称作类加载过程。一个类在整个虚拟机周期内会经历如下图的阶段&…

C++编程法则365天一天一条(323)main函数执行之前和之后的动作

在C和C程序中&#xff0c;main 函数之前和之后执行的函数是由编译器、链接器和运行时环境共同决定的。以下是一些通常会在这些阶段执行的关键函数&#xff1a; 在 main 函数之前执行的函数 启动代码&#xff08;Start-up Code&#xff09;: 这是由编译器提供的一段代码&#…

DIYP对接骆驼后台IPTV管理,退出菜单中显示用户名已经网络信息,MAC,剩余天数,套餐名称等

演示&#xff1a;https://url03.ctfile.com/f/1779803-1042599473-4dc000?p8976 (访问密码: 8976) 后台加上EPG&#xff0c;增加一些播放源的动态端口替换。 前台app上&#xff0c;退出菜单中显示用户名已经网络信息&#xff0c;MAC&#xff0c;剩余天数&#xff0c;套餐名称…

Python知识点17---包

提前说一点&#xff1a;如果你是专注于Python开发&#xff0c;那么本系列知识点只是带你入个门再详细的开发点就要去看其他资料了&#xff0c;而如果你和作者一样只是操作其他技术的Python API那就足够了。 Python的包&#xff0c;你可以把它看成是一个大的模块&#xff0c;它…

JAVA基础|多线程

什么是线程&#xff1f; 线程&#xff08;Thread&#xff09;是一个程序内部的一条执行流程。 多线程是什么&#xff1f; 多线程是指从软硬件上实现的多条执行流程的技术&#xff08;多条线程由CPU负责调度执行&#xff09; 一. 如何在程序中创建出多条线程&#xff1f; Ja…

新接手业务的线上Bug特别多怎么办?

文章目录 接手&#xff1a;保证质量顺利过渡紧急质量审计临时增加测试频次灰度发布加强监控与预警建立快速反馈机制 打补丁&#xff1a;针对性解决质量问题Bug 分析与分类测试策略优化环境一致性 搞基建&#xff1a;全流程质量控制需求分析与评审设计阶段的评审与验证代码质量控…

Windows10系统中安装与配置PyTorch(无GPU版本)

文章目录 1. 什么是PyTorch2. PyTorch的安装与配置&#xff08;无GPU&#xff09;2.1 创建环境2.2 安装pytorch库&#xff08;无GPU&#xff09;2.3 验证安装结果 1. 什么是PyTorch PyTorch 是一种用于构建深度学习模型且功能完备的开源框架&#xff0c;通常用于处理图像识别和…

JVM学习-自定义类加载器

为什么要自定义类加载器 隔离加载类 在某些框架内进行中间件与应用的模块隔离&#xff0c;把类加载到不同的环境&#xff0c;如Tomcat这类Web应用服务器&#xff0c;内部自定义了好几种类加载器&#xff0c;用于隔离同一个Web应用服务器上的不同应用程序 修改类加载的方式 …

OpenCV 的几种查找图像中轮廓边缘的方法

原始图片&#xff1a; 1、Sobel() Sobel 算子结合了高斯平滑和微分&#xff0c;用于计算图像的梯度&#xff0c;从而突出显示边缘。 import cv2# 读取图像 image cv2.imread(image.png, cv2.IMREAD_GRAYSCALE)# 使用 Sobel 算子查找水平和垂直边缘 sobel_x cv2.Sobel(image…

建筑企业有闲置资质怎么办?

如果建筑企业拥有闲置资质&#xff0c;可以考虑以下几种方式来充分利用这些资质&#xff1a; 1. 租赁或转让资质&#xff1a; 将闲置的建筑资质租赁给其他企业或个人使用&#xff0c;或者通过转让的方式将资质出售给有需要的企业或个人。 2. 提供咨询服务&#xff1a; 利用建…

git分布式版本控制系统(四)

目前世界上最先进的分布式版本控制系统 官方网址&#xff1a;https://git-scm.com 学习目标&#xff1a; 1 了解 git 前世今生 2 掌握 git 基础概念、基础操作 3 各种 git 问题处理 4 互联网常用 gitflow(工作流程规范) 5 git 代码提交规范 6 git 分支管理及命名规范 常见问…

OneForall工具的下载安装和使用(Windows和Linux)

目录 OneForall的介绍 OneForall的下载 OneForall的安装 安装要求 安装步骤&#xff08;git 版&#xff09; 安装&#xff08;kali&#xff09; OneForall的使用命令 在Windows 在Linux&#xff08;kali&#xff09; OneForall的结果说明 免责声明 本文所提供的文字和…

车辆前向碰撞预警系统性能要求和测试规程

前言 本文整理《GB/T 33577-2017 智能运输系统-车辆前向碰撞预警系统性能要求和测试规程》国标文件关键信息,FCW系统性能和测试右给深层次的认识。 术语和定义 车辆前向碰撞预警系统 forward vehicle collision warning system自车 subject vehicle(SV)目标车辆 target ve…

【Linux】查找和压缩

一、文件查找 1、命令查找 which 2、文件查找、依赖数据库 locate 3、文件查找 find 语法&#xff1a;find [path] [options] [expression] [action] ①按文件名 -name按名 -iname可不区分大小写 ②按文件大小 5M&#xff1a;5M以上文件 5M&#xff1a;5M文件 -…

高中数学:解三角形相关公式总结及用法总结

一、正弦定理 二、余弦定理 三、三角形面积公式 由正弦定理&#xff0c;可以推出三角形的面积公式&#xff1a; S*ab*sinC S*ac*sinB S*bc*sinA 四、使用方法总结 五、练习 例题1 解析 对条件等式进行变形&#xff0c;结合余弦定理&#xff0c;求出∠A的度数&#xff0c;从而…

【面经分享-CPP篇】[建议收藏!!] C++基础20问-01

&#x1f36d; 大家好这里是清隆学长 &#xff0c;一枚热爱算法的程序员 ✨ 本系列打算持续跟新c面试基础 &#x1f44f; 感谢大家的订阅➕ 和 喜欢&#x1f497; 文章目录 1.题目&#xff1a;解释C中的RAII机制。2.题目&#xff1a;解释C中的智能指针及其类型。3.题目&#xf…

从内存到sql的upsert

业务的upsert ​ 在写业务时&#xff0c;大家一开始都会以顺序流程的方式开始着手写代码&#xff0c;CR时再看代码&#xff0c;会有不一样的感觉。 1. 需求描述 ​ 现有一张数据库表&#xff0c;表字段结构如下&#xff1a; 字段名称类型描述uuidstring数据的唯一键datastrin…

代码随想录算法训练营第四十六天|KM52. 携带研究材料、518. 零钱兑换 II、377. 组合总和 Ⅳ

代码随想录算法训练营第四十六天 KM52. 携带研究材料 题目链接&#xff1a;KM52. 携带研究材料 确定dp数组以及下标的含义&#xff1a;j的含义是当前背包的最大容量&#xff0c;dp[j]背包内物品的总价值确定递推公式&#xff1a;背包最大容量固定为j&#xff0c;每个循环尝试…

Nginx01-HTTP简介与Nginx简介(安装、命令介绍、目录介绍、配置文件介绍)

目录 HTTP简介HTTP原理查看访问网站的详细流程curl -vwget --debug 查看网站访问量HTTP协议版本HTTP协议交互HTTP 请求请求报文起始行请求头 HTTP响应响应报文起始行响应头 Nginx常见的Web服务常见网站服务 安装NginxNginx目录结构Nginx启动管理Nginx常用命令 Nginx配置文件主配…