延时消息队列

目录

前言

一、延时队列实用场景

二、DelayQueue 

DelayQueue的实现

使用延迟队列 

 DelayQueue实现延时任务的优缺点

三、RocketMQ

原理

四、Kafka

原理

实现 

DelayMessage定义

消息发送代码 

消费者代码 

参考



前言

延时队列的内部是有序的,最重要的特性就体现在它的延时属性上,延时队列就是用来存放需要在指定时间点被处理的元素的队列

队列是存储消息的载体,延时队列存储的对象是延时消息。所谓的延时消息,是指消息被发送以后,并不想让消费者立刻获取,而是等待特定的时间后,消费者才能获取这个消息进行消费。


一、延时队列实用场景

  • 淘宝七天自动确认收货。在我们签收商品后,物流系统会在七天后延时发送一个消息给支付系统,通知支付系统将货款打给商家,这个过程持续七天,就是使用了消息中间件的延迟推送功能;

  • 订单在三十分钟之内未支付则自动取消;

  • 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒;

  • 用户注册成功后,如果三天内没有登陆则进行短信提醒

  • 用户发起退款,如果三天内没有得到处理则通知相关运营人员;

  • 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议。

二、DelayQueue 

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>implements BlockingQueue<E> {
}

DelayQueue是一个无界的BlockingQueue,是线程安全的(无界指的是队列的元素数量不存在上限,队列的容量会随着元素数量的增加而扩容,阻塞队列指的是当队列内元素数量为0的时候,试图从队列内获取元素的线程将被阻塞或者抛出异常)

以上是阻塞队列的特点,而延迟队列还拥有自己如下的特点:

DelayQueue中存入的必须是实现了Delayed接口的对象(Delayed定义了一个getDelay的方法,用来判断排序后的元素是否可以从Queue中取出,并且Delayed接口还继承了Comparable用于排序),插入Queue中的数据根据compareTo方法进行排序(DelayQueue的底层存储是一个PriorityQueue,PriorityQueue是一个可排序的Queue,其中的元素必须实现Comparable接口的compareTo方法),并通过getDelay方法返回的时间确定元素是否可以出队,只有小于等于0的元素(即延迟到期的元素)才能够被取出

延迟队列不接收null元素

DelayQueue的实现

public class UserDelayTask implements Delayed {@Getterprivate UserRegisterMessage message;private long delayTime;public UserDelayTask(UserRegisterMessage message, long delayTime) {this.message = message;// 延迟时间加当前时间this.delayTime = System.currentTimeMillis() + delayTime;}// 获取任务剩余时间@Overridepublic long getDelay(TimeUnit unit) {return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);}@Overridepublic int compareTo(Delayed o) {return Long.compare(delayTime, ((UserDelayTask) o).delayTime);}
}

定义延迟队列并交付容器管理 

    /*** 延迟队列*/@Bean("userDelayQueue")public DelayQueue<UserDelayTask> orderDelayQueue() {return new DelayQueue<UserDelayTask>();}

使用延迟队列 

@Resource
private DelayQueue<UserDelayTask> orderDelayQueue;UserDelayTask task = new UserDelayTask(message, 1000 * 60);orderDelayQueue.add(task);

开启线程处理延迟任务

