转载自 springboot手动提交kafka offset
enable.auto.commit参数设置成了false
但是测试发现enable.auto.commit参数设置成了false,kafka的offset依然提交了(也没有进行人工提交offset)。
查看源码
如果我们enable.auto.commit设置为false,那么就会走标红的if语句。而且下面有个stopInvokerAndCommitManualAcks()方法,看名字就知道是人工提交的意思。那么我们进去stopInvokerAndCommitManualAcks()方法瞅瞅。
如上图所示有个processCommits()方法,那么继续追进去:
单单看标红的方法是不是就知道这方法里面是更新offset和提交offset的方法。那么我们继续追进去:
结论:如果我们把enable.auto.commit参数设置成true。那么offset交给kafka来管理,offset进行默认的提交模式。
enable.auto.commit参数设置成false。那么就是Spring来替为我们做人工提交,从而简化了人工提交的方式。
所以kafka和springboot结合中的enable.auto.commit为false为spring的人工提交模式。enable.auto.commit为true是采用kafka的默认提交模式。
手动提交
spring.kafka.consumer.enable-auto-commit设置为false,设置AckMode的值
/*** The offset commit behavior enumeration.*/public enum AckMode {/*** Commit after each record is processed by the listener.*/RECORD,/*** Commit whatever has already been processed before the next poll.*/BATCH,/*** Commit pending updates after* {@link ContainerProperties#setAckTime(long) ackTime} has elapsed.*/TIME,/*** Commit pending updates after* {@link ContainerProperties#setAckCount(int) ackCount} has been* exceeded.*/COUNT,/*** Commit pending updates after* {@link ContainerProperties#setAckCount(int) ackCount} has been* exceeded or after {@link ContainerProperties#setAckTime(long)* ackTime} has elapsed.*/COUNT_TIME,/*** User takes responsibility for acks using an* {@link AcknowledgingMessageListener}.*/MANUAL,/*** User takes responsibility for acks using an* {@link AcknowledgingMessageListener}. The consumer is woken to
- RECORD
每处理一条commit一次 - BATCH(
默认
)
每次poll的时候批量提交一次,频率取决于每次poll的调用频率 - TIME
每次间隔ackTime的时间去commit - COUNT
累积达到ackCount次的ack去commit - COUNT_TIME
ackTime或ackCount哪个条件先满足,就commit - MANUAL
listener负责ack,但是背后也是批量上去 - MANUAL_IMMEDIATE
listner负责ack,每调用一次,就立即commit
manual commit
@KafkaListener(topics = "k010")public void listen(ConsumerRecord<?, ?> cr,Acknowledgment ack) throws Exception {LOGGER.info(cr.toString());ack.acknowledge();}
方法参数里头传递Acknowledgment,然后手工ack
如果只添加上面语句会报错:
the listener container must have a MANUAL Ackmode to populate the Acknowledgment
我们要配置AckMode为MANUAL Ackmode
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);