Apache pulsar 技术系列-- 消息重推的几种方式

导语

Apache Pulsar 是一个多租户、高性能的服务间消息传输解决方案,支持多租户、低延时、读写分离、跨地域复制(GEO replication)、快速扩容、灵活容错等特性。在很多场景下,用户需要通过 MQ 实现消息的重新推送能力,比如超时重推、处理异常时重推等,本文介绍 Apache Pulsar 提供的几种消息重推方案。

在 MQ 实际的使用中,用户消费数据时,可能会遇到消息处理异常或者需要推迟处理的场景,这里就涉及到消息的重推逻辑,Pulsar 自己提供了消息重推的能力。本文主要介绍 Pulsar 的消息重推机制。

消息获取(拉取/推送)机制

Pulsar 的消费采用了推、拉结合的消息获取机制,Consumer 获取消息之前会首先通知 Broker(FLOW 请求),Broker 会根据配置的 ReceiveQueue 大小以及 Consumer 当前可以接收的消息数量来推送消息给 Consumer。

详细的交互流程如下图所示:

图片

  1. Consumer 在创建之后,会以 MaxReceiveQueue 的大小作为 Permit 值,这个值就是 Consumer 可以缓存的的最大消息条数。

  2. 然后,Consumer 向 Broker 发起 FLOW 请求,携带 Permit 信息(Consumer Permit 减少到 0),Broker 接收之后会记录这个 Permit 作为 Consumer 的 AvailablePermit,AvailablePermit 决定 Broker 可以向 Consumer 发送数据的数量(实际是在读取数据时判断)。

  3. 如果 AvailablePermit > 0, Broker 开始读取数据(假设有 N 条),然后推送给 Consumer,推送之后,AvailablePermit 自减 N。

  4. Consumer 接收到消息之后,并不会直接返回给用户,而是放在 ReceiveQueue 中,当用户调用 Receive() 方法来获取消息时,Consumer 将 Permit + 1。

  5. 当 Permit > MaxReceiveQueueSize / 2,Consumer 会再次发起 Flow 请求,并且携带当前的 Permit 值。

上述流程,就是 Consumer 和 Broker 的消息传递过程。

在默认的情况下,数据推送给 Consumer 之后,就完全交给用户处理,数据不会重复推送。这种方式满足不了需要重推的场景,下面介绍目前 Pulsar 的几种重推机制。

SDK 统一的重推

一个比较直观的做法是超过一定时间,如果消息没有 Ack 就重新推送。

目前 Pulsar 提供了通过超时时间来控制数据重推的能力,Consumer 可以配置 AckTimeout(默认关闭),在设置了 AckTimeout 之后,Client 会构建一个 UnAckedMessageTracker ,用户 Receive() 的所有的消息都会被 UnAckedMessageTracker 跟踪。用户 Ack 消息时,会从 UnAckedMessageTracker 删除,对于没有 Ack 的消息,UnAckedMessageTracker 会有定时任务来检查,如果已经超过了 AckTimeout 时间,则会触发重推。

重推是通过 RedeliverUnackMessage 来实现的,UnAckedMessageTracker 会主动发起 Redeliver 的请求,Broker 会根据请求的 MessageId 信息重新推送。

AckTimeout 在 Consumer 初始化时设置:

 Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)                  .ackTimeout(10, TimeUnit.SECOND)

用户决策的重推 – NegativeAck

通过 AckTimeout 实现的重推,是 SDK 内部统一实现的,用户不能控制重推的行为,如果用户希望根据自己的使用场景,决定哪些消息需要重推,Pulsar 提供了 NegativeAck 的能力。

NegativeAck 和 AckTimeout 方式类似,有一个 NegativeAcksTracker 来管理消息的重推,NegativeAcksTracker 只会跟踪用户主动调用 NegativeAcknowledge() 方法的 MessageID,重推的逻辑也是通过 RedeliverUnackMessage 实现。

