如何更好地使用Kafka? - 事先预防篇

要确保Kafka在使用过程中的稳定性,需要从kafka在业务中的使用周期进行依次保障。主要可以分为:事先预防(通过规范的使用、开发,预防问题产生)、运行时监控(保障集群稳定,出问题能及时发现)、故障时解决(有完整的应急预案)这三阶段。

 另外的篇幅请参考

如何更好地使用Kafka? - 事先预防篇-CSDN博客

如何更好地使用Kafka? - 故障时解决-CSDN博客

1. 事先预防原则

事先预防即通过规范的使用、开发,预防问题产生。主要包含集群/生产端/消费端的一些最佳实践、上线前测试以及一些针对紧急情况(如消息积压等)的临时开关功能。

Kafka调优原则:

  1. 确定优化目标,并且定量给出目标(Kafka 常见的优化目标是吞吐量、延时、持久性和可用性);

  2. 确定了目标之后,需要明确优化的维度:

  • 通用性优化:操作系统、JVM 等;

  • 针对性优化:优化 Kafka 的 TPS、处理速度、延时等。

2. 生产端最佳实践

2.1 参数调优

  • 使用 Java 版的 Client;

  • 使用 kafka-producer-perf-test.sh 测试你的环境;

  • 设置内存、CPU、batch 压缩;

  • batch.size:该值设置越大,吞吐越大,但延迟也会越大;

  • linger.ms:表示 batch 的超时时间,该值越大,吞吐越大、但延迟也会越大;

  • max.in.flight.requests.per.connection:默认为5,表示 client 在 blocking 之前向单个连接(broker)发送的未确认请求的最大数,超过1时,将会影响数据的顺序性;

  • compression.type:压缩设置,会提高吞吐量;

  • acks:数据 durability 的设置;

  • 避免大消息(占用过多内存、降低broker处理速度);

  • broker调整:增加 num.replica.fetchers,提升 Follower 同步 TPS,避免 Broker Full GC 等;

  • 当吞吐量小于网络带宽时:增加线程、提高 batch.size、增加更多 producer 实例、增加 partition 数;

  • 设置 acks=-1 时,如果延迟增大:可以增大 num.replica.fetchers(follower 同步数据的线程数)来调解;

  • 跨数据中心的传输:增加 socket 缓冲区设置以及 OS tcp 缓冲区设置。

2.2 开发实践

(1) 做好Topic隔离

根据具体场景(是否允许一定延迟、实时消息、定时周期任务等)区分kafka topic,避免挤占或阻塞实时业务消息的处理。

(2) 做好消息流控

如果下游消息消费存在瓶颈或者集群负载过高等,需要在生产端(或消息网关)实施流量生产速率的控制或者延时/暂定消息发送等策略,避免短时间内发送大量消息。

(3) 做好消息补推

手动去查询丢失的那部分数据,然后将消息重新发送到mq里面,把丢失的数据重新补回来。

(4) 做好消息顺序性保障

如果需要在保证Kafka在分区内严格有序的话(即需要保证两个消息是有严格的先后顺序),需要设置key,让某类消息根据指定规则路由到同一个topic的同一个分区中(能解决大部分消费顺序的问题)。
但是,需要避免分区内消息倾斜的问题(例如,按照店铺Id进行路由,容易导致消息不均衡的问题)。

  1. 生产端:消息发送指定key,确保相同key的消息发送到同一个partition。

  2. 消费端:单线程消费或者写 N 个内存 queue,具有相同 key 的数据都到同一个内存 queue;然后对于 N 个线程,每个线程分别消费一个内存 queue ;

