springboot kafka 提高拉取数量

文章目录

  • 背景
  • 问题复现
  • 解决问题
  • 原理分析
    • fetch.min.bytes
    • fetch.max.wait.ms
    • 源码分析
      • ReplicaManager#fetchMessages

背景

开发过程中,使用kafka批量消费,发现拉取数量一直为1,如何提高批量拉取数量,记录下踩坑记录。

问题复现

  • kafka maven依赖
		<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.11</version></dependency>
  • 配置消费者
@Configuration
public class KafkaBlukConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.consumer.max-poll-records:30}")private Integer maxPollRecords;@Value("${spring.kafka.consumer.groupId:group1}")private String group;/*** 消费者配置信息*/@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");props.put(ConsumerConfig.GROUP_ID_CONFIG, group);props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}/*** 消费者批量⼯程*/@Beanpublic KafkaListenerContainerFactory<?> batchFactory() {ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));//设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIGfactory.setBatchListener(true);return factory;}
}
  • 消费端代码

@Component
public class KafkaBatchConsumer {private static final Logger log = LoggerFactory.getLogger(KafkaBatchConsumer.class);@KafkaListener(id = "consumer1", topics = "topic2", containerFactory = "batchFactory")public void consume(List<ConsumerRecord<String, String>> record) throws Exception {log.info("KafkaBatchConsumer recode size : {} ", record.size());}}
  • 使用yml配置生产者
spring:kafka:bootstrap-servers: 192.168.56.112:9092producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer
  • 使用生产者发送消息
@Slf4j
@RestController
@RequestMapping("/kafka")
public class KafkaProducer {// 自定义的主题名称public static final String TOPIC_NAME = "topic2";private KafkaTemplate<String, String> kafkaTemplate;/*** http://localhost:8080/kafka/send?msg=a* @param msg*/@RequestMapping("/send")public String send(@RequestParam("msg") String msg) {log.info("准备发送消息为:{}", msg);// 1.发送消息ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(TOPIC_NAME, msg);future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {@Overridepublic void onFailure(Throwable throwable) {// 2.发送失败的处理log.error("生产者 发送消息失败:" + throwable.getMessage());}@Overridepublic void onSuccess(SendResult<String, String> stringObjectSendResult) {// 3.发送成功的处理log.info("生产者 发送消息成功:" + stringObjectSendResult.toString());}});return "接口调用成功";}
}
  • 发送消息,观察消费者批量消费情况
http://localhost:9999/kafka/send?msg=a

多次调用发现如下:

在这里插入图片描述
发现拉取消息的大小始终为1

解决问题

  • 添加下面两行代码
@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");props.put(ConsumerConfig.GROUP_ID_CONFIG, group);props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);################ 添加下面两行 ###########props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024 * 1024);props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 3000);######################################props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}
  • 再次发送消息,观察消费情况

在这里插入图片描述
可以看到批量消费成功。

原理分析

fetch.min.bytes

消费者从服务器获取记录的最小字节数,broker 收到消费者拉取数据的请求的时候,如果可用数据量小于设置的值,那么 broker 将会等待有足够可用的数据的时候才返回给消费者,这样可以降低消费者和 broker 的工作负载。如果消费者的数量比较多,把该属性的值设置得大一点可以降低 broker 的工作负载。

fetch.max.wait.ms

如果 Kafka 仅仅参考 fetch.min.bytes 参数的要求,那么有可能会因为获取不到足够大小的消息而一直阻塞等待,从而无法发送响应给 Consumer,显然这是不合理的。fetch.max.wait.ms 参数用于指定 等待 FetchResponse 的最长时间,服务端根据此时间决定何时进行响应,默认值为 500(ms)。如果 Kafka 中没有足够多的消息而满足不了 fetch.min.bytes 参数的要求,那么最终会等待 500ms 再响应消费者请求。这个参数的设定需要参考 Consumer 与 Kafka 之间的延迟大小,如果业务应用对延迟敏感,那么可以适当调小这个参数。

源码分析

ReplicaManager#fetchMessages

