SpringKafka消息消费:@KafkaListener与消费组配置

在这里插入图片描述

文章目录

    • 引言
    • 一、Spring Kafka消费者基础配置
    • 二、@KafkaListener注解使用
    • 三、消费组配置与负载均衡
    • 四、手动提交偏移量
    • 五、错误处理与重试机制
    • 总结

引言

Apache Kafka作为高吞吐量的分布式消息系统,在大数据处理和微服务架构中扮演着关键角色。Spring Kafka为Java开发者提供了简洁易用的Kafka消费者API,特别是通过@KafkaListener注解,极大地简化了消息消费的实现过程。本文将深入探讨Spring Kafka的消息消费机制,重点关注@KafkaListener注解的使用方法和消费组配置策略,帮助开发者构建高效稳定的消息消费系统。

一、Spring Kafka消费者基础配置

使用Spring Kafka进行消息消费的第一步是配置消费者工厂和监听器容器工厂。这些配置定义了消费者的基本行为,包括服务器地址、消息反序列化方式等。

@Configuration
@EnableKafka
public class KafkaConsumerConfig {@Beanpublic ConsumerFactory<String, Object> consumerFactory() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// 使JsonDeserializer信任所有包props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");return new DefaultKafkaConsumerFactory<>(props);}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());return factory;}
}

二、@KafkaListener注解使用

@KafkaListener是Spring Kafka提供的核心注解,用于将方法标记为Kafka消息监听器。通过简单的注解配置,就能实现消息的自动消费和处理。

@Service
public class KafkaConsumerService {// 基本用法:监听单个主题@KafkaListener(topics = "test-topic", groupId = "test-group")public void listen(String message) {System.out.println("接收到消息:" + message);}// 监听多个主题@KafkaListener(topics = {"topic1", "topic2"}, groupId = "multi-topic-group")public void listenMultipleTopics(String message) {System.out.println("从多个主题接收到消息:" + message);}// 指定分区监听@KafkaListener(topicPartitions = {@TopicPartition(topic = "partitioned-topic", partitions = {"0", "1"})}, groupId = "partitioned-group")public void listenPartitions(String message) {System.out.println("从特定分区接收到消息:" + message);}// 使用ConsumerRecord获取消息元数据@KafkaListener(topics = "metadata-topic", groupId = "metadata-group")public void listenWithMetadata(ConsumerRecord<String, String> record) {System.out.println("主题:" + record.topic() + ",分区:" + record.partition() +",偏移量:" + record.offset() +",键:" + record.key() +",值:" + record.value());}// 批量消费@KafkaListener(topics = "batch-topic", groupId = "batch-group", containerFactory = "batchListenerFactory")public void listenBatch(List<String> messages) {System.out.println("接收到批量消息,数量:" + messages.size());messages.forEach(message -> System.out.println("批量消息:" + message));}
}

配置批量消费需要额外的批处理监听器容器工厂:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> batchListenerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setBatchListener(true);  // 启用批量监听factory.getContainerProperties().setPollTimeout(3000);  // 轮询超时时间return factory;
}

三、消费组配置与负载均衡

Kafka的消费组机制是实现消息消费负载均衡的关键。同一组内的多个消费者实例会自动分配主题分区,确保每个分区只被一个消费者处理,实现并行消费。

// 配置消费组属性
@Bean
public ConsumerFactory<String, Object> consumerFactory() {Map<String, Object> props = new HashMap<>();// 基本配置props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);// 消费组配置props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-application-group");props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);  // 禁用自动提交props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);  // 单次轮询最大记录数props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);  // 会话超时时间props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);  // 心跳间隔return new DefaultKafkaConsumerFactory<>(props);
}

多个消费者可以通过配置相同的组ID来实现负载均衡:

// 消费者1
@KafkaListener(topics = "shared-topic", groupId = "shared-group")
public void consumer1(String message) {System.out.println("消费者1接收到消息:" + message);
}// 消费者2
@KafkaListener(topics = "shared-topic", groupId = "shared-group")
public void consumer2(String message) {System.out.println("消费者2接收到消息:" + message);
}

当这两个消费者同时运行时,Kafka会自动将主题分区分配给它们,每个消费者只处理分配给它的分区中的消息。

四、手动提交偏移量

在某些场景下,自动提交偏移量可能无法满足需求,此时可以配置手动提交。手动提交允许更精确地控制消息消费的确认时机,确保在消息完全处理后才提交偏移量。

