Pulsar客户端如何控制内存使用

Pulsar客户端如何控制内存使用

一、使用场景

在实际应用中,Pulsar客户端的内存使用控制是一个重要的性能优化点。假设有一个搜索类业务需要记录用户搜索请求,以便后续分析搜索热点和优化搜索效果。以下是一个简化的代码示例:

PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
Producer<byte[]> producer = pulsarClient.newProducer().topic("search-activities").create();
try {MessageId messageId = producer.send(/* message payload here */);log.debug("Search activity messageId={}", messageId);
} catch (Exception e) {log.error("Failed to record search activity", e);
}

在这个场景中,pulsarClientproducer 支持复用,推荐这么做,这里只是为了演示写到了一起。producer.send 是阻塞方式发送消息,线程会卡在这里等待发送结果返回。在现实中,根据消息在实际业务中的需要,可以选择阻塞和非阻塞两种方式。例如,业务上对搜索请求事件并无强依赖,因此使用阻塞方式发消息不太适合,从性能上考虑会加长整体的搜索延迟,从稳定性上考虑会增加搜索执行过程中的不确定性。因此,可以优化为非阻塞方式,将记录搜索事件放到其他线程中完成:

producer.sendAsync(/* message payload here */).whenComplete((msgId, ex) -> {if (ex != null) {log.error("Failed to record search activity", ex);} else {log.debug("Search activity messageId={}", msgId);}
});

在高TPS(例如单实例超过1000QPS)和大消息内容(例如100KB甚至1MB)的情况下,上述代码可能会遇到 MemoryBufferIsFullError 异常:

org.apache.pulsar.client.api.PulsarClientException$MemoryBufferIsFullError: Client memory buffer is full

此外,如果服务与Pulsar的broker之间出现网络波动,或者Pulsar服务内部组件之间出现网络波动,导致整体producer写入延迟升高,亦或是短时间出现大量写入,还可能会遇到 ProducerQueueIsFullError 异常:

org.apache.pulsar.client.api.PulsarClientException$ProducerQueueIsFullError: Producer send queue is full

二、Producer的内存控制

1. 配置项分析

在构建Producer时,ProducerBuilder 中与内存使用有关的配置项包括:

  • maxPendingMessages(int maxPendingMessages):控制producer内部队列中正在发送但还没有接收到broker确认的消息数量。若队列大小超出这个限制,默认行为是抛出 ProducerQueueIsFullError 异常。可以通过设置 blockIfQueueFull=true 调整为阻塞等待队列中空出新的空间。
  • maxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions):控制整个topic所有分区的总pending消息数量。最终到各个分区内部producer取 maxPendingMessages 和 maxPendingMessagesAcrossPartitions / partitions 的较小值。

2. 内存限制配置

在现实应用场景中,不同业务的消息大小差异很大,单纯基于消息数量控制内存使用是不切实际的。因此,在 PIP-74 中,ClientBuilder 提供了一个面向整个client实例统一的内存限制配置:

ClientBuilder memoryLimit(long memoryLimit, SizeUnit unit);

当客户端所有producer中所有pending的消息大小总和超过这个限制时,默认会抛出 MemoryBufferIsFullError 异常。若同时配置了 blockIfQueueFull=true,则当前线程会阻塞等待前面pending的消息发送完成。

3. blockIfQueueFull 配置的使用

blockIfQueueFull 配置是为了限制客户端producer内存使用的同时,让开发者简化处理队列或者内存buffer满了的情况可以继续发送消息。然而,一旦配置为 true,不论是应用发送消息调用的是阻塞的 Producer.send 方法还是非阻塞的 Producer.sendAsync 方法都会出现阻塞等待,这可能会阻塞当前线程,对于某些业务场景是不可接受的。

4. 默认配置

PIP-120 对 2.10.0 以及之后版本的客户端中,默认启用了 memoryLimit 配置,其默认值为 64MB,同时默认禁用了 maxPendingMessagesmaxPendingMessagesAcrossPartitions 配置(默认值修改为0),并将 maxPendingMessagesAcrossPartitions 配置标记为 Deprecated

三、Consumer的内存控制

1. 配置项分析

在构造一个Consumer时,ConsumerBuilder 提供的与内存使用有关的选项包括:

  • receiverQueueSize(int receiverQueueSize):控制每个分区consumer的接收队列大小。
  • maxTotalReceiverQueueSizeAcrossPartitions(int maxTotalReceiverQueueSizeAcrossPartitions):控制所有分区consumer和parent consumer的接收队列总大小。

Pulsar客户端通过预接收队列临时存放broker推送过来的消息,以便应用程序调用 Consumer#receive 或者 Consumer#receiveAsync 方法时直接从内存中返回消息,这是出于消费吞吐的考虑,本质上是一种以空间换取时间的策略。

2. 自动扩展接收队列

在 PIP-74 中提出了一个新的控制Consumer内存使用的方案,即 autoScaledReceiverQueueSizeEnabled

