引言
- 同事在公司内部分享了关于 kafka 技术一些相关的内容,所以有了这篇文章;
- 部分图片选自网络摘抄;
1 Kafka概述
1.1 定义
Kafka传统定义:kafka是一个分布式的基于发布/订阅模式的消息队列。
Kafka最新定义:kafka用于构建实时数据处理系统,它具有横向扩展、高可用,速度极快等特点,已经被很多公司使用。
1.2 应用场景
- 消息系统:解耦、削峰、缓存消息、异步通信等。
- 日志收集:可以用kafka收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer。
- 用户活动跟踪:kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后消费者通过订阅这些topic来做实时的监控分析,亦可保存到数据库。
运营指标:kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
大数据实时计算:kafka被应用到大数据处理,如与hadoop、spark、storm等整合。
1.3 主流MQ对比
什么是MQ,谈谈你的理解?
特性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
---|---|---|---|---|
单机吞吐量 | 万级 | 万级 | 十万级,支撑高吞吐 | 十万级,高吞吐 |
性能的稳定性 | 老牌消息队列,性能一般、资源消耗比较大。 | 消息堆积时,性能不稳定、明显下降 | 队列较多、消息堆积时性能稳定 | 队列/分区多时性能不稳定、明显下降;消息堆积时性能稳定 |
时效性 | ms级 | 微秒级 | ms级 | 延迟在ms级以内 |
可用性 | 高 | 高 | 非常高,分布式架构 | 非常高,分布式,多个副本 |
消息可靠性 | 有较低的概率丢失数据 | 基本不丢失 | 经过参数优化配置,可以做到0丢失 | 同RocketMQ |
技术栈 | Java | Erlang | Java | Java、Scala |
1.4 初识Kafka(kafka-2.12-3.2.0)
(Kafka之所以受到越来越多的青睐,与它所“扮演 ”的三大角色是分不开的 :)
- 消息系统: Kafka和传统的消息系统(也称作消息中间件)都具备系统解稿、冗余存储、流量削峰、缓冲、异步通信、扩展性、 可恢复性等功能。与此同时,Kafka 还提供了大多数消息系统难以实现的消息顺序性保障及回溯消费的功能。
- 存储系统: Kafka 把消息持久化到磁盘,相比于其他基于内存存储的系统而言,有效地降低了数据丢失的风险 。 也正是得益于Kafka的消息持久化功能和多副本机制,我们可以把Kafka作为长期的数据存储系统来使用,只需要把对应的数据保留策略设置为“永久”或启用主题的日志压缩功能即可。
- 流式处理平台: Kafka不仅为每个流行的流式处理框架提供了可靠的数据来源,还提供了一个完整的流式处理类库,比如窗口、连接、变换和聚合等各类操作。
1.5 基本概念
- 消息:消息就是kafka中的一条数据,类似于数据库中的一行。
- Producer:生产者,向kafka主题发布消息的应用程序。
- Consumer:消费者,从kafka主题订阅消息的应用程序。
- Consumer Group:消费者组,由多个 consumer 组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。 所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
- Broker:kafka服务器,一个broker可以容纳多个topic。
- Topic:主题是承载消息的逻辑容器,在实际使用中用来区分具体业务。
- Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker上,一个topic可以分为多个partition,每个partition是一个有序的队列。如果一个topic中的partition有5个,那么topic的并发度为5。
- Offset:每个Consumer 消费的信息都会有自己的序号,我们称作当前队列的offset。即消费点位标识消费到的位置。
- Replica:副本,为保证集群中的某个节点发生故障时,该节点上的partition数据不丢失,且kafka仍然能够继续工作,kafka提供了副本机制,一个topic的每个分区都有若干个副本,一个leader和若干个follower。
- Leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是leader。
- Follower:每个分区多个副本中的“从”,实时从leader中同步数据,保持和leader数据的同步。 leader发生故障时,某个follower会成为新的 leader。
2 Kafka生产者
2.1生产者消息发送流程
2.1.1 整体架构
2.1.2 名词释义
- 消息在网络上都是以字节(Byte)的形式传输的,在发送之前需要创建一块内存区域来保存对应的消息。在Kafka生产者客户端中,通过java.io.ByteBuffer实现消息内存的创建和释放。不过频繁的创建和释放是比较耗费资源的,在RecordAccumulator的内部还有一个BufferPool,它主要用来实现ByteBuffer的复用,以实现缓存的高效利用。不过BufferPool只针对特定大小的ByteBuffer进行管理,而其他大小的ByteBuffer不会缓存进BufferPool中,这个特定的大小由batch.size参数来指定,默认值为16KB。
- RecordAccumulator 缓存消息以便 Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能。RecordAccumulator 缓存的大小可以通过生产者客户端参数 buffer.memory 配置,默认值为32MB。如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候KafkaProducer的send()方法调用要么被阻塞,要么抛出异常,这个取决于参数max.block.ms的配置,此参数的默认值为60秒。
- 主线程中发送过来的消息都会被追加到RecordAccumulator的某个双端队列(Deque)中,在RecordAccumulator 的内部为每个分区都维护了一个双端队列,队列中的内容就是ProducerBatch,即 Queque<ProducerBatch>。将较小的ProducerRecord拼凑成一个较大的ProducerBatch,也可以减少网络请求的次数以提升整体的吞吐量。
- Sender 从 RecordAccumulator 中获取缓存的消息之后,会进一步将原本<分区,Deque<ProducerBatch>>的保存形式转变成<Node,List< ProducerBatch>的形式,其中Node表示Kafka集群的broker节点。然后进一步封装成<Node,Request>的形式,这样就可以将Request请求发往各个Node。
- 请求在从Sender线程发往Kafka之前还会保存到 InFlightRequests中,InFlightRequests保存对象的具体形式为 Map<NodeId,Deque<Request>>,它的主要作用是缓存了已经发出去但还没有收到响应的请求(NodeId 是一个 String 类型,表示节点的 id 编号)。通过配置参数max.in.flight.requests.per.connection,默认值为 5,即每个连接最多只能缓存 5 个未响应的请求,超过该数值之后就不能再向这个连接发送更多的请求了,除非有缓存的请求收到了响应(Response)。通过比较Deque<Request>的size与这个参数的大小来判断对应的Node中是否已经堆积了很多未响应的消息,如果真是如此,那么说明这个Node 节点负载较大或网络连接有问题,再继续向其发送请求会增大请求超时的可能。
2.2 重要的生产者参数
2.2.1 acks
- acks=0, 生产者在成功写入消息之前不会等待任何来自服务器的相应。如果出现问题生产者是感知不到的,消息就丢失了。不过因为生产者不需要等待服务器响应,所以它可以以网络能够支持的最大速度发送消息,从而达到很高的吞吐量。
- acks=1,只要集群的leader节点收到消息,生产这就会收到一个来自服务器的成功响应。如果消息无法达到leader节点(比如leader节点崩溃,新的leader还没有被选举出来),生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。但是,这样还有可能会导致数据丢失,如果收到写成功通知,此时leader节点还没来的及同步数据到follower节点,leader节点崩溃,就会导致数据丢失。
- acks=-1(all), 只有当所有参与复制的节点都收到消息时,生产这会收到一个来自服务器的成功响应,这种模式是最安全的,它可以保证不止一个服务器收到消息。
2.2.2 buffer.memory
该参数用来设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。如果应用程序发送消息的速率比写入kafka的速度要快,会导致生产者空间不足。这个时候,send()方法调用要么被阻塞,要么抛出异常,取决于如何设置max.block.ms,表示在抛出异常之前可以阻塞一段时间。)
2.2.3 compression.type
默认情况下,消息发送时不会被压缩。该参数可以设置成snappy,gzip或lz4,它指定了消息发送给broker之前使用哪一种压缩算法。snappy占用较少的CPU,却能提供较好的性能和相当可观的压缩比,如果比较关注性能和网络带宽,可以使用这种压缩算法。gzip压缩算法一般会占用比较多的CPU,但会提供更高的压缩,如果网络带宽有限,可以使用这种算法。使用压缩可以降低网络传输开销和存储开销,而这往往是向kafka发送消息的瓶颈所在。
2.2.4 max.request.size
该参数用来限制生产者客户端能发送的消息的最大值,默认为1MB。
2.2.5 retries和retry.backoff.ms
retries参数用来配置生产者重试的次数,默认值为0,即在发生异常的时候不进行任何重试动作。retry.backoff.ms有关,这个参数的默认值为100,它用来设定两次重试之间的时间间隔,避免无效的频繁重试。
2.2.6 batch.size
当有多个消息要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算,而不是消息个数。当批次被填满,批次里的所有消息会被发送出去。不过生产者并不一定都会等到批次被填满才发送,半满的批次,甚至只包含一个消息的批次也可能被发送。所以就算把batch.size设置的很大,也不会造成延迟,只会占用更多的内存而已,如果设置的太小,生产者会因为频繁发送消息而增加一些额外的开销。
2.2.7 linger.ms
该参数用来指定生产者发送ProducerBatch之前等待更多消息(ProducerRecord)加入ProducerBatch的时间,默认值为0。
3 Kafka Broker
3.1 ZK存储的Kafka信息
- /kafka/brokers/ids 记录kafka集群服务器
- /kafka/brokers/topics/{topic}/partitions/0/state 记录谁是Leader,有哪些服务器可用
- consumers 以前是用来保存offset信息,0.9版本之后offset存储在kafka主题中
- controller用来辅助选举Leader
3.2 Broker整体工作流程
3.3 Kafka副本
- Kafka副本作用:提高数据可靠性。
- Kafka默认副本1个,生产环境一般配置为2个,保证数据可靠性;太多副本会增加磁盘存储空间,增加网路上数据传输,降低效率。
- Kafka中副本分为:Leader和Follewer。Kafka生产者只会把数据发往Leader,然后Follower找Leader进行数据同步。
- Kafka分区中的所有副本统称为AR,AR=ISR+OSR
ISR:表示和Leader保持同步的Follower集合(也包含Leader)。如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认为30S。Leader发生故障之后,就会从ISR中选举新的Leader。
OSR:表示Follower与Leader副本同步时,延迟过多的副本集合。
3.4 Leader和Follower故障处理细节
3.4.1 Leader故障处理细节:
3.4.2 Follower故障处理细节:
3.5 Kafka的数据存储
3.5 Kafka文件清理策略
(Kafka中默认的日志保存时间为7天,可以通过调整如下参数修改保存时间)**
- log.retention.hours,小时(168)最低优先级,默认7天。
- log.retention.minutes,分钟。
- log.retention.ms,毫秒 最高优先级。
- log.retention.check.interval.ms,负责设置检查周期,默认5分钟。
日志超过设置时间,Kafka中提供了两种日志清理策略:
- delete 日志删除:将过期数据删除,参数设置:log.cleanup.policy = delete(a.基于时间,b.基于大小 log.retention.bytes,默认等于-1,表示无穷大)。
- compact 日志压缩:参数设置:log.cleanup.policy = compact,对于相同key的不同value值,只保留最后一个版本。
3.6 数据高效读写
- kafka支持分布式集群,可以采用分区技术,并行度高。
- 采用稀疏索引,可以快速定位要消费的数据。
- 顺序读写磁盘。(顺序写可以达到600M/s,随机写只有100K/s)
- 页缓存+零拷贝技术
3.6.1 高效读写技术
页缓存:kafka重度依赖底层操作系统提供的PageCache功能。当上层有写操作时,操作系统只是将数据写入PageCache中;当读操作发生时,先从PageCache中查找,如果找不到,再去磁盘中读取。实际上PageCache是把尽可能多的空闲内存当做了磁盘缓存来使用。
DMA:DMA(Direct Memory Access,直接内存访问)拷贝是计算机系统中常用的一种数据传输方式。在进行数据传输时,CPU通常需要参与数据的读取和写入过程,这样会占用CPU的大量时间和资源。为了减轻CPU的负担,提高系统整体性能,引入了DMA技术。
- DMA拷贝的基本原理是通过直接访问系统内存,绕过CPU的直接控制,实现数据的传输。具体来说,DMA控制器可以直接从设备(如硬盘、网卡等)读取数据,并将数据直接写入系统内存,或者直接从内存读取数据并发送给设备,而无需CPU的参与。这样一来,CPU可以继续执行其他任务,而数据传输过程由DMA控制器完成,从而提高了系统的并发性和整体性能。
- 在实际应用中,DMA拷贝常用于大规模数据传输,例如文件读写、网络数据传输等场景。通过使用DMA技术,可以显著减少CPU的负载,提高数据传输效率,从而改善系统的响应速度和性能表现。
sendfile():sendfile() 是一个在网络编程中常用的系统调用函数,它用于在两个文件描述符之间直接传输数据,而无需在用户空间进行数据拷贝,避免了数据在用户空间和内核空间之间来回复制的过程。在Linux系统中,sendfile() 函数可以在内核空间和内核缓冲区之间直接传输数据,提高了数据传输的效率和性能。
3.6.2 各种读写流程对比
- 常规读取数据流程:
- sendfile()方式读取数据流程:
- sendfile + DMA gather copy:
4 Kafka 消费者
4.1 消费方式
4.2 消费者整体工作流程
4.3 消费者组原理
4.3.1 Consumer Group
(消费者组由多个consumer组成,形成一个消费者组的条件是,所有消费者的groupid相同)
- 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。
- 消费者组之间互不影响,消费者组是逻辑上的一个订阅者。
4.3.2 消费者组初始化流程
4.3.3 消费者组消费流程
4.4 分区的分配以及再平衡
Kafka有四种主流的分区分配策略:Range、RoundRobin、Sticky、CooperativeSticky。可以通过配置参数partition.assignment.strategy修改分区分配策略,默认策略是Range+CooperativeSticky,Kafka可以同时使用多个分区分配策略。
4.4.1 Range策略
4.4.2 RoundRobin策略
4.4.3 Sticky策略
该策略旨在尽可能地保持分配的稳定性,即消费者与分区的绑定关系尽量保持不变,这有助于提高消费者的性能和效率。(粘性分区)
Sticky 分配策略的工作原理如下:
- 初始分配:当一个新的消费者加入消费者组时,初始分配会按照其他常规的分配策略(如 Round-robin 等)将分区分配给消费者。
- Sticky 机制:一旦分配完成后,Sticky 分配策略会尽可能地保持分配的稳定性。即在后续的重新分配中,尽量保持每个消费者与其分配到的分区的绑定关系不变。
- 重平衡触发:当消费者组发生变化(如有新的消费者加入、有消费者退出等)或者分区发生变化时,会触发重平衡操作。
- 重平衡操作:在重平衡操作中,根据 Sticky 分配策略,系统会尽量保持消费者与分区的绑定关系不变。这意味着在重新分配分区时,会优先考虑让消费者仍然消费之前分配到的分区。
(通过 Sticky 分配策略,Kafka 可以有效地减少在消费者组内发生重平衡时的数据移动,避免不必要的分区再分配,提高整体的性能和吞吐量。)
4.4.4 CooperativeSticky策略:
CooperativeSticky 分配策略是 Kafka 新版本中引入的一种消费者分区分配策略,旨在提高消费者的负载均衡和性能。
有以下几个特点:
- 合作式(Cooperative):CooperativeSticky 策略是一种合作式的分配策略,消费者可以共同合作以优化分区的分配。
- 粘性(Sticky):与 Sticky 策略类似,CooperativeSticky 策略也具有粘性的特点,即消费者在某个分区上可以保持相对长时间的粘性。
- 动态调整:CooperativeSticky 策略会根据消费者组中各个消费者的处理情况动态调整分区的分配,以实现更好的负载均衡。
工作原理
- 首次分配:当消费者组第一次订阅主题时,CooperativeSticky 策略会根据消费者的订阅情况和消费能力来为每个消费者分配初始的分区。
- 协作调整:随着消费者组内消费者的工作情况变化,比如消费者加入或退出消费者组、消费速度不均等情况,CooperativeSticky 策略会协作调整分区的分配情况。
- 动态优化:策略会根据消费者的消费速度和处理能力,选择最优的分区分配方式,以使得消费者组内各个消费者的负载尽可能均衡,避免出现某些消费者负责过多分区而导致负载不均衡的情况。
- 保持粘性:同时,CooperativeSticky 策略也会保持一定程度的粘性,确保消费者在一段时间内负责相同的分区,以提高局部性和消费效率。
总结:通过合作式和动态调整的特点,CooperativeSticky 分配策略能够更好地适应消费者组的动态变化,提高消费者组的负载均衡性和整体性能,从而更有效地处理 Kafka 主题的消息消费。
4.5 Offset
Kafka 中的 Offset(偏移量)是指消费者在一个特定分区中的消息消费进度,即消费者已经读取并处理到哪一条消息。
4.5.1 Offset 的作用
- 标识消费位置:Offset 标识了消费者在特定分区中的消费位置,记录了消费者已经处理过的消息。
- 实现消息的顺序消费:通过管理 Offset,消费者可以确保消息被按照正确的顺序消费,避免重复消费或漏掉消息。
- 支持消费者组协调:Offset 的管理可以帮助消费者组内的各个消费者进行协调,避免重复消费,实现负载均衡。
4.5.2 Offset 的类型
- Commit Offset:消费者将已经处理完的消息的 Offset 提交给 Kafka,表示这些消息已经成功消费。Kafka 会记录 Commit Offset,并在下次消费时从提交的 Offset 开始继续。
- Current Offset:当前待消费的消息的 Offset,即消费者下一次将要读取的消息的位置。
- Group Offset:消费者组中每个消费者在每个分区中的消费位置,由消费者组协调器统一管理。
4.5.3 Offset 的存储方式
- 内部存储:Kafka 内部使用 __consumer_offsets 主题来存储消费者组的 Offset 信息,每个消费者组都有一个特定的 Consumer Group ID。
- 外部存储:消费者也可以选择自行管理 Offset 的存储,比如在数据库中记录每个分区的最新 Offset,以便在重新启动消费者时恢复消费位置。
4.5.4 Offset 的管理
- 自动提交:消费者可以选择自动提交 Offset,Kafka 会周期性地自动将消费者消费的 Offset 提交给服务器。但存在风险,可能会导致消息重复消费或漏消费。
- 手动提交:消费者可以手动控制 Offset 的提交时机,确保消息被正确处理。手动提交能够提供更精确的控制,避免不必要的重复消费。
- Seek 操作:消费者可以通过 Seek 操作来调整消费位置,使得消费者可以从指定的 Offset 开始消费消息。
总结:Offset 在 Kafka 中扮演着非常重要的角色,帮助消费者管理消息消费的位置和进度,确保消息被正确、顺序地消费。消费者可以根据业务需求选择合适的Offset策略来进行消息消费。
4.6 消费者事务
4.6.1 关于重复消费与漏消费:
总结:如果想完成Consumer端的精准一次性消费,那么需要Kafka消费端将消费过程和提交Offset过程做原子性绑定,我们需要将Kafka的offset保存到支持事务的自定义介质中,比如Mysql。