/*** 能够立即返回给客户端的4种情况* 1. fetch请求没有大于0的wait时间,参考fetch.max.wait.ms设置* 2. fetch请求要拉取的分区为空* 3. 根据fetch.min.bytes的设置,有足够的数据返回* 4. 出现异常*/if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData) {// fetchPartitionData是一个TopicPartition -> FetchPartitionData 的map集合val fetchPartitionData = logReadResults.map { case (tp, result) =>tp -> FetchPartitionData(result.error, result.highWatermark, result.leaderLogStartOffset, result.info.records,result.lastStableOffset, result.info.abortedTransactions)}// 调用响应回调函数responseCallback(fetchPartitionData)}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/20118.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

攻防对抗少丢分,爱加密帮您筑起第二防线

应用程序通常处理和存储大量的敏感数据&#xff0c;如用户个人信息、财务信息、商业数据、国家数据等&#xff0c;用户量越大的应用程序&#xff0c;其需要存储和保护的用户数据越多。因此应用层长期是攻击方的核心目标&#xff0c;传统应用安全依靠防火墙(FireWall)、入侵检测…

iOS组件化 方案 实现

iOS组件化 组件化的原因现在流行的组件化方案方案一、url-block &#xff08;基于 URL Router&#xff09;方案二、protocol调用方式解读 方案三、target-action调用方式解读 gitHub代码链接参考 组件化的原因 模块间解耦模块重用提高团队协作开发效率单元测试 当项目App处于…

网络原理-四

一、续 当窗口大小为0,意味着缓冲区满了,此时发送方,就因该暂停发送,发送方会周期性的除法 " 窗口探测包 " ,并不携带载荷,这样的包对于业务不产生影响,只是为了触发ACK,一旦查询出来的结果是非0,缓冲区右有空间了,发送方就可以继续发送. 二、拥塞控制 要限制发送方…

【AI+知识库问答】沉浸式体验了解 AI知识库问答fastGPT

之前写过一篇文章 【AI本地知识库】个人整理的几种常见本地知识库技术方案 &#xff0c; 由于当时主要是针对AI本地知识库&#xff0c; 所以没列fastGPT。 最近经常刷到fastGPT&#xff0c;这里单独水一篇。 FastGPT 是一个基于 LLM 大语言模型的知识库问答系统&#xff0c;…

Github 2024-06-01 开源项目日报Top10

根据Github Trendings的统计,今日(2024-06-01统计)共有10个项目上榜。根据开发语言中项目的数量,汇总情况如下: 开发语言项目数量Python项目5Jupyter Notebook项目2TypeScript项目1Go项目1Shell项目1Lua项目1Kong:云原生API网关与AI能力 创建周期:3482 天开发语言:Lua协议…

云原生架构相关技术_4.服务网格

1.技术特点 服务网格&#xff08;ServiceMesh&#xff09;是分布式应用在微服务软件架构之上发展起来的新技术&#xff0c;旨在将那些微服务间的连接、安全、流量控制和可观测等通用功能下沉为平台基础设施&#xff0c;实现应用与平台基础设施的解耦。这个解耦意味着开发者无需…

Chisel入门——在windows下vscode搭建|部署Scala2.13.3开发环境|用Chisel点亮FPGA小灯等实验

文章目录 前言一、vscode搭建scala开发环境1.1 安装Scala官方插件1.2 创建hello_world.scala文件1.3 确认java的版本(博主使用的是1.8)1.4 下载Scala Windows版本的二进制文件1.5 配置环境变量1.6 交互模式测试一下1.7 vscode运行scala 二、windows安装sbt2.1 下载sbt2.2 设置环…

函数递归及具体例子(持续更新)

递归就是函数自己调用自己 求n的阶乘 n! n * (n - 1)! 直到n为1或者0的时候为止 举个例子 int Fun(int n) {if (n < 0){return 1;}else{return n * Fun(n - 1);} }int main() {int n 0;scanf("%d", &n);int ret Fun(n);printf("%d\n", ret…

安装Kubernetes v3 ----以docker的方式部署

以docker的方式部署 docker run -d \ --restartunless-stopped \ --namekuboard \ -p 80:80/tcp \ -p 10081:10081/tcp \ -e KUBOARD_ENDPOINT"http://192.168.136.55:80" \ -e KUBOARD_AGENT_SERVER_TCP_PORT"10081" \ -v /root/kuboard-data:/data \ e…

springboot中抽象类无法注入到ioc容器

1、背景 在写代码时&#xff0c;发现service接口有两个实现类&#xff0c;并且两个实现类中没有对类名重命名&#xff0c;属性注入的时候也没有使用byName或Qualifier&#xff0c;正确情况下会发生多实现报错的问题&#xff0c;以前对这个问题进行解析过。 2、调试过程 我想…

【设计模式】创建型-建造者模式

前言 在面向对象的软件开发中&#xff0c;构建复杂对象时经常会遇到许多挑战。一种常见的解决方案是使用设计模式&#xff0c;其中建造者模式是一个强大而灵活的选择。本文将深入探讨建造者模式的原理、结构、优点以及如何在实际项目中应用它。 一、复杂的对象 public class…

飞凌嵌入式FET3568/3568J-C核心板现已适配OpenHarmony4.1

近日&#xff0c;飞凌嵌入式为FET3568/3568J-C核心板适配了OpenHarmony4.1系统&#xff0c;新系统的加持使核心板在兼容性、稳定性与安全性等方面都得到进一步提升&#xff0c;不仅为FET3568/3568J-C核心板赋予了更强大的功能&#xff0c;也为开发者们提供了更加广阔的创新空间…

每日一练编程题:今天是【接口,多态】

设计程序 : 电脑类的属性USB接口数组 : 有3个usb插口电脑类的功能 : 通过接口插入外设 (u盘,麦克风,键盘等) addUSB(USB usb) { }开机 要求: 电脑开机前,先启动外设关机 要求: 电脑关机前,先关闭外设 外设类(u盘,麦克风,键盘等) 功能 : 启动 关闭 USB接口 定义usb设备的统一…

python多种方式 保留小数点位数(附Demo)

目录 前言1. 字符串格式2. round函数3. Decimal模块4. numpy库5. Demo 前言 在Python中&#xff0c;保留小数点后特定位数可以通过多种方式实现 以下是几种常见的方法&#xff0c;并附上相应的代码示例&#xff1a; 使用字符串格式化&#xff08;String Formatting&#xff…

Ubuntu20.04 Mysql基本操作知识

#Mysql基本知识 运行环境Ubuntu20.04 1.开启mysql服务 sytemctl start mysql不然&#xff0c;命令行进入myql交互行提交命令后&#xff0c;就会出现4200错误。 2.显示所有数据库 SHOW DATABASES;注意复数s&#xff0c;毕竟很多数据库 3.新建数据库test CREATE DATABASE …

【学习笔记】计算机组成原理(九+十)

控制单元的功能 文章目录 控制单元的功能9.1 微操作命令的分析9.1.1 取指周期9.1.2 间址周期9.1.3 执行周期9.1.4 中断周期 9.2 控制单元的功能9.2.1 控制单元的外特性9.2.2 控制信号举例9.2.3 多级时序系统9.2.4 控制方式 控制单元的设计10.1 组合逻辑设计10.1.1 组合逻辑控制…

LabVIEW与Simulink的通信及调用方式

LabVIEW和Simulink可以通过多种方式进行通信和集成&#xff0c;实现数据交互和功能调用。常见的通信方式包括TCP/IP、UDP、共享内存等&#xff0c;此外还可以利用MATLAB Script Node和S-Function等直接调用对方的功能。这些方法使得LabVIEW和Simulink能够协同工作&#xff0c;充…

[Algorithm][动态规划][子序列问题][最长递增子序列的个数][最长数对链]详细讲解

目录 1.最长递增子序列的个数1.题目链接2.算法原理详解3.代码实现 2.最长数对链1.题目链接2.算法原理详解3.代码实现 1.最长递增子序列的个数 1.题目链接 最长递增子序列的个数 2.算法原理详解 注意&#xff1a;本题思路和思维方式及用到的方法很值得考究&#xff0c;个人感…

dubbo复习:(18)服务端Filter

用来在服务响应返回到客户端之前进行额外处理。 一、定义Filter package cn.edu.tju.config;import org.apache.dubbo.rpc.Filter; import org.apache.dubbo.rpc.Result; import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.Invocation; import org.apache.du…

大量path计算优化方案

1.影响path基础属性数据做key缓存&#xff0c;缓存的path应去除坐标变换&#xff0c;归一化。基础属性应满足CAB, BC-A 2.高频path操作以&#xff08;keykey操作&#xff09;做新key缓存。 3.高频修改高级属性&#xff0c;以新key属性变更做新key缓存。 4.key与id做中转映射&am…