redisson的延时队列机制简述

概述

业务中经常会遇到一些延迟执行的需求;通常想到的都是rabbitmq或者rocketmq的延迟消息;
但是系统中不一定集成了mq,但为了控制分布式下的并发,一般redis都是有集成的;
rediskey过期监听那个时间不准确,在集群环境下节点挂了也容易丢失;

那么用redisson的延迟队列,正好可以用来解决轻量级的延时消息;
简单的来说就是消费者生产了一个消息任务,塞到ZSet里(用当前时间戳+延迟时间作为分数),等时间到了,就会放到任务List中,然后消费者真正去执行任务都是从任务List中获取任务;

redisson中的消费者并不是一直轮询获取任务;而是有具体时间的延迟任务,时间到了去任务队列中获取任务;

redisson延时任务机制简述

生产者先将任务pushdelay_queue_timeout等待队列中,延迟时间到了,消费者会把任务从timeout队列挪到SANYOU任务队列中(消费者实际获取任务的队列),然后消费者就能拿到最终要执行的任务了;

这里具体要说的就是客户端通知和获取机制;
消费者在启动时通常都会去get一下队列,达到订阅队列的目的;

RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue("SANYOU");
RDelayedQueue<String> delayQueue = redissonClient.getDelayedQueue(blockingQueue);

这样做的目的:
消费者订阅队列,从delay_queue_timeout等待延迟队列中将已经到达时间的任务挪到真正的任务List队列中,然后再将delay_queue_timeout队列中第一个(也就是第一个要执行的)的任务的时间拿到,用这个时间开启一个延迟任务,时间到了之后,会发布一个消息到时间通知channel中;然后客户端监听到这个channel中的消息后,会再次重复上述步骤,让delay_queue_timeout中的任务,可以都放到真正的任务List队列中;

这样有一个好处就是不用一直while扫描等待,客户端的延迟任务时间和delay_queue_timeout中的延迟时间是一样的,可以精准利用cpu,理论上是没有延迟的,但是实际消息数量大量增加,消费者消费比较慢,还是会造成延迟任务消费延迟;

另外由于客户端都是用lua脚本去redis的同一个List队列中获取任务,lua脚本在redis中都是原子任务,而且redis真正的操作是单线程的,所以不会存在任务广播情况(并发获取时,一个任务不会被多个消费者同时拿到);

捞一张图片
在这里插入图片描述

代码Demo