(5) 适当提高消息发送效率
  • 批量发送:kafka先将消息缓存在内存中的双端队列(buffer)中,当消息量达到batch size指定大小时进行批量发送,减少了网络传输频次,提高了传输效率;

  • 端到端压缩消息: 将一批消息打包后进行压缩,发送给 Broker 服务器后,但频繁的压缩和解压也会降低性能,最终还是以压缩的方式传递到消费者的手上,在 Consumer 端进行解压;

  • 异步发送:将生产者改造为异步的方式,可以提升发送效率,但是如果消息异步产生过快,会导致挂起线程过多,内存不足,最终导致消息丢失;

  • 索引分区并行消费:当一个时间相对长的任务在执行时,它会占用该消息所在索引分区被锁定,后面的任务不能及时派发给空闲的客户端处理,若服务端如果启用索引分区并行消费的特性,就可以及时的把后面的任务派发给其他的客户端去执行,同时也不需要调整索引的分区数(但此类消息仅适用于无需保证消息顺序关系的消息)

(6) 保证消息发送可靠性
  • Producer:如果对数据可靠性要求很高的话,在发送消息的时候,需要选择带有 callBack 的api进行发送,并设置 acks、retries、factor等等些参数来保证Producer发送的消息不丢失。

  • Broker:kafka为了得到更高的性能和吞吐量,将数据异步批量的存储在磁盘中,并采用了批量刷盘的做法,如果对数据可靠性要求很高的话,可以修改为同步刷盘的方式提高消息的可靠性。

3. 消费端最佳实践

3.1 参数调优

  • 吞吐量:调整partition 数、OS page cache(分配足够的内存来缓存数据);

  • offset topic(__consumer_offsets):offsets.topic.replication.factor(默认为3)、offsets.retention.minutes(默认为1440,即 1day);

  • offset commit较慢:异步 commit 或 手动 commit

  • fetch.min.bytes 、fetch.max.wait.ms

  • max.poll.interval.ms:调用 poll() 之后延迟的最大时间,超过这个时间没有调用 poll() 的话,就会认为这个 consumer 挂掉了,将会进行 rebalance

  • max.poll.records:当调用 poll() 之后返回最大的 record 数,默认为500

  • session.timeout.ms

  • Consumer Rebalance:check timeouts、check processing times/logic、GC Issues

  • 网络配置

3.2 开发实践

(1) 做好消息消费幂等

消息消费的幂等主要根据业务逻辑做调整。

以处理订单消息为例:

  1. 由订单编号+订单状态唯一的幂等key,并存入redis;

  2. 在处理之前,首先会去查Redis是否存在该Key,如果存在,则说明已经处理过了,直接丢掉;

  3. 如果Redis没处理过,则将处理过的数据插入到业务DB上,再到最后把幂等Key插入到Redis上;

简而言之,即通过Redis做前置处理 + DB唯一索引做最终保证来实现幂等性。

(2) 做好Consumer隔离

在消息量非常大的情况下,实时和离线消费者同时消费一个集群,离线数据繁重的磁盘 IO 操作会直接影响实时业务的实时性和集群的稳定性。

根据消费的实时性可以将消息消费者行为划分两类:实时消费者和离线消费者。

  • 实时消费者:对数据实时性要求较高;在实时消费的场景下,Kafka 会利用系统的 page cache 缓存,直接从内存转发给实时消费者(热读),磁盘压力为零,适合广告、推荐等业务场景。

  • 离线消费者(定时周期性消费者):通常是消费数分钟前或是数小时前的消息,这类消息通常存储在磁盘中,消费时会触发磁盘的 IO 操作(冷读),适合报表计算、批量计算等周期性执行的业务场景。

(3) 避免消息消费堆积
  • 延迟处理、控制速度,时间范围内分摊消息(针对实时性不高的消息);

  • 生产速度大于消费速度,这样可以适当增加分区,增加consumer数量,提升消费TPS;

  • 避免很重的消费逻辑,优化consumer TPS:

    • 是否有大量DB操作;

    • 下游/外部服务接口调用超时;

    • 是否有lock操作(导致线程阻塞);

    • 需要特别关注kafka异步链路中的涉及消息放大的逻辑;

  • 如果有较重的消费逻辑,需要调整xx参数,避免消息没消费完时,消费组退出,造成reblance等问题

  • 确保consumer端没有因为异常而导致消费hang住;

  • 如果使用的是消费者组,确保没有频繁地发生rebalance

  • 多线程消费,批量拉取处理;

