基于Redis的4种延时队列实现方式

延时队列是一种特殊的消息队列,它允许消息在指定的时间后被消费。在微服务架构、电商系统和任务调度场景中,延时队列扮演着关键角色。例如,订单超时自动取消、定时提醒、延时支付等都依赖延时队列实现。

Redis作为高性能的内存数据库,具备原子操作、数据结构丰富和简单易用的特性,本文将介绍基于Redis实现分布式延时队列的四种方式。

1. 基于Sorted Set的延时队列

原理

利用Redis的Sorted Set(有序集合),将消息ID作为member,执行时间戳作为score进行存储。通过ZRANGEBYSCORE命令可以获取到达执行时间的任务。

代码实现

public class RedisZSetDelayQueue {private final StringRedisTemplate redisTemplate;private final String queueKey = "delay_queue:tasks";public RedisZSetDelayQueue(StringRedisTemplate redisTemplate) {this.redisTemplate = redisTemplate;}/*** 添加延时任务* @param taskId 任务ID* @param taskInfo 任务信息(JSON字符串)* @param delayTime 延迟时间(秒)*/public void addTask(String taskId, String taskInfo, long delayTime) {// 计算执行时间long executeTime = System.currentTimeMillis() + delayTime * 1000;// 存储任务详情redisTemplate.opsForHash().put("delay_queue:details", taskId, taskInfo);// 添加到延时队列redisTemplate.opsForZSet().add(queueKey, taskId, executeTime);System.out.println("Task added: " + taskId + ", will execute at: " + executeTime);}/*** 轮询获取到期任务*/public List<String> pollTasks() {long now = System.currentTimeMillis();// 获取当前时间之前的任务Set<String> taskIds = redisTemplate.opsForZSet().rangeByScore(queueKey, 0, now);if (taskIds == null || taskIds.isEmpty()) {return Collections.emptyList();}// 获取任务详情List<String> tasks = new ArrayList<>();for (String taskId : taskIds) {String taskInfo = (String) redisTemplate.opsForHash().get("delay_queue:details", taskId);if (taskInfo != null) {tasks.add(taskInfo);// 从集合和详情中移除任务redisTemplate.opsForZSet().remove(queueKey, taskId);redisTemplate.opsForHash().delete("delay_queue:details", taskId);}}return tasks;}// 定时任务示例public void startTaskProcessor() {ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);scheduler.scheduleAtFixedRate(() -> {try {List<String> tasks = pollTasks();for (String task : tasks) {processTask(task);}} catch (Exception e) {e.printStackTrace();}}, 0, 1, TimeUnit.SECONDS);}private void processTask(String taskInfo) {System.out.println("Processing task: " + taskInfo);// 实际任务处理逻辑}
}

优缺点

优点

  • 实现简单,易于理解
  • 任务按执行时间自动排序
  • 支持精确的时间控制

缺点

