Redis延迟队列原理及实例

redis延迟队列

    • 一、Redis延迟队列的原理
    • 二、数据结构说明
      • 2.1、数据结构说明
      • 2.2、为什么Sorted Set适合做延迟队列
      • 2.3、Sorted Set内部结构
    • 三、三种实现方式
      • 3.1、 Jedis实现方式
      • 3.2、Redisson实现(**推荐**)
      • 3.3、RedisTemplate 实现
    • 四、总结

一、Redis延迟队列的原理

在Redis延迟队列中,我们使用有序集合(Sorted Set)来存储消息,其中,消息的到期时间作为有序集合的分数,消息内容作为有序集合的成员。例如,我们向Redis中添加一个到期时间为2023年12月25日的消息,可以使用以下命令:

ZADD delay_queue 1640390400 "message content"

在这个命令中,delay_queue是有序集合的名称,1640390400是消息的到期时间(即2023年12月25日的时间戳),"message content"是消息的内容。

当我们需要消费这个延迟消息时,需要定期轮询有序集合,找到所有到期的消息进行消费。可以使用以下命令:

ZRANGEBYSCORE delay_queue 0 CURRENT_TIMESTAMP

其中,CURRENT_TIMESTAMP为当前时间戳,该命令的作用是获取当前时间之前的所有消息。然后,我们可以遍历返回结果,依次进行消息的处理。

对于已经处理完成的消息,可以选择从有序集合中删除,也可以将其留在有序集合中,等待下一次轮询。如果希望删除消息,可以使用以下命令:

ZREM delay_queue "message content"

通过这种方式,Redis延迟队列可以实现消息的延迟投递和消费。因为消息按照到期时间排序,所以可以保证消息的有序性。此外,由于Redis本身是一种高性能内存数据库,所以延迟队列的处理效率也非常高。

二、数据结构说明

2.1、数据结构说明

Sorted Set(有序集合)在Redis中也被称为 ZSet(有序集合)。在Redis中,ZSet提供了有序的、唯一的成员(member)和对应的分数(score)之间的映射关系。这种数据结构非常适合用来实现延迟队列,因为可以根据分数进行范围查询,从而轻松地获取到期的消息。

2.2、为什么Sorted Set适合做延迟队列

Sorted Set(有序集合)适合用于实现延迟队列的主要原因有以下几点:

  1. 有序性:Sorted Set中的成员按照分数进行排序,这使得我们可以方便地根据到期时间进行范围查询。通过设置分数为消息的到期时间,可以轻松地获取到期的消息,而无需遍历整个队列。

  2. 唯一性:Sorted Set中的成员是唯一的,这意味着我们可以确保消息不会重复。这对于一些需要确保消息处理幂等性的场景非常重要。

  3. 高效性:Redis是一种高性能的内存数据库,Sorted Set的操作效率很高。在使用Sorted Set实现延迟队列时,添加、删除和范围查询等操作都可以在O(logN)的时间复杂度内完成,具有良好的性能表现。

  4. 支持多种操作:除了基本的添加、删除和范围查询,Sorted Set还支持其他一些有用的操作,如获取成员的分数、修改成员的分数、按照分数范围删除成员等。这些操作可以为延迟队列的管理提供更多的灵活性和功能。

综上所述,Sorted Set具备有序性、唯一性、高效性和丰富的操作特性,使其成为实现延迟队列的理想选择。在Redis中利用Sorted Set来实现延迟队列,可以简单高效地处理延迟任务。

2.3、Sorted Set内部结构

Sorted Set 能够按照分数进行排序的原因在于它内部使用了一种数据结构——跳跃表(Skip List)。跳跃表是一种有序链表的数据结构,它通过在不同层级上建立索引的方式,提高了基本有序链表的查找效率。在 Redis 中,Sorted Set 的实现就是基于跳跃表。