注:批量拉取处理时,需注意下kafka版本,spring-kafka 2.2.11.RELEASE版本以下,如果配置kafka.batchListener=true,但是将消息接收的元素设置为单个元素(非批量List),可能会导致kafka在拉取一批消息后,仅仅消费了头部的第一个消息。

(4) 避免Rebalance问题

A. 触发条件:

  • 消费者数量变化: 新消费者加入、消费者下线(未能及时发送心跳,被“踢出”Group)、消费者主动退出消费组(Consumer 消费时间过长导致)

  • 消费组内订阅的主题或者主题的分区数量发生变化;

  • 消费组对应的 GroupCoorinator 节点发生变化

B. 如何避免非必要rebalance(消费者下线、消费者主动退出消费组导致的reblance):

  1. 需要仔细地设置session.timeout.ms(决定了 Consumer 存活性的时间间隔)和heartbeat.interval.ms(控制发送心跳请求频率的参数) 的值。

  2. max.poll.interval.ms参数配置:控制 Consumer 实际消费能力对 Rebalance 的影响,限定了 Consumer 端应用程序两次调用 poll 方法的最大时间间隔。默认值是 5 分钟,表示 Consumer 程序如果在 5 分钟之内无法消费完 poll 方法返回的消息,那么 Consumer 会主动发起“离开组”的请求,Coordinator 也会开启新一轮 Rebalance。具体可以统计下历史的时间花费,把最长的时间为参考进行设置。

(5) 保证消息消费可靠性

一般情况下,还是client 消费 broker 丢消息的场景比较多,想client端消费数据不能丢,肯定是不能使用autoCommit的,所以必须是手动提交的。

Consumer自动提交的机制是根据一定的时间间隔,将收到的消息进行commit。commit过程和消费消息的过程是异步的。也就是说,可能存在消费过程未成功(比如抛出异常),commit消息已经提交了,则此时消息就丢失了。

(6) 保证消息消费顺序性
  1. 不同topic(乱序消息):如果支付与订单生成对应不同的topic,只能在consumer层面去处理了。

  2. 同一个topic(乱序消息):一个topic可以对应多个分区,分别对应了多个consumer,与“不同topic”没什么本质上的差别。(可以理解为我们的服务有多个pod,生产者顺序发送消息,但被路由到不同分区,就可能变得乱序了,服务消费的就是无序的消息)

  3. 同一个topic,同一个分区(顺序消息):Kafka的消息在分区内是严格有序的,例如把同一笔订单的所有消息,按照生成的顺序一个个发送到同一个topic的同一个分区。

针对乱序消息:

例如:订单和支付分别封装了各自的消息,但是消费端的业务场景需要按订单消息->支付消息的顺序依次消费消息。

  • 宽表(业务主题相关的指标、维度、属性关联在一起的一张数据库表):消费消息时,只更新对应的字段就好,消息只会存在短暂的状态不一致问题,但是状态最终是一致的。例如订单,支付有自己的状态字段,订单有自己的状态字段,售后有自己的状态字段,就不需要保证支付、订单、售后消息的有序,即使消息无序,也只会更新自己的状态字段,不会影响到其他状态;

  • 消息补偿机制:将消息与DB进行对比,如果发现数据不一致,再重新发送消息至主进程处理,保证最终一致性;

  • MQ队列:一个中间方(比如redis的队列)来维护MQ的顺序;

  • 业务保证:通过业务逻辑保障消费顺序;

针对顺序消息:

两者都是通过将消息绑定到定向的分区或者队列来保证顺序性,通过增加分区或者线程来提升消费能力。

A. Consumer单线程顺序消费

生产者在发送消息时,已保证消息在分区内有序,一个分区对应了一个消费者,保证了消息消费的顺序性。

B. Consumer多线程顺序消费(具体策略在后面章节)

单线程顺序消费的扩展能力很差。为了提升消费者的处理速度,除了横向扩展分区数,增加消费者外,还可以使用多线程顺序消费。

将接收到的kafka数据进行hash取模(注意:如果kafka分区接受消息已经是取模的了,这里一定要对id做一次hash再取模)发送到不同的队列,然后开启多个线程去消费对应队列里面的数据。