NegativeAck 可以设置 Redelivery 的 Delay 时间。

 Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)                .negativeAckRedeliveryDelay(1001, TimeUnit.MILLISECONDS)

使用的时候,需要明确调用。

// call the API to send negative acknowledgmentconsumer.negativeAcknowledge(message);

用户决策的重推 – RLQ

除了 NegativeAck 的方式,用户还可以通过重试队列( RLQ )来实现主动的消息重推,RLQ 一般会使用在用户暂时不能处理某些消息,并且希望之后再处理的场景。

Pulsar 提供了 ReconsumeLater() 方法来实现重试队列,和 Negative 不同的是,RLQ 会创建一个新的 Topic,Topic 的格式是 TopicName-SubscriptionName_RLQ , 每次 ReconsumeLater() 时,都会产生一个新的消息写入到 RLQ Topic 中,并且会对之前的消息 Ack。

设置了 RLQ 的 Consumer,SDK 内部默认会启动 RLQ 的订阅,所以 RLQ 的消息也会被 Consumer 消费到。

RLQ 是通过 DeadLetterPolicy 来配置的(DLQ 下文会解释)。

Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)     .topic("my-topic")     .subscriptionName("my-subscription")    .subscriptionType(SubscriptionType.Shared)     .enableRetry(true)    .deadLetterPolicy(DeadLetterPolicy.builder()     .maxRedeliverCount(maxRedeliveryCount)    .build())     .subscribe();

RLQ Topic 中的消息属性中会添加一下信息:

Special propertyDescription
REAL_TOPIC原始 Topic 名称
ORIGIN_MESSAGE_ID原始 MessageId
RECONSUMETIMES重复消费的次数
DELAY_TIME投递的延迟时间

RLQ 也需要主动调用: consumer.reconsumeLater(msg, 3, TimeUnit.SECONDS)。

为重推次数加上限制–DLQ

对于数据持续处理失败,一直重试并不是一个很好的策略,此时死信队列(DLQ)就是一个比较好的选择,DLQ 允许用户将持续处理失败的数据写入到一个独立的 Dead Letter Topic 中,DLQ 的数据需要单独的订阅来消费。

DLQ Topic 的格式为 TopicName-SubscriptionName_DLQ。DLQ 需要为重试设置一个上限,当重试次数超过上限之后,就会被写入到 DLQ Topic 中。

Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)       .topic("my-topic")      .subscriptionName("my-subscription")      .subscriptionType(SubscriptionType.Shared)         .deadLetterPolicy(DeadLetterPolicy.builder()             .maxRedeliverCount(maxRedeliveryCount)             .build())         .subscribe();

几种重推和 DLQ 的关系

如果配置了 DLQ,那么使用 AckTimeout、NegativeAck 或者 ReconsumeLater 引起的数据重推都会触发 DLQ,也就是说重试的次数达到上限之后,都会被写入到 DLQ topic 里。

重试次数的统计有所区别:

AckTimeout 和 NegativeAck 都是通过 Redelivery 机制来计数的,SDK 发起 Redelivery 请求之后,Broker 侧的 RedeliveryTracker 会记录重推的次数,并且在推送给 Consumer 的 Message 中会包含 RedeliveryCount 的字段。

对于 RLQ,则是从 RECONSUMETIMES 属性中获取重复消费的次数,这个属性在 Client 生成,并且也是在 Client 计数。

总的来说,Apache Pulsar 提供了多种消息重推的方式,用户可以结合自己的场景,灵活使用,满足自己的业务需求。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/9021.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【算法】求欧拉函数(包括完整的证明以及代码模板,建议收藏)

求欧拉函数 前置知识 互质&#xff1a;互质是公约数只有1的两个整数&#xff0c;叫做互质整数。 欧拉函数定义 1 ∼ N − 1 1∼N-1 1∼N−1中与N互质的数的个数被称为欧拉函数&#xff0c;记为 ϕ ( N ) \phi(N) ϕ(N)。 若在算数基本定理中&#xff0c; N p 1 a 1 p 2 a 2 .…

