redis — 基于Spring Boot实现redis延迟队列

1. 业务场景

延时队列场景在我们日常业务开发中经常遇到,它是一种特殊类型的消息队列,它允许把消息发送到队列中,但不立即投递给消费者,而是在一定时间后再将消息投递给消费者。延迟队列的常见使用场景有以下几种:

  • 在各种购物平台上下单,订单超过30分钟未支付,自动关闭。
  • 订单完成后, 如果用户一直未评价, 5天后自动好评。
  • 会员到期前15天, 到期前3天分别发送短信提醒。
  • 当订单一直处于未支付状态时,如何及时的关闭订单,并退还库存?
  • 如何定期检查处于退款状态的订单是否已经退款成功?

2. Redis延迟队列实现原理

目前延迟队列的类型主要实现有:

  • 基于消息的延迟:指为每条消息设置不同的延迟时间,那么每当队列中有新消息进入的时候就会重新根据延迟时间排序,或者定义时间轮,新消息落在指定位置;
  • 基于队列的延迟: 设置不同延迟级别的队列,比如5s、1min、30mins、1h等,每个队列中消息的延迟时间都是相同的。

基于第一种不少组件都有实现方案,比如redis的sortset间接实现,kafka内部时间轮,rabbitMQ可安装插件实现。第一种实时性高,不过主观看会比较依赖组件本身,但自己实现就得考虑持久化、高可用等问题,建议直接使用组件本身;第二种方案可以基于组件去实现,通用性会高点,不过实时性不高,更适合用于重试业务场景。当然Redis本身并不支持延迟队列,所以我们只是实现一个比较简单的延迟队列,而且Redis不太适合大量消息堆积,所以只适合比较简单的场景,然假如我们对消息的实时性以及可靠性要求非常高,可能就需要使用MQ或kafka来实现了。

消息延迟流程图如下:
在这里插入图片描述
Redis延迟队列可以通过 zset 来实现,因为 zset 中有一个 score,我们可以把时间作为 score,将 value 存到 redis 中,然后通过轮询的方式,去不断的读取消息出来,整体思路为:

  1. 消息体设置有效期,设置好score,然后放入zset中
  2. 通过排名拉取消息
  3. 有效期到了,就把当前消息从zset中移除

zadd命令

使用方式:ZADD key score member [[score member][score member] …]
将一个或多个 member 元素及其 score 值加入到有序集 key 当中。如果 key 不存在,则创建一个空的有序集并执行 ZADD 操作。如果某个 member 已经是有序集的成员,那么更新这个 member 的 score 值,并通过重新插入这个 member 元素,来保证该 member 在正确的位置上。score 值可以是整数值或双精度浮点数。

ZRANGEBYSCORE命令

使用方式:ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]

  1. 返回有序集 key 中,所有 score 值介于 min 和 max 之间(包括等于 min 或 max )的成员。有序集成员按 score 值递增(从小到大)次序排列。
  2. 具有相同 score 值的成员按字典序来排列
  3. 可选的 LIMIT 参数指定返回结果的数量及区间(就像SQL中的 SELECT LIMIT offset, count ),注意当 offset 很大时,定位 offset 的操作可能需要遍历整个有序集,此过程最坏复杂度为 O(N) 时间。
  4. 可选的 WITHSCORES 参数决定结果集是单单返回有序集的成员,还是将有序集成员及其 score 值一起返回。

ZREM命令

使用方式:ZREM key member [member …]
移除有序集 key 中的一个或多个成员,不存在的成员将被忽略。
当 key 存在但不是有序集类型时,返回一个错误。

3. 基于springboot实现redis延迟队列

3.1 引入依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><version>${version}</version>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>${version}</version>
</dependency>

3.2 redis基础方法

定义RedisService基础服务方法,本次案例只涉及到以下三个基础方法:

    /*** 添加 ZSet 元素** @param key* @param value* @param score*/@Overridepublic boolean add(String key, Object value, double score) {return redisTemplate.opsForZSet().add(key, value, score);}/*** 返回 分数范围内 指定 count 数量的元素集合, 并且从 offset 下标开始(从小到大,不带分数的集合)** @param key* @param min* @param max* @param offset 从指定下标开始* @param count  输出指定元素数量* @return*/@Overridepublic Set<Object> rangeByScore(String key, double min, double max, long offset, long count) {return redisTemplate.opsForZSet().rangeByScore(key, min, max, offset, count);}/*** Zset 删除一个或多个元素** @param key* @param values* @return*/@Overridepublic Long removeZset(String key, Object... values) {return redisTemplate.opsForZSet().remove(key, values);}