此外,这里通过配置中心进行开关、动态扩容/缩容线程池。

(7) 处理Consumer的事务

通过事务消息,可以很好的保证一些业务场景的事务逻辑,不会因为网络不可用等原因出现系统之间状态不一致。

当更新任何一个服务出现故障时就抛出异常,事务消息不会被提交或回滚,消息服务器会回调发送端的事务查询接口,确定事务状态,发送端程序可以根据消息的内容对未做完的任务重新执行,然后告诉消息服务器该事务的状态。

4. 集群配置最佳实践

4.1 集群配置

  • Broker 评估:每个 Broker 的 Partition 数不应该超过2k、控制 partition 大小(不要超过25GB);

  • 集群评估(Broker 的数量根据以下条件配置):数据保留时间、集群的流量大小;

  • 集群扩容:磁盘使用率应该在 60% 以下、网络使用率应该在 75% 以下;

  • 集群监控:保持负载均衡、确保 topic 的 partition 均匀分布在所有 Broker 上、确保集群的阶段没有耗尽磁盘或带宽

4.2 Topic 评估

  • Partition 数:

    • Partition 数应该至少与最大 consumer group 中 consumer 线程数一致;

    • 对于使用频繁的 topic,应该设置更多的 partition;

    • 控制 partition 的大小(25GB 左右);

    • 考虑应用未来的增长(可以使用一种机制进行自动扩容);

  • 使用带 key 的 topic;

  • partition 扩容:当 partition 的数据量超过一个阈值时应该自动扩容(实际上还应该考虑网络流量)。

4.3 分区配置

设置多个分区在一定程度上是可以提高消费者消费的并发度,但是分区数量过多时可能会带来:句柄开销过大、生产端占用内存过大、可能增加端到端的延迟、影响系统可用性、故障恢复时间较长等问题。

根据吞吐量的要求设置 partition 数:

  1. 假设 Producer 单 partition 的吞吐量为 P

  2. consumer 消费一个 partition 的吞吐量为 C

  3. 而要求的吞吐量为 T

  4. 那么 partition 数至少应该大于 T/P、T/c 的最大值

5. 性能调优

调优目标:高吞吐量、低延时。

5.1 分层调优

自上而下分为应用程序层、框架层、JVM层和操作系统层,层级越靠上,调优的效果越明显。

调优类型

建议

操作系统

挂载文件系统时禁掉atime更新;选择ext4或XFS文件系统;swap空间的设置;页缓存大小

JVM(堆设置和GC收集器)

将JVM 堆大小设置成 6~8GB;建议使用 G1 收集器,方便省事,比 CMS 收集器的优化难度小

Broker端

保持服务器端和客户端版本一致

应用层

要频繁地创建Producer和Consumer对象实例;用完及时关闭;合理利用多线程来改善性能

5.2 吞吐量(TPS)调优


参数列表

Broker端

适当增加num.replica.fetchers参数值,但不超过CPU核数

调优GC参数以避免经常性的Full GC

Producer端

适当增加batch.size参数值,比如从默认的16KB增加到512KB或1MB

适当增加linger.ms参数值,比如10~100

设置compression.type=lz4或zstd

设置acks=0或1

设置retries=0

如果多线程共享同一个Producer实例,则增加buffer.memory参数值

Consumer端

采用多Consumer进程或线程同时消费数据

增加fetch.min.bytes参数值,比如设置成1KB或更大

5.3 延时调优


参数列表

Broker端

适当设置num.replica.fetchers值

Producer端

设置linger.ms=0

不启用压缩,即设置compression.type=none

设置ackes=1

Consumer端

设置fetch.min.bytes=1

6. 稳定性测试

kafka的稳定性测试主要在业务上线前针对Kafka实例/集群健康性、高可用性的测试。

6.1 健康性检查

(1) 检查实例:查看Kafka 实例对象中拿到所有的信息(例如 IP、端口等);

(2) 测试可用性:访问生产者和消费者,测试连接。

6.2 高可用测试

