springboot kafka 提高拉取数量

文章目录

  • 背景
  • 问题复现
  • 解决问题
  • 原理分析
    • fetch.min.bytes
    • fetch.max.wait.ms
    • 源码分析
      • ReplicaManager#fetchMessages

背景

开发过程中,使用kafka批量消费,发现拉取数量一直为1,如何提高批量拉取数量,记录下踩坑记录。

问题复现

  • kafka maven依赖
		<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.11</version></dependency>
  • 配置消费者
@Configuration
public class KafkaBlukConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.consumer.max-poll-records:30}")private Integer maxPollRecords;@Value("${spring.kafka.consumer.groupId:group1}")private String group;/*** 消费者配置信息*/@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");props.put(ConsumerConfig.GROUP_ID_CONFIG, group);props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}/*** 消费者批量⼯程*/@Beanpublic KafkaListenerContainerFactory<?> batchFactory() {ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));//设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIGfactory.setBatchListener(true);return factory;}
}
  • 消费端代码

@Component
public class KafkaBatchConsumer {private static final Logger log = LoggerFactory.getLogger(KafkaBatchConsumer.class);@KafkaListener(id = "consumer1", topics = "topic2", containerFactory = "batchFactory")public void consume(List<ConsumerRecord<String, String>> record) throws Exception {log.info("KafkaBatchConsumer recode size : {} ", record.size());}}
  • 使用yml配置生产者
spring:kafka:bootstrap-servers: 192.168.56.112:9092producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer
  • 使用生产者发送消息
@Slf4j
@RestController
@RequestMapping("/kafka")
public class KafkaProducer {// 自定义的主题名称public static final String TOPIC_NAME = "topic2";private KafkaTemplate<String, String> kafkaTemplate;/*** http://localhost:8080/kafka/send?msg=a* @param msg*/@RequestMapping("/send")public String send(@RequestParam("msg") String msg) {log.info("准备发送消息为:{}", msg);// 1.发送消息ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(TOPIC_NAME, msg);future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {@Overridepublic void onFailure(Throwable throwable) {// 2.发送失败的处理log.error("生产者 发送消息失败:" + throwable.getMessage());}@Overridepublic void onSuccess(SendResult<String, String> stringObjectSendResult) {// 3.发送成功的处理log.info("生产者 发送消息成功:" + stringObjectSendResult.toString());}});return "接口调用成功";}
}
  • 发送消息,观察消费者批量消费情况
http://localhost:9999/kafka/send?msg=a

多次调用发现如下:

在这里插入图片描述
发现拉取消息的大小始终为1

解决问题

  • 添加下面两行代码
@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");props.put(ConsumerConfig.GROUP_ID_CONFIG, group);props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);################ 添加下面两行 ###########props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024 * 1024);props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 3000);######################################props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}
  • 再次发送消息,观察消费情况

在这里插入图片描述
可以看到批量消费成功。

原理分析

fetch.min.bytes

消费者从服务器获取记录的最小字节数,broker 收到消费者拉取数据的请求的时候,如果可用数据量小于设置的值,那么 broker 将会等待有足够可用的数据的时候才返回给消费者,这样可以降低消费者和 broker 的工作负载。如果消费者的数量比较多,把该属性的值设置得大一点可以降低 broker 的工作负载。

fetch.max.wait.ms

如果 Kafka 仅仅参考 fetch.min.bytes 参数的要求,那么有可能会因为获取不到足够大小的消息而一直阻塞等待,从而无法发送响应给 Consumer,显然这是不合理的。fetch.max.wait.ms 参数用于指定 等待 FetchResponse 的最长时间,服务端根据此时间决定何时进行响应,默认值为 500(ms)。如果 Kafka 中没有足够多的消息而满足不了 fetch.min.bytes 参数的要求,那么最终会等待 500ms 再响应消费者请求。这个参数的设定需要参考 Consumer 与 Kafka 之间的延迟大小,如果业务应用对延迟敏感,那么可以适当调小这个参数。

源码分析

ReplicaManager#fetchMessages