  • 需要轮询获取到期任务,消耗CPU资源
  • 大量任务情况下,ZRANGEBYSCORE操作可能影响性能
  • 没有消费确认机制,需要额外实现

2. 基于List + 定时轮询的延时队列

原理

这种方式使用多个List作为存储容器,按延迟时间的不同将任务分配到不同的队列中。通过定时轮询各个队列,将到期任务移动到一个立即执行队列。

代码实现

public class RedisListDelayQueue {private final StringRedisTemplate redisTemplate;private final String readyQueueKey = "delay_queue:ready";  // 待处理队列private final Map<Integer, String> delayQueueKeys;  // 延迟队列,按延时时间分级public RedisListDelayQueue(StringRedisTemplate redisTemplate) {this.redisTemplate = redisTemplate;// 初始化不同延迟级别的队列delayQueueKeys = new HashMap<>();delayQueueKeys.put(5, "delay_queue:delay_5s");     // 5秒delayQueueKeys.put(60, "delay_queue:delay_1m");    // 1分钟delayQueueKeys.put(300, "delay_queue:delay_5m");   // 5分钟delayQueueKeys.put(1800, "delay_queue:delay_30m"); // 30分钟}/*** 添加延时任务*/public void addTask(String taskInfo, int delaySeconds) {// 选择合适的延迟队列String queueKey = selectDelayQueue(delaySeconds);// 任务元数据,包含任务信息和执行时间long executeTime = System.currentTimeMillis() + delaySeconds * 1000;String taskData = executeTime + ":" + taskInfo;// 添加到延迟队列redisTemplate.opsForList().rightPush(queueKey, taskData);System.out.println("Task added to " + queueKey + ": " + taskData);}/*** 选择合适的延迟队列*/private String selectDelayQueue(int delaySeconds) {// 找到最接近的延迟级别int closestDelay = delayQueueKeys.keySet().stream().filter(delay -> delay >= delaySeconds).min(Integer::compareTo).orElse(Collections.max(delayQueueKeys.keySet()));return delayQueueKeys.get(closestDelay);}/*** 移动到期任务到待处理队列*/public void moveTasksToReadyQueue() {long now = System.currentTimeMillis();// 遍历所有延迟队列for (String queueKey : delayQueueKeys.values()) {boolean hasMoreTasks = true;while (hasMoreTasks) {// 查看队列头部任务String taskData = redisTemplate.opsForList().index(queueKey, 0);if (taskData == null) {hasMoreTasks = false;continue;}// 解析任务执行时间long executeTime = Long.parseLong(taskData.split(":", 2)[0]);// 检查是否到期if (executeTime <= now) {// 通过LPOP原子性地移除队列头部任务String task = redisTemplate.opsForList().leftPop(queueKey);// 任务可能被其他进程处理,再次检查if (task != null) {// 提取任务信息并添加到待处理队列String taskInfo = task.split(":", 2)[1];redisTemplate.opsForList().rightPush(readyQueueKey, taskInfo);System.out.println("Task moved to ready queue: " + taskInfo);}} else {// 队列头部任务未到期,无需检查后面的任务hasMoreTasks = false;}}}}/*** 获取待处理任务*/public String getReadyTask() {return redisTemplate.opsForList().leftPop(readyQueueKey);}/*** 启动任务处理器*/public void startTaskProcessors() {// 定时移动到期任务ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);// 移动任务线程scheduler.scheduleAtFixedRate(this::moveTasksToReadyQueue, 0, 1, TimeUnit.SECONDS);// 处理任务线程scheduler.scheduleAtFixedRate(() -> {String task = getReadyTask();if (task != null) {processTask(task);}}, 0, 100, TimeUnit.MILLISECONDS);}private void processTask(String taskInfo) {System.out.println("Processing task: " + taskInfo);// 实际任务处理逻辑}
}

优缺点

优点

  • 分级队列设计,降低单队列压力
  • 相比Sorted Set占用内存少
  • 支持队列监控和任务优先级

缺点

  • 延迟时间精度受轮询频率影响
  • 实现复杂度高
  • 需要维护多个队列
  • 时间判断和队列操作非原子性,需特别处理并发问题

3. 基于发布/订阅(Pub/Sub)的延时队列

原理

结合Redis发布/订阅功能与本地时间轮算法,实现延迟任务的分发和处理。任务信息存储在Redis中,而时间轮负责任务的调度和发布。

代码实现

public class RedisPubSubDelayQueue {private final StringRedisTemplate redisTemplate;private final String TASK_TOPIC = "delay_queue:task_channel";private final String TASK_HASH = "delay_queue:tasks";private final HashedWheelTimer timer;public RedisPubSubDelayQueue(StringRedisTemplate redisTemplate) {this.redisTemplate = redisTemplate;// 初始化时间轮,刻度100ms,轮子大小512this.timer = new HashedWheelTimer(100, TimeUnit.MILLISECONDS, 512);// 启动消息订阅subscribeTaskChannel();}/*** 添加延时任务*/public void addTask(String taskId, String taskInfo, long delaySeconds) {// 存储任务信息到RedisredisTemplate.opsForHash().put(TASK_HASH, taskId, taskInfo);// 添加到时间轮timer.newTimeout(timeout -> {// 发布任务就绪消息redisTemplate.convertAndSend(TASK_TOPIC, taskId);}, delaySeconds, TimeUnit.SECONDS);System.out.println("Task scheduled: " + taskId + ", delay: " + delaySeconds + "s");}/*** 订阅任务通道*/private void subscribeTaskChannel() {redisTemplate.getConnectionFactory().getConnection().subscribe((message, pattern) -> {String taskId = new String(message.getBody());// 获取任务信息String taskInfo = (String) redisTemplate.opsForHash().get(TASK_HASH, taskId);if (taskInfo != null) {// 处理任务processTask(taskId, taskInfo);// 删除任务redisTemplate.opsForHash().delete(TASK_HASH, taskId);}}, TASK_TOPIC.getBytes());}private void processTask(String taskId, String taskInfo) {System.out.println("Processing task: " + taskId + " - " + taskInfo);// 实际任务处理逻辑}// 模拟HashedWheelTimer类public static class HashedWheelTimer {private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);private final long tickDuration;private final TimeUnit unit;public HashedWheelTimer(long tickDuration, TimeUnit unit, int wheelSize) {this.tickDuration = tickDuration;this.unit = unit;}public void newTimeout(TimerTask task, long delay, TimeUnit timeUnit) {long delayMillis = timeUnit.toMillis(delay);scheduler.schedule(() -> task.run(null), delayMillis, TimeUnit.MILLISECONDS);}public interface TimerTask {void run(Timeout timeout);}public interface Timeout {}}
}

优缺点

优点

