Apache Pulsar 技术系列 - PulsarClient 实现解析

导语

Apache Pulsar 是一个多租户、高性能的服务间消息传输解决方案,支持多租户、低延时、读写分离、跨地域复制(GEO replication)、快速扩容、灵活容错等特性。同时为了达到高性能,低延时、高可用,Pulsar 在客户端也做了很多的优化,本文主要讲述 PulsarClient 基本原理和实现。

PulsarClient 简介

Pulsar 客户端 API 设计优雅简洁,使用 PulsarClient 作为客户端的总入口,方便用户记忆和构建出具体的客户端,例如:

  • Producer: 生产者用来发送消息到指定 Topic。

  • Consumer: 消费者通过订阅关联到指定 Topic 并接收消息。

  • Reader: 手动管理 Cursors 的消费者。(内部使用 Consumer 实现)。

PulsarClient 还统一管理客户端系统资源,为具体的客户端提供了部分通用化处理,包括连接管理、线程管理、内存管理等。接下来让我们了解一下 PulsarClient 是如何实现的。

PulsarClient 有哪些功能

作为客户端的统一入口,下面代码片段不难看出 PulsarClient 主要功能是构建、销毁 PulsarClient 实例,以及构建各种具体 Client 和事务实例。

public interface PulsarClient extends Closeable {ProducerBuilder<byte[]> newProducer();<T> ProducerBuilder<T> newProducer(Schema<T> schema);ConsumerBuilder<byte[]> newConsumer();<T> ConsumerBuilder<T> newConsumer(Schema<T> schema);ReaderBuilder<byte[]> newReader();<T> ReaderBuilder<T> newReader(Schema<T> schema);void updateServiceUrl(String serviceUrl) throws PulsarClientException;CompletableFuture<List<String>> getPartitionsForTopic(String topic);CompletableFuture<Void> closeAsync();void shutdown() throws PulsarClientException;boolean isClosed();TransactionBuilder newTransaction() throws PulsarClientException;
}

实现原理

初始化过程

PulsarClient 可以使用以下代码来实例化。

PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://broker:6650").build();

PulsarClient 以及具体客户端都使用 Builder 模式构建,每种客户端都有对应的 ConfigurationData 来管理配置,PulsarClient 核心配置如下:

public class ClientConfigurationData implements Serializable, Cloneable {private String serviceUrl;// 用来在运行时外部改变urlprivate transient ServiceUrlProvider serviceUrlProvider;private long operationTimeoutMs = 30000;private long statsIntervalSeconds = 60;private int numIoThreads = 1;private int numListenerThreads = 1;private int connectionsPerBroker = 1;private boolean useTcpNoDelay = true;private int concurrentLookupRequest = 5000;private int maxLookupRequest = 50000;private int maxLookupRedirects = 20;private int maxNumberOfRejectedRequestPerConnection = 50;private int keepAliveIntervalSeconds = 30;private int connectionTimeoutMs = 10000;private int requestTimeoutMs = 60000;private long initialBackoffIntervalNanos = TimeUnit.MILLISECONDS.toNanos(100);private long maxBackoffIntervalNanos = TimeUnit.SECONDS.toNanos(60);private boolean enableBusyWait = false;private String listenerName;// 全局内存限制(producer使用)private long memoryLimitBytes = 0;private String proxyServiceUrl;private ProxyProtocol proxyProtocol;long tickDuration = 1;// transactionprivate boolean enableTransaction = false;
}

PulsarClient 的初始化过程比较简单,逐个初始化内部模块,以下代码片段展示了 Client 内部主要的模块。