/*** 能够立即返回给客户端的4种情况* 1. fetch请求没有大于0的wait时间,参考fetch.max.wait.ms设置* 2. fetch请求要拉取的分区为空* 3. 根据fetch.min.bytes的设置,有足够的数据返回* 4. 出现异常*/if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData) {// fetchPartitionData是一个TopicPartition -> FetchPartitionData 的map集合val fetchPartitionData = logReadResults.map { case (tp, result) =>tp -> FetchPartitionData(result.error, result.highWatermark, result.leaderLogStartOffset, result.info.records,result.lastStableOffset, result.info.abortedTransactions)}// 调用响应回调函数responseCallback(fetchPartitionData)}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/20118.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

攻防对抗少丢分,爱加密帮您筑起第二防线

应用程序通常处理和存储大量的敏感数据&#xff0c;如用户个人信息、财务信息、商业数据、国家数据等&#xff0c;用户量越大的应用程序&#xff0c;其需要存储和保护的用户数据越多。因此应用层长期是攻击方的核心目标&#xff0c;传统应用安全依靠防火墙(FireWall)、入侵检测…

1.7 协议层次和服务模型

协议层次 网络是一个复杂的系统!  网络功能繁杂&#xff1a;数字信号的物理信 号承载、点到点、路由、rdt、进程区分、应用等 现实来看&#xff0c;网络的许多构成元素和设备:  主机  路由器  各种媒体的链路  应用  协议  硬件, 软件 Q:如何组织和实现这个…

Linux上实现ssh免密通讯

Linux上实现ssh免密通讯 1.SSH互信原理2.SSH所需的RPM包3.两台机器实现互信4.常见问题及处理 1.SSH互信原理 SSH&#xff08;Secure Shell&#xff09;是一种安全的传输协议&#xff0c;它能让Linux系统中的服务器和客户端之间进行安全可靠的通讯。 SSH使用加密的传输方式&…

iOS组件化 方案 实现

iOS组件化 组件化的原因现在流行的组件化方案方案一、url-block &#xff08;基于 URL Router&#xff09;方案二、protocol调用方式解读 方案三、target-action调用方式解读 gitHub代码链接参考 组件化的原因 模块间解耦模块重用提高团队协作开发效率单元测试 当项目App处于…

网络原理-四

一、续 当窗口大小为0,意味着缓冲区满了,此时发送方,就因该暂停发送,发送方会周期性的除法 " 窗口探测包 " ,并不携带载荷,这样的包对于业务不产生影响,只是为了触发ACK,一旦查询出来的结果是非0,缓冲区右有空间了,发送方就可以继续发送. 二、拥塞控制 要限制发送方…

一步一步写线程之十三队列间的消息通知

一、线程和分布式的通信 随着技术的不断发展&#xff0c;多线程和分布式通信愈发的普及。那么在这种场景下的如何进行数据的通信&#xff0c;便成为了一个非常典型的问题。无论是多线程还是分布式&#xff0c;其实其抽象出来的通信机制都是类似的。或者说换句话&#xff0c;多…

java检测字符串是否包含数字和字母

在Java中&#xff0c;要检测一个字符串是否同时包含数字和字母&#xff0c;我们可以使用正则表达式&#xff08;regex&#xff09;或者通过遍历字符串并检查每个字符来实现。以下是两种方法的详细代码示例&#xff1a; 1.方法一&#xff1a;使用正则表达式 import java.util.…

【AI+知识库问答】沉浸式体验了解 AI知识库问答fastGPT

之前写过一篇文章 【AI本地知识库】个人整理的几种常见本地知识库技术方案 &#xff0c; 由于当时主要是针对AI本地知识库&#xff0c; 所以没列fastGPT。 最近经常刷到fastGPT&#xff0c;这里单独水一篇。 FastGPT 是一个基于 LLM 大语言模型的知识库问答系统&#xff0c;…

Github 2024-06-01 开源项目日报Top10

根据Github Trendings的统计,今日(2024-06-01统计)共有10个项目上榜。根据开发语言中项目的数量,汇总情况如下: 开发语言项目数量Python项目5Jupyter Notebook项目2TypeScript项目1Go项目1Shell项目1Lua项目1Kong:云原生API网关与AI能力 创建周期:3482 天开发语言:Lua协议…

如何确保绩效目标执行到位?

很多企业在实施绩效过程中&#xff0c;尽管制定好了绩效目标&#xff0c;但是没有执行下去&#xff0c;管理者将原因归咎于“员工低效”、“体制机制”等问题&#xff0c;那么在人力资源管理方面&#xff0c;企业应该如何确保制定的绩效目标执行到位&#xff1f;如何提高低效能…

云原生架构相关技术_4.服务网格

1.技术特点 服务网格&#xff08;ServiceMesh&#xff09;是分布式应用在微服务软件架构之上发展起来的新技术&#xff0c;旨在将那些微服务间的连接、安全、流量控制和可观测等通用功能下沉为平台基础设施&#xff0c;实现应用与平台基础设施的解耦。这个解耦意味着开发者无需…

React@16.x(14)context 举例 - Form 表单

目录 1&#xff0c;目标2&#xff0c;实现2.1&#xff0c;index.js2.2&#xff0c;context.js2.2&#xff0c;Form.Input2.3&#xff0c;Form.Button 3&#xff0c;使用 1&#xff0c;目标 上篇文章说到&#xff0c;context 上下文一般用于第3方组件库&#xff0c;因为使用场景…

Chisel入门——在windows下vscode搭建|部署Scala2.13.3开发环境|用Chisel点亮FPGA小灯等实验

文章目录 前言一、vscode搭建scala开发环境1.1 安装Scala官方插件1.2 创建hello_world.scala文件1.3 确认java的版本(博主使用的是1.8)1.4 下载Scala Windows版本的二进制文件1.5 配置环境变量1.6 交互模式测试一下1.7 vscode运行scala 二、windows安装sbt2.1 下载sbt2.2 设置环…

函数递归及具体例子(持续更新)

递归就是函数自己调用自己 求n的阶乘 n! n * (n - 1)! 直到n为1或者0的时候为止 举个例子 int Fun(int n) {if (n < 0){return 1;}else{return n * Fun(n - 1);} }int main() {int n 0;scanf("%d", &n);int ret Fun(n);printf("%d\n", ret…

安装Kubernetes v3 ----以docker的方式部署

以docker的方式部署 docker run -d \ --restartunless-stopped \ --namekuboard \ -p 80:80/tcp \ -p 10081:10081/tcp \ -e KUBOARD_ENDPOINT"http://192.168.136.55:80" \ -e KUBOARD_AGENT_SERVER_TCP_PORT"10081" \ -v /root/kuboard-data:/data \ e…

springboot中抽象类无法注入到ioc容器

1、背景 在写代码时&#xff0c;发现service接口有两个实现类&#xff0c;并且两个实现类中没有对类名重命名&#xff0c;属性注入的时候也没有使用byName或Qualifier&#xff0c;正确情况下会发生多实现报错的问题&#xff0c;以前对这个问题进行解析过。 2、调试过程 我想…

【设计模式】创建型-建造者模式

前言 在面向对象的软件开发中&#xff0c;构建复杂对象时经常会遇到许多挑战。一种常见的解决方案是使用设计模式&#xff0c;其中建造者模式是一个强大而灵活的选择。本文将深入探讨建造者模式的原理、结构、优点以及如何在实际项目中应用它。 一、复杂的对象 public class…

飞凌嵌入式FET3568/3568J-C核心板现已适配OpenHarmony4.1

近日&#xff0c;飞凌嵌入式为FET3568/3568J-C核心板适配了OpenHarmony4.1系统&#xff0c;新系统的加持使核心板在兼容性、稳定性与安全性等方面都得到进一步提升&#xff0c;不仅为FET3568/3568J-C核心板赋予了更强大的功能&#xff0c;也为开发者们提供了更加广阔的创新空间…

每日一练编程题:今天是【接口,多态】

设计程序 : 电脑类的属性USB接口数组 : 有3个usb插口电脑类的功能 : 通过接口插入外设 (u盘,麦克风,键盘等) addUSB(USB usb) { }开机 要求: 电脑开机前,先启动外设关机 要求: 电脑关机前,先关闭外设 外设类(u盘,麦克风,键盘等) 功能 : 启动 关闭 USB接口 定义usb设备的统一…

python多种方式 保留小数点位数(附Demo)

目录 前言1. 字符串格式2. round函数3. Decimal模块4. numpy库5. Demo 前言 在Python中&#xff0c;保留小数点后特定位数可以通过多种方式实现 以下是几种常见的方法&#xff0c;并附上相应的代码示例&#xff1a; 使用字符串格式化&#xff08;String Formatting&#xff…