kafka实现异步发送_Kafka Producer 异步发送消息居然也会阻塞?

Kafka 一直以来都以高吞吐量的特性而家喻户晓,就在上周,在一个性能监控项目中,需要使用到 Kafka 传输海量消息,在这过程中遇到了一个 Kafka Producer 异步发送消息会被阻塞的问题,导致生产端发送耗时很大。

是的,你没听错,Kafka Producer 异步发送消息也会发生阻塞现象,那究竟是怎么回事呢?

在新版的 Kafka Producer 中,设计了一个消息缓冲池,客户端发送的消息都会被存储到缓冲池中,同时 Producer 启动后还会开启一个 Sender 线程,不断地从缓冲池获取消息并将其发送到 Broker,如下图所示:

这么看来,Kafka 的所有发送,都可以看作是异步发送了,因此在新版的 Kafka Producer 中废弃掉异步发送的方法了,仅保留了一个 send 方法,同时返回一个 Futrue 对象,需要同步等待发送结果,就使用 Futrue#get 方法阻塞获取发送结果。而我在项目中直接调用 send 方法,为何还会发送阻塞呢?

我们在构建 Kafka Producer 时,会有一个自定义缓冲池大小的参数 buffer.memory,默认大小为 32M,因此缓冲池的大小是有限制的,我们不妨想一下,缓冲池内存资源耗尽了会怎么样?

Kafka 源码的注释是非常详细的,RecordAccumulator 类是 Kafka Producer 缓冲池的核心类,而 RecordAccumulator 类就有那么一段注释:

The accumulator uses a bounded amount of memory and append calls will block when that memory is exhausted, unless this behavior is explicitly disabled.

大概的意思是:

当缓冲池的内存块用完后,消息追加调用将会被阻塞,直到有空闲的内存块。

由于性能监控项目每分钟需要发送几百万条消息,只要 Kafka 集群负载很高或者网络稍有波动,Sender 线程从缓冲池捞取消息的速度赶不上客户端发送的速度,就会造成客户端发送被阻塞。

我写个例子让大家直观感受一下被阻塞的现象:

public static void main(String[] args){

Properties properties = new Properties();

properties.put(ProducerConfig.ACKS_CONFIG, "0");

properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");

properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094");

properties.put(ProducerConfig.LINGER_MS_CONFIG, 1000);

properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 1024 * 1024);

properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 5242880);

properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");

KafkaProducer producer = new KafkaProducer<>(properties);

String str = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";

List bytesList = new ArrayList<>();

Random random = new Random();

for (int j = 0; j 

int i1 = random.nextInt(10);

if (i1 == 0) {

i1 = 1;

}

byte[] bytes = new byte[1024 * i1];

for (int i = 0; i 

bytes[i] = (byte) str.charAt(random.nextInt(62));

}

bytesList.add(bytes);

}

while (true) {

long start = System.currentTimeMillis();

producer.send(new ProducerRecord<>("test_topic", bytesList.get(random.nextInt(1023))));

long end = System.currentTimeMillis() - start;

if (end > 100) {

System.out.println("发送耗时:" + end);

}

// Thread.sleep(10);

}

}

以上例子构建了一个  Kafka Producer 对象,同时使用死循环不断地发送消息,这时如果把 Thread.sleep(10);注释掉,则会出现发送耗时很长的现象:

使用 JProfiler 可以查看到分配内存的地方出现了阻塞:

跟踪到源码:

发现在 org.apache.kafka.clients.producer.internals.BufferPool#allocate 方法中,如果判断缓冲池没有空闲的内存了,则会阻塞内存分配,直到有空闲内存为止。

如果不注释 Thread.sleep(10);这段代码则不会发生阻塞现象,打断点到阻塞的地方,也不会被 Debug 到,从现象能够得知,Thread.sleep(10);使得发送消息的频率变低了,此时 Sender 线程发送的速度超过了客户端的发送速度,缓冲池一直处于未满状态,因此不会产生阻塞现象。

除了以上缓冲池内存满了会发生阻塞之外,Kafka Produer 其它情况都不会发生阻塞了吗?非也,其实还有一个地方,也会发生阻塞!

Kafka Producer 通常在第一次发送消息之前,需要获取该主题的元数据 Metadata,Metadata 内容包括了主题相关分区 Leader 所在节点信息、副本所在节点信息、ISR 列表等,Kafka Producer 获取 Metadata 后,便会根据 Metadata 内容将消息发送到指定的分区 Leader 上,整个获取流程大致如下:

如上图所示,Kafka Producer 在发送消息之前,会检查主题的 Metadata 是否需要更新,如果需要更新,则会唤醒 Sender 线程并发送 Metatadata 更新请求,此时 Kafka Producer 主线程则会阻塞等待 Metadata 的更新。

如果 Metadata 一直无法更新,则会导致客户端一直阻塞在那里。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/512632.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

adb zip linux 安装教程,centos下安装adb环境

1.安装adb&#xff0c;步骤如下 wget https://dl.google.com/android/repository/sdk-tools-linux-3859397.zip unzip sdk-tools-linux-3859397.zip mkdir -p /opt/android/sdk/ mv tools /opt/android/sdk/ cd /opt/android/sdk touch ~/.android/repositories.cfg #若不创建此…

SmartNews:基于 Flink 加速 Hive 日表生产的实践

简介&#xff1a; 将 Flink 无缝地集成到以 Airflow 和 Hive 为主的批处理系统的技术挑战和应对方案。 本文介绍了 SmartNews 利用 Flink 加速 Hive 日表的生产&#xff0c;将 Flink 无缝地集成到以 Airflow 和 Hive 为主的批处理系统的实践。详细介绍过程中遇到的技术挑战和应…

今天来聊聊 Redis 的主从复制

作者 | 阿Q来源 | 阿Q说代码今天我们就从配置文件、设计原理、面试真题三个方面来聊一聊 Redis 的主从复制。在 Redis 复制的基础上&#xff0c;使用和配置主从复制非常简单&#xff0c;能使得从 Redis 服务器&#xff08;下文称 replica&#xff09;能精确的复制主 Redis 服务…

基于英特尔® 优化分析包(OAP)的 Spark 性能优化方案

简介&#xff1a; Spark SQL 作为 Spark 用来处理结构化数据的一个基本模块&#xff0c;已经成为多数企业构建大数据应用的重要选择。但是&#xff0c;在大规模连接&#xff08;Join&#xff09;、聚合&#xff08;Aggregate&#xff09;等工作负载下&#xff0c;Spark 性能会面…

表格长度_知道你的成绩单是怎么打印的吗?超长Excel表格1页打印,拯救A4纸

中小学的成绩单&#xff0c;红色的一张榜真实的魔鬼&#xff01;每次都得瞄半小时才找得到自己的全部科目成绩&#xff0c;不知道是不是为了节省A4纸~到了大学我才知道A4纸的珍贵&#xff0c;字小算什么&#xff0c;打印论文恨不得双面打印。要是能八号字打印更好了~到了工作的…

苹果电脑上使用linux环境变量,mac系统下修改环境变量

苹果电脑使用率越来越高&#xff0c;在mac系统下研发&#xff0c;性能要比在windows下快不少&#xff0c;既然要开发&#xff0c;免不了要配置环境变量.下面是学习啦小编收集整理的mac系统下修改环境变量&#xff0c;希望对大家有帮助~~mac系统下修改环境变量的方法工具/原料os…

提升代码质量的方法:领域模型、设计原则、设计模式

简介&#xff1a; 我们可以列举出非常多质量差的代码的表现现象&#xff0c;其中最影响代码质量的两个表现是命名名不副实、逻辑可扩展性差&#xff0c;当一个新人阅读代码时&#xff0c;有时发现方法命名与实际逻辑对不上&#xff0c;这就让人感到非常疑惑&#xff0c;这种现象…

SphereEx 完成近千万美元 Pre-A 轮融资,连接企业数据与应用,构建新一代数据库生态引擎

2022年1月4日&#xff0c;创新型数据库基础软件提供商 SphereEx 宣布完成近千万美元 Pre-A 轮融资&#xff0c;本轮融资由嘉御资本领投&#xff0c;红杉中国种子基金、初心资本、指数创投跟投。指数资本担任独家财务顾问。这是继 2021 年 5 月份以来&#xff0c;SphereEx 完成的…

python大列表分割成小列表_Python有什么方法将列表分割成大小均匀的块?求使用实例...

Python有什么方法将列表分割成大小均匀的块&#xff1f;求使用实例。我有一个任意长度的列表&#xff0c;我需要把它分成大小相等的块并对它进行操作。有一些很明显的方法可以做到这一点&#xff0c;比如保留一个计数器和两个列表&#xff0c;当第二个列表被填满时&#xff0c;…

40年技术发展变革,物联网行业的趋势、现状与挑战

