Pulsar消息路由深入剖析

一、概述

大数据背景下,分区应该是所有组件必备的基本条件,否则面对海量数据时无论是计算还是存储都容易遇到瓶颈。跟其他消息系统一样,Pulsar通过Topic将消息数据进行业务层面划分管理,同时也支持Topic分区,通过将多个分区分布在多台Broker/机器上从而带来性能上的巨大提升以及无限的横向拓展能力。而一旦有了分区之后就会面临一个问题,但一条数据请求时应该将其发往哪个分区?目前Pulsar跟其他消息系统一样支持以下三种路由模式。

  1. 轮询路由
    生产者会按将消息按批为单位轮询发送到不同的分区,这是一种常见的路由策略,具有简单的优势,由于它不需要过多的配置以及考虑但却可以表现不错的性能。如果消息带有key的话会根据key进行哈希运算后再对分区进行取模来决定消息投放的目标分区。
  2. 单分区路由
    单分区路由提供一种更简单的机制,它会将所有消息路由到同一个分区。这种模式类似非分区Topic,如果消息提供key的话将恢复到轮询哈希路由方式
  3. 自定义分区路由
    自定义分区路由支持你通过实现MessageRouter接口来自定义路由逻辑,例如将特定key的消息发到指定的分区等

二、实战

消息路由发生在生产者端,在创建生产者是通过 .messageRoutingMode() 进行指定,下面就分别实战对比下这三种的路由效果

1. 轮询路由

先试试轮询路由的策略,这是最常见也是默认的路由策略,通过 .messageRoutingMode(MessageRoutingMode.RoundRobinPartition) 进行指定,然后往里面通过同步方式往分区Topic里面写入数据

        String serverUrl = "http://localhost:8080";PulsarClient pulsarClient =PulsarClient.builder().serviceUrl(serverUrl).build();Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic("sherlock-api-tenant-1/sherlock-namespace-1/partition_partition_topic_2").messageRoutingMode(MessageRoutingMode.RoundRobinPartition)//.messageRoutingMode(MessageRoutingMode.SinglePartition).create();for (int i = 0; i < 20000; i++) {producer.send("hello java API pulsar:"+i+", 当前时间为:"+new Date());}

通过管理页面可以看到数据基本均匀的落在各个分区,从这个结果是能够反向验证数据是符合轮询发送后的效果
在这里插入图片描述

2. 单分区路由

现在试试单分区路由的策略,通过 .messageRoutingMode(MessageRoutingMode.SinglePartition) 进行指定,并往分区Topic里面写入一批数据

        String serverUrl = "http://localhost:8080";PulsarClient pulsarClient =PulsarClient.builder().serviceUrl(serverUrl).build();Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic("sherlock-api-tenant-1/sherlock-namespace-1/partition_partition_topic_2").messageRoutingMode(MessageRoutingMode.SinglePartition).create();for (int i = 0; i < 20000; i++) {producer.send("hello java API pulsar:"+i+", 当前时间为:"+new Date());}

通过管理页面可以看到数据都落在第一个分区,说明这也符合官网中对单分区路由的描述。同时经过反复试验多次发现,生产者会随机选择一个分区并将所有数据发送到这个分区。
在这里插入图片描述

3. 自定义路由

在有些业务场景,我们需要将自己的业务逻辑“融入”路由策略,因此像Pulsar、Kafka等消息中间件都是支持用户进行路由规则的自定义的。这里为了好玩,咱们尝试将数据按照 1:2:3:4 等比例分别落在四个分区如何?说干就干,自定义路由也是比较简单的,只需要实现Pulsar MessageRouter接口的choosePartition方法即可,实现逻辑如下

public class SherlockRouter implements MessageRouter {Long count = 0L;public int choosePartition(Message<?> msg, TopicMetadata metadata) {count++;count = count % 10;if (count == 0) return 0;if (count < 3) return 1;if (count < 6) return 2;return 3;}
}

通过上面代码可以看到,参数msg就是生产者中国呢发送的消息对象,metadata是这条消息的元数据如租户、命名空间等等,而返回值其实就是这个Topic分区的下标,这里需要注意的是不要超过Topic的分区数,同时一些比较复杂的数据处理逻辑代码尽量不要写在这里影响消息发送性能以及不规范。

写完后通过 .messageRouter() 方法进行指定即可使用

    public static void customRoundSchemaProducer() throws Exception {String serverUrl = "http://localhost:8080";PulsarClient pulsarClient =PulsarClient.builder().serviceUrl(serverUrl).build();Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic("sherlock-api-tenant-1/sherlock-namespace-1/partition_partition_topic_3").messageRouter(new SherlockRouter()).create();for (int i = 0; i < 20000; i++) {producer.send("hello java API pulsar:"+i+", 当前时间为:"+new Date());}producer.close();pulsarClient.close();}