public class PulsarClientImpl implements PulsarClient {// 配置protected final ClientConfigurationData conf;// 本地元数据管理器,主要负责topic分区个数、topic对应的owner节点以及schema信息private LookupService lookup;// 共享连接池 双层map结构private final ConnectionPool cnxPool;// 时间轮private final Timer timer;// 执行外部逻辑线程组(主要消费使用)private final ExecutorProvider externalExecutorProvider;// 执行内部逻辑线程组(主要消费使用)private final ExecutorProvider internalExecutorService;private final AtomicReference<State> state = new AtomicReference<>();//producer集合private final Set<ProducerBase<?>> producers;//consumer集合private final Set<ConsumerBase<?>> consumers;//producer自增Idprivate final AtomicLong producerIdGenerator = new AtomicLong();//consumer自增Idprivate final AtomicLong consumerIdGenerator = new AtomicLong();// 请求自增Idprivate final AtomicLong requestIdGenerator = new AtomicLong();// netty 线程组protected final EventLoopGroup eventLoopGroup;// 生产本地buffer内存限制器private final MemoryLimitController memoryLimitController;...
}

PulsarClient 初始化时主要创建了 Netty 客户端,连接池、时间轮等对象,只是准备好资源,并没有和服务端建立连接进行任何交互。只有在创建具体的客户端时,才会和服务端有交互。

Producer 创建

Pulsar 是以 Topic 粒度对外提供服务,多分区 Topic 等同于多个不同数字后缀的 Topic 集合。下文提到的 Topic-Partition 包含了单分区 Topic 和多分区 Topic 中的一个  Partition。Pulsar 客户端的实现 Topic-Partition 之间是相互独立的,SDK 内部会为每个 Topic-Partition 单独创建一个具体的客户端。我们在这里只介绍 Producer 的初始化流程(Consumer 类似)。

可以用以下代码构建 Producer。

Producer<byte[]> producer = client.newProducer().topic("my-topic").create();

当 My-topic 为 Non-partitioned Topic,会实例化一个 ProducerImpl 对象并返回,当 My-topic 分区数量大于0时,则会创建 PartitionedProducerImpl 对象。PartitionedProducerImpl 对象内包含了 List。可以理解为 PulsarClient 创建 Producer 时,最终会创建和分区数量一致的 ProducerImpl 对象,每个 ProducerImpl 都独立工作,互不影响(Consumer 类似)。

在创建 Producer 时客户端与服务端命令字交互如下:

  1. PulsarClient 通过用户指定的 ServiceUrl 挑选一个 url 来连接服务端,并做认证相关操作。

  2. 使用 LookupService 发送 PARTITIONED_METADATA 命令字查询给定 Topic 的分区数。

  3. 根据 Metadata 返回结果中的分区数循环创建 ProducerImpl 对象。

    3.1 ProducerImpl 对象初始化时会使用 LookupService 发送 LOOKUP 请求查询对应的分区的 Owner 节点 Lookup 过程可参考https://km.woa.com/articles/show/555638。

    3.2 根据 LOOKUP 响应连接到 Owner 节点,并发送 PRODUCER 请求向服务端创建 Producer。

    到这里 Producer 就已经创建完毕,可以正式使用来发送消息了。

ps: 如果创建好 Producer 后,分区数量有变化了,比如服务端扩容了,客户端可以感知到并增加 ProducerImpl 对象数量吗。答案是可以的,默认会定时1分钟发起一次检测,有分区变化会做相应处理。

连接管理

与大部分组件一样,客户端和服务端使用长连接通信。Pulsar 协议设计上不是传统的应答模式,可以同时支持多个客户端使用同一个连接并行发送接收请求(服务端会串行处理单个 Topic-partition 上的请求来保证消息顺序性)。

得益于连接共享,客户端消耗的连接数是很少的,PulsarClient 会为每台 Broker 创建一个连接池,默认连接数为1, 用户可以使用 ConnectionsPerBroker 配置来设置每台 Broker 最大连接数。ProducerImpl、ConsumerImpl 在初始化时,会随机从连接池中获取一个连接用来和服务端通信。

下图中 maxConnectionsPerHosts=2, 连接池中为每个 Broker 创建2个连接,6个客户端会在对应 T opic owner 节点里随机挑选一个连接绑定。

连接健康管理

