架构师系列-消息中间件(九)- RocketMQ 进阶(三)-消费端消息保障

5.2 消费端保障
5.2.1 注意幂等性

应用程序在使用RocketMQ进行消息消费时必须支持幂等消费,即同一个消息被消费多次和消费一次的结果一样,这一点在使用RoketMQ或者分析RocketMQ源代码之前再怎么强调也不为过。

“至少一次送达”的消息交付策略,和消息重复消费是一对共生的因果关系,要做到不丢消息就无法避免消息重复消费,原因很简单,试想一下这样的场景:客户端接收到消息并完成了消费,在消费确认过程中发生了通讯错误,从Broker的角度是无法得知客户端是在接收消息过程中出错还是在消费确认过程中出错,为了确保不丢消息,重发消息是唯一的选择。

有了消息幂等消费约定的基础,RocketMQ就能够有针对性地采取一些性能优化措施,例如:并行消费、消费进度同步机制等,这也是RocketMQ性能优异的原因之一。

5.2.2 消息消费模式

从不同的维度划分,Consumer支持以下消费模式:

  • 广播消费模式下,消息消费失败不会进行重试,消费进度保存在Consumer端;
  • 集群消费模式下,消息消费失败有机会进行重试,消费进度集中保存在Broker端。
5.2.2.1 集群消费

使用相同 Group ID 的订阅者属于同一个集群,同一个集群下的订阅者消费逻辑必须完全一致(包括 Tag 的使用),这些订阅者在逻辑上可以认为是一个消费节点

 

注意事项

  • 消费端集群化部署, 每条消息只需要被处理一次。
  • 由于消费进度在服务端维护, 可靠性更高。
  • 集群消费模式下,每一条消息都只会被分发到一台机器上处理。如果需要被集群下的每一台机器都处理,请使用广播模式。
  • 集群消费模式下,不保证每一次失败重投的消息路由到同一台机器上,因此处理消息时不应该做任何确定性假设。
5.2.2.2 广播消费

广播消费指的是:一条消息被多个consumer消费,即使这些consumer属于同一个ConsumerGroup,消息也会被ConsumerGroup中的每个Consumer都消费一次,广播消费中ConsumerGroup概念可以认为在消息划分方面无意义。

 

注意事项

  • 广播消费模式下不支持顺序消息。
  • 广播消费模式下不支持重置消费位点。
  • 每条消息都需要被相同逻辑的多台机器处理。
  • 消费进度在客户端维护,出现重复的概率稍大于集群模式。
  • 广播模式下,消息队列 RocketMQ 保证每条消息至少被每台客户端消费一次,但是并不会对消费失败的消息进行失败重投,因此业务方需要关注消费失败的情况。
  • 广播模式下,客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过, 请谨慎选择。
  • 广播模式下,每条消息都会被大量的客户端重复处理,因此推荐尽可能使用集群模式。
  • 目前仅 Java 客户端支持广播模式。
  • 广播模式下服务端不维护消费进度,所以消息队列 RocketMQ 控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能。
5.2.2.3 集群模式模拟广播

如果业务需要使用广播模式,也可以创建多个 Group ID,用于订阅同一个 Topic。

 

注意事项

  • 每条消息都需要被多台机器处理,每台机器的逻辑可以相同也可以不一样。
  • 消费进度在服务端维护,可靠性高于广播模式。
  • 对于一个 Group ID 来说,可以部署一个消费端实例,也可以部署多个消费端实例。当部署多个消费端实例时,实例之间又组成了集群模式(共同分担消费消息)。假设 Group ID 1 部署了三个消费者实例 C1、C2、C3,那么这三个实例将共同分担服务器发送给 Group ID 1 的消息。同时,实例之间订阅关系必须保持一致。
5.2.3 消息消费模式

RocketMQ消息消费本质上是基于的拉(pull)模式,consumer主动向消息服务器broker拉取消息。

  • 推消息模式下,消费进度的递增是由RocketMQ内部自动维护的;
  • 拉消息模式下,消费进度的变更需要上层应用自己负责维护,RocketMQ只提供消费进度保存和查询功能。
