Pulsar客户端如何控制内存使用

Pulsar客户端如何控制内存使用

一、使用场景

在实际应用中,Pulsar客户端的内存使用控制是一个重要的性能优化点。假设有一个搜索类业务需要记录用户搜索请求,以便后续分析搜索热点和优化搜索效果。以下是一个简化的代码示例:

PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
Producer<byte[]> producer = pulsarClient.newProducer().topic("search-activities").create();
try {MessageId messageId = producer.send(/* message payload here */);log.debug("Search activity messageId={}", messageId);
} catch (Exception e) {log.error("Failed to record search activity", e);
}

在这个场景中,pulsarClientproducer 支持复用,推荐这么做,这里只是为了演示写到了一起。producer.send 是阻塞方式发送消息,线程会卡在这里等待发送结果返回。在现实中,根据消息在实际业务中的需要,可以选择阻塞和非阻塞两种方式。例如,业务上对搜索请求事件并无强依赖,因此使用阻塞方式发消息不太适合,从性能上考虑会加长整体的搜索延迟,从稳定性上考虑会增加搜索执行过程中的不确定性。因此,可以优化为非阻塞方式,将记录搜索事件放到其他线程中完成:

producer.sendAsync(/* message payload here */).whenComplete((msgId, ex) -> {if (ex != null) {log.error("Failed to record search activity", ex);} else {log.debug("Search activity messageId={}", msgId);}
});

在高TPS(例如单实例超过1000QPS)和大消息内容(例如100KB甚至1MB)的情况下,上述代码可能会遇到 MemoryBufferIsFullError 异常:

org.apache.pulsar.client.api.PulsarClientException$MemoryBufferIsFullError: Client memory buffer is full

此外,如果服务与Pulsar的broker之间出现网络波动,或者Pulsar服务内部组件之间出现网络波动,导致整体producer写入延迟升高,亦或是短时间出现大量写入,还可能会遇到 ProducerQueueIsFullError 异常:

org.apache.pulsar.client.api.PulsarClientException$ProducerQueueIsFullError: Producer send queue is full

二、Producer的内存控制

1. 配置项分析

在构建Producer时,ProducerBuilder 中与内存使用有关的配置项包括:

  • maxPendingMessages(int maxPendingMessages):控制producer内部队列中正在发送但还没有接收到broker确认的消息数量。若队列大小超出这个限制,默认行为是抛出 ProducerQueueIsFullError 异常。可以通过设置 blockIfQueueFull=true 调整为阻塞等待队列中空出新的空间。
  • maxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions):控制整个topic所有分区的总pending消息数量。最终到各个分区内部producer取 maxPendingMessages 和 maxPendingMessagesAcrossPartitions / partitions 的较小值。

2. 内存限制配置

在现实应用场景中,不同业务的消息大小差异很大,单纯基于消息数量控制内存使用是不切实际的。因此,在 PIP-74 中,ClientBuilder 提供了一个面向整个client实例统一的内存限制配置:

ClientBuilder memoryLimit(long memoryLimit, SizeUnit unit);

当客户端所有producer中所有pending的消息大小总和超过这个限制时,默认会抛出 MemoryBufferIsFullError 异常。若同时配置了 blockIfQueueFull=true,则当前线程会阻塞等待前面pending的消息发送完成。

3. blockIfQueueFull 配置的使用

blockIfQueueFull 配置是为了限制客户端producer内存使用的同时,让开发者简化处理队列或者内存buffer满了的情况可以继续发送消息。然而,一旦配置为 true,不论是应用发送消息调用的是阻塞的 Producer.send 方法还是非阻塞的 Producer.sendAsync 方法都会出现阻塞等待,这可能会阻塞当前线程,对于某些业务场景是不可接受的。

4. 默认配置

PIP-120 对 2.10.0 以及之后版本的客户端中,默认启用了 memoryLimit 配置,其默认值为 64MB,同时默认禁用了 maxPendingMessagesmaxPendingMessagesAcrossPartitions 配置(默认值修改为0),并将 maxPendingMessagesAcrossPartitions 配置标记为 Deprecated

三、Consumer的内存控制

1. 配置项分析

在构造一个Consumer时,ConsumerBuilder 提供的与内存使用有关的选项包括:

  • receiverQueueSize(int receiverQueueSize):控制每个分区consumer的接收队列大小。
  • maxTotalReceiverQueueSizeAcrossPartitions(int maxTotalReceiverQueueSizeAcrossPartitions):控制所有分区consumer和parent consumer的接收队列总大小。

Pulsar客户端通过预接收队列临时存放broker推送过来的消息,以便应用程序调用 Consumer#receive 或者 Consumer#receiveAsync 方法时直接从内存中返回消息,这是出于消费吞吐的考虑,本质上是一种以空间换取时间的策略。

2. 自动扩展接收队列

在 PIP-74 中提出了一个新的控制Consumer内存使用的方案,即 autoScaledReceiverQueueSizeEnabled

ConsumerBuilder<T> autoScaledReceiverQueueSizeEnabled(boolean enabled);

