kafka原理之springboot 集成批量消费

前言

由于 Kafka 的写性能非常高,因此项目经常会碰到 Kafka 消息队列拥堵的情况。遇到这种情况,我们可以通过并发消费、批量消费的方法进行解决。

一、新建一个maven工程,添加kafka依赖

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

二、yaml配置文件

spring:kafka:bootstrap-servers: 127.0.0.1:9002producer:key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerconsumer:group-id: test-consumer-group# 当 Broker 端没有 offset(如第一次消费或 offset 超过7天过期)时如何初始化 offset,当收到 OFFSET_OUT_OF_RANGE 错误时,如何重置 Offset# earliest:表示自动重置到 partition 的最小 offset# latest:默认为 latest,表示自动重置到 partition 的最大 offset# none:不自动进行 offset 重置,抛auto-offset-reset: latest# 是否在消费消息后将 offset 同步到 Broker,当 Consumer 失败后就能从 Broker 获取最新的 offsetenable-auto-commit: false## 当 auto.commit.enable=true 时,自动提交 Offset 的时间间隔,建议设置至少1000auto-commit-interval: 2000max-poll-records: 30heartbeat-interval: 3000key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:# 使用 Kafka 消费分组机制时,消费者超时时间。当 Broker 在该时间内没有收到消费者的心跳时,认为该消费者故障失败,Broker 发起重新 Rebalance 过程。目前该值的配置必须在 Broker 配置group.min.session.timeout.ms=6000和group.max.session.timeout.ms=300000 之间session.timeout.ms: 60000# 使用 Kafka 消费分组机制时,消费者发送心跳的间隔。这个值必须小于 session.timeout.ms,一般小于它的三分之一heartbeat.interval.ms: 3000# 使用 Kafka 消费分组机制时,再次调用 poll 允许的最大间隔。如果在该时间内没有再次调用 poll,则认为该消费者已经失败,Broker 会重新发起 Rebalance 把分配给它的 partition 分配给其他消费者max.poll.interval.ms: 300000request.timeout.ms: 600000listener:# 在侦听器容器中运行的线程数。concurrency: 2type: batchmax-poll-records: 50#当 auto.commit.enable 设置为false时,表示kafak的offset由customer手动维护,#spring-kafka提供了通过ackMode的值表示不同的手动提交方式#手动调用Acknowledgment.acknowledge()后立即提交ack-mode: manual_immediate# 消费者监听的topic不存在时,项目会报错,设置为falsemissing-topics-fatal: false

三、消息消费

手动提交非批量消费

  •   String 类型接入

     @KafkaListener(topics = {"test-topic"}, groupId = "test-consumer-group")public void onMessage(String message, Consumer consumer) {System.out.println("接收到的消息:" + message);consumer.commitSync();}

  • 使用注解方式获取消息头、消息体

         /*** 处理消息*/@KafkaListener(topics = "test-topic", groupId = "test-consumer-group")public void onMessage(@Payload String message,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,@Header(name = KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false) String key,@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts,Acknowledgment ack) {try {ack.acknowledge();log.info("Consumer>>>>>>>>>>>>>end");} catch (Exception e) {log.error("Consumer.onMessage#error . message={}", message, e);throw new BizException("事件消息消费失败", e);}} 

 

手动提交批量消费

想要批量消费,首先要开启批量消费,通过listener.type属性设置为batch即可开启,看下代码吧:

spring:kafka:consumer:group-id: test-consumer-groupbootstrap-servers: 127.0.0.1:9092max-poll-records: 50 # 一次 poll 最多返回的记录数listener:type: batch # 开启批量消费

 