5.2.3.1 推模式(PUSH)

我们上面使用的消费者都是PUSH模式,也是最常用的消费模式

由消息中间件(MQ消息服务器代理)主动地将消息推送给消费者;采用Push方式,可以尽可能实时地将消息发送给消费者进行消费。但是,在消费者的处理消息的能力较弱的时候(比如,消费者端的业务系统处理一条消息的流程比较复杂,其中的调用链路比较多导致消费时间比较久。概括起来地说就是“慢消费问题”),而MQ不断地向消费者Push消息,消费者端的缓冲区可能会溢出,导致异常。

实现方式,代码上使用 DefaultMQPushConsumer

consumer把轮询过程封装了,并注册MessageListener监听器,取到消息后,唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉消息是被推送(push)过来的。主要用的也是这种方式。

5.2.3.2 拉模式(PULL)

RocketMQ的PUSH模式是由PULL模式来实现的

由消费者客户端主动向消息中间件(MQ消息服务器代理)拉取消息;采用Pull方式,如何设置Pull消息的频率需要重点去考虑,举个例子来说,可能1分钟内连续来了1000条消息,然后2小时内没有新消息产生(概括起来说就是“消息延迟与忙等待”)。如果每次Pull的时间间隔比较久,会增加消息的延迟,即消息到达消费者的时间加长,MQ中消息的堆积量变大;若每次Pull的时间间隔较短,但是在一段时间内MQ中并没有任何消息可以消费,那么会产生很多无效的Pull请求的RPC开销,影响MQ整体的网络性能。

5.2.3.3 注意事项

注意:RocketMQ 4.6.0版本后将弃用DefaultMQPullConsumer

DefaultMQPullConsumer方式需要手动管理偏移量,官方已经被废弃,将在2022年进行删除

 

 

DefaultLitePullConsumer

该类是官方推荐使用的手动拉取的实现类,偏移量提交由RocketMQ管理,不需要手动管理

5.2.4 消息确认机制

consumer的每个实例是靠队列分配来决定如何消费消息的,那么消费进度具体是如何管理的,又是如何保证消息成功消费的?(RocketMQ有保证消息肯定消费成功的特性,失败则重试)

为了保证数据不被丢失,RocketMQ支持消息确认机制,即ack。发送者为了保证消息肯定消费成功,只有使用方明确表示消费成功,RocketMQ才会认为消息消费成功。中途断电,抛出异常等都不会认为成功——即都会重新投递。

5.2.4.1 确认消费

业务实现消费回调的时候,当且仅当此回调函数返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,RocketMQ才会认为这批消息(默认是1条)是消费完成的。

consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);execute();//执行真正消费return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
})
5.2.4.2 消费异常

如果这时候消息消费失败,例如数据库异常,余额不足扣款失败等一切业务认为消息需要重试的场景,只要返回ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ就会认为这批消息消费失败了。

consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);execute();//执行真正消费return ConsumeConcurrentlyStatus.RECONSUME_LATER}
})

 

为了保证消息是肯定被至少消费成功一次,RocketMQ会把这批消息重发回Broker(topic不是原topic而是这个消费组的RETRY topic),在延迟的某个时间点(默认是10秒,业务可设置)后,再次投递到这个ConsumerGroup,而如果一直这样重复消费都持续失败到一定次数(默认16次),就会投递到DLQ死信队列,应用可以监控死信队列来做人工干预。

5.2.5 消息重试机制
5.2.5.1 顺序消息的重试

对于顺序消息,当消费者消费消息失败后,消息队列RocketMQ版会自动不断地进行消息重试(每次间隔时间为1秒),这时,应用会出现消息消费被阻塞的情况,因此,建议您使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。

5.2.5.2 无序消息的重试

无序消息的重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。

5.2.5.3 重试次数

消息队列RocketMQ版默认允许每条消息最多重试16次,每次重试的间隔时间如下。

第几次重试与上次重试的间隔时间第几次重试与上次重试的间隔时间
110秒97分钟
230秒108分钟
31分钟119分钟
42分钟1210分钟
53分钟1320分钟
64分钟1430分钟
75分钟151小时
86分钟162小时