简介&#xff1a; 40年技术发展变革&#xff0c;物联网行业的趋势、现状与挑战 基础设施的完善&#xff0c;推动应用形态不断变迁 我们把过去四十年分为五个重要的技术发展阶段&#xff0c;从时间轴上我们把它切分为&#xff1a;1980 - 2000&#xff0c;2000 - 2005&#xff…

Linux 6安装kde桌面,CentOS 5/6 安装 GNOME 或 KDE 桌面

1、安装 XWindowyum -y groupinstall X Window System2.1、Centos 5.x 安装 GNOME 或 KDE (可择一安装)GNOMEyum -y groupinstall GNOME Desktop EnvironmentKDEyum -y groupinstall KDE (K Desktop Environment)2.2、Centos 6.x 安装 GNOME 或 KDE (可择一安装)GNOMEyum -y gr…

Javascript 机器学习的四个层次

简介&#xff1a; Atwood定律说&#xff0c;凡是可以用Javascript实现的应用&#xff0c;最终都会用Javascript实现掉。作为最热门的机器学习领域&#xff0c;服务端是Python的主场&#xff0c;但是到了手机端呢&#xff1f;Android和iOS里默认都没有Python。但是有浏览器的地方…

运维监控再添新品,F5联合智维数据推出应用质量主动拨测解决方案

一直以来&#xff0c;业务的稳定性和客户体验是企业各个部门都关注的重点&#xff0c;也是企业数字化转型的重要支撑。但在实际的运维环境中&#xff0c;如果只从服务端的视角进行监控&#xff0c;往往会出现数据中心内部监控体系显示设备状态正常&#xff0c;但却收到了客户端…

hikari如何切换数据源_如何使用Spring为HikariCP设置数据源?

你需要在bean配置上编写这个结构(这是你的数据源)&#xff1a;${dataSource.url}${dataSource.username}${dataSource.password}这是我的例子,它正在发挥作用.您只需要将属性放在hibernate.properties上并在之前设置它&#xff1a;classpath:hibernate.propertiesObs.&#xff…

排查指南 | 两个案例学会从埋点排查 iOS 离线包

简介&#xff1a; 首次打开离线包白屏以及报错“-1009”等该如何处理呢&#xff1f; 离线包原理 以一次启动离线包的流程为例&#xff0c;离线包的加载流程分为两种场景&#xff0c;第一种是离线包下载好的场景&#xff0c;流程如图1所示&#xff0c;第二种是离线包没下载好的…

linux下模糊搜索命令,linux命令当前文件夹下面模糊搜索文件

权限管理AppOpsManagerAppOps工具类 import android.annotation.TargetApi; import android.app.AppOpsManager; import android.cont ...Vijos P1062 迎春舞会之交谊舞题目链接:https://vijos.org/p/1062 题意:输入n(n < 1500)个女生左边有多少个男生.每个女生都和她左边最…

360数科 CTO 王继平:金融 IT 变革浪潮下,360数科的技术破局

据艾瑞咨询最新发布的《2021 年中国 Fintech 行业发展洞察报告》显示&#xff0c;2020年&#xff0c;银行、保险与证券机构的累计技术资金投入达 2691.9 亿元&#xff0c;预计至 2024 年将达到5754.5 亿元 目前我们观察到金融机构积极探索隐私计算等技术&#xff0c;推动智能在…

如何设计可靠的灰度方案

简介&#xff1a; 一个较大的业务或系统改动&#xff0c;往往会影响整个产品的用户体验或操作流程。为了控制影响面&#xff0c;可以选取一批特定用户、流程、单据等&#xff0c;只允许这一部分用户或数据按照变更后的新逻辑在系统中流转&#xff0c;而另一部分用户仍然执行变更…

php创建多级栏目_用PHP实现多级树型菜单

用PHP实现多级树型菜单更新时间&#xff1a;2006年10月09日 00:00:00 作者&#xff1a;//树型目录结构模板程序//菜单目录库字段说明&#xff1a;//menu_id 菜单项目 id//menu 菜单名称//menu_grade 菜单等级 1 为主菜单 2 为二级菜单 ........//menu_superior 上一级菜单 id…

linux如何设置mac快捷键,在Ubuntu上使用macOS的快捷键

因为常用机一台Mac&#xff0c;一台Linux&#xff0c;都频繁使用&#xff0c;两个系统不同的快捷键已经让人精神分裂了&#xff01;macOS几乎所有的快捷键都基于command键&#xff0c;全选(cmda)、复制(cmdc)、粘贴(cmdv)、开关标签页(cmdt/w)、切换窗口(cmdtab)、保存(cmds)、…