Apache Pulsar源码解析之Lookup机制

引言

在学习Pulsar一段时间后,相信大家也或多或少听说Lookup这个词,今天就一起来深入剖析下Pulsar是怎么设计的它吧

Lookup是什么

在客户端跟服务端建立TCP连接前有些信息需要提前获取,这个获取方式就是Lookup机制。所获取的信息有以下几种

  • 应该跟哪台Broker建立连接
  • Topic的Schema信息
  • Topic的分区信息

其中第一个是最重要的,因此今天就针对第一点进行深入剖析,大致流程如下图
在这里插入图片描述

  1. 在创建生产者/消费者时会触发Lookup,一般是通过HTTP请求Broker来获取目标Topic所归属的Broker节点信息,这样才知道跟哪台机器建立TCP连接进行数据交互
  2. Broker接收到Lookup命令,此时会进行限流检查、身份/权限认证、校验集群等检测动作后,根据请求中携带的Namespace信息获取对应的Namespace对象进行处理,这里Namespace会对Topic进行哈希运算并判断它落在数组的哪一个节点,算出来后就根据数组的信息来从Bundle数组中获得对应的Bundle,这个过程其实就是一致性哈希算法寻址过程。
  3. 在获得Bundle后会尝试从本机Cache中查询该Bundle所归属的Broker信息。
  4. 如果在Cache中没有命中,则会去Zookeeper中进行读取,如果发现该Bundle还未归属Broker则触发归属Broker的流程
  5. 获取到该Topic所归属的Broker信息后返回给客户端,客户端解析结果并跟所归属的Broker建立TCP连接,用于后续生产者往Broker节点进行消息写入

补充说明确定Bundle的归属,如果Broker的loadManager使用的是中心化策略,则需要Broker Leader来当裁判决定,否则当前Broker就可当作裁判。虽然Broker是无状态的,但会通过Zookeeper选举出一个Leader用于监控负载、为Bundle分配Broker等事情,裁判Broker通过loadManager查找负载最低的Broker并把Bundle分配给它。

客户端实现原理