 @Overridepublic void afterPropertiesSet() throws Exception {new Thread(() -> {while (true) {try {UserDelayTask task = orderDelayQueue.take();// 当队列为null的时候,poll()方法会直接返回null, 不会抛出异常,但是take()方法会一直等待,// 因此会抛出一个InterruptedException类型的异常。(当阻塞方法收到中断请求的时候就会抛出InterruptedException异常)UserRegisterMessage message = task.getMessage();execute(message);// 执行业务} catch (Exception ex) {log.error("afterPropertiesSet", ex);}}}).start();}

 DelayQueue实现延时任务的优缺点

使用DelayQueue实现延时任务非常简单,而且简便,全部都是标准的JDK代码实现,不用引入第三方依赖(不依赖redis实现、消息队列实现等),非常的轻量级。

它的缺点就是所有的操作都是基于应用内存的,一旦出现应用单点故障,可能会造成延时任务数据的丢失。如果订单并发量非常大,因为DelayQueue是无界的,订单量越大,队列内的对象就越多,可能造成OOM的风险。所以使用DelayQueue实现延时任务,只适用于任务量较小的情况。
 

三、RocketMQ

RocketMQ 和本身就有延迟队列的功能,但是开源版本只能支持固定延迟时间的消息,不支持任意时间精度的消息(这个好像只有阿里云版本的可以)。

他的默认时间间隔分为 18 个级别,基本上也能满足大部分场景的需要了。

默认延迟级别:1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h。

使用起来也非常的简单,直接通过setDelayTimeLevel设置延迟级别即可。

setDelayTimeLevel(level)

原理

实现原理说起来比较简单,Broker 会根据不同的延迟级别创建出多个不同级别的队列,当我们发送延迟消息的时候,根据不同的延迟级别发送到不同的队列中,同时在 Broker 内部通过一个定时器去轮询这些队列(RocketMQ 会为每个延迟级别分别创建一个定时任务),如果消息达到发送时间,那么就直接把消息发送到指 topic 队列中。

RocketMQ 这种实现方式是放在服务端去做的,同时有个好处就是相同延迟时间的消息是可以保证有序性的。

谈到这里就顺便提一下关于消息消费重试的原理,这个本质上来说其实是一样的,对于消费失败需要重试的消息实际上都会被丢到延迟队列的 topic 里,到期后再转发到真正的 topic 中。

四、Kafka

对于 Kafka 来说,原生并不支持延迟队列的功能,需要我们手动去实现,这里我根据 RocketMQ 的设计提供一个实现思路。

这个设计,我们也不支持任意时间精度的延迟消息,只支持固定级别的延迟,因为对于大部分延迟消息的场景来说足够使用了。

只创建一个 topic,但是针对该 topic 创建 18 个 partition,每个 partition 对应不同的延迟级别,这样做和 RocketMQ 一样有个好处就是能达到相同延迟时间的消息达到有序性。

原理

  • 首先创建一个单独针对延迟队列的 topic,同时创建 18 个 partition 针对不同的延迟级别
  • 发送消息的时候根据消息延迟等级发送到延迟 topic 对应的 partition,同时把原 topic 保存到 延迟消息 中。
  • 内嵌的consumer单独设置一个ConsumerGroup去消费延迟 topic 消息,消费到消息之后如果没有达到延迟时间那么就进行pause,然后seek到当前ConsumerRecordoffset位置,同时使用定时器去轮询延迟的TopicPartition,达到延迟时间之后进行resume。

       KafkaConsumer 提供了暂停和恢复的API函数,调用消费者的暂停方法后就无法再拉取到新的消息,同时长时间不消费kafka也不会认为这个消费者已经挂掉了。

  • 如果达到了延迟时间,那么就获取到延迟消息中的真实 topic ,直接转发

这里为什么要进行pauseresume呢?因为如果不这样的话,如果超时未消费达到max.poll.interval.ms 最大时间(默认300s),那么将会触发 Rebalance。

实现 

DelayMessage定义

/*** 延迟消息** @author yangyanping* @date 2023-08-31*/
@Getter
@Setter
@ToString
public class DelayMessage<T> implements DTO {/*** 消息级别,共18个,对应18个partition*/private Integer level;/*** 业务类型,真实投递到的topic*/private String topic;/*** 目标消息key*/private String key;/*** 事件*/private DomainEvent<T> event;
}

消息发送代码 

public void publishAsync(DelayMessage delayMessage) {String topic = "delay_topic";try {Integer level = delayMessage.getLevel();Integer delayPartition = level - 1;String data = JSON.toJSONString(delayMessage);ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, delayPartition, "", data);ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(producerRecord);future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {@Overridepublic void onSuccess(SendResult<String, String> result) {//发送成功后回调log.info("{}-异步发送成功, result={}。", topic, result.getRecordMetadata().toString());}@Overridepublic void onFailure(Throwable throwable) {//发送失败回调log.error("{}-异步发送失败。", topic, throwable);}});} catch (Exception ex) {log.error("{}-异步发送异常。", topic, ex);}}

消费者代码 

/*** 参考RocketMQ支持延迟消息设计,不支持任意时间精度的延迟消息,只支持特定级别的延迟消息,* 将消息延迟等级分为1s、5s、10s 、30s、1m、2m、3m、4m、5m、6m、7m、8m、9m、10m、20m、30m、1h、2h,共18个级别,* 只创建一个有18个分区的延时topic,每个分区对应不同延时等级。** https://blog.csdn.net/weixin_40270946/article/details/121293032** https://zhuanlan.zhihu.com/p/365802989** @author yangyanping* @date 2023-08-30*/
@Slf4j
@Component
public class DelayConsumer implements ConsumerSeekAware {/*** 锁*/private final Object lock = new Object();/*** 间隔*/private final int interval = 5000;/*** 消费者*/@Resource(name = "kafkaConsumer")private KafkaConsumer<String, String> kafkaConsumer;/*** 延迟消息发布*/@Resourceprivate DelayMessagePublisher delayMessagePublisher;@PostConstructpublic void init() {ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();//当系统需要循环间隔一定时间执行某项任务的时候可以使用scheduleWithFixedDelay方法来实现executorService.scheduleWithFixedDelay(() -> {synchronized (lock) {resume();lock.notifyAll();log.info("DelayConsumer-notifyAll");}}, 0, interval, TimeUnit.MILLISECONDS);}/*** 批量消费消息*/@KafkaListener(topics = "#{'${delayTopic.topic}'}", groupId = "#{'${spring.kafka.consumer.group-id}'}")public void onMessage(List<ConsumerRecord<String, String>> records, Consumer consumer) {synchronized (lock) {try {if (CollectionUtil.isEmpty(records)) {log.info("DelayConsumer-records is empty !");consumer.commitSync();return;}boolean delay = false;for (ConsumerRecord<String, String> record : records) {long timestamp = record.timestamp();String value = record.value();JSONObject jsonObject = JSON.parseObject(value);Integer level = Convert.toInt(jsonObject.get("level"));String targetTopic = Convert.toStr(jsonObject.get("topic"));String event = Convert.toStr(jsonObject.get("event"));TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());long delayTime = getDelayTime(timestamp, level);if (delayTime <= System.currentTimeMillis()) {log.info("DelayConsumer-delayTime={} <= currentTime={}", delayTime, System.currentTimeMillis());// 处理消息processMessage(record, consumer, topicPartition, targetTopic, event);} else {log.info("DelayConsumer-delayTime={} > currentTime={}", delayTime, System.currentTimeMillis());// 暂停消费consumer.pause(Collections.singletonList(topicPartition));consumer.seek(topicPartition, record.offset());delay = true;break;}}if (delay) {lock.wait();}} catch (Exception var10) {log.error("{}.onMessage#error . message={}");throw new BizException("事件消息消费失败", var10);}}}/*** 消息级别,共18个* level-1 :30s* level-2 : 1m* level-3 : 5m* level-4 : 10m* level-5 : 20m* level-6 : 30m*/private Long getDelayTime(long timestamp, Integer level) {switch (level) {case 1:return timestamp + 1 * 1000;case 2:return timestamp + 5 * 1000;case 3:return timestamp + 10 * 1000;case 4:return timestamp + 30 * 1000;case 5:return timestamp + 1 * 60 * 1000;case 6:return timestamp + 2 * 60 * 1000;//.........  省略}return timestamp;}/*** 处理消息 并提交消息*/private void processMessage(ConsumerRecord<String, String> record, Consumer consumer, TopicPartition topicPartition, String targetTopic, String event) {OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(record.offset() + 1);HashMap<TopicPartition, OffsetAndMetadata> metadataHashMap = new HashMap<>();metadataHashMap.put(topicPartition, offsetAndMetadata);delayMessagePublisher.sendMessage(targetTopic, event);log.info("DelayConsumer-records#offset={},targetTopic={},event={}", record.offset() + 1, targetTopic, event);consumer.commitSync(metadataHashMap);}/*** 重启消费*/private void resume() {try {kafkaConsumer.resume(kafkaConsumer.paused());} catch (Exception ex) {log.error("DelayConsumer-resume", ex);}}
}

参考

RabbitMQ、RocketMQ、Kafka延迟队列实现-腾讯云开发者社区-腾讯云

延迟消息队列设计-腾讯云开发者社区-腾讯云

用Kafka实现延迟消息_kafka延迟消费_alvin.yao的博客-CSDN博客

怎么设计一个合适的延时队列?

基于kafka实现延迟队列 - 知乎

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/73854.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux之NFS服务器

目录 Linux之NFS服务器 简介 NFS背景介绍 生产应用场景 NFS工作原理 NFS工作流程图 流程 NFS的安装 安装nfs服务 安装rpc服务 启动rpcbind服务同时设置开机自启动 启动nfs服务同时设置开机自启动 NFS的配置文件 主配置文件分析 示例 案例 --- 建立NFS服务器&#…

【Python】环境的搭建

前言 要想能够进行 Python 开发, 就需要搭建好 Python 的环境. 需要安装的环境主要是两个部分: 运行环境: Python开发环境: PyCharm 一、安装 Python 1.找到官方网站 官网&#xff1a;Welcome to Python.org 2.找到下载页面 点击download中的Windows 3.选择稳定版中的Win…

【计算机网络】HTTPS

文章目录 1. HTTPS的概念2. 加密常见的加密方式对称加密非对称加密 3. HTTPS的工作过程的探究方案1 —— 只使用对称加密方案2 —— 只使用 非对称加密方案3 —— 双方都是用非对称加密方案4 —— 非对称加密对称加密中间人攻击引入证书CA认证理解数据签名 方案5 —— 非对称加…

Java守护线程的理解及应用

在Java中有两类线程&#xff0c;分别是User Thread&#xff08;用户线程&#xff09;和Daemon Thread&#xff08;守护线程&#xff09; 。 用户线程很好理解&#xff0c;我们日常开发中编写的业务逻辑代码&#xff0c;运行起来都是一个个用户线程。而守护线程相对来说则要特别…

C#__资源访问冲突和死锁问题

/// 线程的资源访问冲突&#xff1a;多个线程同时申请一个资源&#xff0c;造成读写错乱。 /// 解决方案&#xff1a;上锁&#xff0c;lock{执行的程序段}:同一时刻&#xff0c;只允许一个线程访问该程序段。 /// 死锁问题&#xff1a; /// 程序中的锁过多&#xf…

vscode debug python launch.json添加args不起作用

问题 为了带入参数调试python 程序&#xff0c;按照网上搜到的教程配置了lauch.json文件&#xff0c;文件中添加了"args": [“model” “0” “path”] {// 使用 IntelliSense 了解相关属性。 // 悬停以查看现有属性的描述。// 欲了解更多信息&#xff0c;请访问: h…

深入浅出学Verilog--基础语法

1、简介 Verilog的语法和C语言非常类似&#xff0c;相对来说还是非常好学的。和C语言一样&#xff0c;Verilog语句也是由一连串的令牌&#xff08;Token&#xff09;组成。1个令牌必须由1个或1个以上的字符&#xff08;character&#xff09;组成&#xff0c;令牌可以是&#x…

day3_C++

day3_C 思维导图用C的类完成数据结构 栈的相关操作用C的类完成数据结构 循环队列的相关操作 思维导图 用C的类完成数据结构 栈的相关操作 stack.h #ifndef STACK_H #define STACK_H#include <iostream> #include <cstring>using namespace std;typedef int datat…

2023 年全国大学生数学建模竞赛题D 题 圈养湖羊的空间利用率思路详解+Python源码(二)

昨天已经将E题第一二问的详解和思路源码都写了出来&#xff0c;大家如果想从E题下手的话推荐参考本人文章&#xff0c;个人认为E题在建模上是优于D题的&#xff0c;毕竟有给出数据而且有明确的建模思路&#xff0c;E题我直接提供了Python源码直接可以运行即可&#xff1a; 202…

STC15单片机特有的PWM寄存器和普通定时器实现PWM输出

STC15单片机特有的PWM寄存器和普通定时器实现PWM输出 🌿主要针对STC15W4型号特有的6通道15位专门的高精度PWM。 ✨STC15W4K32S4系列单片机具有6通道15位专门的高精度PWM(带死区控制)和2通道CCP(利用它的高速脉冲输出功能可实现11~16位PWM);(STC15F/L2K60S2系列单片机具有3通…

Android逆向学习(番外一)smali2java部分文件无法反编译的bug与修复方法

Android逆向学习&#xff08;番外一&#xff09;smali2java部分文件无法反编译的bug与修复方法 一、前言 昨天我和往常一样准备着android逆向&#xff08;四&#xff09;的博客&#xff0c;结果发现smali2java对某些文件无法进行逆向&#xff0c;我不知道windows会不会产生这…

视频汇聚/视频云存储/视频监控管理平台EasyCVR安全检查的相关问题及解决方法2.0

开源EasyDarwin视频监控TSINGSEE青犀视频平台EasyCVR能在复杂的网络环境中&#xff0c;将分散的各类视频资源进行统一汇聚、整合、集中管理&#xff0c;在视频监控播放上&#xff0c;TSINGSEE青犀视频安防监控汇聚平台可支持1、4、9、16个画面窗口播放&#xff0c;可同时播放多…

windows10使用wheel安装tensorflow2.13.0/2.10.0 (保姆级教程)

安装过程 安装虚拟环境安装virtualenv安装满足要求的python版本使用virtualenv创建指定python版本的虚拟环境 安装tensorflow安装tensorflow-docs直接下载使用wheel下载 在VSCode编辑器中使用虚拟环境下的python解释器&#xff0c;并使用tensorflow常见错误 注意&#xff1a; t…

反序列化中_wakeup的绕过

文章目录 前言绕过方法变量引用属性个数不匹配(cve-2016-7124)C绕过fast-destruct其余GC回收机制 前言 反序列化中_wakeup扮演着非常重要的角色&#xff0c;ctf碰到很多的题目都有涉及到_wakeup绕过&#xff0c;写下这篇博客来总结下大部分绕过方法&#xff0c;其中会有例题具…

大数据导论 笔记

一、大数据方向 1、技术发展 计算机网络云计算大数据时代人工智能&#xff08;本科&#xff1a;使用&#xff0c;研究生&#xff1a;推导&#xff0c;博士&#xff1a;创新&#xff09; 2023年 大数据模型 人工智能元年 2、基础课程 hadoop 大数据基础 三大件&#xff1a;HDF…

java实现调用百度地图

这里使用的springbootthymeleaf实现&#xff0c;所以需要有springboot技术使用起来更方便 当然&#xff0c;只使用html加js也可以实现&#xff0c;下面直接开始 首先我们需要去百度地图注册一个AK&#xff08;百度地图开放平台 | 百度地图API SDK | 地图开发&#xff09; 找到左…

学会用命令行创建uni-app项目并用vscode开放项目

(创作不易&#xff0c;感谢有你&#xff0c;你的支持&#xff0c;就是我前行的最大动力&#xff0c;如果看完对你有帮助&#xff0c;请留下您的足迹&#xff09; 目录 创建 uni-app 项目 命令行创建 uni-app 项目 编译和运行 uni-app 项目&#xff1a; 用 VS Code 开发 uni…

教你如何快速阅读葡萄酒标签

我们经常被问及葡萄酒标签上写了什么&#xff0c;总体而言这些信息可以分为四个关键部分&#xff0c;第一品牌或生产商&#xff1b;第二国家或地区&#xff1b;第三葡萄品种&#xff1b;第四年份。 第一品牌或生产商&#xff0c;在寻找葡萄酒的制造商时&#xff0c;著名的品牌名…

一篇文章教会你什么是高度平衡二叉搜索(AVL)树

高度平衡二叉搜索树 AVL树的概念1.操作2.删除3.搜索4.实现描述 AVL树的实现1.AVL树节点的定义2.AVL树的插入3.AVL树的旋转3.1 新节点插入较高右子树的右侧---右右:左单旋3.2 新节点插入较高左子树的左侧---左左:右单旋3.3 新节点插入较高左子树的右侧---左右&#xff1a;先左单…

尖端AR技术如何在美国革新外科手术实践?

AR智能眼镜已成为一种革新性的工具&#xff0c;在外科领域具有无穷的优势和无限的机遇。Vuzix与众多医疗创新企业建立了长期合作关系&#xff0c;如Pixee Medical、Medacta、Ohana One、Rods & Cones、Proximie等。这些公司一致认为Vuzix智能眼镜可有效提升手术实践&#x…