Kafka源码分析(五) - Server端 - 基于时间轮的延时组件

系列文章目录

Kafka源码分析-目录

一. 背景

Kafka内部涉及大量的"延时"操作,比如收到PRODUCE请求后可为副本等待一个timeout的时间后再响应客户端。

那我们讨论一个问题:Kafka为什么自己实现了一个延时任务组件,而不直接使用java的java.util.concurrent.DelayQueue?

可以按如下两点来分析这个事:

  1. DelayQueue提供了哪些能力?

  2. Kafka所面对的场景是怎样的,为什么DelayQueue不适用?

1.1 DelayQueue的能力

DelayQueue相关的接口/类如下图:
在这里插入图片描述
对应地,DelayQueue所提供的能力如下:

  1. 为任务指定延时时间
    DelayQueue所维护的任务都要实现Delayed接口;其中的getDelay方法返回了距离该任务所设定执行时间有多远;

  2. 任务存取的时间复杂度为O(log n)
    DelayQueue内部使用"优先级队列"来保存任务,该数据结构存取的时间复杂度为对数复杂度;

1.2 Kafka的业务场景

Kafka的业务背景有如下特点:

  1. 延时任务不一定非得等到超时后才执行,有一些事件会触发其提前执行;
    比如PRODUCE请求处理过程中,若副本的响应比较快,那收到了预期数量的副本响应后就可以开始给客户端发响应,不一定非得等满一个timeout的时间;

  2. 延时任务的"入队"操作QPS很高;
    Kafka就是为高QPS而生,内部会产生大量的延时任务;

对应地,Kafka对延时任务组件有如下两点要求:

  1. 要求有提前执行任务的能力;

  2. 要求尽可能降低延时任务入队操作的时间复杂度;
    (借助一个名为"时间轮"的数据结构,Kafka将时间复杂度实际降到了O(1))

这两点要求都无法通过直接应用DelayQueue的方式得到满足。

二. 组件接口

我们来看看Kafka的延时任务组件对外提供的接口,进而搞清楚其提供的能力和使用方式。

如下图:
在这里插入图片描述
左边两个类定义了"延时操作":

  1. TimerTask
    描述了一个最基本的延时Task;定义了超时时间(delayMs属性)、提供了一个取消操作(cancel方法);

  2. DelayedOperation
    描述了一个可被外部事件提前触发的TimerTask;其在TimerTask基础上提供了检查是否满足提前触发条件的方法(tryComplete)、并定义被外部事件提前触发后的行为(onComplete)和无事件触发直到超时后的行为(onExpiration);这几个方法都需要DelayedOperation的子类进行实现;

右边的DelayedOperationPurgatory类定义了一个维护DelayOperaton的容器,其核心操作如下:

  1. tryCompleteElseWatch
    添加延时任务;该方法有两个入参:operation和watchkeys;前者是要添加的延时任务,后者是该任务要监听的事件类型(可以同时监听多个事件类型);可以在kafka.server.DelayedOperationKey所在scala文件中查看目前可选的watchkeys类型;一个watchkey对应一个延时任务队列;

  2. checkAndComplete
    检查各任务是否以满足提前触发的条件;该方法在某个事件发生之后,逻辑是遍历watchkey对应的任务队列,逐个判断是否满足了触发条件;

  3. cancelForKey
    取消watchkey下的所有延时任务;

三. 实现

下文主要关注"延时"的实现方式。

3.1 业务模型

时间轮延时组件的思路如下:

  1. 仍然使用java的DelayQueue存储延时任务;
    但为了降低延时任务入队的事件复杂度,Kafka并不直接将单个延时任务直接存储于DelayQueue,而是先将延时任务列表(TimerTaskList)加入DelayQueue,然后再向对应的列表中添加延时任务 (TimerTaskList也有超时时间的概念,等于其中最快超时的任务的超时时间)。这样任务的入队就由向优先级队列添加元素变为了向TimerTaskList中添加元素;前者时间复杂度为O(logn),后者时间复杂度为O(1)。

  2. 如何判断一个新的延时任务应该加到哪个TimerTaskList中?时间轮(TimingWheel)!
    delayQueue中存储了多个TimerTaskList,当拿到一个新的延时任务时,Kafka会使用TimingWheel来计算该延时任务应插入到哪个TimerTaskList。其实,TimingWheel的本质就是TimerTask和TimerTaskList之间的映射函数。

接下来通过一个具体的例子来说明这种映射逻辑:
在这里插入图片描述
先关注上图中①号时间轮。圆环中每一个单元格表示一个TimerTaskList。单元格有其关联的时间跨度;下方的"1s x 12"表示时间轮上共12个单元格,每个单元格的时间跨度为1秒。有一个指针指向了"当前时间"所对应的单元格。顺时针方向为时间流动方向。