在管理页面可以看到,数据是按照咱们预期的逻辑 1:2:3:4等比落在分区里面,嘿嘿~
在这里插入图片描述

三、源码分析

1. 接口以及父类

Pulsar中所有路由规则都是基于MessageRouter接口进行实现的,这个接口主要提供了choosePartition方法,只要重写这个方法即可自定义任意自己预期的逻辑

@InterfaceAudience.Public
@InterfaceStability.Stable
public interface MessageRouter extends Serializable {/**** @param msg*            Message object* @return The index of the partition to use for the message* @deprecated since 1.22.0. Please use {@link #choosePartition(Message, TopicMetadata)} instead.*/@Deprecateddefault int choosePartition(Message<?> msg) {throw new UnsupportedOperationException("Use #choosePartition(Message, TopicMetadata) instead");}/*** Choose a partition based on msg and the topic metadata.** @param msg message to route* @param metadata topic metadata* @return the partition to route the message.* @since 1.22.0*/default int choosePartition(Message<?> msg, TopicMetadata metadata) {return choosePartition(msg);}}

MessageRouterBase是路由策略的抽象类,主要定义了消息有key时的哈希算法,像上面提的轮询路由和单分区路由继承了这个抽象类。JavaStringHash和Murmur3Hash32两个都是Pulsar提供的哈希算法的实现类,两者的差异后面再单独进行分析

public abstract class MessageRouterBase implements MessageRouter {private static final long serialVersionUID = 1L;protected final Hash hash;MessageRouterBase(HashingScheme hashingScheme) {switch (hashingScheme) {case JavaStringHash:this.hash = JavaStringHash.getInstance();break;case Murmur3_32Hash:default:this.hash = Murmur3Hash32.getInstance();}}
}

2. 轮询路由的实现

主要看choosePartition 方法的逻辑,首先如果消息带有key则针对key进行哈希然后取模,这样可以保证相同key的消息落在同一个分区。然后就是判断消息是否按批次进行发送的,如果是单条消息发送的话则通过一个累加计数器进行轮询分区,即可达到消息按照分区顺序逐个发送的效果;如果是按批次发送的话,则是根据时间戳进行取模,这样达到的效果就是每批数据都会随机发送到某一个分区