  • 即时触发,无需轮询
  • 高效的时间轮算法
  • 可以跨应用订阅任务
  • 分离任务调度和执行,降低耦合

缺点

  • 依赖本地时间轮,非纯Redis实现
  • Pub/Sub模式无消息持久化,可能丢失消息
  • 服务重启时需要重建时间轮
  • 订阅者需要保持连接

4. 基于Redis Stream的延时队列

原理

Redis 5.0引入的Stream是一个强大的数据结构,专为消息队列设计。结合Stream的消费组和确认机制,可以构建可靠的延时队列。

代码实现

public class RedisStreamDelayQueue {private final StringRedisTemplate redisTemplate;private final String delayQueueKey = "delay_queue:stream";private final String consumerGroup = "delay_queue_consumers";private final String consumerId = UUID.randomUUID().toString();public RedisStreamDelayQueue(StringRedisTemplate redisTemplate) {this.redisTemplate = redisTemplate;// 创建消费者组try {redisTemplate.execute((RedisCallback<String>) connection -> {connection.streamCommands().xGroupCreate(delayQueueKey.getBytes(), consumerGroup, ReadOffset.from("0"), true);return "OK";});} catch (Exception e) {// 消费者组可能已存在System.out.println("Consumer group may already exist: " + e.getMessage());}}/*** 添加延时任务*/public void addTask(String taskInfo, long delaySeconds) {long executeTime = System.currentTimeMillis() + delaySeconds * 1000;Map<String, Object> task = new HashMap<>();task.put("executeTime", String.valueOf(executeTime));task.put("taskInfo", taskInfo);redisTemplate.opsForStream().add(delayQueueKey, task);System.out.println("Task added: " + taskInfo + ", execute at: " + executeTime);}/*** 获取待执行的任务*/public List<String> pollTasks() {long now = System.currentTimeMillis();List<String> readyTasks = new ArrayList<>();// 读取尚未处理的消息List<MapRecord<String, Object, Object>> records = redisTemplate.execute((RedisCallback<List<MapRecord<String, Object, Object>>>) connection -> {return connection.streamCommands().xReadGroup(consumerGroup.getBytes(),consumerId.getBytes(),StreamReadOptions.empty().count(10),StreamOffset.create(delayQueueKey.getBytes(), ReadOffset.from(">")));});if (records != null) {for (MapRecord<String, Object, Object> record : records) {String messageId = record.getId().getValue();Map<Object, Object> value = record.getValue();long executeTime = Long.parseLong((String) value.get("executeTime"));String taskInfo = (String) value.get("taskInfo");// 检查任务是否到期if (executeTime <= now) {readyTasks.add(taskInfo);// 确认消息已处理redisTemplate.execute((RedisCallback<String>) connection -> {connection.streamCommands().xAck(delayQueueKey.getBytes(),consumerGroup.getBytes(),messageId.getBytes());return "OK";});// 可选:从流中删除消息redisTemplate.opsForStream().delete(delayQueueKey, messageId);} else {// 任务未到期,放回队列redisTemplate.execute((RedisCallback<String>) connection -> {connection.streamCommands().xAck(delayQueueKey.getBytes(),consumerGroup.getBytes(),messageId.getBytes());return "OK";});// 重新添加任务(可选:使用延迟重新入队策略)Map<String, Object> newTask = new HashMap<>();newTask.put("executeTime", String.valueOf(executeTime));newTask.put("taskInfo", taskInfo);redisTemplate.opsForStream().add(delayQueueKey, newTask);}}}return readyTasks;}/*** 启动任务处理器*/public void startTaskProcessor() {ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);scheduler.scheduleAtFixedRate(() -> {try {List<String> tasks = pollTasks();for (String task : tasks) {processTask(task);}} catch (Exception e) {e.printStackTrace();}}, 0, 1, TimeUnit.SECONDS);}private void processTask(String taskInfo) {System.out.println("Processing task: " + taskInfo);// 实际任务处理逻辑}
}

优缺点

优点