当启用这个特性后,receiverQueueSize 会从1开始呈2的指数倍增长,直至达到 receiverQueueSize 的限制或达到client的 memoryLimit 限制,其目标是在有限制的内存使用下,达到最大的吞吐效率。

四、番外:ackTimeout 和 ackTimeoutTickTime 的配置

除了Producer和Consumer在生产和消费过程中的内存使用之外,还有一个容易被忽视的点是创建Consumer时 ackTimeoutackTimeoutTickTime 的配置如果不匹配,会消耗较多堆内内存。

ConsumerBuilder<T> ackTimeout(long ackTimeout, TimeUnit timeUnit);
ConsumerBuilder<T> ackTimeoutTickTime(long tickTime, TimeUnit timeUnit);

若Consumer配置了 ackTimeout 并且配置了较大的时间窗口(例如1小时或者更长),应适当调大 ackTimeoutTickTime,这是因为Consumer内部使用了一个简单时间轮的算法对消息的处理时间计时,若 ackTimeout 时间窗口很大,ackTimeoutTickTime 仍然使用其默认值 1s,时间轮本身将会占用大量堆内存空间。具体细节可参考客户端源码 UnAckedMessageTracker.java

五、总结

  1. 使用 sendAsync 非阻塞方法要注意其不能保证消息一定发送成功,特别是开启了 blockIfQueueFull 之后,它会在特定情况下演变成阻塞方法。
  2. 对于同时使用到了Producer和Consumer的应用,推荐创建两个client,分别用来创建Producer和Consumer,做读写分离,避免由于共用 memoryLimit 导致相互影响

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/66860.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Unity高级】一文了解Unity 中的条件编译(附所有指令)

一、Unity中的条件编译 Unity 对 C# 语言的支持包括使用指令&#xff0c;这些指令允许您根据是否定义了某些脚本符号&#xff0c;选择性地包含或排除代码的编译。有关这些指令在 C# 中如何工作的更多信息&#xff0c;请参阅微软关于 C# 预处理器指令 的文档。 &#xff08;一…

苍穹外卖08——(涉及接收日期格式数据、ApachePOI导出报表、sql获取top10菜品数据)

营业额统计 service层 在需要处理空值、与数据库交互或使用集合时&#xff0c;Integer 、Double是更好的选择。 // 导入string工具类 import org.apache.commons.lang.StringUtils; Service // 标记该类为Spring的服务组件 Slf4j // 引入日志功能 public class Repor…

微信小程序订阅消息提醒-云函数

微信小程序消息订阅分2种&#xff1a; 1.一次性订阅&#xff1a;用户订阅一次就可以推送一次&#xff0c;如果需要多次提醒需要多次订阅。 2.长期订阅&#xff1a;只有公共服务领域&#xff0c;如政务、医疗、交通、金融和教育等。‌在用户订阅后&#xff0c;在很长一段时间内…

代码随想录算法训练营第 4 天(链表 2)| 24. 两两交换链表中的节点19.删除链表的倒数第N个节点 -

一、24. 两两交换链表中的节点 题目&#xff1a;24. 两两交换链表中的节点 - 力扣&#xff08;LeetCode&#xff09; 视频&#xff1a;帮你把链表细节学清楚&#xff01; | LeetCode&#xff1a;24. 两两交换链表中的节点_哔哩哔哩_bilibili 讲解&#xff1a;代码随想录 dummy-…

pycharm-pyspark 环境安装

1、环境准备&#xff1a;java、scala、pyspark、python-anaconda、pycharm vi ~/.bash_profile export SCALA_HOME/Users/xunyongsun/Documents/scala-2.13.0 export PATH P A T H : PATH: PATH:SCALA_HOME/bin export SPARK_HOME/Users/xunyongsun/Documents/spark-3.5.4-bin…

【大模型入门指南 07】量化技术浅析

【大模型入门指南】系列文章&#xff1a; 【大模型入门指南 01】深度学习入门【大模型入门指南 02】LLM大模型基础知识【大模型入门指南 03】提示词工程【大模型入门指南 04】Transformer结构【大模型入门指南 05】LLM技术选型【大模型入门指南 06】LLM数据预处理【大模型入门…

3DGabor滤波器实现人脸特征提取

import cv2 import numpy as np# 定义 Gabor 滤波器的参数 kSize 31 # 滤波器核的大小 g_sigma 3.0 # 高斯包络的标准差 g_theta np.pi / 4 # Gabor 函数的方向 g_lambda 10.0 # 正弦波的波长 g_gamma 0.5 # 空间纵横比 g_psi np.pi / 2 # 相位偏移# 生成 Gabor 滤…

【Linux】4.Linux常见指令以及权限理解(2)

文章目录 3. Linux指令3.1 ls指令和rm指令补充3.2 man指令&#xff08;重要&#xff09;3.3cp指令&#xff08;重要&#xff09;输出重定向3.3.1ubuntu20.04如何安装tree 3.4 mv指令&#xff08;重要&#xff09;mv指令更改文件名mv指令更改目录名 如何看待指令指令的重命名3.5…