Pulsar keepAlive 检测是双向的,连接创建成功后,客户端和服务端都会定时30s(KeepAliveIntervalSeconds 配置可修改)发送 Ping 请求到对端,接收到 Ping 请求后会回应 Pong 来标识存活。在以下几种情况下,客户端、服务端都会主动断开连接:

  • 超时时间内没有完成握手动作。

  • 发送 Ping 或者 Pong 命令时,Netty 回调发送失败。

  • 连接 isAutoRead 打开并且超时时间内没有收到任何请求(包含 Ping、Pong)。

连接断开后,会通知绑定在该连接上的所有客户端,这些客户端会重新从连接池中获取健康的连接。Pulsar 中空闲连接不会自动回收。

线程模型

PulsarClient 使用 Netty 作为网络通信框架, 是标准的 Netty 客户端。协议处理和事件驱动都是依托于 Netty。核心处理类直接继承于 Netty Handler。

所以线程模型也主要围绕于 Netty 的 EventLoopGroup。上文提到,客户端资源管理都收敛于 PulsarClient,也就是使用同一个 PulsarClient 创建出来的具体客户端都共享该 PulsarClient 中的线程等资源,比如使用 ClientA 对象分别创建一个或多个 Producer、Consuemer、Reader 客户端,这些客户端都共享 Client 中的线程资源。

PulsarClient 线程、线程组如下:

图中实线表示客户端会从线程池中挑选一个线程绑定运行。

  • Pulsar-client-io: io 线程( Netty 内部线程),负责网络连接和读写。NumIoThreads 参数配置,默认值为1。客户端不直接绑定 IO 线程,而是由其内部的连接来绑定 IO 线程,所以 IO 线程数配置最好小于或者等于总连接数,否则有些线程不会使用到。

  • Pulsar-client-internal: 主要用于 Consumer 内部处理,比如接收到消息后放置到接收队列等。也是通过 NumIoThreads 参数配置,默认值为1。

  • Pulsar-external-listener: 主要用于 Consumer 外部处理,比如用户消费逻辑回调。NumListenerThreads 参数配置,默认值为1。

  • Pulsar-timer: 时间轮内部线程,负责所有定时操作,比如连接重连,发送超时检测等。一个 PulsarClient 对应一个线程。

简单描述一下生产消费时线程是如何交互:

  • 生产: 用户线程创建消息并放置到本地缓存,IO 线程负责把消息发送到服务端。

  • 消费: IO 线程接收到服务端的消息推送,使用  Pulsar-client-internal 线程把消息放在本地缓存队列,然后使用 Pulsar-external-listener 线程执行用户消息处理逻辑。

总结和思考

本文介绍了 Pulsar 整体客户端架构,讲解了 PulsarClient、Producer 初始化过程以及客户端的连接管理和线程模型。并没有涉及到详细的生产消费过程。大家不难发现 Pulsar 客户端和其他组件客户端相比,较大的区别就是会给每个 Topic-partition 创建  Producer/consumer。如果客户端关联的 Topic-partition 数量很大,Producer/consumer 数量会急剧膨胀,从而导致客户端需要消耗更多的资源。也正是因为  Producer/consumer 数量可能较大,连接和线程等资源不可能做到独立,只能是 Producer/consumer 共享。而资源共享就不可避免出现客户端之间会相互影响,比如限流是控制在连接维度,但是由于连接是共享的,某些 Topic 的限流就会影响到该连接上的全部客户端。建议用户客户端关联的 Topic-partition 数量较大时,可以适当调大连接池和线程池大小来缓解影响,或者使用不同的 PulsarClient 来做客户端隔离。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/235441.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

快速从图中提取曲线坐标数据的在线工具WebPlotDigitizer

快速从图中提取曲线坐标数据的在线工具WebPlotDigitizer 1 介绍2 WebPlotDigitizer在线版的使用2.1 上传图像2.2 点击横纵坐标点&#xff1a;2.3 选择曲线 3 查看数据参考 1 介绍 写论文时要对比别人曲线图、点图、柱形图的数据&#xff0c;但是只有图没有原始数据怎么办&…