A. 单节点异常测试:重启Leader副本或Follower副本所在Pod

步骤:

  1. 查看topic的副本信息

  2. 删除相应pod

  3. 脚本检测Kafka的可用性

预期:对生产者和消费者的可用性均无影响。

B. 集群异常测试:重启所有pod

步骤:

  1. 删除所有pod

  2. 脚本检测Kafka的可用性

预期:所有broker ready后服务正常。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/7986.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Cargo - 构建 rust项目、管理依赖包

文章目录 关于 Cargo构建项目创建工程编译运行buildclean 管理依赖添加依赖updatecheck计时 manual rust 安装可参考:https://blog.csdn.net/lovechris00/article/details/124808034 关于 Cargo Cargo 官方文档 : https://doc.rust-lang.org/cargo/crat…

文本转图表的AI工具-Chart-GPT

Chart-GPT Chart-GPT一款基于 GPT 实现的开源工具,可在几秒内,将文本快速转换为各种图表。用户只需在输入字段中输入数据说明和所需的图表类型,Chart-GPT的后台生成器即可建出多种类型的图表,包括条形图、折线图、组合图、散点图、…

「Dasha and Photos」Solution

简述题意 给定一个 n m n \times m nm 的方格,每个格子里有一个小写英文字母。 现在你有 k k k 个 n m n \times m nm 的方格,这些方格都是给定方格的基础上将左上角为 ( a i , b i ) (a_i,b_i) (ai​,bi​),右下角为 ( c i , d i ) …

【LAMMPS学习】八、基础知识(5.11)磁自旋

8. 基础知识 此部分描述了如何使用 LAMMPS 为用户和开发人员执行各种任务。术语表页面还列出了 MD 术语,以及相应 LAMMPS 手册页的链接。 LAMMPS 源代码分发的 examples 目录中包含的示例输入脚本以及示例脚本页面上突出显示的示例输入脚本还展示了如何设置和运行各…

FFmpeg 音视频处理工具三剑客(ffmpeg、ffprobe、ffplay)

【导读】FFmpeg 是一个完整的跨平台音视频解决方案,它可以用于音频和视频的转码、转封装、转推流、录制、流化处理等应用场景。FFmpeg 在音视频领域享有盛誉,号称音视频界的瑞士军刀。同时,FFmpeg 有三大利器是我们应该清楚的,它们…

2024年第九届数维杯数学建模B题思路分享

文章目录 1 赛题思路2 比赛日期和时间3 竞赛信息4 建模常见问题类型4.1 分类问题4.2 优化问题4.3 预测问题4.4 评价问题 5 建模资料 1 赛题思路 (赛题出来以后第一时间在CSDN分享) https://blog.csdn.net/dc_sinor?typeblog 2 比赛日期和时间 报名截止时间:2024…

Ansible-playbook剧本

目录 一、Ansible playbook简介 2.1 playbook格式 2.2 playbook组成部分 二、playbook示例 2.1 yaml文件编写 2.2 运行playbook 2.3 定义、引用变量 2.4 指定远程主机sudo切换用户 ​编辑 2.5 when条件判断 ​编辑​编辑 2.6 迭代 ​编辑 ​编辑 三、总结 Ansib…

2023黑马头条.微服务项目.跟学笔记(五)

2023黑马头条.微服务项目.跟学笔记 五 延迟任务精准发布文章 1.文章定时发布2.延迟任务概述 2.1 什么是延迟任务2.2 技术对比 2.2.1 DelayQueue2.2.2 RabbitMQ实现延迟任务2.2.3 redis实现3.redis实现延迟任务4.延迟任务服务实现 4.1 搭建heima-leadnews-schedule模块4.2 数据库…

简单了解泛型

基本数据类型和对应的包装类 在Java中, 基本数据类型不是继承自Object, 为了在泛型代码中可以支持基本类型, Java给每个基本类型都对应了一个包装类型. 简单来说就是让基本数据类型也能面向对象.基本数据类型可以使用很多方法, 这就必须让它变成类. 基本数据类型对定的包装类…

sql注入练习

