文章目录
- 术语
- 消息
- 主题和分区
- 集群和分区副本
- 消费者组
- 重新平衡组/分区再均衡
- 消费者的分区策略
- 群组协调者Coordinator 和群组领导者 Group Leader
- 流程
- 初始化流程
- 消费流程
- Consumer重平衡
- 消费者核心配置
- 示例代码
- 高级
- 提交偏移量的几种方式
- 自动提交
- 手动同步提交
- 手动异步提交
- 提交特定偏移量
- 一次读取多条消息
- 参考
Kafka 可视化:https://softwaremill.com/kafka-visualisation/
模拟数据如何流经复制的 Kafka 主题,以更好地了解消息处理模型。
术语
消息
消息(根据用例也可以称为“记录”或“事件”)是 Kafka 处理的基本数据单位。其有效负载可以是任何二进制格式,也可以是纯文本、Avro、XML 或 JSON 等文本格式。
每个生产者都必须指定一个序列化器,将消息对象转换为二进制有效负载格式。每个消费者都必须指定一个相应的反序列化器,将有效负载格式转换回其 JVM 中的对象。
-
键,也可以是任何二进制格式。如果我们使用键,我们也需要序列化和反序列化。Kafka 使用键进行分区。
-
时间戳表示消息的生成时间。Kafka 使用时间戳来对消息进行排序或实施保留策略。
-
应用headers头部来将元数据与有效负载关联起来。例如,Spring 默认添加了用于序列化和反序列化的类型头部。
主题和分区
默认情况下,主题的保留策略是7天,即在7天后,Kafka 会自动删除消息,无论是否已传递给消费者。如果有必要,我们可以进行配置。
主题由分区(至少一个)组成。确切地说,消息存储在主题的一个分区中。在一个分区中,消息会获得一个顺序编号(偏移量)。这可以确保消息以存储在分区中的顺序传递给消费者。通过存储消费者群组已接收的偏移量,Kafka 保证了仅一次传递。
通过处理多个分区,我们可以确定 Kafka 可以在消费者进程池中既提供顺序保证又进行负载平衡。
当消费者订阅主题时,会将一个消费者分配给一个分区。
集群和分区副本
Kafka 使用主题分区来实现并行消息传递和消费者负载平衡。但 Kafka 本身必须具有可扩展性和容错性。因此,我们通常不使用单个 Kafka Broker,而是使用由多个 Broker 组成的集群。这些 Broker 的行为并不完全相同,但每个 Broker 都分配有特殊任务,如果一个 Broker 发生故障,集群的其余部分可以接管这些任务。
Broker1是分区 1 的领导者,而 Broker 4 是分区 2 的领导者。因此,每个客户端在从这些分区发送或轮询消息时都会连接到这些代理。为了获取有关分区领导者和其他可用代理(元数据)的信息,有一个特殊的引导机制。总之,我们可以说每个代理都可以提供集群的元数据,因此客户端可以初始化与每个代理的连接,然后重定向到分区领导者。这就是我们可以指定多个代理作为引导服务器的原因。
如果一个分区领导者代理发生故障,Kafka 将声明其中一个仍在工作的代理为新的分区领导者。然后,所有客户端都必须连接到新的领导者。在我们的示例中,如果代理 1 发生故障,代理 2 将成为分区 1 的新领导者。然后,连接到代理 1 的客户端必须切换到代理 2。
消费者组
在 Kafka 中,消费者通常属于某个消费者组。当多个组读取同一个主题时,它们之间不会互相干扰。Kafka 引入消费者组是因为消费者经常需要执行耗时操作,如将数据写入数据库或进行复杂计算。单个消费者处理不过来时,可以增加更多消费者,让它们分担任务,每个消费者处理一部分消息。这样,Kafka 就可以实现更好的扩展性。
可以通过在消费者的属性文件中设置 group.id 来为消费者设置组。如果您没有指定 group.id,那么消费者组将被随机生成。
- 同一个分区只能被同一个群组中的一个消费者读取,不会有多个消费者同时读取同一个分区
- 消费者组Group1中的Consumer4没有任务,也不会读取任何分区的数据。这提醒我们在使用时应该适当设置消费者数量,避免造成不必要的空闲和额外开销。
- 消费者组Group2中的消费者数量小于主题的分区数时,就会出现一个消费者Consumer2处理多个分区的情况,这会导致消费效率降低。因此,在使用时应确保消费者数量能够覆盖所有的分区,以提高消费效率。
重新平衡组/分区再均衡
随着新成员加入和老成员离开,分区会重新分配,以使每个成员获得对分区的比例份额。这被称为重新平衡组。
一个broker被指定为组的协调者 - coordinator ,负责管理组成员及其分区分配。
每个组的协调者 - coordinator是从内部偏移量主题__consumer_offsets
的领导者中选择的,该主题用于存储提交的偏移量。
基本上,组的ID被哈希到该主题的一个分区,该分区的领导者被选为协调者。通过这种方式,消费者组的管理大致均匀分布在集群中的所有代理中,这允许通过增加代理的数量来扩展组的数量。
当消费者启动时,它会找到其组的协调者,并发送请求加入该组。然后,协调者开始进行组重新平衡,以便为新成员分配其公平份额的分区。每次重新平衡都会导致组的新一代。
组中的每个成员必须向协调者发送心跳,以保持组的成员身份。如果在配置的会话超时之前未收到心跳,则协调者将踢出该成员,并将其分区重新分配给另一个成员。
在重新平衡期间,消费者无法消费消息。
消费者的分区策略
Kafka的Topic是由分区组成的,并且还可以配置分区的冗余度。一个分区在多个Broker中选举出一个Leader,消费者只访问这个Leader的分区副本。
消费者组订阅Topic,意味着该Topic下的所有分区都会被消费者组中的消费者消费,如果按照从属关系来说,Topic下的每个分区只属于消费者组中的一个消费者,不可能出现组中的两个消费者负责同一个分区。
Kafka通过配置消费者分区分配策略来决定分区中的消息被哪一个消费者消费。消费者分区的分配策略都应该实现org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor接口。通过实现这个接口,用户可以自定义分区分配策略。Kafka提供了3种实现的方式,可以通过参数partition.assignment.strategy进行指定。
(1)RangeAssignor
默认的分区分配策略。这种分配策略是根据Kafka Consumer端的总数和Topic中的分区总数来获取一个范围的,然后将分区按照范围进行平均分配,以保证分区尽可能均匀地分配给所有消费者。
(2)RoundRobinAssignor
这种分区分配策略对应的partition.assignment.strategy参数值为:org.apache.kafka. clients.consumer.RoundRobinAssignor。这种方式将Consumer Group中的所有消费者及其订阅Topic的分区按照字典序列排序,然后通过轮询的方式逐个将分区分配给每个消费者。
(3)StickyAssignor
这种分区分配策略采用黏性分配策略,该策略从Kafka 0.11版本引入。所谓黏性分配策略,既要保证分区的分配要尽可能均匀,又要保证每次分区的分配尽可能与上次分配的保持相同,就像进行粘贴一样。如果这两点发生冲突,优先考虑第一点,即分区的分配要尽可能均匀。
群组协调者Coordinator 和群组领导者 Group Leader
当我们实例化一个消费者群组时,Kafka 也会创建群组协调者。群组协调者定期从消费者接收称为心跳的请求。如果消费者停止发送心跳,协调者会认为该消费者已离开群组或崩溃。这是分区重新平衡的一个可能触发因素。
第一个请求加入群组协调者的消费者成为群组领导者。当由于任何原因发生重新平衡时,群组领导者会从群组协调者那里接收到群组成员列表。然后,群组领导者使用 partition.assignment.strategy
配置中设置的可定制策略,在列表中的消费者之间重新分配分区。
受支持的分区分配策略,当使用组管理时,客户端将使用这些策略在消费者实例之间分配分区所有权。可用选项包括:
org.apache.kafka.clients.consumer.RangeAssignor
:根据每个主题分配分区。org.apache.kafka.clients.consumer.RoundRobinAssignor
:以循环方式将分区分配给消费者。org.apache.kafka.clients.consumer.StickyAssignor
:保证最大程度平衡的分配,同时尽可能多地保留现有分区分配。org.apache.kafka.clients.consumer.CooperativeStickyAssignor
:遵循相同的 StickyAssignor 逻辑,但允许合作重新平衡。
默认分配器是 [RangeAssignor, CooperativeStickyAssignor],默认情况下将使用 RangeAssignor,但允许通过一次滚动反弹升级到 CooperativeStickyAssignor,从而将 RangeAssignor 从列表中删除。
流程
初始化流程
1.消费者与kafka
Broker交互
2.寻找自己所在组对应的Coordinator
3.消费者会向对应的Coordinator
发送加入消费者组请求
4.Coordinator在收到消费者加入消费者组请求后,会从同一个消费者组中选择一个消费者作为leader
,其余消费者作为`flower
5.如果某一消费者被Coordinator
选为leader
后,那么就需要负责制定该分组分区分配方案,该方法先寻找对应的分区分配器,根据分配器来给消费者组中的消费者分配分区
6.消费者在收到加入消费者组的响应后,如果被选为leader
,那么该消费者负责制定分区分配方案,其它消费者不需要,之后所有消费者再次向Coordinator
发送同步组请求
7.Coordinator
在收到消费者leader
制定的分区分配方案后,会将该方案通知到各个消费者,告诉每个消费者应该消费哪些分区
8.消费者反序列化分区信息并在本地设置分区消息。
每个Broker都会有一个
coordinator
,他的作用是辅助实现消费者组的初始化的分区的分配,coordinator节点选择就等于group id的hashcode值 % 50,这个50是__consumer_offsets的分区数量,是默认的。
消费流程
- Consumer采取的pull拉取的方式,首先会初始化一个
ConsumerNetworkClient
,这个是消费者网络连接客户端- 消费者组的消费者想要拉取数据首先给
ConsumerNetworkClient
发送一个sendFetches
请求- 收到请求之后,
ConsumerNetworkClient
就向broker发送请求了(send)- send有一个回调方法
onSuccess
,如果成功就把数据放入completedFetches
队列- Consumer在这个队列拉取数据,首先经过反序列化、然后拦截器最后就可以处理数据了
1.消费者有了分区信息后就可以拉取该分区存储的消息记录,在拉取消息记录之前,必须要明确从什么位置开始拉取,因此需要初始化分区偏移量。
2.**消费者向Coordinator
询问对应的offset,得到偏移量后进行本地初始化。**获取所有初始化状态分区列表,向Coordinator
询问对应的offset,得到偏移量后进行本地初始化
3.发送拉取请求,Consumer 在调用poll方法时,如果本地缓存区中(completedFeches) 存在未消费的消息,则直接从本地缓存区中拉取消息,否则会调用client#send
方法进行异步多线程并行发送拉取请求
4.拉取消息记录,每次拉取完消息后,消费者会将分区本地offset设置为最后一条消息对应offset + 1
5.消费消息,在拉取到消息后,就需要对消息记录进行消费
6.消费的过程也就是遍历消息记录,然后调用对应被@KafkaListener
注解标注的方法
Consumer重平衡
作用
让消费者分组内消费者消费哪些主题分区达成一致。重平衡需要借助Kafka Broker
端的Coordinator
组件,在Coordinator
的帮助下完成消费者组的分区重新分配。
触发时机
-
消费组的成员数量发生变化,比如有新成员加入,有成员退出。
- 正常的新成员加入和退出。
- 消费者故障了,也会导致消费组的成员数量发生变化,比如消费者hang了,在规定的时间范围内,消费者没有向群组协调器发送任何心跳,消费者被群组协调器认为是
DEAD
状态,这也会导致重平衡的发生。
-
消费者组订阅的主题数量发生变化,比如通过正则表达式进行订阅,有新的主题满足表达式,主题数量增多。
-
消费者组订阅的主题的分区数量发生变化,比如broker节点宕机,导致分区数减少。
无论是增减分区数,还是增加消费者数量,这都是主动控制。而消费者故障是最容易发生的,也是最常见引发重平衡的地方。
怎么避免消费者发生故障?
崩溃或者故障离组是指消费者实例出现严重故障,突然宕机导致的离组。它和主动离组是有区别的,因为后者是主动发起的离组,协调者能马上感知并处理。但崩溃离组是被动的,协调者通常需要等待一段时间才能感知到,这段时间一般是由消费者端参数 session.timeout.ms 控制的。
这也是没法避免的,我们的做的是保证不要让kafka错误判断消费者发生故障。这主要与三个参数有关系:
-
session.timeout.ms
:控制心跳超时时间,一般来说,超时时间应该是心跳间隔的 3 倍时间。 -
heartbeat.interval.ms
:控制发送心跳的频率,频率越高越不容易被误判,但是消耗更多资源。 -
max.poll.intervel.ms
:控制poll间隔。消费者poll数据后,进行业务处理,然后再次拉取数据。如果两次poll的间隔大于这个参数值,就会被提出消费者组,默认是5分钟 -
max.poll.records
: 每次poll的最大消息数。获取的消息条数越多,需要处理的时间越长。所以每次拉取的消息数不能太多,需要保证在 max.poll.interval.ms 设置的时间内能消费完,否则会发生 rebalance。这个是最最容易出问题的,如果消费者的消费速度过慢,就会导致这个问题。
简单来说,会导致崩溃的几个点是:
- 消费者心跳超时,导致 rebalance。
- 消费者处理时间过长,导致 rebalance。
较为合理的配置是:
session.timeout.ms = 6000
heartbeat.interval.ms = 2000
max.poll.interval.ms = 消费者处理消息最长耗时+1分钟
max.poll.records
= 500(根据自己的业务场景适当调整,减少每次处理的消息数会帮助解决消费处理超时问题)
重平衡流程
重平衡是通过消费者端的心跳线程通知其他的消费实例发生重平衡。重平衡开启后,borker
通过维护一套消费者组状态机来协调完成整个重平衡机制。
重平衡带来的问题
在重平衡过程中,消费者是不能从kafka消费消息的
,这就对系统性能造成影响。如果kafka节点较多,比如数百个,那么重平衡所需要的时间可能数分钟,甚至数小时,这段期间kafka几乎是不可用。所以,在实际环境中,我们要尽量避免重平衡的发生
消费者核心配置
-
bootstrap.servers:指定了Kafka集群的连接字符串,它的用途与在KafkaProducer中的用途是一样的。
-
key.deserializer和value.deserializer:指定消息的键和值的反序列化器
-
group.id:表示消费者组名称,如果group.id相同则表示属于一个消费者组中的成员。
-
enable.auto.commit 默认启用。消费者将按照 auto.commit.interval.ms 设置的时间间隔定期自动提交偏移量,默认为 5 秒。 只有当消息提交以后,该消息才不会被再次接收到,也可以通过consumer.commitSync()的方式实现手动提交。
-
auto.offset.reset 控制消费者在尝试读取没有上次读取偏移量的分区时的行为方式。有两种设置:最新和最早。最新的将从最新的可用消息开始,最早的将从最早的可用偏移量开始。
auto.offset.reset=latest情况下,新的消费者将会从其他消费者最后消费的offset处开始消费Topic下的消息
auto.offset.reset= earliest情况下,新的消费者会从该topic最早的消息开始消费
auto.offset.reset=none情况下,新的消费者加入以后,由于之前不存在offset,则会直接抛出异常。
-
fetch.max.bytes: 用于配置每个消费者拉取请求的最大字节数。
-
key.deserializer 和 value.deserializer: 分别用于指定消费者解析消息键和值的反序列化器。
-
isolation.level: 设置事务隔离级别,可选 “read_uncommitted” 和 “read_committed”。
-
client.id: Kafka 集群中用于标识客户端。它用于将请求与发出请求的客户端关联起来。通常建议同一组中的所有使用者具有相同的客户端 ID,以强制执行单个组的客户端配额。
-
session.timeout.ms 默认为 10 秒。该值指定代理在将消费者标记为死亡之前至少需要从客户端获取一次心跳的时间量。
用于检测工作器故障的超时时间。工作线程定期发送心跳以向代理表明其活跃度。如果在此会话超时到期之前代理没有收到心跳,则代理将从组中删除该工作线程并启动重新平衡。请注意,该值必须在代理配置中
group.min.session.timeout.ms
和所配置的允许范围内group.max.session.timeout.ms
。 -
heartbeat.interval.ms 默认为 3 秒。该值指定消费者发送心跳信号的频率。
使用 Kafka 的组管理工具时,向组协调器发出心跳的预期时间间隔。心跳用于确保工作人员的会话保持活动状态,并在新成员加入或离开组时促进重新平衡。该值必须设置为低于
session.timeout.ms
,但通常不应高于该值的 1/3。它可以调整得更低,以控制正常重新平衡的预期时间。 -
max.poll.interval.ms 默认 300 秒。指定代理在调用 poll 方法之间等待的时间,导致消费者在将消费者标记为死亡之前尝试获取更多消息。
使用消费者组管理时,调用 poll() 之间的最大延迟。这对消费者在获取更多记录之前可以空闲的时间设置了上限。如果在此超时到期之前未调用 poll(),则消费者将被视为失败,并且组将重新平衡,以便将分区重新分配给另一个成员。对于使用非空的消费者
group.instance.id
达到此超时,分区不会立即重新分配。相反,消费者将停止发送心跳,并且分区将在 过期后重新分配session.timeout.ms
。这反映了已关闭的静态消费者的行为。 -
fetch.min.bytes 消费者将从代理获取的最小数据量。如果代理的可用数据少于此值,消费者将等待,直到有更多可用数据。这可用于最大限度地减少消费者和代理之间的来回(可能提高吞吐量),但以延迟为代价)。
- 该参数用来配置Kafka消费者在一次拉取请求中能从Kafka中拉取的最小数据量,即调用poll()方法时,每次拉取的数据量,其默认值为1字节。
- 消费者在拉取数据时,如果Kafka服务器端返回给消费者的数据量小于这个参数值的设定,那么消费者就需要进行等待,直到数据量满足这个参数的配置大小。因此在实际运行环境中,可以适当调大这个参数的值以提高一定的吞吐量。另外,增大这个参数值也会造成额外的延迟,因此增大该参数不适合敏感的应用。
-
fetch.max.wait.ms 从代理获取消息之前等待的最长时间。与 fetch.min.bytes 结合使用,可以设置消费者等待时间的上限。如果超过此值,消费者将从代理获取消息
- 该参数也和fetch.min.bytes参数有关。前面提到,如果Kafka服务器端返回给消费者的数据量小于fetch.min.bytes参数值的设定,消费者就需要等待,直到数据量满足这个参数的配置大小。然而有可能会一直等待而无法将消息发送给消费者,显然这是不合理的。fetch.max.wait.ms参数用于指定Kafka的等待时间,默认值为500ms。当Kafka满足不了fetch.min.bytes参数值的设定时,Kafka集群也会根据fetch.max.wait.ms参数值的设定,默认等待5s,然后将消息数据返回给消费者。综合来看,fetch.min.bytes和fetch.max.wait.ms都有可能造成消息的延迟处理。如果业务应用对延迟敏感,那么可以适当调小这些参数。
-
max.poll.records 在单个轮询调用中要获取的最大记录数。对于控制每个消费者内的吞吐量很有用。
- 该参数用来配置Kafka消费者在一次拉取请求中拉取的最大消息数,其默认值为500条。如果消息数都比较小,则可以适当调大这个参数值来提升消费速度
-
max.partition.fetch.bytes 每个分区获取的最大字节数。对于限制客户端获取消息所需的内存上限很有用。当设置此值时,请记住 max.poll.interval 设置,因为该值较大可能会导致传递的数据多于轮询间隔窗口中可能获取的数据。
- 该参数用来配置从每个分区里返回给消费者的最大数据量,其默认值为1 048 576字节,即1MB。这个参数与fetch.max.bytes参数相似,只不过max.partition.fetch.bytes用来限制一次拉取中每个分区消息的字节数,而fetch.max.bytes用来限制一次拉取中整体消息的字节数。同样,如果这个参数设定的值比消息字节数小,那么也不会造成无法消费。
-
partition.assignment.strategy 分区分配策略控制 Group Leader 消费者如何在消费者组中的消费者之间划分分区。
-
connections.max.idle.ms: 用来指定在多长时间之后,关闭闲置的Kafka消费者连接,默认值是540 000ms,即9min。
示例代码
Properties properties = new Properties();
// 连接 bootstrap.servers
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "xxx:9092, xxx:9092 ");
// 反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
// 配置消费者组 id
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group");
// 手动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// 从 Kafka 主题的开头读取
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// 1.创建一个消费者
KafkaConsumer<String, String> kafkaConsumer = newKafkaConsumer<>(properties);
// 2.订阅主题 first
ArrayList<String> topics = new ArrayList<>();
topics.add("first");
kafkaConsumer.subscribe(topics);
// 3.轮训消费数据
while (true) {ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> record : consumerRecords) {log.info("Key: " + record.key() + ", Value: " + record.value());log.info("Partition: " + record.partition() + ", Offset:" + record.offset());}// 手动提交 offset// kafkaConsumer.commitSync();kafkaConsumer.commitAsync();
}
高级
提交偏移量的几种方式
在Kafka中,消费者从分区读取消息。在读取消息时,需要考虑一些问题,例如确定从分区读取哪些消息,在发生故障时防止重复读取消息或丢失消息。解决这些问题的方法是使用偏移量。
自动提交
默认情况下,Kafka 使用自动提交 -每五秒提交poll()方法返回的最大偏移量。poll()返回一组消息,超时时间为10秒。
KafkaConsumer<Long, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(KafkaConfigProperties.getTopic());
ConsumerRecords<Long, String> messages = consumer.poll(Duration.ofSeconds(10));
for (ConsumerRecord<Long, String> message : messages) {// processed message
}
自动提交的问题在于,如果应用程序发生故障,数据丢失的几率非常高。当[poll返回消息时,Kafka可能会在处理消息之前提交最大的偏移量。
假设poll()返回 100 条消息,而消费者在自动提交时处理了 60 条消息。然后,由于某些故障,消费者崩溃了。当新的消费者上线读取消息时,它从偏移量 101 开始读取,导致 61 到 100 之间的消息丢失。
因此,我们需要其他方法来避免这种缺点。答案是手动提交。
手动同步提交
在手动提交中,无论是同步还是异步,都需要通过将默认属性(enabled.auto.commit属性)设置为false来禁用自动提交:
Properties props = new Properties();
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");KafkaConsumer<Long, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(KafkaConfigProperties.getTopic());
ConsumerRecords<Long, String> messages = consumer.poll(Duration.ofSeconds(10));//process the messages
consumer.commitSync();
此方法仅在处理完消息后才提交偏移量,从而防止数据丢失。但是,如果消费者在提交偏移量之前崩溃,则无法防止重复读取。除此之外,它还会影响应用程序性能。
commitSync ()
会阻止代码,直到完成为止。此外,如果出现错误,它会继续重试。这会降低应用程序的吞吐量,这是我们不希望看到的。因此,Kafka 提供了另一种解决方案,即异步提交,可以解决这些缺点。
手动异步提交
Kafka 提供commitAsync()来异步提交偏移量。它通过在不同线程中提交偏移量来克服手动同步提交的性能开销。让我们实现一个异步提交来理解这一点:
KafkaConsumer<Long, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(KafkaConfigProperties.getTopic());
ConsumerRecords<Long, String> messages = consumer.poll(Duration.ofSeconds(10));//process the messages
consumer.commitAsync();
异步提交的问题在于,如果失败,它不会重试。它依赖于commitAsync()的下一次调用,该调用将提交最新的偏移量。
假设 300 是我们想要提交的最大偏移量,但我们的commitAsync ()由于某些问题而失败。在重试之前,另一个 commitAsync() 调用可能会提交最大偏移量 400,因为它是异步的。当失败的commitAsync()重试时,如果它成功提交了偏移量 300,它将覆盖之前的 400 提交,从而导致重复读取。这就是commitAsync()不重试的原因。
提交特定偏移量
有时,我们需要对偏移量进行更多控制。假设我们正在小批量处理消息,并希望在处理消息后立即提交偏移量。我们可以使用commitSync()和commitAsync()的重载方法,它们采用 map 参数来提交特定偏移量:
KafkaConsumer<Long, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(KafkaConfigProperties.getTopic());
Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
int messageProcessed = 0;while (true) {ConsumerRecords<Long, String> messages = consumer.poll(Duration.ofSeconds(10));for (ConsumerRecord<Long, String> message : messages) {// processed one messagemessageProcessed++;currentOffsets.put(new TopicPartition(message.topic(), message.partition()),new OffsetAndMetadata(message.offset() + 1));if (messageProcessed%50==0){consumer.commitSync(currentOffsets);}}}
管理一个 currentOffsets 映射,该映射以TopicPartition为键,以OffsetAndMetadata为值。我们将消息处理过程中已处理消息的TopicPartition和OffsetAndMetadata插入到currentOffsets映射中。当已处理消息数达到 50 条时,我们使用 currentOffsets 映射调用commitSync ()以将这些消息标记为已提交。
这种方式的行为与同步和异步提交相同。唯一的区别是,这里我们决定要提交的偏移量,而不是 Kafka。
一次读取多条消息
Kafka消费者以可配置大小的批次从给定的分区中获取记录。我们不能配置每批次获取的确切记录数,但可以配置这些批次的大小,以字节为单位。
Kafka中的max.partition.fetch.bytes属性决定了消费者在单次请求中可以从单个分区获取的最大数据量。因此,即使是少量的短消息,通过更改此属性,我们也可以强制监听器以多个批次获取记录。
批次中的记录数量取决于这些记录及其元数据的大小。
还允许通过fetch.min.bytes属性自定义最小获取大小。我们可以更改此属性,以指定Broker需要响应的最小数据量。如果未达到此最小值,代理会等待更长时间再响应消费者的获取请求。
可以通过将“ fetch.min.bytes ”值调整为更大的大小来强制消费者等待更多数据的积累。
Kafka消费者默认会在有新记录时获取数据,但如果新数据超过1,048,576 字节1 MB(兆字节),会拆分成多个批次。通过自定义“fetch.min.bytes”和“max.partition.fetch.bytes”属性,可以调整Kafka的行为以满足特定需求。
参考
-
《Kafka进阶》 作者:赵渝强
-
https://www.baeldung.com/apache-kafka