import cn.hutool.extra.spring.SpringUtil;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;@Slf4j
@Component
public class RedissonDelayQueueConfig implements InitializingBean {@Resourceprivate RedissonClient redissonClient;//延时队列mapprivate final Map<String, RDelayedQueue<DelayMessageDTO>> delayQueueMap = new ConcurrentHashMap<>(16);/*** 消费者初始化所有队列,订阅对应的队列,并开启第一个过期任务的过期时间对应的延迟任务*/@PostConstructpublic void reScheduleDelayedTasks() {DelayQueueEnum[] queueEnums = DelayQueueEnum.values();for (DelayQueueEnum queueEnum : queueEnums) {RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(queueEnum.getCode());RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);}}@Overridepublic void afterPropertiesSet() {// 有新的延迟队列在这里添加,队列消费类需要继承DelayQueueConsumerDelayQueueEnum[] queueEnums = DelayQueueEnum.values();for (DelayQueueEnum queueEnum : queueEnums) {DelayQueueConsumer delayQueueConsumer = SpringUtil.getBean(queueEnum.getBeanName());if (delayQueueConsumer == null) {throw new ServiceException("queueName=" + queueEnum.getBeanName() + ",delayQueueConsumer=null,请检查配置...");}// Redisson的延时队列是对另一个队列的再包装,使用时要先将延时消息添加到延时队列中,当延时队列中的消息达到设定的延时时间后,// 该延时消息才会进行进入到被包装队列中,因此,我们只需要对被包装队列进行监听即可。RBlockingQueue<DelayMessageDTO> rBlockingQueue = redissonClient.getBlockingDeque(queueEnum.getCode());//消费者初始化队列RDelayedQueue<DelayMessageDTO> rDelayedQueue = redissonClient.getDelayedQueue(rBlockingQueue);//set到map中方便获取delayQueueMap.put(queueEnum.getCode(), rDelayedQueue);// 订阅新元素的到来,调用的是takeAsync(),异步执行rBlockingQueue.subscribeOnElements(delayQueueConsumer::execute);}}public RedissonClient getRedissonClient() {return redissonClient;}public Map<String, RDelayedQueue<DelayMessageDTO>> getDelayQueueMap() {return delayQueueMap;}
}import cn.hutool.core.date.DateUtil;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;import javax.annotation.Resource;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;@Slf4j
@Component
public class DelayQueueUtil {private static RedissonDelayQueueConfig redissonDelayQueueConfig;@Resourcepublic void setRedissonDelayQueueConfig(RedissonDelayQueueConfig redissonDelayQueueConfig) {DelayQueueUtil.redissonDelayQueueConfig = redissonDelayQueueConfig;}private static Map<String, RDelayedQueue<DelayMessageDTO>> getDelayQueueMap() {if(null == redissonDelayQueueConfig) return Collections.emptyMap();return redissonDelayQueueConfig.getDelayQueueMap();}private static RedissonClient getRedissonClient() {if(null == redissonDelayQueueConfig) return null;return redissonDelayQueueConfig.getRedissonClient();}/*** 添加延迟消息*/public static void addDelayMessage(DelayMessageDTO delayMessage) {log.info("delayMessage={}", delayMessage);Assert.isTrue(getDelayQueueMap().containsKey(delayMessage.getQueueName()), "队列不存在");delayMessage.setCreateTime(DateUtil.now());if(null == delayMessage.getTimeUnit()){delayMessage.setTimeUnit(TimeUnit.SECONDS);}RDelayedQueue<DelayMessageDTO> rDelayedQueue = getDelayQueueMap().get(delayMessage.getQueueName());//移除相同的消息rDelayedQueue.remove(delayMessage);//添加消息rDelayedQueue.offer(delayMessage, delayMessage.getDelayTime(), delayMessage.getTimeUnit());}/*** 移除指定队列中的消息*/public static void removeDelayMessage(DelayMessageDTO delayMessage) {log.info("取消:delayMessage={}", delayMessage);if (!getDelayQueueMap().containsKey(delayMessage.getQueueName())) {log.error("queueName={},该延迟队列不存在,请确认后再试...", delayMessage.getQueueName());return;}RDelayedQueue<DelayMessageDTO> rDelayedQueue = getDelayQueueMap().get(delayMessage.getQueueName());rDelayedQueue.remove(delayMessage);removeDelayQueue(delayMessage);}/*** 从所有队列中删除消息*/public static void removeDelayQueue(DelayMessageDTO value) {DelayQueueEnum[] queueEnums = DelayQueueEnum.values();for (DelayQueueEnum queueEnum : queueEnums) {RBlockingDeque<Object> blockingDeque = getRedissonClient().getBlockingDeque(queueEnum.getCode());RDelayedQueue<Object> delayedQueue = getRedissonClient().getDelayedQueue(blockingDeque);delayedQueue.remove(value);}}}

参考了大佬的博文
https://lhalcyon.com/delay-task/index.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/634572.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

GIS复试Tips(特别是南师大)

注&#xff1a;本文仅个人观点&#xff0c;仅供参考 在这提前㊗️24年考南师大GISer成功上岸&#xff01; 当然&#xff0c;考研是个考试&#xff0c;总有人顺利上岸&#xff0c;稳上岸或逆袭上岸&#xff0c;但可能也有人被刷&#xff0c;这是常态。 所以&#xff0c;㊗️你…

idea设置编辑器背景颜色

文章目录 一、Ided常用工具栏显示二、更改idea主题设置三、设置代码编辑器背景颜色为豆沙绿四、设置新项目 默认Jdk配置、maven配置1、settings for new projects2、structre for new projects 五、修改代码中注释的字体颜色六、设置编辑器字体大小七、文件编码的设置(可以设置…

DC-1靶机刷题记录

靶机下载地址&#xff1a; 链接&#xff1a;https://pan.baidu.com/s/1GX7qOamdNx01622EYUBSow?pwd9nyo 提取码&#xff1a;9nyo 参考答案&#xff1a; https://c3ting.com/archives/kai-qi-vulnhnbshua-tiDC-1.pdf【【基础向】超详解vulnhub靶场DC-1】 https://www.bilibi…

禅道:从安装到使用,一篇文章带你全面了解

博客前言&#xff1a; 在这个充满竞争和快节奏的世界里&#xff0c;项目管理已经成为了许多行业的关键环节。禅道作为一种功能强大、易用的项目管理工具&#xff0c;正在被越来越多的企业和团队所采用。它不仅能帮助我们高效地管理项目&#xff0c;还能提升团队协作和沟通的效…

为什么JavaScript中0.1 + 0.2 ≠ 0.3

JavaScript中的浮点数运算有时候会出现一点偏差。下面解释为什么0.1 0.2 ≠ 0.3,以及如果你需要精确运算应该怎么做。 如果1 2 3,那么为什么在JavaScript中0.1 0.2 ≠ 0.3?这个原因与计算机科学和浮点数运算有关。 我建议你打开浏览器的控制台,输入0.1 0.2来查看结果。…

带你学C语言-指针(4)

目录 ​编辑 ⚾0.前言 &#x1f3c0;1.回调函数 ⚽2.qsort &#x1f3c9;2.1 qsort函数的模拟实现 &#x1f3be;3.sizeof与strlen对比 &#x1f3be;4.结束语 ⚾0.前言 言C之言&#xff0c;聊C之识&#xff0c;以C会友&#xff0c;共向远方。各位CSDN的各位你们好啊&…

【JS逆向学习】36kr登陆逆向案例(webpack)

在开始讲解实际案例之前&#xff0c;大家先了解下webpack的相关知识 WebPack打包 webpack是一个基于模块化的打包&#xff08;构建&#xff09;工具, 它把一切都视作模块 webpack数组形式&#xff0c;通过下标取值 !function(e) {var t {};// 加载器 所有的模块都是从这个…

【RocketMQ每日一问】RocketMQ nameserver的作用是什么?

Name Server 在 Apache RocketMQ 集群中扮演着以下几个重要作用&#xff1a; 服务注册与发现&#xff1a; Name Server 负责管理和协调整个集群&#xff0c;维护集群中所有 Broker 的信息&#xff0c;包括 Broker 的 IP 地址、端口号、存储容量等。当 Producer 和 Consumer 需…

【消息队列】RocketMQ 生产和消费中的集群模式和广播模式

在消息队列系统中&#xff0c;生产者和消费者的模式通常包括集群模式和广播模式。这两种模式分别用于不同的场景&#xff0c;具有不同的特点和优势。 1. 集群模式&#xff08;Cluster Mode&#xff09;&#xff1a; 在集群模式下&#xff0c;多个相同角色的实例组成一个集群&…

太帅了 soeasy两行命令创建一个文件系统

看三遍 看三遍 看三遍 A file list program that supports multiple storage, powered by Gin and Solidjs. 翻译:一个支持多个存储的文件列表程序&#xff0c;由Gin和Solidjs提供支持。 1.安装 命令1:创建目录 mkdir -p /opt/alist 命令2:创建容器 docker run -d \ --res…

一天吃透Redis面试八股文

目录&#xff1a; Redis是什么&#xff1f;Redis优缺点&#xff1f;Redis为什么这么快&#xff1f;讲讲Redis的线程模型&#xff1f;Redis应用场景有哪些&#xff1f;Memcached和Redis的区别&#xff1f;为什么要用 Redis 而不用 map/guava 做缓存?Redis 数据类型有哪些&…

UE5 C++ 基础变量类型,关于框架的初级练习

一.创建自己的MyGameModed。并在其中设置好GamePlay框架。 1.创建MyGameState,MyGameState,MyHUD,MyPawn&#xff0c;MyPawn&#xff0c;MyPlayerController,MyPlayerState。 2.并在MyGameMode的头文件里面&#xff0c;把GmaeMode里的框架需要的框架类都包含进去。 3.写一个构…

用微服务快速开发框架实现流程化办公!

实现流程化办公&#xff0c;可以助力企业提升市场竞争力&#xff0c;从而实现数字化转型升级。微服务快速开发框架是提升办公协作效率的得力助手&#xff0c;流辰信息以市场为导向&#xff0c;坚持自主研发与创新&#xff0c;始终为行业的进步和发展贡献应有的力量。 1、流程化…

【EI会议征稿通知】2024年通信技术与软件工程国际学术会议 (CTSE 2024)

2024年通信技术与软件工程国际学术会议 (CTSE 2024) 2024 International Conference on Communication Technology and Software Engineering (CTSE 2024) 2024年通信技术与软件工程国际学术会议 (CTSE 2024)将于2024年03月15-17日在中国长沙举行。会议专注于通信技术与软件工…

LLaVA-Plus:多模态大模型的新突破

前言 随着AIGC技术的不断进步&#xff0c;各类多模态大模型&#xff08;MLM&#xff09;开始蓬勃发展。在这一领域中&#xff0c;LLaVA-Plus的推出无疑是一次重大突破。作为LLaVA团队的最新工作&#xff0c;LLaVA-Plus不仅继承了LLaVA的优秀特性&#xff0c;还在此基础上进行了…

ChatGPT提示词保姆级教程

现在越来越多提示词教程&#xff0c;本文列个清单&#xff0c;方便以后整理&#xff0c;不定期更新&#xff0c;欢迎关注留言&#xff01; 后续更新欢迎关注 提示词&#xff08;prompt&#xff09;出来后&#xff0c;被称为一个新的岗位诞生&#xff0c;面向提示词工程师。 …

dns正反解析配置

1.配置正向解析baidu.com 1、下载bind包 [rootlocalhost ~]# yum install bind -y 2、对配置文件修改 [rootlocalhost ~]# vim /etc/named.conf 3、对数据文件修改 [rootlocalhost ~]# vim /var/named/baidu 4、重启服务 [rootlocalhost ~]# systemctl restart named.service 5…

hash应用

目录 一、位图 1.1、引出位图 1.2、位图的概念 1.3、位图的应用 1.4、位图模拟实现 二、布隆过滤器 2.1、什么是布隆过滤器 2.2、布隆过滤器应用的场景 2.3、布隆过滤器的原理 2.4、布隆过滤器的查找 2.5、布隆过滤器的插入 2.6、布隆过滤器的删除 2.7、布隆过滤器…

行云部署前端架构解析-前言 | 京东云技术团队

一个简单的自我介绍 项目规模 截止目前上万次代码提交&#xff0c;总代码行数1超过21万行&#xff0c;其中人工维护的代码超过 13万行&#xff0c;近千个文件。 前端线上服务直接对接的后端服务&#xff0c;达十多个。 跟很多应用一样, 它有行云的入口, 也有独立的服务, 还…

Rust-泄漏

在C中&#xff0c;如果引用计数智能指针出现了循环引用&#xff0c;就会导致内存泄漏。而Rust中也一样存在引用计数智能指针Rc,那么Rust中是否可能制造出内存泄漏呢? 内存泄漏 首先&#xff0c;我们设计一个Node类型&#xff0c;它里面包含一个指针&#xff0c;可以指向其他…