Kafka 深入服务端 — 时间轮

Kafka中存在大量的延迟操作,比如延时生产、延时拉取和延时删除等。Kafka基于时间轮概念自定义实现了一个用于延时功能的定时器,来完成这些延迟操作。

1 时间轮

Kafka没有使用基于JDK自带的Timer或DelayQueue来实现延迟功能,因为它们的插入和删除操作的时间复杂度为logn,这不能满足Kafka高性能要求。

1.1 Timer 和 DelayQueue

它们都使用了一个优先级队列(通常基于堆实现)来管理任务。

1.1.1 Timer

用于计划在特定时间后执行的任务,这些任务可以只执行一次或定期重复执行。其有以下特点:

  1. 运行在单线程中,无法满足多个任务同时执行的需求。如果前置任务耗时长,可能会阻塞后置任务。
  2. 如果任务执行过程中抛出异常,Timer会被异常中断停止。
public class TimerTest {public static void main(String[] args) {Timer timer = new Timer();Date startTime = new Date();TimerTask task1 = new TimerTask() {@Overridepublic void run() {long dis = (new Date().getTime() - startTime.getTime()) / 1_000;System.out.println("task1执行,距离开始时间:" + dis + "s");}};TimerTask task2 = new TimerTask() {@Overridepublic void run() {long dis = (new Date().getTime() - startTime.getTime()) / 1_000;System.out.println("task2执行,距离开始时间:" + dis + "s,休眠5s。");try {Thread.sleep(5000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("task2休眠结束");}};TimerTask task3 = new TimerTask() {@Overridepublic void run() {long dis = (new Date().getTime() - startTime.getTime()) / 1_000;System.out.println("task3执行,距离开始时间:" + dis + "s");}};timer.schedule(task1,1000); // 1s后执行timer.schedule(task2,2000); // 2s后执行timer.schedule(task3,3000); // 3s后执行}
//    执行结果:
//    task1执行,距离开始时间:1s
//    task2执行,距离开始时间:2s,休眠5s。
//    task2休眠结束
//    task3执行,距离开始时间:7s
}

1.1.2 DelayQueue

是一个无界阻塞队列,用于存储实现了Delayed接口的元素,这些元素只有在它们的延迟期满时才会被取出。其有以下特点:

  1. 线程安全,可以在多线程环境中使用。
  2. 无界队列,可以存储任意数量的元素,直到系统内存耗尽。
  3. 延迟精度依赖与系统时钟。
public class DelayQueueTest {private static class DelayQueueTask implements Delayed {private final String taskName;private final long delayTime;private DelayQueueTask(String taskName, long delayTime) {this.taskName = taskName;this.delayTime = delayTime + System.currentTimeMillis();}@Overridepublic long getDelay(TimeUnit unit) { // 返回剩余延迟long diff = delayTime - System.currentTimeMillis();return unit.convert(diff,TimeUnit.MILLISECONDS);}@Overridepublic int compareTo(Delayed o) {if (this.delayTime < ((DelayQueueTask) o).delayTime) {return -1;}if (this.delayTime > ((DelayQueueTask) o).delayTime) {return 1;}return 0;}@Overridepublic String toString() {return "DelayQueueTask{" +"taskName='" + taskName + '\'' +", delayTime=" + delayTime +'}';}}public static void main(String[] args) throws InterruptedException {DelayQueue<DelayQueueTask> delayQueue = new DelayQueue<>();delayQueue.offer(new DelayQueueTask("task1",5000));delayQueue.offer(new DelayQueueTask("task2",2000));delayQueue.offer(new DelayQueueTask("task3",4000));System.out.println("开始执行delayQueue任务");while (!delayQueue.isEmpty()) {DelayQueueTask task = delayQueue.take();System.out.println("任务:" + task);}System.out.println("delayQueue任务 任务执行完毕");}
}

1.2 时间轮结构

Kafka 在任务的插入与删除采用了时间轮结构,其时间复杂度为O(1),而在时间推进上,还是依赖JDK提供的DelayQueue。

图 时间轮(TimingWheel)结构

Kafka的时间轮是一个存储定时任务的环形队列,每个元素(时间格)相当于一个桶(bucket),来存储一个定时任务列表TimerTaskList。TimerTaskList是一个环形的双向链表,链表中的每一项都是定时任务项TimerTaskEntry,其中封装了真正的定时任务TimerTask。

Kafka将TimerTaskList插入到DelayQueue(队列)中,使其成为其中的一个元素。它的过期时间为TimerTaskList的TimerTaskEntry中最快过期的时间。

1.2.1 时间格与时间跨度

时间轮由多个时间格组成(上面示意图中,每一层有10个时间格),每个时间格代表时间轮的基本时间跨度(tick),时间格数(wheelSize)是固定的,那么时间轮的总体跨度interval = tick * wheelSize。

时间轮还有一个表盘指针(currentTime),用来表示时间轮当前所处的时间。currentTime是tick的整数倍,currentTime将整个时间轮划分为到期部分和未到期部分。当前指向的时间格也属于到期部分,此时需要处理此时间格所对应的TimerTaskList中的所有任务。

上面示意图中,tick = 1s,此时currentTime指向第2个时间格,需要处理这个时间格存储的所有任务。假设,此时插入了一个3s后的任务,则把该任务插入第5个时间格中的bucket。

1.2.2 时间轮层级

当currentTime 指向第2个时间格时,需要插入一个33s后的任务,此时时间超过了第一层的跨度(1s * 10 = 10s)。Kafka引入层级时间轮的概念,当到期时间超过了当前时间轮所表示的时间范围时,就会尝试添加到上层的时间轮中。

33s 的任务会被插入到第二层的第(33 / 10 = 3) 3 个时间格中。

第一层的开始时间(第0格)startMs 是当前系统时间。其余高层时间轮的起始时间都设置为创建此层时前面第一轮的currentTime。

每一层时间轮都会有指向更高一层的引用。

1.2.3 任务处理及时间轮降级

图 时钟

时间轮类似于时钟,当每一层走完一圈时,上一层就会走一格。例如当第1层的currentTime 指向第2格时,此时需要插入两个任务,分别是33s及39s后。它们都会被插入到第2层的第3格中的bucket(TimerTaskList)。

假设经过33s(第1层指向第5格,第2层指向第3格)后,此bucket还是只有这两个任务。Kafka会把它们所在的TimerTaskList从第2层的第3格中取出,将33s的任务执行并从TimerTaskList中删除。此时,39s的任务还剩6s,Kafka会把这个任务“降级”,插入到第1层第1((5+6)% 10)格中。

1.2.4 时间推进与DelayQueue

如果按照时间格一格格推进时间,这样消耗会比较大,而且可能好多时间格没有存储任务。Kafka借助DelayQueue来推进时间。

将时间格bucket的TimerTaskList封装成Delayed,其剩余时间取TimerTaskList中TimerTaskEntry最快达到的时间。然后将这些Delayed插入到DelayQueue中。DelayQueue会将这些Delayed排序,最快到达的排在队列头部。当到达时刻时,将表头的TimerTaskEntry取出,对它的TaskEntry执行任务执行或降级等操作。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/893987.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数据分析系列--②RapidMiner导入数据和存储过程

一、下载数据 点击下载AssociationAnalysisData.xlsx数据集 二、导入数据 1. 在本地计算机中创建3个文件夹 2. 从本地选择.csv或.xlsx 三、界面说明 四、存储过程 将刚刚新建的过程存储到本地 Congratulations, you are done.

AIP-133 标准方法:Create

编号133原文链接AIP-133: Standard methods: Create状态批准创建日期2019-01-23更新日期2019-01-23 在REST API中&#xff0c;通常向集合URI&#xff08;如 /v1/publishers/{publisher}/books &#xff09;发出POST请求&#xff0c;在集合中创建新资源。 面向资源设计&#x…

HarmonyOS简介:HarmonyOS核心技术理念

核心理念 一次开发、多端部署可分可合、自由流转统一生态、原生智能 一次开发、多端部署 可分可合 自由流转 自由流转可分为跨端迁移和多端协同两种情况 统一生态 支持业界主流跨平台开发框架&#xff0c;通过多层次的开放能力提供统一接入标准&#xff0c;实现三方框架快速…

【论文投稿-第八届智能制造与自动化学术会议(IMA 2025)】HTML, CSS, JavaScript:三者的联系与区别

大会官网&#xff1a;www.icamima.org 目录 前言 一、HTML&#xff08;超文本标记语言&#xff09;&#xff1a;网页的骨架 HTML 的作用&#xff1a; 例子&#xff1a; 总结&#xff1a; 二、CSS&#xff08;层叠样式表&#xff09;&#xff1a;网页的外观设计 CSS 的…

ES6语法

一、Let、const、var变量定义 1.let 声明的变量有严格局部作用域 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"&g…

微前端架构在前端开发中的实践与挑战

随着单页面应用&#xff08;SPA&#xff09;和前端框架如 React、Vue、Angular 的快速发展&#xff0c;现代前端应用的复杂度日益提升。尤其是当应用规模逐渐增大时&#xff0c;单一的代码库往往难以应对不同团队的协作和版本管理问题。为了应对这一挑战&#xff0c;微前端架构…

书生大模型实战营3

文章目录 L0——入门岛git基础Git 是什么&#xff1f;Git 中的一些基本概念工作区、暂存区和 Git 仓库区文件状态分支主要功能 Git 平台介绍GitHubGitLabGitee Git 下载配置验证下载 Git配置 Git验证 Git配置 Git常用操作Git简易入门四部曲Git其他指令 闯关任务任务1: 破冰活动…

【美】Day 1 CPT申请步骤及信息获取指南(Day1 CPT)

参考文章&#xff1a;【美】境外申请Day 1 CPT完整流程&#xff08;境外Day 1 CPT&#xff09; 文章目录 **一、申请前准备&#xff1a;了解Day 1 CPT基本要求****二、选择Day 1 CPT学校****1. 搜索学校****2. 筛选标准** **三、申请流程****1. 提交学校申请****2. 转SEVIS记录…

大一计算机的自学总结:位运算的应用及位图

前言 不仅异或运算有很多骚操作&#xff0c;位运算本身也有很多骚操作。&#xff08;尤其后几个题&#xff0c;太逆天了&#xff09; 一、2 的幂 class Solution { public:bool isPowerOfTwo(int n) {return n>0&&n(n&-n);} }; 根据二进制表示数的原理&#…

Next.js 14 TS 中使用jwt 和 App Router 进行管理

jwt是一个很基础的工作。但是因为架构不一样&#xff0c;就算是相同的架构&#xff0c;版本不一样&#xff0c;加jwt都会有一定的差别。现在我们的项目是Next.js 14 TS 的 App Router项目&#xff08;就是没有pages那种&#xff09;&#xff0c;添加jwt的步骤&#xff1a; 1、…

前端——js高级25.1.27

复习&#xff1a;对象 问题一&#xff1a; 多个数据的封装提 一个对象对应现实中的一个事物 问题二&#xff1a; 统一管理多个数据 问题三&#xff1a; 属性&#xff1a;组成&#xff1a;属性名属性值 &#xff08;属性名为字符串&#xff0c;属性值任意&#xff09; 方…

【Django教程】用户管理系统

Get Started With Django User Management 开始使用Django用户管理 By the end of this tutorial, you’ll understand that: 在本教程结束时&#xff0c;您将了解&#xff1a; Django’s user authentication is a built-in authentication system that comes with pre-conf…

[创业之路-270]:《向流程设计要效率》-2-企业流程架构模式 POS架构(规划、业务运营、支撑)、OES架构(业务运营、使能、支撑)

目录 一、POS架构 二、OES架构 三、POS架构与OES架构的差异 四、各自的典型示例 POS架构典型示例 OES架构典型示例 示例分析 五、各自的典型企业 POS架构典型企业 OES架构典型企业 分析 六、各自典型的流程 POS架构的典型流程 OES架构的典型流程 企业流程架构模式…

计算机的错误计算(二百二十二)

摘要 利用大模型化简计算 实验表明&#xff0c;虽然结果正确&#xff0c;但是&#xff0c;大模型既绕了弯路&#xff0c;又有数值计算错误。 与前面相同&#xff0c;再利用同一个算式看看另外一个大模型的化简与计算能力。 例1. 化简计算摘要中算式。 下面是与一个大模型的…

【现代深度学习技术】深度学习计算 | 参数管理

【作者主页】Francek Chen 【专栏介绍】 ⌈ ⌈ ⌈PyTorch深度学习 ⌋ ⌋ ⌋ 深度学习 (DL, Deep Learning) 特指基于深层神经网络模型和方法的机器学习。它是在统计机器学习、人工神经网络等算法模型基础上&#xff0c;结合当代大数据和大算力的发展而发展出来的。深度学习最重…

C语言,无法正常释放char*的空间

问题描述 #include <stdio.h> #include <stdio.h>const int STRSIZR 10;int main() {char *str (char *)malloc(STRSIZR*sizeof(char));str "string";printf("%s\n", str);free(str); } 乍一看&#xff0c;这块代码没有什么问题。直接书写…

2025春招 SpringCloud 面试题汇总

大家好&#xff0c;我是 V 哥。SpringCloud 在面试中属于重灾区&#xff0c;不仅是基础概念、组件细节&#xff0c;还有高级特性、性能优化&#xff0c;关键是项目实践经验的解决方案&#xff0c;都是需要掌握的内容&#xff0c;正所谓打有准备的仗&#xff0c;秒杀面试官&…

C#面试常考随笔6:ArrayList和 List的主要区别?

在 C# 中&#xff0c;ArrayList和List<T>&#xff08;泛型列表&#xff09;都可用于存储一组对象。推荐优先使用List<T>&#xff0c;因为它具有更好的类型安全性、性能和语法简洁性&#xff0c;并且提供了更丰富的功能。只有在需要与旧代码兼容或存储不同类型对象的…

用C++编写一个2048的小游戏

以下是一个简单的2048游戏的实现。这个实现使用了控制台输入和输出&#xff0c;适合在终端或命令行环境中运行。 2048游戏的实现 1.游戏逻辑 2048游戏的核心逻辑包括&#xff1a; • 初始化一个4x4的网格。 • 随机生成2或4。 • 处理玩家的移动操作&#xff08;上、下、左、…

Julia Distributed(分布式计算)详解

分布式计算是现代计算机科学中日益重要的一个分支&#xff0c;特别是在处理大规模数据和计算密集型应用时变得尤为关键。Julia 语言作为一种高性能的开源编程语言&#xff0c;其内置的分布式计算功能强大且易于使用。本篇博客将深入探讨 Julia Distributed 的基础概念、使用方法…