RocketMQ源码剖析之createUniqID方法

目录

版本信息:

写在前面:

源码剖析:

总计:


版本信息:

RocketMQ-5.1.3

源码地址:https://github.com/apache/rocketmq

写在前面:

首先,笔者先吐槽一下RocketMQ的官方,源码中啥注释都没有,虽然文档给的多,但是很多都是版本过时不及时更新,阅读者只能靠自己的强硬的技术去理解~

回归正题,如今互联网的技术离不开微服务、分布式的体系,所以在分布式的体系中如何创建一个全局唯一的ID是大家所面对的问题。现大厂都提出了解决方案:Twitter的雪花算法(Snowflake)、美团的Leaf算法、以及Mysql、Redis 这种自带原子性操作的中间件。

当然RocketMQ为分布式而生的消息队列中间件肯定也需要有他的分布式ID解决方案(虽然笔者不知道该如何称呼,源码中也没有给出)~ 

源码剖析:

createUniqID 方法是本文章所论述的点,此方法在生产者往Broker 发送消息时,给发送的消息创建一个唯一KEY时调用。

public static void setUniqID(final Message msg) {// 如果用户自定义了唯一key,RocketMQ就不提供默认实现// 否则RocketMQ调用createUniqID 方法提供默认的实现if (msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null) {msg.putProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, createUniqID());}
}

在看createUniqID之前,我们先需要看一些变量的初始化作为看createUniqID 方法的铺垫~

org.apache.rocketmq.common.message MessageClientIDSetter类中。

public class MessageClientIDSetter {private static final int LEN;                   // 原有长度private static final char[] FIX_STRING;         // 变化后的char字符数组(其实就是字符串)private static final AtomicInteger COUNTER;     // 原子变量private static long startTime;                  // 记录开始时间private static long nextStartTime;              // 记录最后时间(用于更新)static {byte[] ip;try {// 获取到本机的IP地址。// 一共占用4个字节。ip = UtilAll.getIP();} catch (Exception e) {ip = createFakeIP();}// 4(ip) + 2(pid进程id) + 4(类加载器的HashCode) + 4(时间差值) + 2(自增位) LEN = ip.length + 2 + 4 + 4 + 2;// 拼接处理分布式体系的10字节// 处理 本机IP + JVM进程PID + HashCodeByteBuffer tempBuffer = ByteBuffer.allocate(ip.length + 2 + 4);tempBuffer.put(ip);tempBuffer.putShort((short) UtilAll.getPid());tempBuffer.putInt(MessageClientIDSetter.class.getClassLoader().hashCode());// 把10字节中的内容 作为索引值 转换成16进制的字符串表示// 简单来说,这一步就是编码,因为ID不可能用负数或者二进制01表示。FIX_STRING = UtilAll.bytes2string(tempBuffer.array()).toCharArray();// 设置当前启动的时间(用来4字节的计算时间差值)// 并且设置末尾时间,末尾时间用来更新时间// 如果有小伙伴看过雪花算法,就明白,雪花算法的时间差值是41位,限制只能用多少年,而这里做了优化,动态更新时间。// 这里的起始时间是本月的1号。// 末尾时间是下月的1号。setStartTime(System.currentTimeMillis());// 原子性自增,用于最后2位的自增位。COUNTER = new AtomicInteger(0);}
}

这里是核心所在,所以在提供的源码中笔者有非常详细的注释,并且这里做一个总结:

  1. RocketMQ的分布式ID算法核心就在这里,用了16字节表示:4(本机IP) + 2(进程的PID) + 4(类加载器的HashCode) + 4(时间差值) + 2(自增位)
  2. 本机IP + 进程PID + 类加载器HashCode 解决了分布式环境下集群的重复可能性
  3. 最后2位的自增位,用于处理本机RocketMQ的并发重复可能性
  4. 时间差值用于解码时获得创建的时间

看到这里,有读者会问,那源码中FIX_STRING 变量是干啥的,这很简单,如上图所示总共16字节,因为byte用10进制可能会有负数,作为分布式ID总不能是一串负数或者二进制01表示把。所以RocketMQ用16字节的Byte数组转换成 16进制的字符串表示,存储在FIX_STRING中。

这里需要注意,在上文的初始化代码中,只对 本机IP + JVM进程PID + HashCode做了处理,后续的时间差值和自增位在createUniqID方法中做处理。

以上的铺垫已做完,直接看到org.apache.rocketmq.common.message MessageClientIDSetter类中createUniqID方法