通过跳跃表,Sorted Set 在插入新成员时可以保持成员的有序性。当需要按照分数进行排序时,Redis 内部会利用跳跃表的特性,快速地定位到对应分数的成员,从而实现按照分数进行排序的功能。

跳跃表的查询时间复杂度为 O(logN),这意味着无论集合中有多少成员,按照分数进行范围查询的效率都很高。这也是为什么 Sorted Set 能够高效地支持按照分数进行排序的重要原因之一。

总的来说,跳跃表作为 Sorted Set 内部的数据结构,通过其高效的有序性和索引特性,使得 Sorted Set 能够轻松地按照分数进行排序。

三、三种实现方式

3.1、 Jedis实现方式

import redis.clients.jedis.Jedis;
import redis.clients.jedis.Tuple;import java.util.Set;public class RedisDelayQueue {private static final String DELAY_QUEUE_KEY = "delay_queue";public static void main(String[] args) {// 连接 RedisJedis jedis = new Jedis("localhost");// 添加延迟消息addMessageToDelayQueue(jedis, "message1", 5000);  // 延迟 5 秒发送addMessageToDelayQueue(jedis, "message2", 10000); // 延迟 10 秒发送// 处理延迟消息processDelayQueue(jedis);// 断开连接jedis.close();}// 将消息添加到延迟队列private static void addMessageToDelayQueue(Jedis jedis, String message, long delayMillis) {long currentTime = System.currentTimeMillis();long delayTime = currentTime + delayMillis;jedis.zadd(DELAY_QUEUE_KEY, delayTime, message);System.out.println("Message added to delay queue: " + message);}// 处理延迟队列中的消息private static void processDelayQueue(Jedis jedis) {while (true) {long currentTime = System.currentTimeMillis();// 获取当前时间之前的第一个消息及其分数Set<Tuple> messages = jedis.zrangeByScoreWithScores(DELAY_QUEUE_KEY, 0, currentTime, 0, 1);if (messages.isEmpty()) {// 延迟队列为空,暂停一段时间再继续轮询try {Thread.sleep(1000); // 暂停 1 秒钟} catch (InterruptedException e) {e.printStackTrace();}continue;}// 处理消息,这里简单打印消息内容for (Tuple tuple : messages) {String message = tuple.getElement();double score = tuple.getScore();System.out.println("Processing message: " + message);System.out.println("Scheduled time: " + score);// 从延迟队列中移除已处理的消息jedis.zrem(DELAY_QUEUE_KEY, message);}}}
}

3.2、Redisson实现(推荐

Redisson 是一个基于 Redis 的 Java 客户端,Redisson 提供了 RDelayedQueue 接口和 RQueue, RBlockingDeque 接口来实现延迟队列。
原理如下:

  • 首先,你需要创建一个基本的队列,然后将它包装在一个延迟队列中。将延迟任务添加到延迟队列中,指定任务的内容和延迟时间(以毫秒为单位)。
  • 延迟队列会自动处理过期的任务并将它们移动到基本队列中。你可以从基本队列中获取任务并进行处理。

过程有点像RabbitMq, 延迟队列 + TTL + 死信队列,用两个队列结合使用。

下面是使用 Redisson 实现延迟队列的示例代码:

  1. 创建两个队列,并关联
@Bean
public RDelayedQueue delayedQueue(RedissonClient redissonClient) {// 创建阻塞队列RBlockingDeque<String> queue = redissonClient.getBlockingDeque("redisson_delay_Queue");// 创建延迟队列并关联到基本队列RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(queue);return delayedQueue;
}
  1. 添加延迟任务到延迟队列中
// 添加延迟任务
delayedQueue.offer("Task1", 5000); // 5秒延迟
delayedQueue.offer("Task2", 10000); // 10秒延迟
  1. 处理延迟任务
    延迟队列会自动处理过期的任务并将它们移动到阻塞队列中。你可以从阻塞队列中获取任务并进行处理。
while (true){// 获取并移除队首元素,如果队列为空,则阻塞等待String take = test.take();log.info("当前时间:{},值:{}", LocalDateTime.now(),take);
}

为什么要使用阻塞队列 RBlockingDeque,而不使用普通队列RQueue,普通队列在获取队列元素时没有阻塞的功能,在while(true)中一直循环,导致cpu空转,造成资源浪费。阻塞队列的话,队列没有元素会阻塞着。

  1. 单测实例
@Test
public void redisDelayedQueueTest() {RBlockingDeque<String> test = redissonClient.getBlockingDeque("test");RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(test);delayedQueue.offer("1",20, TimeUnit.SECONDS);delayedQueue.offer("2",10, TimeUnit.SECONDS);log.info("当前时间:{}", LocalDateTime.now());while (true){String take = test.take();log.info("当前时间:{},值:{}", LocalDateTime.now(),take);}
}

3.3、RedisTemplate 实现

import com.alibaba.fastjson.JSON;
import com.jingdianjichi.redis.util.RedisUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.Collections;
import java.util.Date;
import java.util.Set;
import java.util.stream.Collectors;/*** @Description: 群发任务延时service* @DateTime: 2023/12/8 23:24*/
@Service
@Slf4j
public class MassMailTaskService {@Resourceprivate RedisUtil redisUtil;private static final String MASS_MAIL_TASK_KEY = "massMailTask";public void pushMassMailTaskQueue(MassMailTask massMailTask) {Date startTime = massMailTask.getStartTime();if (startTime == null) {return;}if (startTime.compareTo(new Date()) <= 0) {return;}log.info("定时群发任务加入延时队列,massMailTask:{}", JSON.toJSON(massMailTask));redisUtil.zAdd(MASS_MAIL_TASK_KEY, massMailTask.getTaskId().toString(), startTime.getTime());}public Set<Long> poolMassMailTaskQueue() {Set<String> taskIdSet = redisUtil.rangeByScore(MASS_MAIL_TASK_KEY, 0, System.currentTimeMillis());log.info("获取延迟群发任务,taskIdSet:{}", JSON.toJSON(taskIdSet));if (CollectionUtils.isEmpty(taskIdSet)) {return Collections.emptySet();}redisUtil.removeZsetList(MASS_MAIL_TASK_KEY, taskIdSet);return taskIdSet.stream().map(n -> Long.parseLong(n)).collect(Collectors.toSet());}}
@Component
@Slf4j
public class RedisUtil {@Resourceprivate RedisTemplate redisTemplate;public Boolean zAdd(String key, String value, Long score) {return redisTemplate.opsForZSet().add(key, value, Double.valueOf(String.valueOf(score)));}public void removeZsetList(String key, Set<String> value) {value.stream().forEach((val) -> redisTemplate.opsForZSet().remove(key, val));}public Set<String> rangeByScore(String key, long start, long end) {return redisTemplate.opsForZSet().rangeByScore(key, Double.valueOf(String.valueOf(start)), Double.valueOf(String.valueOf(end)));}}

四、总结

个人建议使用Redisson的方式。因为队列中没有元素时,会阻塞。从而避免了while(true) 一直循环,空转cpu, 浪费资源~ 。其他两种方式在while(true)中获取不到元素会一直死循环,要手动设置线程休眠时间~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/242100.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2016年第五届数学建模国际赛小美赛A题臭氧消耗预测解题全过程文档及程序

2016年第五届数学建模国际赛小美赛 A题 臭氧消耗预测 原题再现&#xff1a; 臭氧消耗包括自1970年代后期以来观察到的若干现象&#xff1a;地球平流层&#xff08;臭氧层&#xff09;臭氧总量稳步下降&#xff0c;以及地球极地附近平流层臭氧&#xff08;称为臭氧空洞&#x…

数据结构和算法-二叉排序树(定义 查找 插入 删除 时间复杂度)

文章目录 二叉排序树总览二叉排序树的定义二叉排序树的查找二叉排序树的插入二叉排序树的构造二叉排序树的删除删除的是叶子节点删除的是只有左子树或者只有右子树的节点删除的是有左子树和右子树的节点 查找效率分析查找成功查找失败 小结 二叉排序树 总览 二叉排序树的定义 …

【LeetCode:1954. 收集足够苹果的最小花园周长 | 等差数列 + 公式推导】

&#x1f680; 算法题 &#x1f680; &#x1f332; 算法刷题专栏 | 面试必备算法 | 面试高频算法 &#x1f340; &#x1f332; 越难的东西,越要努力坚持&#xff0c;因为它具有很高的价值&#xff0c;算法就是这样✨ &#x1f332; 作者简介&#xff1a;硕风和炜&#xff0c;…

C语言—每日选择题—Day60

指针相关博客 打响指针的第一枪&#xff1a;指针家族-CSDN博客 深入理解&#xff1a;指针变量的解引用 与 加法运算-CSDN博客 第一题 1. 下列for循环的循环体执行次数为&#xff08;&#xff09; for(int i 10, j 1; i j 0; i, --j) A&#xff1a;0 B&#xff1a;1 C&#…

蓝桥小课堂-平方和【算法赛】

问题描述 蓝桥小课堂开课啦&#xff01; 平方和公式是一种用于计算连续整数的平方和的数学公式。它可以帮助我们快速求解从 1 到 n 的整数的平方和&#xff0c;其中 n 是一个正整数。 平方和公式的表达式如下&#xff1a; 这个公式可以简化计算过程&#xff0c;避免逐个计算…

2024年【制冷与空调设备运行操作】免费试题及制冷与空调设备运行操作试题及解析

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 制冷与空调设备运行操作免费试题根据新制冷与空调设备运行操作考试大纲要求&#xff0c;安全生产模拟考试一点通将制冷与空调设备运行操作模拟考试试题进行汇编&#xff0c;组成一套制冷与空调设备运行操作全真模拟考…

Android Native Hook 深入理解PLT hook

前言 本文介绍NativeHook技术里的PLT hook,参考开源的xhook和bhook进行理解,本文不涉及该hook技术源码的分析,只分析大体原理,用于在进行修复稳定性问题时候寻找hook点使用。 基本搬运如下两篇文章,主要是加深自己理解: 字节跳动开源 Android PLT hook 方案 bhook xhoo…

K8S面试题

请问一下&#xff0c;在K8S中&#xff0c;deployment和RS有什么区别和联系&#xff1f; 在Kubernetes&#xff08;K8s&#xff09;中&#xff0c;Deployment和ReplicaSet&#xff08;RS&#xff09;是用于管理应用程序副本的两个重要概念&#xff0c;它们之间存在着一些区别和…

【FPGA】分享一些FPGA协同MATLAB开发的书籍

在做FPGA工程师的这些年&#xff0c;买过好多书&#xff0c;也看过好多书&#xff0c;分享一下。 后续会慢慢的补充书评。 【FPGA】分享一些FPGA入门学习的书籍【FPGA】分享一些FPGA协同MATLAB开发的书籍 【FPGA】分享一些FPGA视频图像处理相关的书籍 【FPGA】分享一些FPGA高速…

python时间处理方法和模块

在 Python 中&#xff0c;有一些内置的模块和库&#xff0c;可以帮助我们处理日期和时间的表示、计算和转换。 1. 时间模块&#xff08;time&#xff09; Python 的 time 模块提供了一系列函数来处理时间相关的操作。通过这个模块&#xff0c;可以获取当前时间、睡眠指定时间…

【设计模式】RBAC 模型详解

其他系列文章导航 Java基础合集数据结构与算法合集 设计模式合集 多线程合集 分布式合集 ES合集 文章目录 其他系列文章导航 文章目录 前言 一、什么是 RBAC 呢&#xff1f; 二、RBAC 的组成 三、RBAC 的优缺点 3.1 优点&#xff1a; 3.2 缺点&#xff1a; 四、RBAC 的…

如何用Python画坤坤?

前言 最近闲得慌&#xff0c;突然想起坤坤了&#xff0c;那就画一个吧。 一、Python画坤坤 坤坤大家都熟悉不过了&#xff0c;也就是蔡徐坤。 代码&#xff1a; from turtle import * from math import * #高级椭圆参数方程&#xff08;颜色&#xff09;&#xff0c;sita为…

MySQL报错:1366 - Incorrect integer value: ‘xx‘ for column ‘xx‘ at row 1的解决方法

我在插入表数据时遇到了1366报错&#xff0c;报错内容&#xff1a;1366 - Incorrect integer value: Cindy for column name at row 1&#xff0c;下面我演示解决方法。 根据上图&#xff0c;原因是Cindy’对应的name字段数据类型不正确。我们在左侧找到该字段所在的grade_6表&…

轻松学会在 Linux 上使用 Docker

前言 本文将带你轻松掌握在 CentOS 和 Ubuntu 上安装和配置 Docker 的步骤。Docker 是一个强大的容器化平台&#xff0c;让应用程序的打包、交付和运行变得简单易行。 步骤 步骤 1: 安装所需工具 在 CentOS 和 Ubuntu 上安装必要的工具&#xff0c;确保系统能够通过 HTTPS …

【图文教程】windows 下 MongoDB 介绍下载安装配置

文章目录 介绍MySQL 之间的区别和适用场景差异数据模型&#xff1a;查询语言&#xff1a;可扩展性&#xff1a;数据一致性&#xff1a; 下载安装环境变量配置 介绍 MongoDB 是一种开源的、面向文档的 NoSQL 数据库管理系统。它使用灵活的文档模型来存储数据&#xff0c;这意味…

HarmonyOS - 基础组件绘制

文章目录 所有组件开发 tipsBlankTextImageTextInputButtonLoadingProgress 本文改编自&#xff1a;<HarmonyOS第一课>从简单的页面开始 https://developer.huawei.com/consumer/cn/training/course/slightMooc/C101667360160710997 所有组件 在 macOS 上&#xff0c;组…

k8s部署nginx-ingress服务

k8s部署nginx-ingress服务 经过大佬的拷打&#xff0c;终于把这块的内容配置完成了。 首先去 nginx-ingress官网查看相关内容。 核心就是这个&#xff1a; kubectl apply -f https://raw.githubusercontent.com/kubernetes/ingress-nginx/controller-v1.8.2/deploy/static/prov…

计算机网络——数据链路层-点对点协议(组成部分、PPP帧格式、透明传输、差错检测、工作状态)

目录 介绍 组成部分 PPP帧格式 透明传输 字节填充法 比特填充法 差错检测 工作状态 本篇我们介绍点对点协议PPP 介绍 点对点协议PPP&#xff08;Point-to-Point Protocol&#xff09;是目前使用最广泛的点对点数据链路层协议。 请大家想想看&#xff1a;一般的英特…

算法模板之队列图文详解

&#x1f308;个人主页&#xff1a;聆风吟 &#x1f525;系列专栏&#xff1a;算法模板、数据结构 &#x1f516;少年有梦不应止于心动&#xff0c;更要付诸行动。 文章目录 &#x1f4cb;前言一. ⛳️模拟队列1.1 &#x1f514;用数组模拟实现队列1.1.1 &#x1f47b;队列的定…

Kafka集群架构服务端核心概念

目录 Kafka集群选举 controller选举机制 Leader partition选举 leader partition自平衡 partition故障恢复机制 follower故障 leader故障 HW一致性保障 HW同步过程 Epoch Kafka集群选举 1. 在多个broker中, 需要选举出一个broker, 担任controller. 由controller来管理…