public class RoundRobinPartitionMessageRouterImpl extends MessageRouterBase {@SuppressWarnings("unused")private volatile int partitionIndex = 0;private final int startPtnIdx;private final boolean isBatchingEnabled;private final long partitionSwitchMs;....@Overridepublic int choosePartition(Message<?> msg, TopicMetadata topicMetadata) {// If the message has a key, it supersedes the round robin routing policyif (msg.hasKey()) {return signSafeMod(hash.makeHash(msg.getKey()), topicMetadata.numPartitions());}if (isBatchingEnabled) { // if batching is enabled, choose partition on `partitionSwitchMs` boundary.long currentMs = clock.millis();return signSafeMod(currentMs / partitionSwitchMs + startPtnIdx, topicMetadata.numPartitions());} else {return signSafeMod(PARTITION_INDEX_UPDATER.getAndIncrement(this), topicMetadata.numPartitions());}}}

3. 单分区路由

可以看到单分区的逻辑是比较简单且清晰的,如果有key就进行哈希取模,否则就发送到partitionIndex这个成员变量指定的分区去,那么这个partitionIndex指定的是哪个分区呢?通过代码能看到是从构造函数里面传进来的,因此跟踪下代码看看

public class SinglePartitionMessageRouterImpl extends MessageRouterBase {private final int partitionIndex;public SinglePartitionMessageRouterImpl(int partitionIndex, HashingScheme hashingScheme) {super(hashingScheme);this.partitionIndex = partitionIndex;}@Overridepublic int choosePartition(Message<?> msg, TopicMetadata metadata) {// If the message has a key, it supersedes the single partition routing policyif (msg.hasKey()) {return signSafeMod(hash.makeHash(msg.getKey()), metadata.numPartitions());}return partitionIndex;}}

通过跟踪可以看到是在PartitionedProducerImpl类的getMessageRouter方法中进行SinglePartitionMessageRouterImpl类的初始化,同时是通过ThreadLocalRandom.current().nextInt(topicMetadata.numPartitions()) 来生成一个小于分区数的随机数,因此单分区路由的分区是随机指定的一个,这个结果跟咱们实战中测试的效果是吻合的。除此之外,咱们还看到 getMessageRouter方法中会根据咱们在创建生产者时 .messageRoutingMode 方法指定的路由模式来创建对应的路由实现类,在这里可以明确的看到没有指定的话默认就是采用的轮询路由规则

private MessageRouter getMessageRouter() {MessageRouter messageRouter;MessageRoutingMode messageRouteMode = conf.getMessageRoutingMode();switch (messageRouteMode) {case CustomPartition:messageRouter = Objects.requireNonNull(conf.getCustomMessageRouter());break;case SinglePartition:messageRouter = new SinglePartitionMessageRouterImpl(ThreadLocalRandom.current().nextInt(topicMetadata.numPartitions()), conf.getHashingScheme());break;case RoundRobinPartition:default:messageRouter = new RoundRobinPartitionMessageRouterImpl(conf.getHashingScheme(),ThreadLocalRandom.current().nextInt(topicMetadata.numPartitions()),conf.isBatchingEnabled(),TimeUnit.MICROSECONDS.toMillis(conf.batchingPartitionSwitchFrequencyIntervalMicros()));}return messageRouter;}

四、总结

通过以上内容相信你对Pulsar的路由规则有一定的了解了,如果想进一步了解可以尝试按照自己喜好实现下路由规则并观测是否按照预期运行,同时也可以跟踪Pulsar的源码看看实现是否符合预期。如果想彻底掌握Pulsar,最好自己跟踪下Pulsar的一些核心逻辑,这样不仅了解其底层是如何运作的,也能加深你对一些设计/特性的印象。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/742096.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2024年信息技术与计算机工程国际学术会议(ICITCEI 2024)

2024年信息技术与计算机工程国际学术会议&#xff08;ICITCEI 2024&#xff09; 2024 International Conference on Information Technology and Computer Engineering ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 大会主题&#xff1a; 信息系统和技术…

软件应用实例分享,茶楼计时收费管理系统软件,佳易王茶社吧台计时收费软件试用版教程

软件应用实例分享&#xff0c;茶楼计时收费管理系统软件&#xff0c;佳易王茶社吧台计时收费软件试用版教程 一、前言 以下软件操作教程以 佳易王茶室计时计费软件V17.9为例说明 软件文件下载可以点击最下方官网卡片——软件下载——试用版软件下载 【茶楼计时计费软件&…

解决Java项目运行时错误:“Command line is too long”

在开发Java应用的过程中&#xff0c;你可能偶尔会遇到“Error running ‘Application’: Command line is too long”的问题。这是因为Java虚拟机&#xff08;JVM&#xff09;在启动时&#xff0c;如果传递给它的类路径&#xff08;classpath&#xff09;过长&#xff0c;超过了…

基于Java (spring-boot)的人才招聘系统

一、项目介绍 公司&#xff1a; IT公司的注册与管理 招聘要求的发布与维护 站内私信 求职者&#xff1a; 招聘需求浏览 招聘需求筛选&#xff08;按岗位、薪酬、城市、地区等&#xff09; 简历编辑&#xff0c;建立投递等 站内私信 管理员&#xff1a; 用户信息维护 岗…

ChatGPTPLUS、Poe、Claude介绍,以及如何订阅

我使用了FOmepay的556150卡段升级了ChatGPTPLUS、POE、Claude3 一、ChatGPT Plus 是什么&#xff1f; ChatGPT Plus 是基于 ChatGPT 的月订阅升级方案&#xff0c;它可以提供更快的回应速度、更高的可用性以及优先使用到新功能的权限。 ChatGPT Plus 和原版 ChatGPT 的差异 …

【黑马程序员】python数据容器

文章目录 python数据容器认识数据容器列表列表定义list下标索引列表的常用操作方法列表的查询功能列表的修改功能插入元素追加元素删除元素删除某元素在列表中的第一个匹配项清空列表统计某元素在列表中的数量统计列表中元素个数总结 列表遍历 元组为什么需要元组定义元组下标索…

跟着GPT学设计模式之桥接模式

说明 桥接模式&#xff0c;也叫作桥梁模式&#xff0c;英文是 Bridge Design Pattern。在 GoF 的《设计模式》一书中&#xff0c;桥接模式是这么定义的&#xff1a;“Decouple an abstraction from its implementation so that the two can vary independently。”翻译成中文就…

LeetCode[题解] 1261. 在受污染的二叉树中查找元素

首先我们看原题 给出一个满足下述规则的二叉树&#xff1a; root.val 0如果 treeNode.val x 且 treeNode.left ! null&#xff0c;那么 treeNode.left.val 2 * x 1如果 treeNode.val x 且 treeNode.right ! null&#xff0c;那么 treeNode.right.val 2 * x 2 现在这个…

通过Cpp + Lua 解析副本与服务器的逻辑1

进入副本流程 读publicTables&#xff0c;OnOpenCopySceneOK()发包 private void OnOpenCopySceneOK(){GameManager.PlayerDataPool.CurSelectTier m_curSelTier;CG_OPEN_COPYSCENE_PAK pak new CG_OPEN_COPYSCENE_PAK();pak.data.SceneID (int)SCENE_DEFINE.SCENE_TDBK;p…

[Java、Android面试]_02_HashMap的原理

本人今年参加了很多面试&#xff0c;也有幸拿到了一些大厂的offer&#xff0c;整理了众多面试资料&#xff0c;后续还会分享众多面试资料&#xff0c;感兴趣的朋友可收藏关注。由于时间有限&#xff0c;只能每天整理一点&#xff0c;分享一点儿&#xff01; 现分享如下&#xf…

【系统架构设计师】系统工程与信息系统基础 01

系统架构设计师 - 系列文章目录 01 系统工程与信息系统基础 文章目录 系列文章目录 前言 一、系统工程 ★ 二、信息系统生命周期 ★ 信息系统建设原则 三、信息系统开发方法 ★★ 四、信息系统的分类 ★★★ 1.业务处理系统【TPS】 2.管理信息系统【MIS】 3.决策支持系统…

VMware安装Ubuntu虚拟机

1. 安装VMware VMware中国官网&#xff1a;VMware - Delivering a Digital Foundation For Businesses VMware Workstation Player&#xff08;官方个人免费版&#xff09;&#xff1a;VMware Workstation Player | VMware VMware Workstation Pro&#xff08;商用收费版&am…

Glusterfs 常用命令

1. 启动/关闭/查看glusterd服务 #启动&#xff1a; systemctl daemon-reload systemctl start glusterd#开机自动启动glusterd服务&#xff1a; systemctl enable glusterd#关闭&#xff1a; systemctl stop glusterd#查看状态&#xff1a; systemctl status glusterd 2. 为存…

CTP-API开发系列之九:行情登录及订阅代码

CTP-API开发系列之九&#xff1a;行情登录及订阅代码 前情回顾全局配置参数行情初始化代码行情登录行情订阅行情接收注意事项 前情回顾 CTP-API开发系列之一&#xff1a;各版本更新说明&#xff08;持续更新&#xff09; CTP-API开发系列之二&#xff1a;问题汇总&#xff08;…

tigramite教程(六)使用TIGRAMITE 进行因果发现

使用TIGRAMITE 进行因果发现 基本用法简单玩玩万年不变的第一步:画出来调查数据依赖性和滞后函数PCMCI 因果发现错误发现率控制进一步相关的方法学教程 画图整合专家对链条的假设基准测试和验证因果效应估计数据集挑战滑动窗口分析 TIGRAMITE 是一个时间序列数据分析的python包…

【Python】新手进阶学习:os.sep---跨平台路径分隔符

【Python】新手进阶学习&#xff1a;os.sep—跨平台路径分隔符 &#x1f308; 个人主页&#xff1a;高斯小哥 &#x1f525; 高质量专栏&#xff1a;Matplotlib之旅&#xff1a;零基础精通数据可视化、Python基础【高质量合集】、PyTorch零基础入门教程&#x1f448; 希望得到您…

挑战杯 多目标跟踪算法 实时检测 - opencv 深度学习 机器视觉

文章目录 0 前言2 先上成果3 多目标跟踪的两种方法3.1 方法13.2 方法2 4 Tracking By Detecting的跟踪过程4.1 存在的问题4.2 基于轨迹预测的跟踪方式 5 训练代码6 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; 深度学习多目标跟踪 …

MPP数据架构设计的缺点

目录 一、MPP架构 二、批处理架构和MPP架构 三、MPP架构的OLAP引擎 一、MPP架构 随着分布式、并行化技术成熟应用&#xff0c;MPP引擎逐渐表现出强大的高吞吐、低延时计算能力&#xff0c;有很多采用MPP架构的引擎都能达到“亿级秒开”。例如Impala、ClickHouse、Druid、Dor…

基于SpringBoot+MYSQL的旅游网站

目录 1、前言介绍 2、主要技术 3、系统流程分析 1、登录流程图如下&#xff1a; 2、管理员后台管理流程图如下&#xff1a; 3. 修改密码流程图如下&#xff1a; 4、系统设计 4.1、系统结构设计 4.2 数据库概述 4.2.1 数据库概念设计 4.2.2 数据库逻辑设计 5、运行截…

多线程多进程处理服务器并发(多进程处理如何解决僵死进程)

目录 1.可循环发送数据的代码 2.改成循环之后每次发现只能处理一个客户端 3.服务器端处理并发问题 3.1 思路 3.2 利用多线程实现并发 ​编辑 3.3 利用多进程实现并发 3.3.1 多进程并发产生的僵死进程问题 ​3.3.2 解决僵死进程问题 1.可循环发送数据的代码 服务器代…