3.3 定义Spring消息事件推送

@Getter
@ToString
public class DelayMsg extends ApplicationEvent {private String msg;private String topic;public DelayMsg(Object source, String msg, String topic) {super(source);this.msg = msg;this.topic = topic;}
}

3.4 消息获取

定义redis获取延迟队列消息方法:

/*** 从zset中取出score小于当前时间戳的数据** @param key* @return*/
public String getDelayOne(String key) {//先查后删,一次拿3个做备选,这样抢占到的概率就会高一些Set<Object> sets = redisService.rangeByScore(key, 0, System.currentTimeMillis(), 0, 3);if (CollectionUtils.isEmpty(sets)) {return null;}for (Object val : sets) {if (1L.equals(redisService.removeZset(key, val))) {// 删除成功,表示抢占到return val.toString();}}return null;
}

这里每次查询时取了三个数据,然后遍历获取到的数据,依次尝试去删除,若删除成功,则表示当前实例抢占到了这个消息

  1. 为什么这样设计? 这里有两个点,先解释第一个,为啥先查后删

如果我们按照正常的实现流程,每次从zset中取一个,但是无法保证这个时候就只有我一个人拿到了这个数据,在多实例的场景下,可能存在多个实例同时拿到了它,那么如何才能表示只有一个实例抢占到呢?

借助redis的单线程机制,只可能有一个实例会删除成功,所以拿到并删除成功的那个小伙伴,就是最终的幸运儿;

因此实现细节就是先查,后删,若删除成功,表示获取成功;否则表示被其他的实例捷足先登。

  1. 接下来再看第二个,为啥一次拿三个

从上面的分析可以看出,如果我一次只拿一个,那么我抢占到的几率并不太大,特别是当实例比较多时,可能会做多次的无效操作;为了减少这个可能性,所以我一次多拿几个做备选,这样抢占到的概率就会高一些,至于为什么是3,这个就看实际的实例与定时任务的执行间隔了。

上面定义了如何获取延迟队列中已到期的消息,接下来需要定时轮训获取消息:

/*** 每5s定时轮训消息*/
@Scheduled(fixedRate = 5000)
public void schedule() {for (String specialTopic : topic) {String msg = redisDelayQueue.getDelayOne(specialTopic);logger.info("开始轮训获取消息 {}", msg);if (StringUtil.isNotEmpty(msg)) {//使用Spring推送事件处理applicationContext.publishEvent(new DelayMsg(this, msg, specialTopic));}}
}

上面的定时任务,直接借助Spring的@Schedule来实现,遍历所有的topic,捞出数据之后,通过spring的 event/listener事件机制来实现消息处理的解耦

3.5 定义消费者注解和切面处理

@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@EventListener
public @interface Consumer {String topic();
}

注意这个注解上面还有 @EventListener,表明它可以监听的spring的事件

3.6 定义延时业务的切面处理

@Aspect
@Component
public class ConsumerAspect {@Around("@annotation(consumer)")public Object around(ProceedingJoinPoint joinPoint, Consumer consumer) throws Throwable {Object[] args = joinPoint.getArgs();boolean check = false;for (Object obj : args) {if (obj instanceof DelayMsg) {check = consumer.topic().equals(((DelayMsg) obj).getTopic());}}if (!check) {// 不满足条件,直接忽略return null;}// topic匹配成功,执行return joinPoint.proceed();}
}

3.7 消息监听

	//使用自定义的consumer注解监听topic延迟队列@Consumer(topic = RedisKeyConstant.DELAY_QUEUE)public void consumer(DelayMsg delayMsg) {logger.info("预约单延时确认: " + delayMsg.getMsg() + " at:" + System.currentTimeMillis());//延迟业务具体实现//...//...}

3.8 业务facade层调用延迟处理

经过以上的延迟队列封装处理,在facade层,也就是我们的业务中就可以直接调用:

@Autowired
private DelayListWrapper delayListWrapper;
...
delayListWrapper.publish(RedisKeyConstant.DELAY_QUEUE, xxxId, xxx);

4 总结

本文以redis的zset来实现延时队列,并基于SpringBoot实现了延迟队列的推送和消费。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/37224.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

HoudiniVex笔记_P23_SDFBasics有向距离场

原视频&#xff1a;https://www.youtube.com/playlist?listPLzRzqTjuGIDhiXsP0hN3qBxAZ6lkVfGDI Bili&#xff1a;Houdini最强VEX算法教程 - VEX for Algorithmic Design_哔哩哔哩_bilibili Houdini版本&#xff1a;19.5 1、什么是SDF Houdini支持两种体积类型&#xff0c;…

使用wxPython和PyMuPDF提取PDF页面指定页数的内容的应用程序

在本篇博客中&#xff0c;我们将探讨如何使用wxPython和PyMuPDF库创建一个简单的Bokeh应用程序&#xff0c;用于选择PDF文件并提取指定页面的内容&#xff0c;并将提取的内容显示在文本框中。 C:\pythoncode\new\pdfgetcontent.py 准备工作 首先&#xff0c;确保你已经安装了…

44 | 酒店预订及取消的数据分析

1.背景介绍 数据集来自Kaggle网站上公开的Hotel booking demand项目 该数据集包含了一家城市酒店和一家度假酒店的预订信息,包括预订时间、入住时间、成人、儿童或婴儿数量、可用停车位数量等信息。 数据集容量约为12万32 本次数据分析主要包含如下内容: 总览数据,完成对…

大数据-玩转数据-Flink网页埋点PV统计

一、说明 衡量网站流量一个最简单的指标&#xff0c;就是网站的页面浏览量&#xff08;Page View&#xff0c;PV&#xff09;。用户每次打开一个页面便记录1次PV&#xff0c;多次打开同一页面则浏览量累计。 一般来说&#xff0c;PV与来访者的数量成正比&#xff0c;但是PV并不…

虹科干货 | 化身向量数据库的Redis Enterprise——快速、准确、高效的非结构化数据解决方案!

用户期望在他们遇到的每一个应用程序和网站都有搜索功能。然而&#xff0c;超过80%的商业数据是非结构化的&#xff0c;以文本、图像、音频、视频或其他格式存储。Redis Enterprise如何实现矢量相似性搜索呢&#xff1f;答案是&#xff0c;将AI驱动的搜索功能集成到Redis Enter…

STABLE DIFFUSION模型及插件的存放路径

记录下学习SD的一些心得&#xff0c;使用的是秋叶大佬的集成webui&#xff0c;下载了之后点击启动器即可开启&#xff0c;文件夹中的内容如下 主模型存放在models文件下的stable-diffusion文件夹内&#xff0c;一些扩展类的插件是存放在extensions文件夹下

【MFC】12.双缓冲序列化机制-笔记

双缓冲 双缓冲在之前写字符雨的时候&#xff0c;已经简单介绍过&#xff0c;今天我们来写一个简单的程序来体会双缓冲机制 我们实现一个在屏幕上画直线的功能&#xff1a; 在类中添加变量&#xff0c;保存起点坐标和终点坐标&#xff1a; //定义一个容器&#xff0c;保存每…

【189】Java Spring利用HTTP轮询远程控制树莓派4B继电器开关

因为项目需求&#xff0c;要实现PC远程控制警铃的效果。警铃结构简单&#xff0c;只需要通上12V的直流电就可以报警。本文的树莓派设备是在树莓派4B的基础上找硬件厂商搞的定制化产品。树莓派4B通过4G网卡连接互联网&#xff0c;并利用GPIO控制12V直流电的继电器开关。树莓派4B…

【设计模式】责任链模式

顾名思义&#xff0c;责任链模式&#xff08;Chain of Responsibility Pattern&#xff09;为请求创建了一个接收者对象的链。这种模式给予请求的类型&#xff0c;对请求的发送者和接收者进行解耦。这种类型的设计模式属于行为型模式。 在这种模式中&#xff0c;通常每个接收者…

移动端预览指定链接的pdf文件流

场景 直接展示外部系统返回的获取文件流时出现了跨域问题&#xff1a; 解决办法 1. 外部系统返回的请求头中调整&#xff08;但是其他系统不会给你改的&#xff09; 2. 我们系统后台获取文件流并转为新的文件流提供给前端 /** 获取传入url文件流 */ GetMapping("/get…

Java 正则表达式【非贪婪匹配、格式验证、反向引用、API】

非贪婪匹配 非贪婪匹配的元字符是问号 ? 当此字符跟在任何其他限定符&#xff08;*、、&#xff1f;、{n}、{m}、{n,m}&#xff09;之后&#xff0c;匹配模式是 "非贪心的"。非贪心的意思就是每次匹配搜索到的尽可能短的字符串&#xff0c;可以是0个。 案例 对…

30 | 中国高校数据分析

一、数据源 本项目使用了两个csv的数据文件,一个是中国高校(大学)的数据,一个是中国高校专业设置的数据 数据基本栏位:高校(大学)的数据高校专业设置的数据学校学校省份专业类别城市专业名称地址国家特色专业水平层次办学类别办学类型985211双一流二、数据分析目标 本…

电脑打开对话框中没有桌面这个选项解决办法

问题描述&#xff1a; 左侧栏中的桌面图标不显示 解决方法&#xff1a; 左侧的空白处右键-显示所有的文件夹 这时所有的文件夹都显示了&#xff01;

从鲁大师十五年,寻找软件的生存法则

千禧之年&#xff0c;国内互联网用户数量首次突破1000万大关&#xff0c;互联网的腾飞正式拉开序幕。 从彼时算起&#xff0c;中国互联网发展也不过23年&#xff0c;而我们记忆中那个摇着蒲扇的老头&#xff0c;却占据了其中关键的十五年。 这十五年中有太多曾经为人熟知的软件…

pointpillars怎么查看tensorboard

在PointPillars中使用TensorBoard来可视化训练过程和模型性能是很常见的做法。TensorBoard是TensorFlow提供的一个强大的工具&#xff0c;用于可视化训练过程、模型图、损失曲线、准确率等。下面是在PointPillars中使用TensorBoard的一般步骤&#xff1a; 请注意&#xff0c;上…

CHATGPT源码简介与使用指南

CHATGPT源码的基本介绍 CHATGPT源码备受关注&#xff0c;它是一款基于人工智能的聊天机器人&#xff0c;旨在帮助开发者快速搭建自己的聊天机器人&#xff0c;无需编写代码。下面是对CHATGPT搭建源码的详细介绍。 CHATGPT源码的构建和功能 CHATGPT源码是基于Google的自然语言…

flutter开发实战-MethodChannel实现flutter与iOS双向通信

flutter开发实战-MethodChannel实现flutter与iOS双向通信 最近开发中需要iOS与flutter实现通信&#xff0c;这里使用的MethodChannel 如果需要flutter与Android实现双向通信&#xff0c;请看 https://blog.csdn.net/gloryFlow/article/details/132218837 这部分与https://bl…

Linux——基础IO(1)

目录 0. 文件先前理解 1. C文件接口 1.1 写文件 1.2 读文件 1.3 输出信息到显示器 1.4 总结 and stdin & stdout & stderr 2. 系统调用文件I/O 2.1 系统接口使用示例 2.2 接口介绍 2.3 open函数返回值 3. 文件描述符fd及重定向 3.1 0 & 1 & 2 3.2…

【Spring Cloud Alibaba】RocketMQ的基础使用,如何发送消息和消费消息

在现代分布式架构的开发中&#xff0c;消息队列扮演着至关重要的角色&#xff0c;用于解耦系统组件、保障可靠性以及实现异步通信。RocketMQ作为一款开源的分布式消息中间件&#xff0c;凭借其高性能、高可用性和良好的扩展性&#xff0c;成为了众多企业在构建高可靠性、高吞吐…

运维面试大全

文章目录 第一阶段平常怎么处理故障,思路是什么样的公网和私网分类以及范围,本机地址,网络地址,广播地址交换机的工作原理ICMP是什么干什么用的,它有哪些命令TCP和UDP协议的区别tcp有哪些控制位,分别是什么意思你是用过哪些Linux命令Linux 系统安全优化与内核优化经常使用…