ConsumerBuilder<T> autoScaledReceiverQueueSizeEnabled(boolean enabled);

当启用这个特性后,receiverQueueSize 会从1开始呈2的指数倍增长,直至达到 receiverQueueSize 的限制或达到client的 memoryLimit 限制,其目标是在有限制的内存使用下,达到最大的吞吐效率。

四、番外:ackTimeout 和 ackTimeoutTickTime 的配置

除了Producer和Consumer在生产和消费过程中的内存使用之外,还有一个容易被忽视的点是创建Consumer时 ackTimeoutackTimeoutTickTime 的配置如果不匹配,会消耗较多堆内内存。

ConsumerBuilder<T> ackTimeout(long ackTimeout, TimeUnit timeUnit);
ConsumerBuilder<T> ackTimeoutTickTime(long tickTime, TimeUnit timeUnit);

若Consumer配置了 ackTimeout 并且配置了较大的时间窗口(例如1小时或者更长),应适当调大 ackTimeoutTickTime,这是因为Consumer内部使用了一个简单时间轮的算法对消息的处理时间计时,若 ackTimeout 时间窗口很大,ackTimeoutTickTime 仍然使用其默认值 1s,时间轮本身将会占用大量堆内存空间。具体细节可参考客户端源码 UnAckedMessageTracker.java

五、总结

  1. 使用 sendAsync 非阻塞方法要注意其不能保证消息一定发送成功,特别是开启了 blockIfQueueFull 之后,它会在特定情况下演变成阻塞方法。
  2. 对于同时使用到了Producer和Consumer的应用,推荐创建两个client,分别用来创建Producer和Consumer,做读写分离,避免由于共用 memoryLimit 导致相互影响

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/66860.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Unity高级】一文了解Unity 中的条件编译(附所有指令)

一、Unity中的条件编译 Unity 对 C# 语言的支持包括使用指令&#xff0c;这些指令允许您根据是否定义了某些脚本符号&#xff0c;选择性地包含或排除代码的编译。有关这些指令在 C# 中如何工作的更多信息&#xff0c;请参阅微软关于 C# 预处理器指令 的文档。 &#xff08;一…

苍穹外卖08——(涉及接收日期格式数据、ApachePOI导出报表、sql获取top10菜品数据)

营业额统计 service层 在需要处理空值、与数据库交互或使用集合时&#xff0c;Integer 、Double是更好的选择。 // 导入string工具类 import org.apache.commons.lang.StringUtils; Service // 标记该类为Spring的服务组件 Slf4j // 引入日志功能 public class Repor…

微信小程序订阅消息提醒-云函数

微信小程序消息订阅分2种&#xff1a; 1.一次性订阅&#xff1a;用户订阅一次就可以推送一次&#xff0c;如果需要多次提醒需要多次订阅。 2.长期订阅&#xff1a;只有公共服务领域&#xff0c;如政务、医疗、交通、金融和教育等。‌在用户订阅后&#xff0c;在很长一段时间内…

青少年编程与数学 02-006 前端开发框架VUE 22课题、状态管理

青少年编程与数学 02-006 前端开发框架VUE 22课题、状态管理 一、状态管理二、Vuex1. 安装Vuex2. 创建Vuex Store3. 在Vue应用中使用Store4. 在组件中使用状态5. 模块化Store 三、Vuex应用示例1. 创建项目2. 安装Vuex3. 设置Vuex Store4. 在主项目中使用Store5. 创建组件6. 更新…

代码随想录算法训练营第 4 天(链表 2)| 24. 两两交换链表中的节点19.删除链表的倒数第N个节点 -

一、24. 两两交换链表中的节点 题目&#xff1a;24. 两两交换链表中的节点 - 力扣&#xff08;LeetCode&#xff09; 视频&#xff1a;帮你把链表细节学清楚&#xff01; | LeetCode&#xff1a;24. 两两交换链表中的节点_哔哩哔哩_bilibili 讲解&#xff1a;代码随想录 dummy-…

pycharm-pyspark 环境安装

1、环境准备&#xff1a;java、scala、pyspark、python-anaconda、pycharm vi ~/.bash_profile export SCALA_HOME/Users/xunyongsun/Documents/scala-2.13.0 export PATH P A T H : PATH: PATH:SCALA_HOME/bin export SPARK_HOME/Users/xunyongsun/Documents/spark-3.5.4-bin…

数据结构与算法之链表: LeetCode 146. LRU 缓存 (Ts版)

LRU 缓存 https://leetcode.cn/problems/lru-cache/description/ 描述 请你设计并实现一个满足 LRU (最近最少使用) 缓存 约束的数据结构 实现 LRUCache 类&#xff1a; LRUCache(int capacity) 以 正整数 作为容量 capacity 初始化 LRU 缓存int get(int key) 如果关键字 ke…

Three.js 渲染技术:打造逼真3D体验的幕后功臣