Lookup机制是由客户端发起的,在创建生产者/消费者对象时会初始化网络连接,以生产者代码为例进行跟踪看看。无论是创建分区还是非分区生产者,最终都会走到ProducerImpl的构造函数,就从这里开始看吧

   public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfigurationData conf,CompletableFuture<Producer<T>> producerCreatedFuture, int partitionIndex, Schema<T> schema,ProducerInterceptors interceptors, Optional<String> overrideProducerName) {....//这里进去就是创建跟Broker的网络连接grabCnx();}void grabCnx() {//实际上是调用ConnectionHandler进行的this.connectionHandler.grabCnx();}protected void grabCnx(Optional<URI> hostURI) {....//这里是核心,相当于最终又调用回PulsarClientImpl类的getConnection方法cnxFuture = state.client.getConnection(state.topic, (state.redirectedClusterURI.toString()));....}public CompletableFuture<ClientCnx> getConnection(final String topic, final String url) {TopicName topicName = TopicName.get(topic);//看到方法名就知道到了Lookup的时候了,所以说好的命名远胜于注释return getLookup(url).getBroker(topicName).thenCompose(lookupResult -> getConnection(lookupResult.getLogicalAddress(),lookupResult.getPhysicalAddress(), cnxPool.genRandomKeyToSelectCon()));}public LookupService getLookup(String serviceUrl) {return urlLookupMap.computeIfAbsent(serviceUrl, url -> {try {//忽略其他的,直接跟这里进去return createLookup(serviceUrl);} catch (PulsarClientException e) {log.warn("Failed to update url to lookup service {}, {}", url, e.getMessage());throw new IllegalStateException("Failed to update url " + url);}});}public LookupService createLookup(String url) throws PulsarClientException {//这里可以看到如果咱们在配置客户端的地址是http开头就会通过http方式进行Loopup,否则走二进制协议进行查询if (url.startsWith("http")) {return new HttpLookupService(conf, eventLoopGroup);} else {return new BinaryProtoLookupService(this, url, conf.getListenerName(), conf.isUseTls(),externalExecutorProvider.getExecutor());}}public HttpLookupService(ClientConfigurationData conf, EventLoopGroup eventLoopGroup)throws PulsarClientException {//进到可能会误会Pulsar是通过HttpClient工具包进行的HTTP通信,继续看HttpClient构造函数this.httpClient = new HttpClient(conf, eventLoopGroup);this.useTls = conf.isUseTls();this.listenerName = conf.getListenerName();}protected HttpClient(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException {....//可以看到实际上最终是调用的AsyncHttpClient进行HTTP通信,这是一个封装Netty的async-http-client-2.12.1.jar的外部包httpClient = new DefaultAsyncHttpClient(config);....}

通过上面可以看到Lookup服务已经完成初始化,接下来就来看看客户端如何发起Lookup请求,回到PulsarClientImpl的getConnection方法,可以看到这里是链式调用,上面是从getLookup看到了其实是对Lookup进行初始化的过程,那么接下来就跟踪getBroker方法看看是怎么获取的服务端信息

    public CompletableFuture<ClientCnx> getConnection(final String topic, final String url) {TopicName topicName = TopicName.get(topic);return getLookup(url).getBroker(topicName).thenCompose(lookupResult -> getConnection(lookupResult.getLogicalAddress(),lookupResult.getPhysicalAddress(), cnxPool.genRandomKeyToSelectCon()));}public CompletableFuture<LookupTopicResult> getBroker(TopicName topicName) {//判断访问哪个版本的接口String basePath = topicName.isV2() ? BasePathV2 : BasePathV1;String path = basePath + topicName.getLookupName();path = StringUtils.isBlank(listenerName) ? path : path + "?listenerName=" + Codec.encode(listenerName);//获取要访问的Broker地址return httpClient.get(path, LookupData.class).thenCompose(lookupData -> {URI uri = null;try {//解析服务端返回的数据,本质上就是返回的就是Topic所在Broker的节点IP+端口InetSocketAddress brokerAddress = InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort());//HTTP通过Lookup方式访问服务端绝对不会走代理return CompletableFuture.completedFuture(new LookupTopicResult(brokerAddress, brokerAddress,false /* HTTP lookups never use the proxy */));} catch (Exception e) {....}});}public class LookupTopicResult {//LookupTopicResult是查询Topic归属Broker的结果后包装的一层结果,可以看到这里其实就是Socket信息也就是IP+端口private final InetSocketAddress logicalAddress;private final InetSocketAddress physicalAddress;private final boolean isUseProxy;
}

客户端的流程走到这里基本就结束了,是否有些意犹未尽迫不及待的想知道服务端又是怎么处理的?那么就看看下一节

服务端实现原理

服务端的入口在TopicLookup类的lookupTopicAsync方法,服务端大致步骤是这样的:1. 获取Topic所归属的Bundle 2. 查询Bundle所归属的Broker 3. 返回该Broker的url

    public void lookupTopicAsync(@Suspended AsyncResponse asyncResponse,@PathParam("topic-domain") String topicDomain, @PathParam("tenant") String tenant,@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,@QueryParam("listenerName") String listenerName,@HeaderParam(LISTENERNAME_HEADER) String listenerNameHeader) {TopicName topicName = getTopicName(topicDomain, tenant, namespace, encodedTopic);if (StringUtils.isEmpty(listenerName) && StringUtils.isNotEmpty(listenerNameHeader)) {listenerName = listenerNameHeader;}//可以看得到这里是获取Lookup的,跟踪进去看看internalLookupTopicAsync(topicName, authoritative, listenerName).thenAccept(lookupData -> asyncResponse.resume(lookupData)).exceptionally(ex -> {....});}protected CompletableFuture<LookupData> internalLookupTopicAsync(final TopicName topicName, boolean authoritative, String listenerName) {
CompletableFuture<Optional<LookupResult>> lookupFuture = pulsar().getNamespaceService()//获得目标Broker地址, 继续从这里进去.getBrokerServiceUrlAsync(topicName,LookupOptions.builder().advertisedListenerName(listenerName).authoritative(authoritative).loadTopicsInBundle(false).build());}public CompletableFuture<Optional<LookupResult>> getBrokerServiceUrlAsync(TopicName topic, LookupOptions options) {long startTime = System.nanoTime();// 获取这个Topic所归属的BundleCompletableFuture<Optional<LookupResult>> future = getBundleAsync(topic).thenCompose(bundle -> {//根据获得的bundle信息查询归属的Brokerreturn findRedirectLookupResultAsync(bundle).thenCompose(optResult -> {//如果findRedirectLookupResultAsync方式没查到则走这里进行查询return findBrokerServiceUrl(bundle, options); });});future.thenAccept(optResult -> {....}).exceptionally(ex -> {....});return future;}

先看看是怎么获取Topic所归属的Bundle的吧,就从getBundleAsync方法跟踪进去

    public CompletableFuture<NamespaceBundle> getBundleAsync(TopicName topic) {return bundleFactory.getBundlesAsync(topic.getNamespaceObject())//直接看findBundle,命名意思已经很清晰了.thenApply(bundles -> bundles.findBundle(topic));}public NamespaceBundle findBundle(TopicName topicName) {checkArgument(nsname.equals(topicName.getNamespaceObject()));//同理,继续跟踪进去return factory.getTopicBundleAssignmentStrategy().findBundle(topicName, this);}public NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespaceBundles) {//计算Topic名称的哈希值long hashCode = Hashing.crc32().hashString(topicName.toString(), StandardCharsets.UTF_8).padToLong();//根据哈希值来获取所归属的bundle,一致性哈希的设计。跟进去看看是怎么计算的NamespaceBundle bundle = namespaceBundles.getBundle(hashCode);if (topicName.getDomain().equals(TopicDomain.non_persistent)) {bundle.setHasNonPersistentTopic(true);}return bundle;}protected NamespaceBundle getBundle(long hash) {//通过数组的二分查找进行计算,数组的元素个数跟存储Bundle的bundles的集合大小是一样的,能获取对应的Bundle//思路其实就是一致性哈希的查找方式,计算出哈希值处于哈希环所处的位置并查找其下一个节点的信息int idx = Arrays.binarySearch(partitions, hash);int lowerIdx = idx < 0 ? -(idx + 2) : idx;return bundles.get(lowerIdx);}

知道Bundle之后,下一步就是根据这个Bundle来查询其所归属的Broker节点,也就是上面的NamespaceService类的findRedirectLookupResultAsync方法,这里一路跟下去就是查询缓存中获取映射信息的地方了,感兴趣的伙伴可以继续跟下去

    private CompletableFuture<Optional<LookupResult>> findRedirectLookupResultAsync(ServiceUnitId bundle) {if (isSLAOrHeartbeatNamespace(bundle.getNamespaceObject().toString())) {return CompletableFuture.completedFuture(Optional.empty());}return redirectManager.findRedirectLookupResultAsync();}

总结

以上就是Pulsar的Lookup机制的实现流程,在寻址的过程中,需要阅读的伙伴具备一致性哈希的知识,因为Pulsar的Topic归属就是引入了一致性哈希算法来实现的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/790679.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Flutter中setState函数的使用注意事项

文章目录 Flutter中setState函数的使用注意事项只能在具有State对象的类中使用不要在build方法中使用将状态更新逻辑放在setState方法内部避免频繁调用使用回调函数更新状态 Flutter中setState函数的使用注意事项 setState()函数是Flutter中非常重要的一个函数&#xff0c;它用…

C#使用Selenium驱动Chrome浏览器

1.Selenium库依赖安装 Selenium WebDriver是Selenium项目的一部分&#xff0c;用于模拟用户在Web应用程序中的交互操作。它支持多种浏览器&#xff0c;如Chrome、Firefox、IE等&#xff0c;且与各种编程语言&#xff08;如Java、Python、C#等&#xff09;兼容&#xff0c;具有…

2012年认证杯SPSSPRO杯数学建模B题(第二阶段)节能减排全过程文档及程序

2012年认证杯SPSSPRO杯数学建模 节能减排、抑制全球气候变暖 B题 白屋顶计划 原题再现&#xff1a; 第二阶段问题   虽然环境学家对地球环境温度的改变有许多种不同观点&#xff0c;但大多数科学家可以达成一个基本的共识&#xff1a;近年来人类的活动&#xff0c;尤指二氧…

云存储中常用的相同子策略的高效、安全的基于属性的访问控制的论文阅读

参考文献为2022年发表的Efficient and Secure Attribute-Based Access Control With Identical Sub-Policies Frequently Used in Cloud Storage 动机 ABE是实现在云存储中一种很好的访问控制手段&#xff0c;但是其本身的计算开销导致在实际场景中应用收到限制。本论文研究了…

ESP32学习---ESP-NOW(一)

ESP32学习---ESP-NOW&#xff08;一&#xff09; 官网简介arduino 官网简介 首先看官网的介绍&#xff1a;https://www.espressif.com.cn/zh-hans/solutions/low-power-solutions/esp-now ESP-NOW 是乐鑫定义的一种无线通信协议&#xff0c;能够在无路由器的情况下直接、快速…

【Java多线程(4)】案例:设计模式

目录 一、什么是设计模式&#xff1f; 二、单例模式 1. 饿汉模式 2. 懒汉模式 懒汉模式-第一次改进 懒汉模式-第二次改进 懒汉模式-第三次改进 一、什么是设计模式&#xff1f; 设计模式是针对软件设计中常见问题的通用解决方案。它们提供了一种被广泛接受的方法来解决…

5米分辨率数字高程模型(DEM)的制作

在现代科技的驱动下&#xff0c;地理信息系统&#xff08;GIS&#xff09;和遥感技术已经取得了惊人的进展。其中一项令人瞩目的技术就是5米分辨率数字高程模型&#xff08;DEM&#xff09;的制作&#xff0c;它是基于多颗高分辨率卫星数据为原始数据&#xff0c;借助智能立体模…

C语言编写Linux的Shell外壳

目录 一、输出命令行 1.1 了解环境变量 1.2 获取用户名、主机名、当前路径 1.3 缓冲区改进MakeCommandLine 二、获取用户命令 2.1 读取函数的选择 2.2 细节优化 2.3 返回值 三、指令和选项分割 3.1 strtok 函数 3.2 分割实现 四、执行命令 4.1 fork 方法 4.2 进…

0.17元的4位数码管驱动芯片AiP650,支持键盘,还是无锡国家集成电路设计中心某公司的

推荐原因&#xff1a;便宜的4位数码管驱动芯片 只要0.17元&#xff0c;香吗&#xff1f;X背景的哦。 2 线串口共阴极 8 段 4 位 LED 驱动控制/7*4 位键盘扫描专用电路 AIP650参考电路图 AIP650引脚定义

scratch小动物的晚会 2024年3月中国电子学会图形化编程 少儿编程 scratch编程等级考试一级真题和答案解析

目录 scratch小动物的晚会 一、题目要求 1、准备工作 2、功能实现 二、案例分析

51单片机入门_江协科技_20.1_Proteus串口仿真

1.为了解决51单片机学习过程中在Proteus中的串口仿真的问题&#xff0c;需要在Proteus中建立串口仿真的环境&#xff08;目前Proteus安装在Win7x64虚拟机环境中&#xff1b; 2. 在CSDN中找到VSPD下载地址&#xff0c;在虚拟机中进行VSPD的安装&#xff0c;具体链接地址如下&am…

HTML块级元素和内联元素(头部和布局)

目录 1.HTML块级和内联标签&#xff1a; 1.块级元素&#xff1a; 2.内联元素: 3.元素嵌套&#xff1a; 4.元素转换&#xff1a; 示例如下: 2.内联框架&#xff1a; 前言&#xff1a; 示例如下: 3.布局&#xff1a; 4.头部标签&#xff1a; 前言&#xff1a; 说明&…

【蓝桥杯-Even Parity】

蓝桥杯-Even Parity 洛谷 UVA11464 Even Parity 暴力思路&#xff1a; 去遍历每个元素&#xff0c;如果不符合要求则翻转 时间复杂度大概在O&#xff08;2^&#xff08;nn&#xff09; nn&#xff09; 改进思路&#xff1a; 先去枚举确定第一行&#xff08;第一行得合法&…

反截屏控制技术对于防止数据外泄都有哪些具体作用?

反截屏控制技术在防止数据外泄方面具有以下具体作用&#xff1a; 智能反截屏技术&#xff1a; 当用户启动截屏操作时&#xff0c;并只有非涉密内容被截屏&#xff0c;所有涉密窗口自动隐藏&#xff0c;防止涉密内容被截屏。这一技术普遍支持目前市面上的各种截屏操作&#xff0…

Apache ECharts-数据统计(详解、入门案例)

简介&#xff1a;Apache ECharts 是一款基于 Javascript 的数据可视化图表库&#xff0c;提供直观&#xff0c;生动&#xff0c;可交互&#xff0c;可个性化定制的数据可视化图表。 1、介绍 图 1.1 Apache ECharts 功能、运行环境 功能&#xff1a; ECharts&#xff…

微信小程序使用icon图标

原因&#xff1a; 微信小程序使用fontawesome库使用icon图标&#xff0c;网上有很多教程&#xff0c;按照网上说法制作&#xff0c;引入到微信小程序中&#xff0c;但是验证成功&#xff0c;只能使用部分图标&#xff0c;结果不尽如人意。后面使用阿里巴巴开源iconfont来使用ic…

【opencv】教程代码 —Histograms_Matching(2)计算直方图、直方图比较、直方图均衡、模板匹配...

计算直方图直方图比较图像进行直方图均衡化处理模板匹配 1. calcHist_Demo.cpp 计算直方图 这段代码的功能是加载图像&#xff0c;分离图像的三个颜色通道&#xff0c;然后分别计算这三个通道的直方图&#xff0c;绘制出来并显示结果。直方图是图像中像素值分布的图形表示&…

【Django学习笔记(四)】JavaScript 语言介绍

JavaScript 语言介绍 前言正文1、JavaScript 小案例2、代码位置2.1 在当前 HTML 文件中2.2 在其他 js 文件中 3、代码注释3.1 HTML的注释3.2 CSS的注释3.3 Javascript的注释 4、变量 & 输出4.1 字符串4.2 数组4.3 对象(python里的字典) 5、条件语句6、函数7、DOM7.1 根据 I…

目标检测——图像中提取文字

一、重要性及意义 图像提取文本&#xff0c;即光学字符识别&#xff08;OCR&#xff09;技术&#xff0c;在现代社会中的重要性和意义日益凸显。以下是关于图像提取文本的重要性和意义的几个关键方面&#xff1a; 信息获取的效率提升 快速处理大量文档&#xff1a;OCR技术可…

报错 | 2023新版IDEA/PyCharm连接远程服务器的Docker需使用密钥认证

文章目录 01 问题情景02 需求场景及工作原理03 解决步骤3.1 在本地生成密钥对3.2 将公钥保存至服务器3.3 本地连接时选择私钥文件 网上有很多文章讲怎么解决&#xff0c;但都要么写得很复杂&#xff0c;要么没有写明白原理或操作详情&#xff0c;造成我一头雾水。 01 问题情景…