@Configuration
public class ManualCommitConfig {@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> manualCommitFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);return factory;}
}@Service
public class ManualCommitService {@KafkaListener(topics = "manual-commit-topic", groupId = "manual-group",containerFactory = "manualCommitFactory")public void listenWithManualCommit(String message, Acknowledgment ack) {try {System.out.println("处理消息:" + message);// 处理消息的业务逻辑// ...// 成功处理后确认消息ack.acknowledge();} catch (Exception e) {// 异常处理,可以选择不确认System.err.println("消息处理失败:" + e.getMessage());}}
}

五、错误处理与重试机制

消息消费过程中可能会遇到各种异常,Spring Kafka提供了全面的错误处理机制,包括重试、死信队列等。

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> retryListenerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());// 配置重试factory.setRetryTemplate(retryTemplate());// 配置恢复回调factory.setRecoveryCallback(context -> {ConsumerRecord<String, String> record = (ConsumerRecord<String, String>) context.getAttribute("record");System.err.println("重试失败,发送到死信队列:" + record.value());// 可以将消息发送到死信主题// kafkaTemplate.send("dead-letter-topic", record.value());return null;});return factory;
}private RetryTemplate retryTemplate() {RetryTemplate template = new RetryTemplate();// 固定间隔重试策略FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();backOffPolicy.setBackOffPeriod(1000);  // 1秒重试间隔template.setBackOffPolicy(backOffPolicy);// 简单重试策略SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();retryPolicy.setMaxAttempts(3);  // 最大重试次数template.setRetryPolicy(retryPolicy);return template;
}@KafkaListener(topics = "retry-topic", groupId = "retry-group", containerFactory = "retryListenerFactory")
public void listenWithRetry(String message) {System.out.println("接收到需要重试处理的消息:" + message);// 模拟处理失败if (message.contains("error")) {throw new RuntimeException("处理失败,将重试");}System.out.println("消息处理成功");
}

总结

Spring Kafka通过@KafkaListener注解和灵活的消费组配置,为开发者提供了强大的消息消费能力。本文介绍了基本配置、@KafkaListener的使用方法、消费组机制、手动提交偏移量以及错误处理策略。在实际应用中,开发者应根据业务需求选择合适的消费模式和配置策略,以实现高效可靠的消息处理。合理利用消费组可以实现负载均衡和水平扩展,而手动提交偏移量和错误处理机制则能提升系统的健壮性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/74626.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

VMware 虚报化Ubuntu 卡成一B,如何接招?

故事背景 Win10 专业版 安装VMware pro ,虚拟化出一个Window10&#xff0c;另一个是UBuntu.自从使用起来去不去就卡死。开始是以为驱动或者升级造成的&#xff0c;重新安装一段时间问题照旧。更气人的这种现象具有不定期性&#xff0c;说不定什么时候就来这么一出。 直接解决方…

cloud项目批量修改主机号

当clone了一个cloud项目后&#xff0c;要把别人的主机号全部改成自己的&#xff0c;非常麻烦 在项目根目录下&#xff0c;启动 Git Bash。在 Git Bash 终端中使用原始的 Unix 命令&#xff1a; find . -type f -exec sed -i s/127\.0\.0\.1/132.168.190.163/g {} 其中127.0.…

微信小程序使用 Vant Weapp 组件库教程

在微信小程序项目中使用 Vant 组件库&#xff08;Vant Weapp&#xff09;主要包括以下几个步骤&#xff1a; 1. 初始化项目并安装 Vant Weapp 初始化 npm 在项目根目录下运行以下命令&#xff0c;生成 package.json&#xff1a; npm init -y安装 Vant Weapp 执行以下命令安装 V…

FPGA状态机思想实现流水灯及HDLBits学习

目录 第一章 在DE2-115上用状态机思想实现LED流水灯1.1 状态机设计思路1.2 Verilog代码实现1.3. 仿真测试代码1.4 编译代码与仿真 第二章 CPLD和FPGA芯片的主要技术区别是什么&#xff1f;它们各适用于什么场合&#xff1f;2.1 主要技术区别2.2 适用场合 第三章 HDLBits学习3.1…

与总社团联合会合作啦

2025.4.2日&#xff0c;我社团向总社团联合会与暮光社团发起合作研究“浔川代码编辑器v2.0”。至3日&#xff0c;我社团收到回复&#xff1a; 总社团联合会&#xff1a; 总社团联合会已收到浔川社团官方联合会的申请&#xff0c;经考虑&#xff0c;我们同意与浔川社团官方联合…

Shiro学习(三):shiro整合springboot

一、Shiro整合到Springboot步骤 1、准备SpringBoot 环境&#xff0c;这一步省略 2、引入Shiro 依赖 因为是Web 项目&#xff0c;所以需要引入web 相关依赖 shiro-spring-boot-web-starter&#xff0c;如下所示&#xff1a; 3、准备Realm 因为实例化 ShiroFilterFactoryBean 时…

【图形API】片段着色器自动计算LOD

片段着色器中的自动 LOD 计算详解 在图形渲染中&#xff0c;Level of Detail (LOD) 用于优化纹理采样的性能和视觉质量。片段着色器&#xff08;Fragment Shader&#xff09;能够自动计算 LOD&#xff0c;而顶点着色器&#xff08;Vertex Shader&#xff09;则不行。以下是详细…

24、 Python Socket编程:从协议解析到多线程实战

