背景:
对kafka消息进行监听,生产者发了消息,但是消费端没有接到消息,监听代码
消费端,kafka配置
spring.kafka.bootstrap-servers=kafka.cestc.dmp:9591
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="Kafka#Cestc2021";
spring.kafka.properties.security.protocol=SASL_PLAINTEXT
spring.kafka.properties.sasl.mechanism=SCRAM-SHA-256
#=============== provider =======================
spring.kafka.producer.retries=0
# 每次批量发送消息的数量
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#=============== consumer =======================
# 指定默认消费者group id
spring.kafka.consumer.group-id=dq
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
@KafkaListener(groupId = "${spring.kafka.consumer.group-id:dq}",topics = {"t_dq_rwzt_topic"}) public ReturnT<String> listenKafka2(String records, Acknowledgment ack) {
}
offset explorer发现生产者发送了消息,offset是0
问题解决:
后来查看生产者kafka配置,发现他们的enable-auto-commit是false:
spring.kafka.consumer.enable-auto-commit=false
修改kafka配置
spring.kafka.consumer.enable-auto-commit=false
# 在侦听器容器中运行的线程数
spring.kafka.listener.concurrency=5
# listner负责ack,每调用commit方法,立即向服务器提交
spring.kafka.listener.ack-mode=manual_immediate