kafka消费数据太慢了,给优化下

原代码

public class KafkaConsumerDemo {public static void main(String[] args) {int numConsumers = 5; // 增加消费者的数量for (int i = 0; i < numConsumers; i++) {new Thread(new KafkaConsumerThread()).start();}}static class KafkaConsumerThread implements Runnable {@Overridepublic void run() {// 配置消费者属性Properties props = new Properties();props.put("bootstrap.servers", "114.15.78.14:9092");props.put("group.id", "test-group");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 调整消费者配置props.put("fetch.min.bytes", "1024"); // 最小获取1KB的数据props.put("fetch.max.wait.ms", "500"); // 最大等待500ms// 创建消费者实例KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(Collections.singletonList("test-topic"));// 消费消息; 如果消息处理逻辑允许,可以批量处理消息,而不是逐条处理while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));if (!records.isEmpty()) {for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}// 批量提交偏移量consumer.commitSync();}}}}
}  

在 Kafka 消费者代码中,有几个地方可以进行优化以提高消费速度:

增加消费者线程:你已经增加了消费者的数量,这是一个很好的开始。确保每个消费者线程都在不同的分区上消费,以实现并行处理。

调整 fetch.min.bytes 和 fetch.max.wait.ms:这两个参数控制了消费者从 Kafka
服务器获取数据的最小字节数和最大等待时间。如果你的消息较小,增加 fetch.min.bytes
可能会导致消费者等待更长时间以收集足够的数据,这可能会降低消费速度。如果你的消息较大,可以适当增加这个值。

异步提交偏移量:你当前使用的是 consumer.commitSync() 进行同步提交偏移量,这会阻塞当前线程直到提交完成。可以考虑使用
consumer.commitAsync() 异步提交,这样可以不阻塞消费线程。

增加 max.poll.records:poll
方法有一个参数可以设置一次拉取的最大记录数。如果设置得太低,可能会导致频繁的网络请求,增加延迟。

合理设置 auto.commit.interval.ms:自动提交偏移量的间隔设置得太短可能会导致不必要的性能开销,可以适当增加这个值。

使用多线程处理消息:如果你的消息处理逻辑是 CPU 密集型的,可以考虑使用线程池来并行处理消息。

监控和调优:使用 Kafka 的监控工具来观察消费者的行为,找出瓶颈并进行调优。

下面是根据上述建议优化后的代码示例:

public class KafkaConsumerDemo {public static void main(String[] args) {int numConsumers = 5; // 增加消费者的数量for (int i = 0; i < numConsumers; i++) {new Thread(new KafkaConsumerThread()).start();}}static class KafkaConsumerThread implements Runnable {@Overridepublic void run() {// 配置消费者属性Properties props = new Properties();props.put("bootstrap.servers", "114.15.78.14:9092");props.put("group.id", "test-group");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "5000"); // 增加自动提交偏移量的间隔props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 调整消费者配置props.put("fetch.min.bytes", "1"); // 减少最小获取字节数props.put("fetch.max.wait.ms", "100"); // 减少最大等待时间props.put("max.poll.records", "500"); // 增加一次拉取的最大记录数// 创建消费者实例KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(Collections.singletonList("test-topic"));// 消费消息while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));if (!records.isEmpty()) {processRecords(records); // 异步处理消息consumer.commitAsync(); // 异步提交偏移量}}}private void processRecords(ConsumerRecords<String, String> records) {// 异步处理消息的逻辑for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());// 这里可以添加消息处理逻辑,例如使用线程池并行处理}}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/60767.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【linux】如何扩展磁盘容量(VMware虚拟机)-转载

如何扩展磁盘容量(VMware虚拟机) 一、前置准备工作 扩展虚拟机磁盘前&#xff0c;需要先把虚拟机关机才能进行扩展磁盘操作 1.选择虚拟机设置&#xff0c;如下图所示 2.输入你想扩展的磁盘容量&#xff0c;以本次实操为例&#xff0c;我这里输入的30G&#xff08;具体按照实…

Python sys模块介绍

在Python中&#xff0c; sys模块是一个非常重要的内置模块&#xff0c;它提供了一系列与Python解释器及其运行环境交互的函数和变量。这个模块对于理解和控制Python程序的运行方式至关重要。 一、sys模块的主要功能 1. 命令行参数处理 sys.argv是一个列表&#xff0c;包含了命…

SHA-256哈希函数

SHA-256哈希函数在许多其他领域也有广泛的应用。以下是一些常见的应用场景: 数据完整性验证 文件校验: 通过计算文件的SHA-256哈希值,可以验证文件在传输或存储过程中是否被篡改。 数字签名: 在数字签名中,哈希值用于生成和验证签名,确保数据的完整性和来源的可信性。安全认…

浅谈React的虚拟DOM

React的虚拟DOM&#xff1a;揭秘高效渲染的秘密 在React中&#xff0c;虚拟DOM&#xff08;Virtual DOM&#xff09;是一个核心概念&#xff0c;它是React能够提供高效渲染和更新的关键。虚拟DOM是一个轻量级的JavaScript对象&#xff0c;表示真实的DOM树。通过使用虚拟DOM&am…

Spark RDD中常用聚合算子源码层面的对比分析

在 Spark RDD 中&#xff0c;groupByKey、reduceByKey、foldByKey 和 aggregateByKey 是常用的聚合算子&#xff0c;适用于按键进行数据分组和聚合。它们的实现方式各不相同&#xff0c;涉及底层调用的函数也有区别。以下是对这些算子在源码层面的分析&#xff0c;以及每个算子…

黑盒测试案例设计方法的使用(1)

黑盒测试用例的设计是确保软件质量的关键步骤之一。 一、等价类划分法 定义&#xff1a;把所有可能的输入数据&#xff0c;即程序的输入域划分成若干部分&#xff08;子集&#xff09;&#xff0c;然后从每一个子集中选取少数具有代表性的数据作为测试用例。 步骤&#xff1a…

内网渗透-搭建域环境

声明 笔记的只是方便各位师傅学习知识&#xff0c;以下网站只涉及学习内容&#xff0c;其他的都与本人无关&#xff0c;切莫逾越法律红线&#xff0c;否则后果自负。 ✍&#x1f3fb;作者简介&#xff1a;致力于网络安全领域&#xff0c;目前作为一名学习者&#xff0c;很荣幸成…

【Qt】使用QString的toLocal8Bit()导致的问题

问题 使用Qt发送一个Http post请求的时候&#xff0c;服务一直返回错误和失败信息。同样的url以及post参数&#xff0c;复制黏贴到postman里就可以发送成功。就感觉很神奇。 原因 最后排查出原因是因为参数中含有汉字而导致的编码问题。 在拼接post参数时&#xff0c;使用了…

H.265流媒体播放器EasyPlayer.js H.264/H.265播放器chrome无法访问更私有的地址是什么原因

EasyPlayer.js H5播放器&#xff0c;是一款能够同时支持HTTP、HTTP-FLV、HLS&#xff08;m3u8&#xff09;、WS、WEBRTC、FMP4视频直播与视频点播等多种协议&#xff0c;支持H.264、H.265、AAC、G711A、MP3等多种音视频编码格式&#xff0c;支持MSE、WASM、WebCodec等多种解码方…

【HarmonyOS】Hdc server port XXXX has been used.Configure environment variable

【HarmonyOS】Hdc server port XXXX has been used.Configure environment variable 一、 问题背景&#xff1a; 无法调试debug应用&#xff0c;IDE右下角显示该弹窗&#xff1a; Hdc server port XXXX has been used.Configure environment variable ‘OHOS_HDC_SERVER_POR…

AdaBoost 二分类问题

代码功能 生成数据集&#xff1a; 使用 make_classification 创建一个模拟分类问题的数据集。 数据集包含 10 个特征&#xff0c;其中 5 个是有用特征&#xff0c;2 个是冗余特征。 数据集划分&#xff1a; 将数据分为训练集&#xff08;70%&#xff09;和测试集&#xff08;3…

java8之Stream流

文章目录 Stream流的定义和特性‌定义特性‌中间操作‌终结操作‌ 生成流forEachmapfilterlimitsorted并行&#xff08;parallel&#xff09;程序Collectors Stream流的定义和特性‌ 定义 Stream是Java 8 API添加的一个新的抽象&#xff0c;用于以声明性方式处理数据集合。它…

Java-Redisson分布式锁+自定义注解+AOP的方式来实现后台防止重复请求扩展

1. 添加依赖 首先,在项目的pom.xml文件中添加Redisson和Spring AOP的相关依赖: <dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.16.8</version> </dependency> <dependency…

初识Linux · 信号保存

目录 前言&#xff1a; Block pending handler表 信号保存 前言&#xff1a; 前文我们已经介绍了信号产生&#xff0c;在时间的学习线上&#xff0c;信号的学习分为预备知识&#xff0c;信号产生&#xff0c;信号保存&#xff0c;信号处理&#xff0c;本文我们学习信号保存…

SystemVerilog学习——构造函数new

一、概述 在 SystemVerilog 中&#xff0c;new 是一个构造函数&#xff0c;用于创建类的实例&#xff08;即对象&#xff09;。它在面向对象编程&#xff08;OOP&#xff09;中起着重要作用&#xff0c;负责实例化一个对象并进行初始化。与传统编程语言&#xff08;如 C 或 Jav…

01 最舒适的python开发环境

0 前言 我自己经过尝试&#xff0c;总结出python3开发环境的最舒适方式。 python3安装创建虚拟环境 venvjupyter notebook 笔记本安装vscode插件(Python, Pylance, Jupyter) 1 python3安装 ubuntu系统下安装最新版本的python3 sudo apt update sudo apt install python32 …

vue3:computed

vue3:computed 扫码或者点击文字后台提问 computed 支持选项式写法 和 函数式写法 1.选项式写法 支持一个对象传入get函数以及set函数自定义操作 2.函数式写法 只能支持一个getter函数不允许修改值的 基础示例 <template><div><div>姓&#xff1a;<i…

【弱监督视频异常检测】2024-ESWA-基于扩散的弱监督视频异常检测常态预训练

2024-ESWA-Diffusion-based normality pre-training for weakly supervised video anomaly detection 基于扩散的弱监督视频异常检测常态预训练摘要1. 引言2. 相关工作3. 方法论3.1. 使用扩散自动编码器进行常态学习3.2. 全局-局部特征编码器3.2.1 局部块3.2.2 全局块3.2.3 协同…

124. 二叉树中的最大路径和【 力扣(LeetCode) 】

文章目录 零、原题链接一、题目描述二、测试用例三、解题思路四、参考代码 零、原题链接 124. 二叉树中的最大路径和 一、题目描述 二叉树中的 路径 被定义为一条节点序列&#xff0c;序列中每对相邻节点之间都存在一条边。同一个节点在一条路径序列中 至多出现一次 。该路径…

跳房子(弱化版)

题目描述 跳房子&#xff0c;也叫跳飞机&#xff0c;是一种世界性的儿童游戏&#xff0c;也是中国民间传统的体育游戏之一。 跳房子的游戏规则如下&#xff1a; 在地面上确定一个起点&#xff0c;然后在起点右侧画 n 个格子&#xff0c;这些格子都在同一条直线上。每个格子内…