如果消息重试16次后仍然失败,消息将不再投递。如果严格按照上述重试时间间隔计算,某条消息在一直消费失败的前提下,将会在接下来的4小时46分钟之内进行16次重试,超过这个时间范围消息将不再重试投递。

5.2.5.4 和生产端重试区别

消费者和生产者的重试还是有区别的,主要有两点

  • 默认重试次数:Product默认是2次,而Consumer默认是16次
  • 重试时间间隔:Product是立刻重试,而Consumer是有一定时间间隔的。它照1S,5S,10S,30S,1M,2M····2H进行重试。

注意:Product在异步情况重试失效,而对于Consumer在广播情况下重试失效。

5.2.5.5 重试配置方式

需要重试

消费失败后,重试配置方式,集群消费方式下,消息消费失败后期望消息重试,需要在消息监听器接口的实现中明确进行配置(三种方式任选一种):

  • 方式1:返回RECONSUME_LATER(推荐)
  • 方式2:返回Null
  • 方式3:抛出异常

无需重试

集群消费方式下,消息失败后期望消息不重试,需要捕获消费逻辑中可能抛出的异常,最终返回Action.CommitMessage,此后这条消息将不会再重试。

//注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {//消息处理逻辑抛出异常,消息将重试。try {doConsumeMessage(list);}catch (Exception e){//捕获消费逻辑中的所有异常,并返回Action.CommitMessage;return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}//业务方正常消费return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
});

 

5.3 死信队列

在正常情况下无法被消费(超过最大重试次数)的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列就称为死信队列(Dead-Letter Queue)

当一条消息初次消费失败,消息队列 RocketMQ 会自动进行消息重试;达到最大重试次 数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 RocketMQ 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。 在消息队列 RocketMQ 中,这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。

5.3.1 死信特性
5.3.1.1 死信消息特性
  • 不会再被消费者正常消费
  • 有效期与正常消息相同,均为 3 天,3 天后会被自动删除,故死信消息应在产生的 3 天内及时处理
5.3.1.2 死信队列特性
  • 一个死信队列对应一个消费者组,而不是对应单个消费者实例
  • 一个死信队列包含了对应的 Group ID 所产生的所有死信消息,不论该消息属于哪个 Topic
  • 若一个 Group ID 没有产生过死信消息,则 RocketMQ 不会为其创建相应的死信队列

6. Redis 轮询队列

redis队列中存放车辆信息,调度系统从队列中获取车辆信息,打车完成后再将车辆信息放回队列中

 

6.1 相关代码
6.1.1 redis获取车辆

从list左侧弹出一个车辆

 

/*** 从Redis List列表中拿取一个车辆ID* 如果没有获取到延时10S** @return*/
public String takeVehicle() {//从Redis List列表中拿取一个车辆IDreturn redisTemplate.opsForList().leftPop(DispatchConstant.VEHICLE_QUEUE, 1, TimeUnit.SECONDS);
}
6.1.2 redis压入车辆

检查车辆状态,并从右侧压入车辆

/*** 设置车辆状态为Ready** @param vehicleId*/
public void readyDispatch(String vehicleId) {//检查车辆状态DispatchConstant.DispatchType vehicleDispatchType = taxiVehicleStatus(vehicleId);//如果车辆时运行状态if (vehicleDispatchType.isRunning() || vehicleDispatchType.isReady()) {redisTemplate.opsForValue().set(DispatchConstant.VEHICLE_STATUS_PREFIX + vehicleId, DispatchConstant.DispatchType.READY.toString());//从右侧压入车辆redisTemplate.opsForList().rightPush(DispatchConstant.VEHICLE_QUEUE, vehicleId);}
}

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/3322.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Hive主要介绍

Hive介绍 hive是基于 Hadoop平台操作 HDFS 文件的插件工具 可以将结构化的数据文件映射为一张数据库表 可以将 HQL 语句转换为 MapReduce 程序 1.hive 是由驱动器组成&#xff0c;驱动器主要由4个组件组成&#xff08;解析器、编译器、优化器、执行器&#xff09; 2.hive本身不…