1.什么是SQL注入 SQL注入是比较常见的网络攻击方式之一,它不是利用操作系统的BUG来实现攻击,而是针对程序员编写时的疏忽,通过SQL语句,实现无账号登录,甚至篡改数据库 2.sql注入原理 攻击者注入一段包含注释符的SQL语…

能将图片转为WebP格式的WebP Server Go

本文完成于 2023 年 11 月 之前老苏介绍过 webp2jpg-online,可以将 webp 格式的图片,转为 jpg 等,今天介绍的 WebP Server Go 是将 jpg 等转为 webp 格式 文章传送门:多功能图片转换器webp2jpg-online 什么是 WebP ? WebP 它是由…

Vue 路由

单应用程序 SPA - Single Page Application 所有功能在一个html页面上实现 单页面应用 多用于 系统类网站/内部网站/文档类网站/移动端站点 多页面应用 多用于 公司官网/电商类网站 路由 单页面应用按需更新页面,需要明确访问路径和组件的对应关系 Vue中的路…

重学java 30.API 1.String字符串

于是,虚度的光阴换来了模糊 —— 24.5.8 一、String基础知识以及创建 1.String介绍 1.概述 String类代表字符串 2.特点 a.Java程序中的所有字符串字面值(如“abc”)都作为此类的实例(对象)实现 凡是带双引号的,都是String的对象 String s "abc&q…

修改ElTable组件的样式(element-plus)

效果展示 <div class"table_main"><ElTable:data"tableList":header-cell-style"{color: #ffffff,background: #6f7f93,}"class"table_border":highlight-current-row"false"><ElTableColumn type"inde…

CentOS 自建gitlab仓库:安装相关工具

所需环境 Node 安装项目依赖、项目打包运行Nginx 前端项目部署&#xff08;正向代理、反向代理、负载均衡等&#xff09;Git 自动化部署时 拉取代码使用GitLab 代码仓库GitLab-Runner GitLab的CI/CD执行器 一、安装Node 检测是否已安装 常用node -v 命令检测。 如果已安装&a…

为什么你的企业需要微信小程序?制作微信小程序有什么好处?

什么是小程序&#xff1f; WeChat小程序作为更大的WeChat生态系统中的子应用程序。它们就像更小、更基本的应用程序&#xff0c;在更大的应用程序&#xff08;WeChat&#xff09;中运行。这些程序为用户提供了额外的高级功能&#xff0c;以便在使用WeChat服务时加以利用。根据…

DeepSeek发布全新开源大模型,GPT-4级别能力 价格仅百分之一

最新国产开源MoE大模型&#xff0c;刚刚亮相就火了。 DeepSeek-V2性能达GPT-4级别&#xff0c;但开源、可免费商用、API价格仅为GPT-4-Turbo的百分之一。 因此一经发布&#xff0c;立马引发不小讨论。 从公布的性能指标来看&#xff0c;DeepSeek-V2的中文综合能力超越一众开源…

运维实施工程师之Linux服务器全套教程

一、Linux目录结构 1.1 基本介绍 Linux 的文件系统是采用级层式的树状目录结构&#xff0c;在此结构中的最上层是根目录“/”&#xff0c;然后在此目录下再创建其他的目录。 在 Linux 世界里&#xff0c;一切皆文件&#xff08;即使是一个硬件设备&#xff0c;也是使用文本来标…

校园论坛系统基于PHP的校园管理系统毕设校园好感度系统 校园文化建设系统APP小程序H5前后端源码交付支持二开,一次付款,终生使用

APP小程序H5前后端源码交付&#xff0c;支持二开&#xff0c;一次付款&#xff0c;终身使用&#xff0c;免费更新系统本身源码。 校园社交网络系统开发是一个复杂且综合性的项目&#xff0c;旨在为学生、教师和管理人员提供一个互动、分享和交流的平台。以下是一个关于校园社交…

Hive Bucketed Tables 分桶表

Hive Bucketed Tables 分桶表 1.分桶表概念 2.分桶规则 3.语法 4.分桶表的创建 5.分桶表的好处