如上设置了启用批量消费和批量消费每次最多消费记录数。这里设置 max-poll-records是50,并不是说如果没有达到50条消息,我们就一直等待。而是说一次poll最多返回的记录数为50

  • ConsumerRecord类接收
        /*** kafka的批量消费监听器*/@KafkaListener(topics = "test-topic", groupId = "test-consumer-group")public void onMessage(List<ConsumerRecord<String, String>> records, Consumer consumer) {try {log.info("Consumer.batch#size={}", records == null ? 0 : records.size());if (CollectionUtil.isEmpty(records)) {//分别是commitSync(同步提交)和commitAsync(异步提交)consumer.commitSync();return;}for (ConsumerRecord<String, String> record : records) {String message = record.value();if (StringUtils.isBlank(message)) {continue;}//处理业务数据//doBuiness();}consumer.commitSync();log.info("Consumer>>>>>>>>>>>>>end");} catch (Exception e) {log.error("Consumer.onMessage#error .", e);throw new BizException("事件消息消费失败", e);}}

  • String类接收
     @KafkaListener(topics = {"test-topic"}, groupId = "test-consumer-group")public void onMessage(List<String> message, Consumer consumer) {System.out.println("接收到的消息:" + message);consumer.commitSync();}

  • 使用注解方式获取消息头、消息体,则也是使用 List 来接收:

    @Component
    public class KafkaConsumer {// 消费监听@KafkaListener(topics = {"test-topic"})public void listen2(@Payload List<String> data,@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<String> keys,@Header(KafkaHeaders.RECEIVED_TIMESTAMP) List<Long> tss) {System.out.println("收到"+ data.size() + "条消息:");System.out.println(data);System.out.println(topics);System.out.println(partitions);System.out.println(keys);System.out.println(tss);}
    }

  • 并发消费 

    再来看下并发消费,为了加快消费,我们可以提高并发数,比如下面配置我们将并发设置为 3。注意:并发量根据实际分区数决定,必须小于等于分区数,否则会有线程一直处于空闲状态

spring:kafka:consumer:group-id: test-consumer-groupbootstrap-servers: 127.0.0.1:9092max-poll-records: 50 # 一次 poll 最多返回的记录数listener:type: batch # 开启批量监听concurrency: 3 # 设置并发数

 

我们设置concurrency为3,也就是将会启动3条线程进行监听,而要监听的topic有5个partition,意味着将有2条线程都是分配到2个partition,还有1条线程分配到1个partition

配置类方式

通过自定义配置类的方式也是可以的,但是相对yml配置来说还是有点麻烦的(不提倡)

/*** 消费者配置*/
@Configuration
public class KafkaConsumerConfig {/*** 消费者配置* @return*/public Map<String,Object> consumerConfigs(){Map<String,Object> props = new HashMap<>();props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9002");props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}@Beanpublic KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> batchFactory() {ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));//并发数量factory.setConcurrency(3);//开启批量监听factory.setBatchListener(true);return factory;}
}

同时监听器通过@KafkaListener注解的containerFactory 配置指定批量消费的工厂即可,如下:

同时监听器通过@KafkaListener注解的containerFactory 配置指定批量消费的工厂即可,如下:

四、Kafka参数调优

一、Consumer参数说明


1、enable.auto.commit

该属性指定了消费者是否自动提交偏移量,默认值是true。
为了尽量避免出现重复数据(假如,某个消费者poll消息后,应用正在处理消息,在3秒后kafka进行了重平衡,那么由于没有更新位移导致重平衡后这部分消息重复消费)和数据丢失,可以把它设为 false,由自己控制何时提交偏移量。
如果把它设为true,还可以通过配置 auto.commit.interval.ms 属性来控制提交的频率。
 

2、auto.commit.interval.ms
自动提交间隔。范围:[0,Integer.MAX],默认值是 5000 (5 s)

3、手动提交:commitSync/commitAsync
手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)。

相同点:都会将本次poll的一批数据最大的偏移量提交。
不同点:commitSync会阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而commitAsync则没有失败重试机制,故有可能提交失败,导致重复消费。

4、max.poll.records
Consumer每次调用poll()时取到的records的最大数。


二、Kafka消息积压、消费能力不足怎么解决?

如果是Kafka消费能力不足,则可以考虑增加Topic的分区数,同时相应的增加消费者实例,消费者数=分区数(二者缺一不可)。
如果是下游的数据处理不及时,则可以提高每批次拉取的数量,通过max.poll.records这个参数可以调整。
单个消费者实例的消费能力提升,可以用多线程/线程池的方式并发消费提高单机的消费能力。


三、Kafka消费者如何进行流控?

将自动提交改成手动提交(enable.auto.commit=false),每次消费完再手动异步提交offset,之后消费者再去Broker拉取新消息,这样可以做到按照消费能力拉取消息,减轻消费者的压力。
 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/54877.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【提升接口响应能力的最佳实践】常规操作篇

文章目录 1. 并行处理简要说明CompletableFuture是银弹吗&#xff1f;测试案例测试结论半异步&#xff0c;半同步总结 2. 最小化事务范围简要说明编程式事务模板 3. 缓存简要说明 4. 合理使用线程池简要说明使用场景线程池的创建参数的配置建议 线程池的监控线程池的资源隔离 5…

【jvm】双亲委派机制

目录 一、说明二、工作原理三、优势四、图示 一、说明 1.java虚拟机对class文件采用的是按需加载的方式&#xff0c;当需要使用该类时才会将它的class文件加载到内存生成class对象 2.加载某个类的class文件时&#xff0c;java虚拟机采用双亲委派模式&#xff0c;即把请求交给由…

curl --resolve参数的作用

之所以会有这样的操作&#xff0c;是因为域名一般对应的都是一个反向代理&#xff0c;直接请求域名&#xff0c;反向代理会将流量随机选一台机器打过去&#xff0c;而无法确保所有的机器都可用。所以直接用ip。 在 curl 命令中&#xff0c;--resolve 参数用于指定自定义的主机名…

想解锁禁用的iPhone?除了可以使用电脑之外,这里还有不需要电脑的方法!

多次输入错误的密码后,iPhone将显示“iPhone已禁用”。这种情况看起来很棘手,因为你现在不能用iPhone做任何事情。对于这种情况,我们提供了几种有效的方法来帮助你在最棘手的问题中解锁禁用的iPhone。你可以选择使用或不使用电脑来解锁禁用的iPhone。 一、为什么你的iPhone…

基于FPGA的FIR低通滤波器实现(附工程源码),matlab+vivado19.2+simulation

基于FPGA的FIR低通滤波器实现(附工程源码) 文章目录 基于FPGA的FIR低通滤波器实现(附工程源码)前言一、matlab设计FIR滤波器&#xff0c;生成正弦波1.设计FIR滤波器1.生成正弦波.coe 二、vivado1.fir滤波器IP核2.正弦波生成IP核3.时钟IP核设置4.顶层文件/测试文件代码 三.simul…

【30天熟悉Go语言】11 数组的全方位使用与解析

作者&#xff1a;秃秃爱健身&#xff0c;多平台博客专家&#xff0c;某大厂后端开发&#xff0c;个人IP起于源码分析文章 &#x1f60b;。 源码系列专栏&#xff1a;Spring MVC源码系列、Spring Boot源码系列、SpringCloud源码系列&#xff08;含&#xff1a;Ribbon、Feign&…

OLED透明屏水波纹效果:打造独特的显示体验

OLED透明屏水波纹效果是一种独特的显示技术&#xff0c;通过模拟水波纹的视觉效果&#xff0c;为用户带来更加生动逼真的观感。 根据市场调研报告显示&#xff0c;OLED透明屏水波纹效果已经在广告、游戏和商业领域得到广泛应用&#xff0c;为品牌提供了新的展示方式&#xff0…

根据源码,模拟实现 RabbitMQ - 网络通讯设计,自定义应用层协议,实现 BrokerServer (8)

目录 一、网络通讯协议设计 1.1、交互模型 1.2、自定义应用层协议 1.2.1、请求和响应格式约定 ​编辑 1.2.2、参数说明 1.2.3、具体例子 1.2.4、特殊栗子 1.3、实现 BrokerServer 1.3.1、属性和构造 1.3.2、启动 BrokerServer 1.3.3、停止 BrokerServer 1.3.4、处…

力扣:75. 颜色分类(Python3)

题目&#xff1a; 给定一个包含红色、白色和蓝色、共 n 个元素的数组 nums &#xff0c;原地对它们进行排序&#xff0c;使得相同颜色的元素相邻&#xff0c;并按照红色、白色、蓝色顺序排列。 我们使用整数 0、 1 和 2 分别表示红色、白色和蓝色。 必须在不使用库内置的 sort …

【数据分析】波士顿矩阵

波士顿矩阵是一种用于分析市场定位和企业发展战略的管理工具。由美国波士顿咨询集团&#xff08;Boston Consulting Group&#xff09;于1970年提出&#xff0c;并以该集团命名。 波士顿矩阵主要基于产品生命周期和市场份额两个维度&#xff0c;将企业的产品或业务分为四个象限…

MySQL集群(mysql-cluster)

环境 系统环境&#xff1a;centos7 管理节点&#xff1a;192.168.127.128 数据节点&#xff1a;192.168.127.145&#xff0c;192.168.127.146 sql节点&#xff1a;192.168.127.145&#xff0c;192.168.127.146 约定 创建mysql用户&#xff0c;将程序部署到/home/mysql 目…

LAMP架构详解+构建LAMP平台之Discuz论坛

L A M P 一、LAMP架构简介1.1 LAMP架构的组成1.2 LAMP各组件的主要作用1.3 LAMP工作过程1.4 CGI和fastcgi 二、搭建Discuz论坛的思路三、编译安装Apache httpd3.1 前置准备3.2 移动apr包 apr-util包到安装目录中&#xff0c;并切换到 httpd-2.4.29目录中3.3 编译安装3.4 建立软…

dvwa xss通关

反射型XSS通关 low难度 选择难度&#xff1a; 直接用下面JS代码尝试&#xff1a; <script>alert(/xss/)</script>通关成功&#xff1a; medium难度 直接下面代码尝试后失败 <script>alert(/xss/)</script>发现这段代码直接被输出&#xff1a; 尝试…

开始MySQL之路——外键关联和多表联合查询详细概述

多表查询和外键关联 实际开发中&#xff0c;一个项目通常需要很多张表才能完成。例如&#xff0c;一个商城项目就需要分类表&#xff0c;商品表&#xff0c;订单表等多张表。且这些表的数据之间存在一定的关系&#xff0c;接下来我们将在单表的基础上&#xff0c;一起学习多表…

Spring、SpringMVC、SpringBoot三者的区别

目录 Spring是什么&#xff1f; SpringMVC是什么&#xff1f; SpringBoot是什么&#xff1f; Spring、SpringMVC、SpringBoot三者之间的关系 Spring是什么&#xff1f; Spring是一个开源的应用程序框架&#xff0c;它提供了一种简易的开发方式&#xff0c;通过依赖注入和面…

微服务项目容器编排docker-compose.yml、Dockerfile文件模板、相关配置文件、shell脚本

nacos Dockerfile&#xff08;不需要特殊处理&#xff0c;使用docker conpose可以不写&#xff09; # 基础镜像 FROM nacos/nacos-server # author MAINTAINER jianglifeng<jlifengfoxmail.com> RUN ln -sf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime \ &&a…

Qt --- 自定义提示框 类似QMessagebox

QMessageBox::information(NULL, QString("title"), QString("I am information")); 以下是自定义提示框的代码&#xff0c;有图有真相&#xff01;提示框大部分都采用模态的形式&#xff0c;关于模态也不再多提&#xff01;所以父类为QDialog&#xff0c;…

若依移动端Ruoyi-App 项目的后端项目入门

后端项目运行 运行报错 Error creating bean with name sysConfigServiceImpl: Invocation of init method failed 数据库创建了。 代码连接数据库地方了也匹配上了。但是还是报错。 分析 &#xff1a; 想起来我电脑从来没有安装过redis 下载安装redis到windows 链接&…

【Mac】编译Spring 源码和Idea导入

今天我们开始Spring源码的阅读之旅。阅读Spring的源码的第一步当然是编译Spring源码。首先我们要去GitHub上将spring源码给clone下来。 笔者编译环境如下&#xff1a; Spring版本&#xff1a;5.28 https://github.com/spring-projects/spring-framework/tree/v5.2.8.RELEASE …

人工智能项目集合推荐(数据集 模型训练 C++和Android部署)

人工智能项目集合推荐(数据集 模型训练 C和Android部署) 目录 人工智能项目集合推荐(数据集 模型训练 C和Android部署) 1.三维重建项目集合 ★双目三维重建 ★结构光三维重建 2.AI CV项目集合 ★人脸检测和人体检测 ★人体姿态估计(人体关键点检测) ★头部朝向估计 …