Consumer 需要向 Kafka 汇报自己的位移数据,这个汇报过程被称为提交位移(Committing Offsets)。因为 Consumer 能够同时消费多个分区的数据,所以位移的提交实际上是在分区粒度上进行的,即 Consumer 需要为分配给它的每个分区提交各自的位移数据。
提交位移主要是为了表征 Consumer 的消费进度,这样当 Consumer 发生故障重启之后,就能够从 Kafka 中读取之前提交的位移值,然后从相应的位移处继续消费,从而避免整个消费过程重来一遍。
从用户的角度来说,位移提交分为自动提交和手动提交;从 Consumer 端的角度来说,位移提交分为同步提交和异步提交。
自动提交
自动提交默认全部为同步提交
自动提交相关参数
- enable.auto.commit (bool) – 如果为True,将自动定时提交消费者offset。默认为True。
- auto.commit.interval.ms(int) – 自动提交offset之间的间隔毫秒数。如果enable_auto_commit 为true,默认值为: 5000。
当设置 enable.auto.commit 为 true,Kafka 会保证在开始调用 poll 方法时,提交上次 poll 返回的所有消息。从顺序上来说,poll 方法的逻辑是先提交上一批消息的位移,再处理下一批消息,因此它能保证不出现消费丢失的情况。
网上有说
自动提交位移的一个问题在于,它可能会出现重复消费。
如果设置 enable.auto.commit 为 true,Consumer 按照 auto.commit.interval.ms设置的值(默认5秒)自动提交一次位移。我们假设提交位移之后的 3 秒发生了 Rebalance 操作。在 Rebalance 之后,所有 Consumer 从上一次提交的位移处继续消费,但该位移已经是 3 秒前的位移数据了,故在 Rebalance 发生前 3 秒消费的所有数据都要重新再消费一次。虽然你能够通过减少 auto.commit.interval.ms 的值来提高提交频率,但这么做只能缩小重复消费的时间窗口,不可能完全消除它。这是自动提交机制的一个缺陷。
在实际测试中,未发现上述情况(kafka 版本 2.13), 而是会等待所有消费者消费完当前消息,或者等待消费者超时(等待过程中会报如下 warning), 之后才会 reblance。
手动提交
手动提交可以自己选择是同步提交(commitSync)还是异步提交(commitAsync )
commitAsync 不能够替代 commitSync。commitAsync 的问题在于,出现问题时它不会自动重试。因为它是异步操作,倘若提交失败后自动重试,那么它重试时提交的位移值可能早已经“过期”或不是最新值了。因此,异步提交的重试其实没有意义,所以 commitAsync 是不会重试的。
手动提交,我们需要将 commitSync 和 commitAsync 组合使用才能到达最理想的效果,原因有两个:
- 我们可以利用 commitSync 的自动重试来规避那些瞬时错误,比如网络的瞬时抖动,Broker 端 GC 等。因为这些问题都是短暂的,自动重试通常都会成功,因此,我们不想自己重试,而是希望 Kafka Consumer 帮我们做这件事。我们不希望程序总处于阻塞状态,影响 TPS。
- 我们不希望程序总处于阻塞状态,影响 TPS。
同时使用 commitSync() 和 commitAsync()
对于常规性、阶段性的手动提交,我们调用 commitAsync() 避免程序阻塞,而在 Consumer 要关闭前,我们调用 commitSync() 方法执行同步阻塞式的位移提交,以确保 Consumer 关闭前能够保存正确的位移数据。将两者结合后,我们既实现了异步无阻塞式的位移管理,也确保了 Consumer 位移的正确性.
手动提交和自动提交中的 reblance 问题
- 如果设置为手动提交,当集群满足 reblance 的条件时,集群会直接 reblance,不会等待所有消息被消费完,这会导致所有未被确认的消息会重新被消费,会出现重复消费的问题
- 如果设置为自动提交,当集群满足 reblance 的条件时,集群不会马上 reblance,而是会等待所有消费者消费完当前消息,或者等待消费者超时(等待过程中会报如下 warning), 之后才会 reblance。
python kafka-python 输出信息如下:
[WARNING]Heartbeat failed for group scan_result because it is rebalancing
kafka 中加入消费者时,kafka 会输出如下信息