最新国内可用使用GPT4.0,GPT语音对话,Midjourney绘画,DALL-E3文生图

一、前言 ChatGPT3.5、GPT4.0、GPT语音对话、Midjourney绘画&#xff0c;相信对大家应该不感到陌生吧&#xff1f;简单来说&#xff0c;GPT-4技术比之前的GPT-3.5相对来说更加智能&#xff0c;会根据用户的要求生成多种内容甚至也可以和用户进行创作交流。 然而&#xff0c;GP…

【优化】XXLJOB修改为使用虚拟线程

【优化】XXLJOB修改为使用虚拟线程 新建这几个目录 类&#xff0c; 去找项目对应的xxljob的源码 主要是将 new Thread 改为 虚拟线程 Thread.ofVirtual().name("VT").unstarted 以下代码是 xxljob 2.3.0版本 举一反三 去修改对应版本的代码 <!-- 定…

计算机基础以及实施运维工程师的介绍

目录 什么是实施、运维工程师 实施工程师 实施工程师的职责 什么是运维工程师 运维功工程师的职责 需要的技术 计算机的介绍 CPU 存储器 IO 系统总线 主板 BIOS 什么是实施、运维工程师 实施工程师 纯实施工程师是指在工程项目实施阶段专门负责实施工作的工程师。与其他…

大模型赋能“AI+电商”,景联文科技提供高质量电商场景数据

据新闻报道&#xff0c;阿里巴巴旗下淘天集团和国际数字商业集团都已建立完整的AI团队。 淘天集团已经推出模特图智能生成、官方客服机器人、万相台无界版等AI工具&#xff0c;训练出了自己的大模型产品 “星辰”&#xff1b; 阿里国际商业集团已成立AI Business&#xff0c;…

Gazebo GUI模型编辑器

模型编辑器 现在我们将构建我们的简单机器人。我们将制作一个轮式车辆&#xff0c;并添加一个传感器&#xff0c;使我们能够让机器人跟随一个斑点&#xff08;人&#xff09;。 模型编辑器允许我们直接在图形用户界面 &#xff08;GUI&#xff09; 中构建简单的模型。对于更复…

在使用mapstruct,想忽略掉List<DTO>字段里面的,`data` 字段的映射, 如何写ignore: 使用@IterableMapping