【安卓13-Framework】SystemUI定制之屏蔽下拉状态栏部分快捷按钮

1、需求 屏蔽下拉状态栏谷歌录屏、省电模式、二维码扫描器等快捷按钮。 2、修改路径 普及&#xff1a;安卓的SystemUI包提供了状态栏、导航栏、通知中心等重要的用户界面元素。 状态栏小部件UI显示修改路径&#xff1a;frameworks/base/packages/SystemUI/src/com/android/s…

Java虚拟机(jvm)常见问题总结

1.电脑怎样认识我们编写的Java代码 首先先了解电脑是二进制的系统&#xff0c;他只认识 01010101比如我们经常要编写 HelloWord.java 电脑是怎么认识运行的HelloWord.java是我们程序员编写的&#xff0c;我们人可以认识&#xff0c;但是电脑不认识 Java文件编译的过程 1. 程…

git lab 2.7版本修改密码命令

1.gitlab-rails console -e production Ruby: ruby 2.7.5p203 (2021-11-24 revision f69aeb8314) [x86_64-linux] GitLab: 14.9.0-jh (51fb4a823f6) EE GitLab Shell: 13.24.0 PostgreSQL: 12.7 2根据用户名修改密码 user User.find_by(username: ‘username’) # 替换’use…

无人驾驶(移动机器人)路径规划之RRT与RRTStar算法及其matlab实现

在自动驾驶与移动机器人路径规划时&#xff0c;必定会用到经典的算法RRT与RRT Star。下面是RRT与RRTStar的matlab实现效果。可以发现RRTStar效果明显改善。 目录 一、效果比较 1.1 RRT算法效果&#xff08;黑色为障碍物&#xff0c;红色线为最终路径&#xff0c;蓝色三角形为…

C++之STL-vector+模拟实现

目录 一、vector的介绍和基本使用的方法 1.1 介绍 1.2 迭代器 1.3 vector的一些基本使用 1.3.1 构造函数 1.3.2 迭代器 1.3.3 有关容量的接口 1.3.4 增删查改 二、模拟实现vector 2.1 成员变量 2.2 迭代器的实现 2.3 容量接口的实现 2.3.1 size函数实现 2.3.2 capa…

阿斯达年代记三强争霸新手开荒注意事项 搬砖攻略和注意问题分享

阿斯达年代记三强争霸新手开荒注意事项 搬砖攻略和注意问题分享 阿斯达年代三强争霸这款游戏刚开始公测就获得了玩家们的集体关注&#xff0c;这是一款根据影视剧改编的MMORPG游戏&#xff0c;玩家将置身于名为阿斯大陆的奇幻世界&#xff0c;加入阿斯达、亚高、不法者三大势力…

Prompt之美:如何设计提示词让大模型变“聪明”

目录 一. Prompt关键要素 二. Prompt技巧 三. 实战中的Prompt优化 四. 参考文献 一. Prompt关键要素 Prompt是一个简短的文本输入&#xff0c;用于引导AI模型生成特定的回答或执行特定任务。换句话说&#xff0c;Prompt是你与AI模型沟通的方式。一个好的Prompt可以让AI更准…

从现在开始:让AI写代码,你只负责敲tab键

如果你是一名程序员&#xff0c;你一定有过这样的经历&#xff1a;在编写代码的时候&#xff0c;突然遇到了一个棘手的问题&#xff0c;需要花费大量的时间去查找资料、尝试不同的解决方案&#xff0c;甚至有时候还需要去问同事或者在网上寻求帮助。这样的情况不仅会浪费你的时…

用立方样条联合SHAP分析在危险因素鉴定中的作用

用立方样条联合SHAP分析在危险因素鉴定中的作用 1. SHAP分析告诉我们变量之间的关系 SHAP分析计算的SHAP值代表了某变量对于结局指标的贡献&#xff0c;代表了相关性的趋势&#xff0c;SHAP分析中的散点图是对以上关系的可视化&#xff0c;从中我们可以直观看到随着变量值的变…

百度 | 如何白嫖文心一言4.0,偷偷的用!