public static String createUniqID() {// 在Java中byte占用一个字节,char占用2个字节// 所以这里需要创建LEN * 2 的char数组来存放完 16字节的数据。char[] sb = new char[LEN * 2];// 在上文的初始化中把 IP + PID + HashCode 16进制字符串放入到FIX_STRING// 这里把FIX_STRING拷贝到sb中。System.arraycopy(FIX_STRING, 0, sb, 0, FIX_STRING.length);long current = System.currentTimeMillis();// 是否需要更新时间。if (current >= nextStartTime) {setStartTime(current);}// 计算出运行时间差值。int diff = (int)(current - startTime);if (diff < 0 && diff > -1000_000) {diff = 0;}// 获取到长度,这个长度作为索引。int pos = FIX_STRING.length;// 这里填充了4字节的时间差值UtilAll.writeInt(sb, pos, diff);pos += 8;// 这里填充了2字节的自增位。UtilAll.writeShort(sb, pos, COUNTER.getAndIncrement());// char数组转换成字符串。return new String(sb);}
  1. 获取到初始化中初始的FIX_STRING字段,此字段已经处理了本机IP + JVM进程PID + HashCode,后续的时间差值 和 自增位还没做处理,下文会对其做处理
  2. 获取到当前时间,判断是否需要更新时间(没个月月初更新)
  3. 得到时间差值赋值给diff变量,并且转换成16进制的字符表示
  4. 获取到自增值,并且转换成16进制的字符表示
  5. 最终把16进制的 char数组转换成String对象
  6. 整个分布式的ID 创建过程完毕。

总计:

只需要记住三部分

  1. 第一部分用于处理分布式的重复可能性(IP + PID + HashCode)
  2. 第二部分用于记录创建时间
  3. 第三部分用于处理机器的并发创建ID的重复可能性(原子变量解决)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/186538.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

attention中Q,K,V的理解

第一种 1.首先定义三个线性变换矩阵&#xff0c;query&#xff0c;key&#xff0c;value&#xff1a; class BertSelfAttention(nn.Module):self.query nn.Linear(config.hidden_size, self.all_head_size) # 输入768&#xff0c; 输出768self.key nn.Linear(config.hidde…

上海线下活动 | LLM 时代的 AI 编译器实践与创新

今年 3 月份&#xff0c; 2023 Meet TVM 系列首次线下活动从上海出发&#xff0c;跨越多个城市&#xff0c;致力于为各地关注 AI 编译器的工程师提供一个学习、交流的平台。 12 月 16 日 2023 Meet TVM 年终聚会将重返上海&#xff0c;这一次我们不仅邀请了 4 位资深的 AI 编…

自动伸缩:解密HPA、VPA、CA和CPA智能调整应用大小和数量

关注【云原生百宝箱】公众号&#xff0c;快速掌握云原生 Kubernetes提供了多种自动伸缩机制&#xff0c;例如HPA&#xff08;Horizontal Pod Autoscaling&#xff09;&#xff0c;可以根据不同情况动态调整Pod副本数量。此功能使 Pod 能够有效地处理当前流量&#xff0c;而无需…

pytorch中的激活函数详解

1 激活函数介绍 1.1 什么是激活函数 激活函数是神经网络中引入的非线性函数&#xff0c;用于捕获数据中的复杂关系。它来自动物界的灵感&#xff0c;动物的神经元会接受来自对它有作用的其他神经元的信号&#xff0c;当然这些信号对该神经元的作用大小不同&#xff08;即具有不…

Jmeter+ant+jenkins实现持续集成看这一篇就搞定了!

jmeterantjenkins持续集成 一、下载并配置jmeter 首先下载jmeter工具&#xff0c;并配置好环境变量&#xff1b;参考&#xff1a;https://www.cnblogs.com/YouJeffrey/p/16029894.html jmeter默认保存的是.jtl格式的文件&#xff0c;要设置一下bin/jmeter.properties,文件内容…

如何提高3D建模技能?

无论是制作影视动画还是视频游戏&#xff0c;提高3D建模技能对于你的工作都至关重要的。那么如何能创建出精美的3D模型呢&#xff1f;本文给大家一些3D建模技能方面的建议。 3D建模通过专门的软件完成&#xff0c;涉及制作三维对象。这项技能在视频游戏开发、建筑、动画和产品…

【Java Web学习笔记】0 - 技术体系的说明

B/S软件开发架构简述 B/S架构 1.B/S框架&#xff0c;意思是前端(Browser浏览器)和服务器端( Server )组成的系统的框架结构。 2. B/S架构也可理解为web架构&#xff0c;包含前端、后端、数据库三大组成部分。 3.示意图 ●前端 前端开发技术工具包括三要素: HTML、CSS和Jav…

1-3、DOSBox环境搭建

语雀原文链接 文章目录 1、安装DOSBox2、Debug进入Debugrdeautq 1、安装DOSBox 官网下载下载地址&#xff1a;https://www.dosbox.com/download.php?main1此处直接下载这个附件&#xff08;内部有8086的DEBUG.EXE环境&#xff09;8086汇编工作环境.rar执行安装DOSBox0.74-wi…

解决CentOS下PHP system命令unoconv转PDF提示“Unable to connect or start own listener“

centos系统下&#xff0c;用php的system命令unoconv把word转pdf时提示Unable to connect or start own listene的解决办法 unoconv -o /foo/bar/public_html/upload/ -f pdf /foo/bar/public_html/upload/test.docx 2>&1 上面这个命令在shell 终端能执行成功&#xff0c…

基于GAN的多尺度门合并多模态MRI图像合成

Multi-Modal MRI Image Synthesis via GAN With Multi-Scale Gate Mergence 基于GAN的多尺度门合并多模态MRI图像合成背景贡献实验方法生成器gate mergence (GM) strategy&#xff08;门控融合策略&#xff09;判别器 损失函数Thinking 基于GAN的多尺度门合并多模态MRI图像合成…

深入了解接口测试:揭秘网络分层和数据处理!

网络分层和数据 上一小节中介绍了接口测试中一些必要重要的定义&#xff0c;这一节我们来讨论一下在学习接口测试过程中我们要关注的最重要的东西&#xff1a;网络分层和数据。 首先&#xff0c;我们来尝试理解一下&#xff0c;为什么网络是要分层的呢&#xff1f; 我们可以…

python文件读取

相对路径 读文件 打印txt文件 fopen(".\data.txt","r",encoding"utf-8") contentf.read() print(content) f.close()with open(".\data.txt","r",encoding"utf-8") as f:contentf.read()print(content)contentf…

Ilya Sutskever:师从Hinton,“驱逐”奥特曼,一个改变AI世界的天才科学

ChatGPT 已经在全球爆火&#xff0c;但大众在两周之前似乎更熟悉Sam Altman&#xff0c;而对另一位创始人 Ilya Sutskever 却了解不多。 直到前几天因为OpenA眼花缭乱的政权争夺大戏&#xff0c;OpenAI 的首席科学家Ilya Sutskever的名字逐渐被世人所知。 Ilya Sutskever在科…

STM32 CUBEIDE Outline is disabled due to scalability mode

项目场景&#xff1a; 问题描述 Outline is disabled due to scalability mode 看不到函数 解决方案&#xff1a;

继承中的析构函数的权限的深入了解

如果一个父类中的析构函数如果设置为 private 权限 &#xff0c;一个子类public继承了这个父类&#xff0c;那么 这个父类可以创建对象吗&#xff1f; 答案是 不可以 看看下面的代码 class A { public:private:~A() {} };class B :public A {A a; // 这个地方编译不报错&…

【CAN通信】CanIf模块详细介绍

目录 1.内容简介 2.CanIf详细设计 2.1 CanIf功能简介 2.2 一些关键概念 2.3依赖的上下层模块 2.4 功能详细设计 2.4.1 Hardware object handles 2.4.2 Static L-PDUs 2.4.3 Dynamic L-PDUs 2.4.4 Dynamic Transmit L-PDUs 2.4.5 Dynamic receive L-PDUs 2.4.6Physi…

PostGIS学习教程八:空间关系

PostGIS学习教程八&#xff1a;空间关系 到目前为止&#xff0c;我们只使用了测量&#xff08;ST_Area、ST_Length&#xff09;、序列化&#xff08;ST_GeomFromText&#xff09;或者反序列化&#xff08;ST_AsGML&#xff09;几何图形&#xff08;geometry&#xff09;的空间…

【MATLAB】异常数据识别

基于分位数的异常点识别 首先&#xff0c;给定了一个原始数据序列x。然后&#xff0c;计算了序列x的上四分位数和下四分位数&#xff0c;并根据这两个值计算了异常点的阈值。上四分位数减去1.5倍的四分位数范围得到异常值下界&#xff0c;下四分位数加上1.5倍的四分位数范围得…

运行新vue3项目

一&#xff0c;下载node并安装 官网&#xff1a;https://nodejs.org/en/ 查看版本&#xff1a; node -v二&#xff0c;cd进入到vue3项目目录 cd D:\Program-space\HBuilderXProject\Vue3project三&#xff0c;npm install npm install四&#xff0c;查看安装 npm list五&a…

解析生效探测方法

linux dig命令 1.最常用的查询命令 dig baidu.com2 . 根据记录类型进行查询&#xff0c;比如MX&#xff0c;CNAME&#xff0c;NS&#xff0c;PTR等&#xff0c;只需将类型加在命令后面即可。 dig a.shifen.com ns3 . 指定域名DNS服务器测试解析是否生效的命令&#xff0c;以…