  • 支持消费者组和消息确认,提供可靠的消息处理
  • 内置消息持久化机制
  • 支持多消费者并行处理
  • 消息ID包含时间戳,便于排序

缺点

  • 要求Redis 5.0+版本
  • 实现相对复杂
  • 仍需轮询获取到期任务
  • 对未到期任务的处理相对繁琐

性能对比与选型建议

实现方式性能可靠性实现复杂度内存占用适用场景
Sorted Set★★★★☆★★★☆☆任务量适中,需要精确调度
List + 轮询★★★★★★★★☆☆高并发,延时精度要求不高
Pub/Sub + 时间轮★★★★★★★☆☆☆实时性要求高,可容忍服务重启丢失
Stream★★★☆☆★★★★★可靠性要求高,需要消息确认

总结

在实际应用中,可根据系统规模、性能需求、可靠性要求和实现复杂度等因素进行选择,也可以组合多种方式打造更符合业务需求的延时队列解决方案。无论选择哪种实现,都应关注可靠性、性能和监控等方面,确保延时队列在生产环境中稳定运行。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/901944.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

GN ninja 工程化构建例程

文章目录 1. 前言✨2. 工程实例🚩2.1 工程目录结构2.2 工程顶层.gn文件2.3 工具链配置.gn文件2.4 编译配置.gn文件2.5 编译目标配置.gn文件2.6 工程接口文件2.7 动态库编译.gn文件2.8 动态库源文件2.9 静态库编译.gn文件2.10 静态库源文件2.11 主程序编译.gn文件2.12 主程序源…

基于亚博K210开发板——内存卡读写文件

开发板 亚博K210开发板 实验目的 本实验主要学习 K210 通过 SPI 读写内存卡文件的功能 实验准备 实验元件 开发板自带的 TF 卡、LCD 显示屏 &#xff08;提前准备好 FAT32 格式的TF 卡。TF 插入 TF 卡槽的时候注意方向&#xff0c;TF 卡的金手指那一面需要面向开发板&am…

51单片机实验五:A/D和D/A转换

一、实验环境与实验器材 环境&#xff1a;Keli&#xff0c;STC-ISP烧写软件,Proteus. 器材&#xff1a;TX-1C单片机&#xff08;STC89C52RC&#xff09;、电脑。 二、 实验内容及实验步骤 1.A/D转换 概念&#xff1a;模数转换是将连续的模拟信号转换为离散的数字信…

C++ 常用的智能指针

C 智能指针 一、智能指针类型概览 C 标准库提供以下智能指针&#xff08;需包含头文件 <memory>&#xff09;&#xff1a; unique_ptr&#xff1a;独占所有权&#xff0c;不可复制&#xff0c; 可移动shared_ptr&#xff1a;共享所有权&#xff0c;用于引用计数weak_pt…

6.8.最小生成树

一.复习&#xff1a; 1.生成树&#xff1a; 对于一个连通的无向图&#xff0c;假设图中有n个顶点&#xff0c;如果能找到一个符合以下要求的子图&#xff1a; 子图中包含图中所有的顶点&#xff0c;同时各个顶点保持连通&#xff0c; 而且子图的边的数量只有n-1条&#xff0…

Spring Boot 集成金蝶 API 演示

✨ Spring Boot 集成金蝶 API 演示&#xff1a;登录 / 注销 Cookie 保存 本文将通过 Spring Boot 完整实现一套金蝶接口集成模型&#xff0c;包括&#xff1a; ✅ 普通登录✅ AppSecret 登录✅ 注销✅ Cookie 保存与复用 &#x1f4c5; 项目结构 src/ ├── controller/ │…

React 受控表单绑定基础

React 中最常见的几个需求是&#xff1a; 渲染一组列表绑定点击事件表单数据与组件状态之间的绑定 受控表单绑定是理解表单交互的关键之一。 &#x1f4cd;什么是受控组件&#xff1f; 在 React 中&#xff0c;所谓“受控组件”&#xff0c;指的是表单元素&#xff08;如 &l…

基于FPGA的AES加解密系统verilog实现,包含testbench和开发板硬件测试

目录 1.课题概述 2.系统测试效果 3.核心程序与模型 4.系统原理简介 4.1 字节替换&#xff08;SubBytes&#xff09; 4.2 行移位&#xff08;ShiftRows&#xff09; 4.3 列混合&#xff08;MixColumns&#xff09; 4.4 轮密钥加&#xff08;AddRoundKey&#xff09; 4.…

6.5 GitHub监控系统实战:双通道采集+动态调度打造高效运维体系

GitHub Sentinel Agent 定期更新功能设计与实现 关键词:GitHub API 集成、定时任务调度、Python 爬虫开发、SMTP 邮件通知、系统稳定性保障 1. GitHub 项目数据获取功能 1.1 双通道数据采集架构设计 #mermaid-svg-ZHJIMXcMAyDHVhmV {font-family:"trebuchet ms",v…

Explorer++:轻量级高效文件管理器!!

项目简介 Explorer 是一款专为Windows操作系统设计的轻量级且高效的文件管理器。作为Windows资源管理器的强大替代方案&#xff0c;它提供了丰富的特性和优化的用户体验&#xff0c;使得文件管理和组织变得更加便捷高效。无论是专业用户还是普通用户&#xff0c;都能从中受益&a…

7、生命周期:魔法的呼吸节奏——React 19 新版钩子

一、魔法呼吸的本质 "每个组件都是活体魔法生物&#xff0c;呼吸节奏贯穿其生命始终&#xff0c;"邓布利多的冥想盆中浮现三维相位图&#xff0c;"React 19的呼吸式钩子&#xff0c;让组件能量流转如尼可勒梅的炼金术&#xff01;" ——以霍格沃茨魔法生理…

理解计算篇--正则表达式转NFA--理论部分

空正则表达式转NFA单字符正则表达式转NFA拼接正则表达式转NFA选择正则表达式转NFA重复正则表达式转NFA 正则表达式转NFA–实战部分 空正则表达式转NFA 转换步骤&#xff1a; 构建1个只有1个状态的NFA起始状态也是接受状态没有规则&#xff0c;即规则集为空 单字符正则表达式…

稳态模型下的异步电机调速【运动控制系统】

异步电动机&#xff1a; n1是同步转速&#xff08;电机和磁芯同步时候的转速&#xff09; n&#xff1a;电机的实际转速 异步电动机恒压频比的概念&#xff0c;为什么基频以下可以采取恒压频率&#xff0c;基频以上不可以采用恒压频比&#xff1a; 异步电动机的恒压频比&…

【KWDB 创作者计划】_算法篇---Stockwell变换

文章目录 前言一、Stockwell变换原理详解1.1 连续S变换定义1.2 离散S变换1.3简介 二、S变换的核心特点2.1频率自适应的时频分辨率2.1.1高频区域2.1.2低频区域 2.2无交叉项干扰2.3完全可逆2.4相位保持2.5与傅里叶谱的直接关系 三、应用领域3.1地震信号分析3.2生物医学信号处理3.…

云计算(Cloud Computing)概述——从AWS开始

李升伟 编译 无需正式介绍亚马逊网络服务&#xff08;Amazon Web Services&#xff0c;简称AWS&#xff09;。作为行业领先的云服务提供商&#xff0c;AWS为全球开发者提供了超过170项随时可用的服务。 例如&#xff0c;Adobe能够独立于IT团队开发和更新软件。通过AWS的服务&…

Python爬虫第17节-动态渲染页面抓取之Selenium使用下篇

目录 引言 一、获取节点信息 1.1 获取属性 1.2 获取文本值 1.3 获取ID、位置、标签名、大小 二、切换Frame 三、延时等待 3.1 隐式等待 3.2 显式等待 四、前进后退 五、Cookies 六、选项卡管理 七、异常处理 引言 这一节我们继续讲解Selenium的使用下篇&#xff0…

容器docker入门学习

这里写目录标题 容器容器的软件厂商 dockerdocker引擎 虚拟化虚拟化技术 docker安装详解1、安装检查2、安装yum相关的工具3、安装docker-ce软件4、查看docker版本5、启动docker服务6、设置docker开机启动7、查看有哪些docker容器运行进程8、查看容器里有哪些镜像9、下载nginx软…

文献总结:NIPS2023——车路协同自动驾驶感知中的时间对齐(FFNet)

FFNet 一、文献基本信息二、背景介绍三、相关研究1. 以自车为中心的3D目标检测2. 车路协同3D目标检测3. 特征流 四、FFNet网络架构1. 车路协同3D目标检测任务定义2. 特征流网络2.1 特征流生成2.2 压缩、传输与解压缩2.3 车辆传感器数据与基础设施特征流融合 3. 特征流网络训练流…

git 出现 port 443 Connection timed out

梯子正常延迟不算严重&#xff0c;但在使用git push时反复出现 fatal: unable to access https://github.com/irvingwu5/xxxx.git/ Error in the HTTP2 framing layer Failed to connect to github.com port 443 after 136353 ms: Connection timed out 将git的网络配置与梯子…