八、Kafka时间轮与常见问题

Kafka与时间轮

Kafka中存在大量的延时操作。
1、发送消息-超时+重试机制

2、ACKS 用于指定分区中必须要有多少副本收到这条消息,生产者才认为写入成功(延时 等)

Kafka并没有使用JDK自带的Timer或者DelayQueue来实现延迟的功能,而是基于时间轮自定义了一个用于实现延迟功能的定时器(SystemTimer)

JDK的Timer和DelayQueue插入和删除操作的平均时间复杂度为O(log(n)),并不能满足Kafka的高性能要求,而基于时间轮可以将插入和删除操作的时间复杂度都降为O(1)。

时间轮的应用并非Kafka独有,其应用场景还有很多,在Netty、Akka、Quartz、Zookeeper等组件中都存在时间轮的踪影。

时间轮

Java中任务调度

要回答这个问题,我们先从Java中最原始的任务调度的方法说起。

给你一批任务(假设有1000个任务),都是不同的时间执行的,时间精确到秒,你怎么实现对所有的任务的调度?

第一种思路是启动一个线程,每秒钟对所有的任务进行遍历,找出执行时间跟当前时间匹配的,执行它。如果任务数量太大,遍历和比较所有任务会比较浪费时间。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Cu92Jn2L-1690353345189)(file:///C:/Users/root/AppData/Local/Temp/ksohtml10964/wps14.jpg)]image.png

第二个思路,把这些任务进行排序,执行时间近(先触发)的放在前面。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-GfOAIem8-1690353345192)(file:///C:/Users/root/AppData/Local/Temp/ksohtml10964/wps15.jpg)]image.png

如果是数组的时间的话,这里会涉及到大量的元素移动(新加入任务,任务执行–删除任务之类,都需要重新排序)

那么在Java代码怎么实现呢?

JDK包里面自带了一个Timer工具类(java.util包下),可以实现延时任务(例如30分钟以后触发),也可以实现周期性任务(例如每1小时触发一次)。

它的本质是一个优先队列(TaskQueue),和一个执行任务的线程(TimerThread)。

(普通的队列是一种先进先出的数据结构,元素在队列尾追加,而从队列头删除。在优先队列中,元素被赋予优先级。当访问元素时,具有最高优先级的元素最先删除。优先队列具有最高级先出 (first in, largest out)的行为特征。通常采用堆数据结构来实现。)

image.png

image.png

在这个优先队列中,最先需要执行的任务排在优先队列的第一个。然后 TimerThread 不断地拿第一个任务的执行时间和当前时间做对比。如果时间到了先看看这个任务是不是周期性执行的任务,如果是则修改当前任务时间为下次执行的时间,如果不是周期性任务则将任务从优先队列中移除。最后执行任务。

但是Timer是单线程的,在很多场景下不能满足业务需求。

在JDK1.5之后,引入了一个支持多线程的任务调度工具ScheduledThreadPoolExecutor用来替代TImer,它是几种常用的线程池之一。看看构造函数,里面是一个延迟队列DelayedWorkQueue,也是一个优先队列。

image.png

DelayedWorkQueue的最小堆实现

优先队列的使用的是最小堆实现。

最小堆的含义: 一种完全二叉树, 父结点的值小于或等于它的左子节点和右子节点

比如插入以下的数据 [1,2,3,7,17,19,25,36,100]

最小堆就长成这个样子。

image.png

优先队列的插入和删除的时间复杂度是O(logn),当数据量大的时候,频繁的入堆出堆性能不是很好。

比如要插入0,过程如下:

1、插入末尾元素

image.png

2、0比19小,所以要向上移动且互换。

image.png

3、0比2小,所以要向上移动且互换。

image.png

4、0比2小,所以要向上移动且互换。

image.png

算法复杂度

N个数据的最小堆, 共有logN层, 最坏的情况下, 需要移动logN次

时间轮

这里我们先考虑对所有的任务进行分组,把相同执行时刻的任务放在一起。比如这里,数组里面的一个下标就代表1秒钟。它就会变成一个数组加链表的数据结构。分组以后遍历和比较的时间会减少一些。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-dpk4Jicj-1690353345195)(file:///C:/Users/root/AppData/Local/Temp/ksohtml10964/wps16.jpg)]image.png

但是还是有问题,如果任务数量非常大,而且时间都不一样,或者有执行时间非常遥远的任务,那这个数组长度是不是要非常地长?比如有个任务2个月之后执行,从现在开始计算,它的下标是5253120。

所以长度肯定不能是无限的,只能是固定长度的。比如固定长度是8,一个格子代表1秒(现在叫做一个bucket槽),一圈可以表示8秒。遍历的线程只要一个格子一个格子的获取任务,并且执行就OK了。

固定长度的数组怎么用来表示超出最大长度的时间呢?可以用循环数组。

比如一个循环数组长度8,可以表示8秒。8秒以后执行的任务怎么放进去?只要除以8,用得到的余数,放到对应的格子就OK了。比如10%8=2,它放在第2个格子。这里就有了轮次的概念,第10秒的任务是第二轮的时候才执行。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-qCMzQcjf-1690353345196)(file:///C:/Users/root/AppData/Local/Temp/ksohtml10964/wps17.jpg)]image.png

这时候,时间轮的概念已经出来了。

如果任务数量太多,相同时刻执行的任务很多,会导致链表变得非常长。这里我们可以进一步对这个时间轮做一个改造,做一个多层的时间轮。

比如:最内层8个格子,每个格子1秒;外层8个格子,每个格子8*8=64秒;最内层走一圈,外层走一格。这时候时间轮就跟时钟更像了。随着时间流动,任务会降级,外层的任务会慢慢地向内层移动。

image.png

时间轮任务插入和删除时间复杂度都为O(1),应用范围非常广泛,更适合任务数很大的延时场景。Dubbo、Netty、Kafka中都有实现。

Kafka中时间轮实现

Kafka里面TimingWheel的数据结构

image.png

kafka会启动一个线程,去推动时间轮的指针转动。其实现原理其实就是通过queue.poll()取出放在最前面的槽的TimerTaskList

image.png

image.png

添加新的延迟任务

image.png

往时间轮添加新的任务

image.png

时间轮指针的推进

image.png

第二层时间轮的创建代码如下

image.png

Kafka性能问题

1、kafka如何确保消息的可靠性传输

这个问题需要从以下3个方面分析和解决

(1)消费端弄丢了数据

唯一可能导致消费者弄丢数据的情况,就是说,你那个消费到了这个消息,然后消费者那边自动提交了offset,让kafka以为你已经消费好了这个消息,其实你刚准备处理这个消息,你还没处理,你自己就挂了,此时这条消息就丢咯。

大家都知道kafka会自动提交offset,那么只要关闭自动提交offset,在处理完之后自己手动提交offset,就可以保证数据不会丢。但是此时确实还是会重复消费,比如你刚处理完,还没提交offset,结果自己挂了,此时肯定会重复消费一次,自己保证幂等性就好了。

生产环境碰到的一个问题,就是说我们的kafka消费者消费到了数据之后是写到一个内存的queue里先缓冲一下,结果有的时候,你刚把消息写入内存queue,然后消费者会自动提交offset。

然后此时我们重启了系统,就会导致内存queue里还没来得及处理的数据就丢失了

(2)kafka弄丢了数据

这块比较常见的一个场景,就是kafka某个broker宕机,然后重新选举partiton的leader时。大家想想,要是此时其他的follower刚好还有些数据没有同步,结果此时leader挂了,然后选举某个follower成leader之后,他不就少了一些数据?这就丢了一些数据啊。

所以此时一般是要求起码设置如下4个参数:

给这个topic设置replication.factor参数:这个值必须大于1,要求每个partition必须有至少2个副本。

在kafka服务端设置min.insync.replicas参数:这个值必须大于1,这个是要求一个leader至少感知到有至少一个follower还跟自己保持联系,没掉队,这样才能确保leader挂了还有一个follower吧。

在producer端设置acks=all:这个是要求每条数据,必须是写入所有replica之后,才能认为是写成功了。

在producer端设置retries=MAX(很大很大很大的一个值,无限次重试的意思):这个是要求一旦写入失败,就无限重试,卡在这里了。

(3)生产者会不会弄丢数据

如果按照上述的思路设置了ack=all,一定不会丢,要求是,你的leader接收到消息,所有的follower都同步到了消息之后,才认为本次写成功了。如果没满足这个条件,生产者会自动不断的重试,重试无限次。

2、如何实现Kafka的高性能?

1、宏观架构层面利用Partition实现并行处理

Kafka中每个Topic都包含一个或多个Partition,不同Partition可位于不同节点。同时Partition在物理上对应一个本地文件夹,每个Partition包含一个或多个Segment,每个Segment包含一个数据文件和一个与之对应的索引文件。在逻辑上,可以把一个Partition当作一个非常长的数组,可通过这个“数组”的索引(offset)去访问其数据。

一方面,由于不同Partition可位于不同机器,因此可以充分利用集群优势,实现机器间的并行处理。另一方面,由于Partition在物理上对应一个文件夹,即使多个Partition位于同一个节点,也可通过配置让同一节点上的不同Partition置于不同的disk drive上,从而实现磁盘间的并行处理,充分发挥多磁盘的优势。

利用多磁盘的具体方法是,将不同磁盘mount到不同目录,然后在server.properties中,将log.dirs设置为多目录(用逗号分隔)。Kafka会自动将所有Partition尽可能均匀分配到不同目录也即不同目录(也即不同disk)上。

Partition是最小并发粒度,Partition个数决定了可能的最大并行度。

2、充分利用PageCache

Page Cache,又称pcache,其中文名称为页高速缓冲存储器,简称页高缓。page cache的大小为一页,通常为4K。在linux读写文件时,它用于缓存文件的逻辑内容,从而加快对磁盘上映像和数据的访问。 是Linux操作系统的一个特色。

image.png

1、读Cache

当内核发起一个读请求时(例如进程发起read()请求),首先会检查请求的数据是否缓存到了Page Cache中。

如果有,那么直接从内存中读取,不需要访问磁盘,这被称为cache命中(cache hit);

如果cache中没有请求的数据,即cache未命中(cache miss),就必须从磁盘中读取数据。然后内核将读取的数据缓存到cache中,这样后续的读请求就可以命中cache了。

page可以只缓存一个文件部分的内容,不需要把整个文件都缓存进来。

2、写Cache

当内核发起一个写请求时(例如进程发起write()请求),同样是直接往cache中写入,后备存储中的内容不会直接更新(当服务器出现断电关机时,存在数据丢失风险)。

内核会将被写入的page标记为dirty,并将其加入dirty list中。内核会周期性地将dirty list中的page写回到磁盘上,从而使磁盘上的数据和内存中缓存的数据一致。

当满足以下两个条件之一将触发脏数据刷新到磁盘操作:

数据存在的时间超过了dirty_expire_centisecs(默认300厘秒,即30秒)时间;

脏数据所占内存 > dirty_background_ratio,也就是说当脏数据所占用的内存占总内存的比例超过dirty_background_ratio(默认10,即系统内存的10%)的时候会触发pdflush刷新脏数据。

如何查看Page Cache参数

执行命令 sysctl -a|grep dirty

如何调整内核参数来优化IO性能?

(1)vm.dirty_background_ratio参数优化

这个参数指定了当文件系统缓存脏页数量达到系统内存百分之多少时(如5%)就会触发后台回写进程运行,将一定缓存的脏页异步地刷入磁盘;

当cached中缓存当数据占总内存的比例达到这个参数设定的值时将触发刷磁盘操作。

把这个参数适当调小,这样可以把原来一个大的IO刷盘操作变为多个小的IO刷盘操作,从而把IO写峰值削平。

对于内存很大和磁盘性能比较差的服务器,应该把这个值设置的小一点。

(2)vm.dirty_ratio参数优化

这个参数则指定了当文件系统缓存脏页数量达到系统内存百分之多少时(如10%),系统不得不开始处理缓存脏页(因为此时脏页数量已经比较多,为了避免数据丢失需要将一定脏页刷入外存);在此过程中很多应用进程可能会因为系统转而处理文件IO而阻塞。

对于写压力特别大的,建议把这个参数适当调大;对于写压力小的可以适当调小;如果cached的数据所占比例(这里是占总内存的比例)超过这个设置,

系统会停止所有的应用层的IO写操作,等待刷完数据后恢复IO。所以万一触发了系统的这个操作,对于用户来说影响非常大的。

(3)vm.dirty_expire_centisecs参数优化

这个参数会和参数vm.dirty_background_ratio一起来作用,一个表示大小比例,一个表示时间;即满足其中任何一个的条件都达到刷盘的条件。

为什么要这么设计呢?我们来试想一下以下场景:

如果只有参数 vm.dirty_background_ratio ,也就是说cache中的数据需要超过这个阀值才会满足刷磁盘的条件;

如果数据一直没有达到这个阀值,那相当于cache中的数据就永远无法持久化到磁盘,这种情况下,一旦服务器重启,那么cache中的数据必然丢失。

结合以上情况,所以添加了一个数据过期时间参数。当数据量没有达到阀值,但是达到了我们设定的过期时间,同样可以实现数据刷盘。

这样可以有效的解决上述存在的问题,其实这种设计在绝大部分框架中都有。

(4)vm.dirty_writeback_centisecs参数优化

理论上调小这个参数,可以提高刷磁盘的频率,从而尽快把脏数据刷新到磁盘上。但一定要保证间隔时间内一定可以让数据刷盘完成。

(5)vm.swappiness参数优化

禁用swap空间,设置vm.swappiness=0

3、减少网络开销批处理

批处理是一种常用的用于提高I/O性能的方式。对Kafka而言,批处理既减少了网络传输的Overhead,又提高了写磁盘的效率。

Kafka 的send方法并非立即将消息发送出去,而是通过batch.size和linger.ms控制实际发送频率,从而实现批量发送。

由于每次网络传输,除了传输消息本身以外,还要传输非常多的网络协议本身的一些内容(称为Overhead),所以将多条消息合并到一起传输,可有效减少网络传输的Overhead,进而提高了传输效率。

4、数据压缩降低网络负载

Kafka支持将数据压缩后再传输给Broker。除了可以将每条消息单独压缩然后传输外,Kafka还支持在批量发送时,将整个Batch的消息一起压缩后传输。数据压缩的一个基本原理是,重复数据越多压缩效果越好。因此将整个Batch的数据一起压缩能更大幅度减小数据量,从而更大程度提高网络传输效率。

Broker接收消息后,并不直接解压缩,而是直接将消息以压缩后的形式持久化到磁盘。Consumer Fetch到数据后再解压缩。因此Kafka的压缩不仅减少了Producer到Broker的网络传输负载,同时也降低了Broker磁盘操作的负载,也降低了Consumer与Broker间的网络传输量,从而极大得提高了传输效率,提高了吞吐量。

5、高效的序列化方式

Kafka消息的Key和Value的类型可自定义,只需同时提供相应的序列化器和反序列化器即可。

因此用户可以通过使用快速且紧凑的序列化-反序列化方式(如Avro,Protocal Buffer)来减少实际网络传输和磁盘存储的数据规模,从而提高吞吐率。这里要注意,如果使用的序列化方法太慢,即使压缩比非常高,最终的效率也不一定高。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/12485.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

石子合并(区间dp模板)

题目描述&#xff1a; dp分析&#xff1a; 解题代码&#xff1a; #include<iostream> using namespace std;const int N1e36;int f[N][N]; int a[N]; int s[N];int main(){int n;cin>>n;for(int i1;i<n;i){scanf("%d",&s[i]);s[i]s[i-1];//前缀和…

谈谈你对Synchronized关键字的理解及使用

synchronized关键字最主要的三种使用方式的总结 修饰实例方法&#xff0c;作用于当前对象实例加锁&#xff0c;进入同步代码前要获得当前对象实例的锁修饰静态方法&#xff0c;作用于当前类对象加锁&#xff0c;进入同步代码前要获得当前类对象的锁 。也就是给当前类加锁&…

centos7安装tomcat

安装tomcat 必须依赖 JDK 环境&#xff0c;一定要提前装好JDK保证可以使用 一、下载安装包 到官网下载 上传到linux 服务器 二、安装tomcat 创建tomcat 文件夹 mkdir -p /usr/local/tomcat设置文件夹权限 chmod 757 tomcat将安装包上传至 新建文件夹 解压安装包 tar zx…

自动驾驶之轨迹规划8——Apollo参考线和轨迹

1. abstract 本文主要讲解routing和planning模块中的reference line&#xff0c;我之前一直搞不明白这个reference line是如何生成的&#xff0c;有什么作用&#xff0c;和routing以及planning的关系。现在有了一些心得打算梳理一下&#xff1a; 决策规划模块负责生成车辆的行…

浅谈3D隐式表示(SDF,Occupancy field,NeRF)

本篇文章介绍了符号距离函数Signed Distance Funciton(SDF)&#xff0c;占用场Occupancy Field&#xff0c;神经辐射场Neural Radiance Field&#xff08;NeRF&#xff09;的概念、联系与区别。 显式表示与隐式表示 三维空间的表示形式可以分为显式和隐式。 比较常用的显式表…

STM32 串口基础知识学习

串行/并行通信 串行通信&#xff1a;数据逐位按顺序依次传输。 并行通信&#xff1a;数据各位通过多条线同时传输。 对比 传输速率&#xff1a;串行通信较低&#xff0c;并行通信较高。抗干扰能力&#xff1a;串行通信较强&#xff0c;并行通信较弱。通信距离&#xff1a;串…

区间预测 | MATLAB实现QRLSTM长短期记忆神经网络分位数回归多输入单输出区间预测

区间预测 | MATLAB实现QRLSTM长短期记忆神经网络分位数回归多输入单输出区间预测 目录 区间预测 | MATLAB实现QRLSTM长短期记忆神经网络分位数回归多输入单输出区间预测效果一览基本介绍模型描述程序设计参考资料 效果一览 基本介绍 MATLAB实现QRLSTM长短期记忆神经网络分位数回…

计算机科学cs/电子信息ei面试准备——数学基础/线性代数复习

1. 中值定理 中值定理是反映函数与导数之间联系的重要定理&#xff0c;也是微积分学的理论基础&#xff0c;在许多方面它都有重要的作用&#xff0c;在进行一些公式推导与定理证明中都有很多应用。中值定理是由众多定理共同构建的&#xff0c;其中拉格朗日中值定理是核心&…

【Linux命令200例】less强大的文件内容查看工具

&#x1f3c6;作者简介&#xff0c;黑夜开发者&#xff0c;全栈领域新星创作者✌&#xff0c;2023年6月csdn上海赛道top4。 &#x1f3c6;本文已收录于专栏&#xff1a;Linux命令大全。 &#x1f3c6;本专栏我们会通过具体的系统的命令讲解加上鲜活的实操案例对各个命令进行深入…

最优除法(力扣)数学 JAVA

给定一正整数数组 nums&#xff0c;nums 中的相邻整数将进行浮点除法。例如&#xff0c; [2,3,4] -> 2 / 3 / 4 。 例如&#xff0c;nums [2,3,4]&#xff0c;我们将求表达式的值 “2/3/4”。 但是&#xff0c;你可以在任意位置添加任意数目的括号&#xff0c;来改变算数的…

苍穹外卖day07——缓存菜品套餐+购物车功能实现

缓存菜品——需求设计与分析 问题说明 用户访问量过大带来的一个直接效果就是响应速度慢&#xff0c;使用体验下降。 实现思路 使用redis缓存菜品数据&#xff0c;减少数据库查询操作。 页面展示上基本就是同一个分类在同一页&#xff0c;所以key-value结构可以使用不同的分…

【Git】版本回退与撤销修改案例

目录 一、版本回退 二、撤销修改案例 案例1&#xff1a;仅在工作区进行了修改还未进行add操作 案例2&#xff1a;仅进行了add 操作还未进行commit操作 案例3&#xff1a;进行了add与commit操作无其他操作 三、版本库中删除文件 一、版本回退 在进行版本回退之前我们需要…

Excel的使用

1.EXCEL诞生的意义 1.1 找到想要的数据 1.2 提升输入速度 2.数据分析与可视化操作 目的是提升数据的价值和意义 3.EXCEL使用的内在意义和外在形式 4.EXCEL的价值 4.1 解读及挖掘数据价值 4.2 协作板块 4.3 展示专业度 4.4 共享文档内容 5.人的需求》》软件功能

【UE4】局域网多人联机 Demo

效果 亲测可以打包后在两个电脑上联机运行&#xff08;前提是在同一个局域网内&#xff0c;互相能ping通&#xff09; 步骤 1. 首先新建一个第三人称角色模板工程 2. 在多玩家选项中&#xff0c;设置玩家数量为2 选择在新建编辑器窗口中运行 3. 新建一个父类为Character的蓝…

wangeditor 表格问题总结及适配方案

一、导出编辑器内容&#xff0c;表格无边框样式 1、通过 let article this.editor.getHtml(); // editor.getHtml() 获取 HTML 内容&#xff1b; 2、处理文本字符串&#xff1a;&#xff08;手动为 table 加上 css样式&#xff09;&#xff1b; article article.replace…

虎年现货黄金投资布局图

参与现货黄金交易的主要目的&#xff0c;是为了根据行情走势的变动&#xff0c;把握一些较佳的获利机会&#xff0c;在这样的一个过程中&#xff0c;如果投资者能够提前把布局的图表画好&#xff0c;那么就可能获得事半功倍的效果&#xff0c;而本文将为大家简单的介绍&#xf…

【主成分分析(PCA)- 鸢尾花】

主成分分析&#xff08;PCA&#xff09; 摘要 在现代数据科学中&#xff0c;维度灾难常常是数据处理与分析的一大难题。主成分分析&#xff08;PCA&#xff09;是一种广泛使用的数据降维技术&#xff0c;它通过将原始数据转换为新的低维空间&#xff0c;保留最重要的信息&…

15.Netty源码之EventLoop

highlight: arduino-light Netty配置主从Reactor模式 通过将NioServerSocketChannel绑定到了bossGroup。 将NioServerSocketChannel接收到请求创建的SocketChannel放入workerGroup。 将2个不同的SocketChannel绑定到2个不同的Group完成了主从 Reactor 模式。 分配NIOEventLoop的…

论文精读之BERT

目录 1.摘要&#xff08;Abstract&#xff09; 2.引言&#xff08;Introduction&#xff09;&#xff1a; 3.结论&#xff08;Conlusion&#xff09;&#xff1a; 4.BERT模型算法: 5.总结 1.摘要&#xff08;Abstract&#xff09; 与别的文章的区别是什么:BERT是用来设计去…

springboot创建并配置环境(三) - 配置扩展属性(上集)

文章目录 一、介绍二、配置文件application.yml 一、介绍 在上一篇文章&#xff1a;springboot创建并配置环境(二) - 配置基础环境中&#xff0c;我们介绍了springboot如何配置基础环境变量。本篇文章讨论如何处理配置文件。即来自不同位置的配置属性&#xff0c;如&#xff1…