当收到一个延迟时间在0-1s的TimerTask时,会将其追加到①号时间轮的橙色单元格中。当收到一个延时时间在3-4s的TimerTask时,会将其追加到①号时间轮的黄色单元格中。以此类推。

现在有个问题:①号时间轮能表示的最大延迟时间是12秒,那如果收到了延迟13秒的任务该怎么办?这时该用到②号时间轮了,我们称②号为①号的"溢出时间轮"。②号时间轮的特点如下:

  1. 一个单元格所标识的时间跨度为①号的总长度,即12秒;
  2. 单元格数量与①号相同;

如此,延迟时间在12-24s的TimerTask会被追加到②号的紫色单元格,延迟时间在36-48s的TimerTask会被追加到②号的绿色单元格中。③号时间轮同理。

刚刚是按①->②->③的顺序来分析时间轮的逻辑,反过来也可以得到有用的结论:想象手里有一个"放大镜",其实③号时间轮的蓝色单元格"放大"后是②号时间轮;②号时间轮的蓝色单元格"放大"后是①号时间轮;蓝色单元格并不实际存储TimerTask。

3.2 数据结构

在这里插入图片描述
DelayedOperationPurgatory有个Timer类型的timeoutTimer属性,用于维护延时任务。实际使用的是Timer的实现类:SystemTimer。该类用于维护延时任务的核心属性有两个:delayQueue和timingWheel。TimingWheel表示单个时间轮,接下来我们来看看其类图:
TimingWheel类图

各属性含义如下:

  1. buckets:TimerTaskList数组;其中一个元素对应一个时间轮单元格;

  2. tickMs:本时间轮一个单元格所表示的时间跨度(毫秒);

  3. intervel:本时间轮所能表示的时间总长度,其值为 b u c k e t s . l e n g t h ∗ t i c k M s buckets.length*tickMs buckets.lengthtickMs ;

  4. currentTime:当前时间;其是tickMs的整数倍,这样可以通过currentTime的值很方便地计算出"当前"对应的时间轮单元格;其初始值计算公式如下:
    currentTime = startMs - (startMs % tickMs );(其中startMs为时间轮创建时间)

  5. overflowWheel:本时间轮的"溢出时间轮";

3.3 算法

3.3.1 添加任务

添加任务的入口是 DelayedOperationPurgatory.tryCompleteElseWatch,其核心逻辑分如下两步:

def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean = {// 1. 尝试立即指定指定的任务...// 2. 加入到TimertimeoutTimer.add(operation)...
}

SystemTimer.add直接调用了addTimerTaskEntry方法,后者逻辑如下:

private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {// 尝试将任务加入时间轮if (!timingWheel.add(timerTaskEntry)) {// 若任务已过期且未被取消,直接提交执行if (!timerTaskEntry.cancelled)taskExecutor.submit(timerTaskEntry.timerTask)}
}

TimingWheel.add的逻辑也很清晰,分如下4种场景处理:

def add(timerTaskEntry: TimerTaskEntry): Boolean = {val expiration = timerTaskEntry.expirationMsif (timerTaskEntry.cancelled) {// 1.如果任务已取消... ...false} else if (expiration < currentTime + tickMs) {// 2.若任务已过期, 即任务过期时间在"当前单元格"内... ...false} else if (expiration < currentTime + interval) {// 3.虽然过期时间不在"当前单元格"内,但仍在当前时间轮之内// 将任务正常加入到当前时间轮对应的单元格内 ... ...true} else {// 4.超出了当前时间轮所能表示的范围: 将任务加入到"溢出时间轮"if (overflowWheel == null) // 如果当前不存在上层时间轮, 那么生成一个addOverflowWheel()// 将任务加入到"溢出时间轮"overflowWheel.add(timerTaskEntry)}
}

3.3.2 尝试提前触发任务

入口是DelayedOperationPurgatory.checkAndComplete:

def checkAndComplete(key: Any): Int = {// 1. 根据事件key查找对应的任务队列(watchers就是一个队列的封装);val watchers = inReadLock(removeWatchersLock) { watchersForKey.get(key) }// 2. 遍历队列,尝试执行各任务;if(watchers == null)0elsewatchers.tryCompleteWatched()
}

接下来看Watchers.tryCompleteWatched方法的内容:

def tryCompleteWatched(): Int = {var completed = 0// 遍历队列val iter = operations.iterator()while (iter.hasNext) {// 逐个处理各任务val curr = iter.next()if (curr.isCompleted) {// 若任务已经处于完成态, 直接移除iter.remove()} else if (curr.maybeTryComplete()) {// 若任务在调用maybeTryComplete后被成功触发, 则移除iter.remove()completed += 1}}// 若队列已空, 移除当前队列if (operations.isEmpty)removeKeyIfEmpty(key, this)// 返回本次成功提前触发的任务数量completed
}

DelayedOperation.maybeTryComplete方法最终调用了DelayedOperation.tryComplete;

DelayedOperation的子类需要在后者中实现自己的"触发条件"检查逻辑;若满足了提前触发的条件,则调用forceComplete方法执行事件触发场景下的业务逻辑。

3.3.3 任务到期自动执行

DelayedOperationPurgatory中维护了一个expirationReaper线程,其职责就是循环调用kafka.utils.timer.SystemTimer#advanceClock来从事件轮中获取已超时的任务,并更新时间轮的"当前时间"指针。

四. 总结

才疏学浅,未能窥其十之一二,随时欢迎各位交流补充。若文章质量还算及格,可以点赞收藏加以鼓励,后续我继续更新。

另外也可以在目录中找到同系列的其他文章:
Kafka源码分析系列-目录(收藏关注不迷路)

感谢阅读。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/7375.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

微信个人号开发api接口-视频号矩阵接口-VIdeosApi

友情链接&#xff1a;VIdeosApi 获取用户主页 接口地址&#xff1a; http://api.videosapi.com/finder/v2/api/finder/userPage 入参 { "appId": "{{appid}}", "lastBuffer": "", "toUserName": "v2_060000231003b2…

网络基础-华为VRP基础CLI操作

基本命令模式 华为设备的命令行模式包括用户视图和特权级模式。 用户视图&#xff08;User View&#xff09;&#xff1a;这是用户登录到华为设备时默认进入的模式。在用户视图下&#xff0c;用户可以执行一些基本的查看命令&#xff0c;但不能进行设备配置或管理。提示符通常…

Golang | Leetcode Golang题解之第72题编辑距离

题目&#xff1a; 题解&#xff1a; func minDistance(word1 string, word2 string) int {m, n : len(word1), len(word2)dp : make([][]int, m1)for i : range dp {dp[i] make([]int, n1)}for i : 0; i < m1; i {dp[i][0] i // word1[i] 变成 word2[0], 删掉 word1[i], …

U盘提示“被写保护”无法操作处理怎么办?

今天在使用U盘复制拷贝文件时&#xff0c;U盘出现“U盘被写保护”提示&#xff0c;导致U盘明明有空闲内存却无法复制的情况。这种情况很常见&#xff0c;很多人在插入U盘到电脑后&#xff0c;会出现"U盘被写保护"的提示&#xff0c;导致无法进行删除、保存、复制等操…

Junit 测试中如何对异常进行断言

本文对在 Junit 测试中如何对异常进行断言的几种方法进行说明。 使用 Junit 5 如果你使用 Junit 5 的话&#xff0c;你可以直接使用 assertThrows 方法来对异常进行断言。 代码如下&#xff1a; Exception exception assertThrows(NumberFormatException.class, () -> {n…

pycharm关闭代码补全

pycharm关闭代码补全 文件-设置 编辑器-常规-代码补全-键入时显示建议

pyecharts绘制世界动态轨迹图(v0.5.X与v1.X版本对比)

一、问题引入 pyecharts官网&#xff1a;https://pyecharts.org/#/zh-cn/intro 在使用Geo或者GeoLines绘制动态轨迹图时&#xff0c;如果所选地区是中国的省份或者城市&#xff0c;是能够匹配到对应的经纬度并且正常绘制的&#xff1b;如果所选地区涉及到其他国家或者国外城市&…

监控公司局域网电脑的软件|局域网电脑监控软件哪个好用

想要监控公司局域网电脑&#xff1f;没问题&#xff0c;市面上有一大堆选择等着你&#xff01;每个软件都有它的独门绝技和适用场合&#xff0c;接下来就让我带你看看哪些软件既好用又功能强大吧&#xff01; &#x1f389;OpManager&#xff1a; 这位大佬适合中大型企业&#…

C语言 | Leetcode C语言题解之第73题矩阵置零

题目&#xff1a; 题解&#xff1a; void setZeroes(int** matrix, int matrixSize, int* matrixColSize) {int m matrixSize;int n matrixColSize[0];int flag_col0 false;for (int i 0; i < m; i) {if (!matrix[i][0]) {flag_col0 true;}for (int j 1; j < n; j…

Unity 性能优化之遮挡剔除(Occlusion Culling)(六)

提示&#xff1a;仅供参考&#xff0c;有误之处&#xff0c;麻烦大佬指出&#xff0c;不胜感激&#xff01; 文章目录 前言一、遮挡剔除是什么&#xff1f;二、静态遮挡剔除的使用步骤1.标记为遮挡剔除对象2.创建Occlusion Area组件3.烘焙4.Occlusion窗口Bake的参数Smallest Oc…

后台启动HIVE的JDBC连接

后台启动HIVE的JDBC连接 生活就像一杯咖啡&#xff0c;有时苦涩&#xff0c;有时香甜&#xff0c;但都是值得品味的经历。无论遇到什么挑战&#xff0c;记住在每一天的开始&#xff0c;你都有机会给自己倒上一杯清新的力量&#xff0c;为心灵添一抹温暖。勇敢地面对生活的苦与甜…

专家解读 | NIST网络安全框架(1):框架概览

随 着信息技术的快速发展&#xff0c;组织面临着越来越严峻的网络安全挑战。NIST网络安全框架&#xff08;NIST Cybersecurity Framework&#xff0c;CSF&#xff09;是一个灵活的综合性指南&#xff0c;旨在协助各类组织建立、改进和管理网络安全策略&#xff0c;以加强网络安…

计算机网络学习记录Day1

你好,我是Qiuner. 为记录自己编程学习过程和帮助别人少走弯路而写博客 这是我的 github gitee 如果本篇文章帮到了你 不妨点个赞吧~ 我会很高兴的 &#x1f604; (^ ~ ^) 想看更多 那就点个关注吧 我会尽力带来有趣的内容 计算机网络学习记录Day1 本文基于1.1 计算机网络在信息…

python 中的数据结构

python 中的数据结构 1.1 序列 序列时有索引的数组 举例实现&#xff1a; a["北京","上海","广州","深圳","重庆","成都"] print(a[2]) print(a[-1] " " a[-2]) print(a[1:3]) # 运行结果 "&…

C++使用单链表实现一元多项式的加,乘操作

相邀再次喝酒 待 葡萄成熟透 但是命运入面 每个邂逅 一起走到了 某个路口 是敌与是友 各自也没有自由 位置变了 各有队友 首先&#xff0c;按照惯例&#xff0c;十分欢迎大家边听歌边观看本博客&#xff01;&#xff01; 最佳损友 - 陈奕迅 - 单曲 - 网易云音乐 (163.com) 一…

最新优质电商API接口,附带教程【多语言环境高并发】

给大家更新一波24年一月份的新接口吧。 01 接口信息 线路推荐: 多仓&#xff1a; 1.春盈&#xff1a; https://wds.ecsxs.com/230989.json 2.无意&#xff1a; http://www.wya6.cn/tv/yc.json 3.主流电商平台API数据采集 单仓&#xff1a; 1.饭太硬&#xff1a; http:/…

探无止境,云游未来 | “游戏出海云”发布

4月28日下午&#xff0c;2024中国移动算力网络大会之“游戏出海”分论坛在江苏省苏州金鸡湖国际会议中心圆满落幕。 此次论坛由中国移动海南公司主办&#xff0c;中国移动通信集团政企事业部、中国移动云能力中心、中国移动国际公司共同协办。海南省工业与信息化厅副厅长黄业晶…

爬虫学习(1)简易网页采集器

如何使用: (reques ts模块的编码流程) 指定url 发起请求 获取响应数据 持久化存储 import requests#UA:User-Agent (请求载体的身份标识) #UA伪装&#xff1a;门户网站的服务器会检查对应请求的载体身份标识 if __name__ "__main__":urlhttps://www.baidu.com/s#处理…

ESP8266做主机 手机网络助手为从机

ATCIFSR查看地址&#xff0c;一般ESP8266 为192.168.4.1 在手机上下载网络调试助手&#xff0c;打开TCP客户端 创建后192.168.4.1 端口8089然后连接ESP8266热点。 ESP向手机发数据前先发送要发几个数据ATCIPSEND0,8表示发8个&#xff0c;然后再发8个数 上面创建好热点后&…

(Arxiv,2024)Mind the Modality Gap:通过跨模态对齐建立遥感视觉语言模型

文章目录 相关资料摘要引言相关工作对比语言图像预训练遥感域专用 CLIP 模型遥感中的多模态 CLIP 启发模型 方法模型算法输入阶段&#xff1a;输出阶段&#xff1a;步骤说明&#xff1a; 第一阶段&#xff1a;通过权重插值修补CLIP将遥感图像模态与自然图像和文本对齐 实验 相关…