在使用mapstruct,想忽略掉List字段里面的,data 字段的映射, 如何写ignore 代码如下: public interface AssigmentFileMapper {AssigmentFileDTO assigmentFileToAssigmentFileDTO(AssigmentFile assigmentFile);AssigmentFile assigmentFileDTOToAssigmentFile(Assigment…

用全志R128复刻自平衡赛车机器人,还实现了三种不同的操控方式

经常翻车的朋友们都知道&#xff0c;能在翻车后快速摆正车身的车才是好车。 就像动画《四驱兄弟》中展现的那样&#xff0c;在比赛中需要跟着赛车一起跑圈&#xff0c;而且赛车如果被撞翻还需要重新用手扶正&#xff0c;所浪费的时间非常影响比赛结果。 如果小豪和小烈可以拥有…

flutter自定义地图Marker完美展示图片

世人都说雪景美 寒风冻脚无人疼 只道是一身正气 结论 参考Flutter集成高德地图并添加自定义Maker先实现自定义Marker。如果自定义Marker中用到了图片&#xff0c;那么会碰到图片没有被绘制到Marker的问题&#xff0c;此时需要通过precacheImage来预加载图片&#xff0c;从而解…

智能化创作与艺术:发展、问题、未来趋势

导言 随着人工智能技术的不断进步&#xff0c;智能化创作在艺术领域逐渐崭露头角。本文将深入研究智能化创作与艺术的发展过程、遇到的问题、解决的过程&#xff0c;探讨未来的可用范围&#xff0c;并分析在各国的应用和未来的研究趋势。最后&#xff0c;探讨在哪些方面能取胜&…

一. 模块之间的依赖 ------ 详细解析官网购物应用优秀案例(鸿蒙开发)

一. 项目目录简介 ├──**common** // 公共能力层 │ ├──components │ │ ├──CommodityList.ets // 商品列表组件 │ │ ├──CounterProduct.ets // 数量加减组件 │ │ └──EmptyComponent.ets /…

使用阿里云性能测试工具 JMeter 场景压测 RocketMQ 最佳实践

作者&#xff1a;森元 需求背景 新业务上线前&#xff0c;我们通常需要对系统的不同中间件进行压测&#xff0c;找到当前配置下中间件承受流量的上限&#xff0c;从而确定上游链路的限流规则&#xff0c;保护系统不因突发流量而崩溃。阿里云 PTS 的 JMeter 压测可以支持用户上…

【C++初阶】第一站:C++入门基础(下)

前言&#xff1a; 紧接着上两篇文章&#xff0c;c入门基础(上)&#xff1a;C入门基础(上) c入门基础(中)&#xff1a;C入门基础(中) 继续补充完c初阶入门基础的知识点&#xff0c;本章知识点包括&#xff1a; 引用和指针的区别、内联函数、auto关键字(C11)、基于范围的for循环…

Android 13 - Media框架(24)- OMXNodeInstance(一)

为了了解 ACodec 是如何与 OpenMAX 组件进行 buffer 流转的&#xff0c;我们有必要先来学习 OMXNodeInstance&#xff0c;在前面的章节中&#xff0c;我们已经了解了 media.codec 进程包含的内容&#xff0c;以及 OpenMAX 框架中的一些内容。这一节我们将来学习 OMXNode 与 med…

快猫视频模板源码定制开发 苹果CMS 可打包成双端APP

苹果CMS快猫视频网站模板源码&#xff0c;可用于开发双端APP&#xff0c;后台支持自定义参数&#xff0c;包括会员升级页面、视频、演员、专题、收藏和会员系统等完整模块。还可以直接指定某个分类下的视频为免费专区&#xff0c;具备完善的卡密支付体系&#xff0c;无需人工管…

听GPT 讲Rust源代码--src/tools(17)

File: rust/src/tools/rust-analyzer/crates/profile/src/hprof.rs 在Rust源代码中&#xff0c;rust/src/tools/rust-analyzer/crates/profile/src/hprof.rs文件是rust-analyzer中的性能分析模块&#xff0c;用于代码运行时的性能统计和分析。下面将详细介绍每个结构体的作用&a…

【操作系统】什么是进程?

文章目录 进程进程的属性进程的状态挂起 进程 进程是一个可并发执行的具有独立功能的程序关于某个数据集合的执行过程&#xff0c;也是操作系统进行资源分配和保护的基本单位。 进程的属性 结构性&#xff1a; 共享性&#xff1a;同一程序运行于不同数据集合上构成不同的进程…

Flink Table API 与 SQL 编程整理

Flink API总共分为4层这里主要整理Table API的使用 Table API是流处理和批处理通用的关系型API&#xff0c;Table API可以基于流输入或者批输入来运行而不需要进行任何修改。Table API是SQL语言的超集并专门为Apache Flink设计的&#xff0c;Table API是Scala和Java语言集成式…

【漏洞复现】奥威亚 教学视频应用服务平台任意文件上传漏洞

漏洞描述 AVA 教学视频应用服务平台是由广州市奥威亚电子科技有限公司基于当前教育视频资源建设的背景及用户需求的调研,开发出来能够适应时代发展和满足学校需求,具有实效性、多功能、特点鲜明的平台。 该平台存在任意文件上传漏洞,通过此漏洞攻击者可上传webshell木马,…

OpenCV-9颜色空间的转换

颜色转换API&#xff1a;cvtColor&#xff08;img&#xff0c;colorsapce&#xff09; cvt含义为转换 convesion(转换) 下面为示例代码&#xff1a; import cv2# callback中至少有一个参数 def callback(value):passcv2.namedWindow("color", cv2.WINDOW_NORMAL) …