文心一言4.0 官方价一个月 59.9&#xff0c;贵不贵&#xff0c;很贵啊 现在有个白嫖文心一言4.0的方法 分享给大家 效果比3.0好用 如何使用 这里用到文心智能体平台&#xff0c;也是百度出的&#xff0c;和字节跳动的coze很像 这里打开文心智能体平台&#xff0c;自行百度…

diskMirror docker 使用容器部署 diskMirror 服务器!!!

Welcome to diskMirror-docker 获取项目 这个项目是 diskMirror-spring-boot 镜像版本的项目&#xff0c;您可以使用下面的命令将此项目编译为一个镜像&#xff01; # 进入到您下载的源码包目录 cd diskMirror-docker# 点击脚本来进行版本的设置以及对应版本的下载 设置 和 编…

JavaEE:File类查询一个文件的路径(举例+源码 )

一、File类概述 Java 中通过 java.io.File 类来对一个文件&#xff08;包括目录&#xff09;进行抽象的描述。File 类中的方法可以对文件路径以及文件名等信息进行查询&#xff0c;也可以对文件进行各项增删改操作&#xff0c;本文主要介绍 File 类的查询方法。 二、代码示例 …

Python入门第10篇(编码)

目录 一、编码是什么&#xff1f; 二、Python中编码 1.读取文件引发的问题 2.其实是Windows的问题 3.试着改改问题 4.各种骚操作 5.终极解决 6.推荐方案 总结 Python系列文章目录 前言 编码存在于所有文件&#xff0c;比较常见的ASCII、utf8、gbk等。最常用的还是ut…

大模型 AI 框架昇思 MindSpore 2.3.RC1 发布,训练、推理性能大幅提升,JIT 编译强化

经过社区开发者们几个月的开发与贡献&#xff0c;现正式发布昇思 MindSpore2.3.RC1 版本&#xff0c;通过多维混合并行以及确定性 CKPT 来实现超大集群的高性能训练&#xff0c;支持大模型训推一体架构&#xff0c;大模型开发训练推理更简、更稳、更高效&#xff0c;并在训推一…

【产品设计】B端产品权限设计~功能权限设计篇

对于B端设计而言&#xff0c;良好的权限设计架构是支持其复杂业务的基础和关键。 一、什么是权限管理 权限管理&#xff0c;一般指根据系统设置的安全规则或者安全策略&#xff0c;用户可以访问而且只能访问自己被授权的资源。 简而言之&#xff0c;用户登录系统后&#xff0…

使用 Redux 管理全局状态

Redux 是个状态集中管理框架&#xff0c;状态可以跨组件共享&#xff0c;状态更新后&#xff0c;调用监听器。其实状态可以认为就是个全局对象&#xff0c;为什么要做一个框架来管理呢&#xff1f;如果我们自己使用一个全解字典来管理状态是不是也行&#xff1f;如果不做任何控…

想在游泳健身的同时畅听音乐,随时哈氪漫游这款IP68防水的骨传导耳机

平时健身的过程中&#xff0c;音乐是许多健身爱好者的忠实伴侣。无论是在室内的健身房&#xff0c;还是户外的自然风光中&#xff0c;一副优质的耳机可以极大地提升我们的锻炼体验。现在市面上专为运动设计的耳机选择非常多&#xff0c;我喜欢用骨传导耳机&#xff0c;目前在用…

深入理解多线程编程

title: 深入理解多线程编程 date: 2024/4/25 17:32:02 updated: 2024/4/25 17:32:02 categories: 后端开发 tags:线程同步互斥锁死锁避免竞态条件线程池异步编程性能优化 第一章&#xff1a;多线程基础 1.1 线程概念与原理 线程&#xff1a;在操作系统中&#xff0c;一个程序…

ADB常用命令

大家好&#xff0c;今天给大家分享一些ADB的常用命令&#xff0c;我们作为测试了解ADB命令能给我们带来哪些好处呢&#xff1f; 1、ADB 命令可以帮助测试人员与Android设备进行交互&#xff0c;如安装、卸载应用&#xff0c;获取设备信息等&#xff0c;便于更深入地测试设备功能…