Python Socket编程&#xff1a;从协议解析到多线程实战 一、文章概述 本文深入讲解Python网络编程核心技术&#xff0c;涵盖TCP/UDP协议底层原理、Socket API全流程解析、高并发服务端开发实践&#xff0c;以及网络通信中的典型问题解决方案。通过3个递进式代码案例和协议设计…

LabVIEW 中数字转字符串常用汇总

在 LabVIEW 编程环境里&#xff0c;数字与字符串之间的转换是一项极为基础且重要的操作&#xff0c;广泛应用于数据处理、显示、存储以及设备通信等多个方面。熟练掌握数字转字符串的方法和技巧&#xff0c;对编写高效、稳定的程序起着关键作用。接下来&#xff0c;我们将全面深…

轨迹速度聚类 实战

根据轨迹把速度聚类为3个类别,速度快的那部分不用平滑,速度慢的部分需要平滑。 速度聚类3个类别: kmeans++ import numpy as np import cv2 from sklearn.cluster import KMeans from matplotlib.colors import hsv_to_rgb from scipy.ndimage import gaussian_filter1d# …

vulkanscenegraph显示倾斜模型(5.6)-vsg::RenderGraph的创建

前言 上一章深入分析了vsg::CommandGraph的创建过程及其通过子场景遍历实现Vulkan命令录制的机制。本章将在该基础上&#xff0c;进一步探讨Vulkan命令录制中的核心封装——vsg::RenderGraph。作为渲染流程的关键组件&#xff0c;RenderGraph封装了vkCmdBeginRenderPass和vkCmd…

第二十八章:Python可视化图表扩展-和弦图、旭日图、六边形箱图、桑基图和主题流图

一、引言 在数据可视化领域&#xff0c;除了常见的折线图、柱状图和散点图&#xff0c;还有一些高级图表类型可以帮助我们更直观地展示复杂数据关系。本文将介绍五种扩展图表&#xff1a;和弦图、旭日图、六边形箱图、桑基图和主题流图。这些图表在展示数据关系、层次结构和流量…

大模型-爬虫prompt

爬虫怎么写prompt 以下基于deepseek r1 总结&#xff1a; 以下是为大模型设计的结构化Prompt模板&#xff0c;用于生成专业级网络爬虫Python脚本。此Prompt包含技术约束、反检测策略和数据处理要求&#xff0c;可根据具体需求调整参数&#xff1a; 爬虫脚本生成Prompt模板1 …

Vue中将pdf文件转为图片

平时开发中,我们经常遇到的场景应该是调用后端接口返回给前端pdf格式的文件流,然后我们可以通过URL.createObjectURL的方式转为object url临时路径然后可以通过window.open的方式来打开一个新的浏览器页签来进行预览,效果如下图: 但有时候这样满足不了的需求,它不想这样预…

物联网安全技术:守护智能世界的防线

最近研学过程中发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击链接跳转到网站人工智能及编程语言学习教程。读者们可以通过里面的文章详细了解一下人工智能及其编程等教程和学习方法。下面开始对正文内容的…

kubernetes安装部署k8s

kubernetes https://github.com/kubernetes/kubernetes.git go mod tidy go mod vendor go build -o .\bin -v ./… //手动创建bin文件夹 使用 minikube&#xff1a;https://gitee.com/mirrors/minikube.git 使用minikube启动本地化的集群服务 minikube start 启动集群&…

JT/T 1078 协议基本介绍与解析

文章目录 一、JT/T 1078 协议基本介绍二、JT/T 1078 与 JT808 的关系三、JT1078 协议核心功能四、JT1078 数据结构概览4.1、消息结构&#xff1a;4.2、消息类型&#xff08;部分&#xff09;&#xff1a; 五、Java 中如何解析 JT1078 协议数据&#xff1f;5.1、JT1078 消息 ID …

手机为电脑提供移动互联网络的3种方式

写作目的 在当今数字化时代,电脑已成为人们日常工作和生活中不可或缺的工具,而网络连接更是其核心功能之一。无论是处理工作任务、进行在线学习、还是享受娱乐资源,稳定的网络环境都是保障这些活动顺利开展的关键。然而,在实际使用过程中,电脑网络驱动故障时有发生,这可…

Linux的 /etc/sysctl.conf 笔记250404

Linux的 /etc/sysctl.conf 笔记250404 /etc/sysctl.conf 是 Linux 系统中用于 永久修改内核运行时参数 的核心配置文件。它通过 sysctl 工具实现参数的持久化存储&#xff0c;确保系统重启后配置依然生效。以下是其详细说明&#xff1a; &#x1f4c2; 备份/etc/sysctl.conf t…

deepseek v3-0324 Markdown 编辑器 HTML

Markdown 编辑器 HTML 以下是一个美观的 Markdown 编辑器 HTML 页面&#xff0c;支持多种主题切换和实时预览功能&#xff1a; <!DOCTYPE html> <html lang"zh-CN"> <head><meta charset"UTF-8"><meta name"viewport&q…