Clion开发STM32之OLED屏(软件i2c测试)

前言 本篇内容需要参考之前的文章: Clion开发stm32之微妙延迟(采用nop指令实现)Clion开发STM32之日志模块(参考RT-Thread)Clion开发STM32之I2C驱动(参考RT-Thread)Clion开发STM32之HAL库硬件I2C驱动OLED 使用的是0.96寸OLED屏 测试文件 /**********************************…

Python+Playwright自动化测试--标签页操作(tab)

1.简介 标签操作其实也是基于浏览器上下文&#xff08;BrowserContext&#xff09;进行操作的&#xff0c;而且宏哥在之前的BrowserContext也有提到过&#xff0c;但是有的童鞋或者小伙伴还是不清楚怎么操作&#xff0c;或者思路有点模糊&#xff0c;因此今天单独来对其进行讲…

nvidia-smi输出的结果代表什么

nvidia-smi(NVIDIA System Management Interface) 是基于nvml的gpu的系统管理接口,主要用于显卡的管理和状态监控。 nvidia-smi简称NVSMI&#xff0c;提供监控GPU使用情况和更改GPU状态的功能&#xff0c;是一个跨平台工具&#xff0c;支持所有标准的NVIDIA驱动程序支持的Linu…

【RS】基于规则的面向对象分类

ENVI使用最多的工具就是分类&#xff0c;这也是很多卫星影像的用途。在ENVI中有很多分类工具&#xff0c;如最基础的监督分类&#xff08;最大似然法、最小距离、支持向量机、随机森林&#xff09;、非监督分类&#xff08;K-means、IsoData&#xff09;&#xff0c;还有面向对…

13、PHP面向对象2(方法的访问控制、子类继承、常量)

1、类中的方法可以被定义为公有&#xff0c;私有或受保护。如果没有设置这些关键字&#xff0c;则该方法默认为公有。 public定义的方法&#xff0c;可以在类外使用。 protected定义的方法&#xff0c;只能在本类或子类的定义内使用。 private定义的方法&#xff0c;只能在本…

php学习当中遇到过哪些问题

PHP是一种流行的服务器端脚本语言&#xff0c;广泛用于Web应用程序和网站的开发。虽然PHP相对容易学习&#xff0c;但它仍然可能会给初学者带来一些问题和挑战。本文将探讨一些PHP初学者可能会遇到的常见问题&#xff0c;并提供一些解决方案。 理解PHP的基本语法和语义 PHP的语…

17网商品详情API:使用与数据解析方法

17网是一家知名的电商平台&#xff0c;提供了大量的商品选择。开发者可以通过17网的商品详情API来快速获取和展示商品的详细信息。 17网商品详情API简介 介绍17网商品详情API的作用和目的&#xff0c;解释为何使用该API可以实现丰富的商品详情展示功能。 获取API访问权限 说…

ubuntu开机自启动

ubuntu开机自启动 1、建一个test.sh脚本&#xff0c;并写入 #!/bin/sh gnome-terminal -x bash -c ‘cd /home/文件路径/;python3 main.py’ exit 0 2、:wq!保存 3、创建rc-local.service文件&#xff08;sudo vim /etc/systemd/system/rc-local.service&#xff09;&#xf…

Linux系统安装部署MySQL完整教程(图文详解)

前言&#xff1a;最近网上翻阅了大量关于Linux安装部署MySQL的教程&#xff0c;在自己部署的时候总是存在一些小问题&#xff0c;例如&#xff1a;版本冲突&#xff0c;配置失败和启动失败等等&#xff0c;功夫不负有心人&#xff0c;最后还是安装部署成功了&#xff0c;所以本…

[SQL系列] 从头开始学PostgreSQL 事务 锁 子查询

[SQL系列] 从头开始学PostgreSQL 索引 修改 视图_Edward.W的博客-CSDN博客https://blog.csdn.net/u013379032/article/details/131818865 事务 事务是一系列逻辑相关的数据库操作&#xff0c;可以作为一个整体进行操作或者回滚。事务通常会包含一个序列的读或者写操作&#xf…

Flink任务优化分享

Flink任务优化分享 1.背景介绍 线上计算任务在某版本上线之后发现每日的任务时长都需要三个多小时才能完成&#xff0c;计算时间超过了预估时间&#xff0c;通过Dolphinscheduler的每日调度任务看&#xff0c;在数据层 dwd 的数据分段任务存在严重的性能问题&#xff0c;每天…

【python】flask查询更新指定的某一条记录

PackageRecord.query.filter_by(idpackage_id).update(json_data) 这段代码的问题在于它不能正确地更新指定的记录。这是因为 update() 方法是 SQLAlchemy 提供的一种批量更新的方法&#xff0c;他通过接收一个字典对象来更新记录。但是在你的代码中&#xff0c;json_data 应该…

20230721在WIN10下安装openssl并解密AES-128加密的ts视频切片

20230721在WIN10下安装openssl并解密AES-128加密的ts视频切片 2023/7/21 22:58 1、前言&#xff1a; AES-128加密的ts视频切片【第一个】&#xff0c;打开有时间限制的&#xff01; https://app1ce7glfm1187.h5.xiaoeknow.com/v2/course/alive/l_64af6130e4b03e4b54da1681?typ…

研发机器配网方案(针对禁止外网电脑的组网方案)

背景&#xff1a;公司是研发型小公司&#xff0c;难免会使用A某D和K某l 等国内免费软件&#xff0c;这两个是业界律师函发得最多的软件。最简单的方案是离网使用&#xff0c;但是离网使用比较麻烦的是要进行文件传输&#xff0c;需要使用U盘拷贝&#xff0c;另外研发型企业一般…

ChatGPT:人机交互新境界,AI智能引领未来

一、ChatGPT&#xff1a;智能交流的新标杆 ChatGPT是基于GPT技术的最新版本&#xff0c;拥有深度学习模型的基因。它通过在大量数据上进行预训练&#xff0c;可以理解和生成自然语言&#xff0c;从而实现了与人类更加自然流畅的对话和交流。 二、ChatGPT的技术背景和工作原理 …

【动态规划上分复盘】这是你熟悉的地下城游戏吗?

欢迎 前言一、动态规划五步曲二、地下城游戏题目分析思路&#xff1a;动态规划具体代码如下 总结 前言 本文讲解关于动态规划思路的两道题目。 一、动态规划五步曲 1.确定状态表示&#xff08;确定dp数组的含义&#xff09;2.确定状态转移方程&#xff08;确定dp的递推公式&a…

Docker:Docker-Compose

Docker:Docker-Compose 一、Docker-Compose 介绍1.1 Docker-Compose 概述二、Docker-Compose 安装2.1 Docker Compose 环境安装2.2 下载2.3 安装三、Docker-Compose 使用3.1 YAML 文件格式及编写注意事项3.2 使用 YAML 时需要注意下面事项3.3 Docker-Compose配置常用字段3.4 D…

Python中pyecharts模块

pyecharts模块 官网&#xff1a;pyecharts官网 pyecharts框架画廊 如果想要做出数据可视化效果图, 可以借助pyecharts模块来完成概况 : Echarts 是个由百度开源的数据可视化&#xff0c;凭借着良好的交互性&#xff0c;精巧的图表设计&#xff0c;得到了众多开发者的认可. 而…

java面试整理

一、 JVM部分 JVM内存溢出(一)之排查初体验_少负 | 气节的博客-CSDN博客 JVM内存溢出(二)之双亲委派机制_少负 | 气节的博客-CSDN博客 JVM内存溢出(三)之JVM8内存模型_少负 | 气节的博客-CSDN博客 JVM内存溢出(四)之垃圾回收器_少负 | 气节的博客-CSDN博客 JVM内存溢出(五…