Vue3初学之Element-plus

用于快速的上手开发&#xff0c;以做项目为导向&#xff0c;所以借用element-plus插件 发现淘宝的镜像有时候也是很慢的&#xff0c;还可以换个 npm config set registry https://registry.npmmirror.com 安装element-plus npm install element-plus --save 查看安装是否成…

实用操作系统学习笔记

第1章 操作系统概述 操作系统基本概念 【基础知识】 操作系统&#xff1a;控制和管理整个计算机系统的硬件和软件资源&#xff0c;合理地组织、调度计算机的工作与资源的分配&#xff0c;进而为用户和其他软件提供方便接口与环境的程序集合。操作系统是计算机系统中最基本的…

k8s部署rocketmq踩坑笔记

给团队部署一个rocketmq4.8.0. k8s上部署的broker&#xff0c;注册到nameserver上是自己的pod ip&#xff0c;导致本机连接到的broker的pod ip&#xff0c;这个ip k8s集群外的机器是无法联通的。 nameserver上注册的是这个pod ipv4 尝试将broker的配置brokerIP1修改为注册到na…

UI自动化测试保姆级教程①

欢迎来到阿妮莫的学习小屋慢也好&#xff0c;步子小也好&#xff0c;在往前走就好 目录 自动化测试 简介 作用 分类 优缺点 优点 缺点(误区) UI自动化测试 自动化测试使用场景 自动化测试实现时间 Selenium框架 特点 Web自动化测试环境部署 Selenium包安装 浏览…

【2024年华为OD机试】 (A卷,100分)- 总最快检测效率(Java JS PythonC/C++)

一、问题描述 题目描述 在系统、网络均正常的情况下组织核酸采样员和志愿者对人群进行核酸检测筛查。 每名采样员的效率不同&#xff0c;采样效率为 N 人/小时。由于外界变化&#xff0c;采样员的效率会以 M 人/小时为粒度发生变化&#xff0c;M 为采样效率浮动粒度&#xf…

离线录制激光雷达数据进行建图

目前有一个2D激光雷达&#xff0c;自己控制小车运行一段时间&#xff0c;离线获取到激光雷达数据后运行如下代码进行离线建图。 roslaunch cartographer_ros demo_revo_lds.launch bag_filename:/home/firefly/AutoCar/data/rplidar_s2/2025-01-08-02-08-33.bag实际效果如下 d…

蓝桥杯嵌入式速通(1)

1.工程准备 创建一文件夹存放自己的代码&#xff0c;并在mdk中include上文件夹地址 把所有自身代码的头文件都放在headfile头文件中&#xff0c;之后只需要在新的文件中引用headfile即可 headfile中先提前可加入 #include "stdio.h" #include "string.h"…

QT跨平台应用程序开发框架(1)—— 环境搭建

目录 一&#xff0c;关于QT 二&#xff0c;关于应用程序框架 三&#xff0c;环境搭建 3.1 预备 3.2 下载Qt SDK 3.3 安装Qt SDK 3.4 配置环境变量 3.5 认识一些重要工具 四&#xff0c;Qt Creator 的基本使用 4.1 创建项目 4.2 代码解释 一&#xff0c;关于QT 互联网…

Open FPV VTX开源之第一次出图

Open FPV VTX开源之第一次出图 1. 源由2. 连线2.1 飞控2.2 调试 3. serial3.1 启动log - uboot3.2 登录版本 - linux3.3 获取有线IP 4. ssh - linux5. PixelPilot出图6. 总结7. 参考资料 1. 源由 在《Open FPV VTX开源之硬件规格及组成》章节中&#xff0c;已经基本介绍了产品…

基于高斯混合模型的数据分析及其延伸应用(具体代码分析)

一、代码分析 &#xff08;一&#xff09;清除工作区和命令行窗口 clear; clc;clear;&#xff1a;该命令用于清除 MATLAB 工作区中的所有变量&#xff0c;确保代码运行环境的清洁&#xff0c;避免之前遗留的变量对当前代码运行产生干扰。例如&#xff0c;如果之前运行的代码中…

PostgreSQL技术内幕22:vacuum full 和 vacuum

文章目录 0.简介1.概念及使用方式2.工作原理2.1 主要功能2.2 清理流程2.3 防止事务id环绕说明 3.使用建议 0.简介 在之前介绍MVCC文章中介绍过常见的MVCC实现的两种方式&#xff0c;一种是将旧数据放到回滚段&#xff0c;一种是直接生成一条新数据&#xff08;对于删除是不删除…

【面试】程序员 简历

一、简历整体结构 完整简历包含基本信息、教育背景、求职意向、工作经历、职业技能、项目经历、个人优势和个人荣誉八个部分。编写时&#xff0c;前几部分在保证真实的基础上可适当美化&#xff1b;个人优势和荣誉描述要突出难点亮点且避免夸张&#xff0c;可写入如马拉松参赛、…