文章目录 前言一、着色器&#xff08;Shaders&#xff09;二、后处理&#xff08;Post-processing&#xff09;三、抗锯齿&#xff08;Anti-aliasing&#xff09;四、实时渲染与离线渲染五、光照模型与材质优化六、环境映射&#xff08;Environment Mapping&#xff09;七、纹理…

EFCore HasDefaultValueSql (续1 ValueGeneratedOnAdd)

前情&#xff1a;EFCore HasDefaultValueSql 小伙伴在使用 HasDefaultValueSql 时&#xff0c;对相关的 ValueGeneratedOnAdd 也有了疑问&#xff1a; ValueGeneratedOnAdd 和 HasDefaultValueSql 是 Entity Framework Core 中用于管理字段默认值的两种不同配置方式&#xff0…

通过Apache、Nginx限制直接访问public下的静态文件

一、Apache 在public目录下的.htaccess文件中添加如下规则&#xff0c;来拒绝除了指定文件类型之外的所有请求 <FilesMatch "\.(?!(jpg|jpeg|png|gif|css|js|ico)$)[^.]$">Order Allow,DenyDeny from all </FilesMatch> 上述配置表示仅允许访问.jpg …

远程和本地文件的互相同步

文章目录 1、rsync实现类似git push pull功能1. 基础概念2. 示例操作3. 定制化和进阶用法4. 定时同步&#xff08;类似自动化&#xff09; 2 命令简化1. 动态传参的脚本2. Shell 函数支持动态路径3. 结合环境变量和参数&#xff08;更简洁&#xff09;4. Makefile 支持动态路径…

AIOps 平台

AIOps&#xff08;Artificial Intelligence for IT Operations&#xff09;平台是一种结合人工智能&#xff08;AI&#xff09;技术和IT运营管理的解决方案&#xff0c;旨在通过自动化、智能化的手段优化企业IT系统的运行与管理。以下是AIOps平台的核心功能、优势以及常见的技术…

【大模型入门指南 07】量化技术浅析

【大模型入门指南】系列文章&#xff1a; 【大模型入门指南 01】深度学习入门【大模型入门指南 02】LLM大模型基础知识【大模型入门指南 03】提示词工程【大模型入门指南 04】Transformer结构【大模型入门指南 05】LLM技术选型【大模型入门指南 06】LLM数据预处理【大模型入门…

3DGabor滤波器实现人脸特征提取

import cv2 import numpy as np# 定义 Gabor 滤波器的参数 kSize 31 # 滤波器核的大小 g_sigma 3.0 # 高斯包络的标准差 g_theta np.pi / 4 # Gabor 函数的方向 g_lambda 10.0 # 正弦波的波长 g_gamma 0.5 # 空间纵横比 g_psi np.pi / 2 # 相位偏移# 生成 Gabor 滤…

【Linux】4.Linux常见指令以及权限理解(2)

文章目录 3. Linux指令3.1 ls指令和rm指令补充3.2 man指令&#xff08;重要&#xff09;3.3cp指令&#xff08;重要&#xff09;输出重定向3.3.1ubuntu20.04如何安装tree 3.4 mv指令&#xff08;重要&#xff09;mv指令更改文件名mv指令更改目录名 如何看待指令指令的重命名3.5…

Vue3初学之Element-plus

用于快速的上手开发&#xff0c;以做项目为导向&#xff0c;所以借用element-plus插件 发现淘宝的镜像有时候也是很慢的&#xff0c;还可以换个 npm config set registry https://registry.npmmirror.com 安装element-plus npm install element-plus --save 查看安装是否成…

vue2新增删除

&#xff08;只是页面实现&#xff0c;不涉及数据库&#xff09; list组件&#xff1a; <button click"onAdd">新增</button><el-table:header-cell-style"{ textAlign: center }" :cell-style"{ textAlign: center }":data&quo…

实用操作系统学习笔记

第1章 操作系统概述 操作系统基本概念 【基础知识】 操作系统&#xff1a;控制和管理整个计算机系统的硬件和软件资源&#xff0c;合理地组织、调度计算机的工作与资源的分配&#xff0c;进而为用户和其他软件提供方便接口与环境的程序集合。操作系统是计算机系统中最基本的…

k8s部署rocketmq踩坑笔记

给团队部署一个rocketmq4.8.0. k8s上部署的broker&#xff0c;注册到nameserver上是自己的pod ip&#xff0c;导致本机连接到的broker的pod ip&#xff0c;这个ip k8s集群外的机器是无法联通的。 nameserver上注册的是这个pod ipv4 尝试将broker的配置brokerIP1修改为注册到na…

【机器学习:八、逻辑回归】

逻辑回归&#xff08;Logistic Regression&#xff09; 1. 逻辑回归的引出 在现实世界中&#xff0c;许多问题都涉及到分类任务。例如&#xff1a; 判断一封邮件是否为垃圾邮件&#xff1b;预测某人是否会患某种疾病&#xff1b;确定图片中是否包含某种特定物体。 这些问题…