Redis延迟队列详解

以下是对 Redis 延迟队列的详细解释:

一、什么是 Redis 延迟队列

Redis 延迟队列是一种使用 Redis 实现的消息队列,其中的消息在被消费之前会等待一段时间,这段时间就是延迟时间。延迟队列常用于一些需要延迟处理的任务场景,例如订单超时未支付取消、定时提醒等。

二、实现原理

  1. 使用 ZSET(有序集合)存储消息

    • 在 Redis 中,可以使用 ZSET 存储延迟消息。ZSET 的成员是消息的唯一标识,分数(score)是消息的到期时间戳。这样,消息会根据到期时间戳自动排序。
    • 例如,我们可以使用以下 Redis 命令添加一条延迟消息:
     

    收起

    redis

    ZADD delay_queue <timestamp> <message_id>
    
     

    其中 <timestamp> 是消息到期的时间戳,<message_id> 是消息的唯一标识。

  2. 消费者轮询 ZSET

    • 消费者会不断轮询 ZSET,使用 ZRANGEBYSCORE 命令查找分数小于或等于当前时间戳的元素。
    • 例如:
     

    redis

    ZRANGEBYSCORE delay_queue 0 <current_timestamp>
    
     

    这里的 0 表示最小分数,<current_timestamp> 是当前时间戳,这个命令会返回所有到期的消息。

  3. 处理到期消息

    • 当消费者找到到期消息后,会将消息从 ZSET 中移除并进行处理。可以使用 ZREM 命令移除消息:

    redis

    ZREM delay_queue <message_id>
    
     

    然后将消息发送到实际的消息处理程序中。

三、Java 代码示例

以下是一个使用 Jedis(Redis 的 Java 客户端)实现 Redis 延迟队列的简单示例:

java

import redis.clients.jedis.Jedis;
import java.util.Set;public class RedisDelayQueue {private Jedis jedis;public RedisDelayQueue() {jedis = new Jedis("localhost", 6379);}// 生产者添加延迟消息public void addDelayMessage(String messageId, long delayMillis) {long score = System.currentTimeMillis() + delayMillis;jedis.zadd("delay_queue", score, messageId);}// 消费者轮询并处理消息public void consume() {while (true) {// 查找到期的消息Set<String> messages = jedis.zrangeByScore("delay_queue", 0, System.currentTimeMillis(), 0, 1);if (messages.isEmpty()) {try {// 没有消息,等待一段时间再轮询Thread.sleep(100);} catch (InterruptedException e) {Thread.currentThread().interrupt();}continue;}String messageId = messages.iterator().next();// 移除消息Long removed = jedis.zrem("delay_queue", messageId);if (removed > 0) {// 消息成功移除,进行处理System.out.println("Processing message: " + messageId);// 在这里添加实际的消息处理逻辑}}}public static void main(String[] args) {RedisDelayQueue delayQueue = new RedisDelayQueue();// 生产者添加消息,延迟 5 秒delayQueue.addDelayMessage("message_1", 5000);// 启动消费者delayQueue.consume();}
}

代码解释

  • RedisDelayQueue 类封装了延迟队列的基本操作。
  • addDelayMessage 方法:
    • 计算消息的到期时间戳,将消息添加到 delay_queue ZSET 中,使用 jedis.zadd 命令。
  • consume 方法:
    • 不断轮询 delay_queue ZSET,使用 jedis.zrangeByScore 查找到期消息。
    • 如果没有消息,线程休眠 100 毫秒后继续轮询。
    • 若找到消息,使用 jedis.zrem 移除消息,如果移除成功,说明该消息被此消费者处理,进行后续处理。

四、注意事项

  1. 并发处理

    • 多个消费者同时轮询 ZSET 时,可能会出现竞争条件,需要注意消息的重复处理问题。可以使用 Redis 的事务(MULTIEXEC)或 Lua 脚本保证原子性。
    • 例如,可以使用 Lua 脚本将查找和移除操作合并为一个原子操作:

    lua

    local message = redis.call('ZRANGEBYSCORE', 'delay_queue', 0, ARGV[1], 'LIMIT', 0, 1)
    if #message > 0 thenif redis.call('ZREM', 'delay_queue', message[1]) == 1 thenreturn message[1]end
    end
    return nil
    
     

    然后在 Java 中调用这个脚本:

    java

    String script = "local message = redis.call('ZRANGEBYSCORE', 'delay_queue', 0, ARGV[1], 'LIMIT', 0, 1)\n" +"if #message > 0 then\n" +"    if redis.call('ZREM', 'delay_queue', message[1]) == 1 then\n" +"        return message[1]\n" +"    end\n" +"end\n" +"return nil";
    while (true) {String messageId = (String) jedis.eval(script, 0, String.valueOf(System.currentTimeMillis()));if (messageId!= null) {System.out.println("Processing message: " + messageId);// 在这里添加实际的消息处理逻辑} else {try {Thread.sleep(100);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}
    }
    
  2. 消息持久化

    • Redis 是内存数据库,需要考虑消息的持久化问题,确保在 Redis 重启后不会丢失重要消息。可以使用 Redis 的 RDB 或 AOF 持久化机制,但要注意性能和数据安全的平衡。

五、使用 Redis 模块

除了上述基本实现,还可以使用 Redis 的一些第三方模块,如 Redis 的 Redisson 库,它提供了更高级的延迟队列实现,使用更加方便和可靠:

java

import org.redisson.Redisson;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import java.util.concurrent.TimeUnit;public class RedissonDelayQueueExample {public static void main(String[] args) {Config config = new Config();config.useSingleServer().setAddress("redis://127.0.0.1:6379");RedissonClient redisson = Redisson.create(config);RBlockingQueue<String> blockingQueue = redisson.getBlockingQueue("myQueue");RDelayedQueue<String> delayedQueue = redisson.getDelayedQueue(blockingQueue);// 生产者添加延迟消息delayedQueue.offer("message_1", 5, TimeUnit.SECONDS);// 消费者new Thread(() -> {while (true) {try {String message = blockingQueue.take();System.out.println("Processing message: " + message);// 在这里添加实际的消息处理逻辑} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}).start();}
}

代码解释

  • Redisson 是一个功能强大的 Redis 客户端库。
  • RBlockingQueue 是阻塞队列,RDelayedQueue 是延迟队列。
  • 使用 delayedQueue.offer("message_1", 5, TimeUnit.SECONDS) 添加延迟消息。
  • 消费者通过 blockingQueue.take() 阻塞等待消息,当消息到期时,会自动从延迟队列转移到阻塞队列并被消费者接收。

通过上述几种方法,可以使用 Redis 实现延迟队列,满足不同场景下的延迟任务处理需求。根据具体情况,可以选择简单的 ZSET 实现或使用更高级的第三方库,同时要注意并发处理和消息持久化等问题,以确保延迟队列的稳定性和可靠性。

总之,Redis 延迟队列是一种高效且灵活的实现延迟任务的方式,在分布式系统中具有广泛的应用,利用 Redis 的特性可以轻松处理延迟消息,减少系统的复杂性和开发成本。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/67517.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

利用免费GIS工具箱实现高斯泼溅切片,将 PLY 格式转换为 3dtiles

在地理信息系统&#xff08;GIS&#xff09;和三维数据处理领域&#xff0c;不同数据格式有其独特应用场景与优势。PLY&#xff08;Polygon File Format&#xff09;格式常用于存储多边形网格数据&#xff0c;而 3DTiles 格式在 Web 端三维场景展示等方面表现出色。将 PLY 格式…

【数据分析】02- A/B 测试:玩转假设检验、t 检验与卡方检验

一、背景&#xff1a;当“审判”成为科学 1.1 虚拟场景——法庭审判 想象这样一个场景&#xff1a;有一天&#xff0c;你在王国里担任“首席审判官”。你面前站着一位嫌疑人&#xff0c;有人指控他说“偷了国王珍贵的金冠”。但究竟是他干的&#xff0c;还是他是被冤枉的&…

ZooKeeper 核心知识全解析:架构、角色、节点与应用

1.ZooKeeper 分布式锁怎么实现的 ZooKeeper 是一个高效的分布式协调服务&#xff0c;它提供了简单的原语集来构建更复杂的同步原语和协调数据结构。利用 ZooKeeper 实现分布式锁主要依赖于它的顺序节点&#xff08;Sequential Node&#xff09;特性以及临时节点&#xff08;Ep…

数据结构与算法之递归: LeetCode 47. 全排列 II (Ts, Py, Go版)

全排列 II https://leetcode.cn/problems/permutations-ii/description/ 描述 给定一个可包含重复数字的序列 nums &#xff0c;按任意顺序 返回所有不重复的全排列 示例 1 输入&#xff1a;nums [1,1,2] 输出&#xff1a; [[1,1,2],[1,2,1],[2,1,1]]示例 2 输入&#x…

深入理解 Windows Server 的核心功能:现代 IT 架构的基石

深入理解 Windows Server 的核心功能:现代 IT 架构的基石 在现代 IT 基础架构中,Windows Server 一直扮演着不可或缺的角色。它不仅是一个强大的服务器操作系统,更是企业级解决方案的核心支柱。从中小型企业到跨国公司,Windows Server 提供了从身份管理到高可用性的一系列…

Spark任务提交流程

当包含在application master中的spark-driver启动后&#xff0c;会与资源调度平台交互获取其他执行器资源&#xff0c;并通过反向注册通知对应的node节点启动执行容器。此外&#xff0c;还会根据程序的执行规划生成两个非常重要的东西&#xff0c;一个是根据spark任务执行计划生…

54,【4】BUUCTF WEB GYCTF2020Ezsqli

进入靶场 吓我一跳&#xff0c;但凡放个彭于晏我都不说啥了 提交个1看看 1 and 11 1# 还尝试了很多&#xff0c;不过都被过滤了&#xff0c;头疼 看看别人的WP 竟然要写代码去跑&#xff01;&#xff01;&#xff01;&#xff0c;不会啊&#xff0c;先用别人的代码吧&#xf…

Kivy App开发之UX控件Spinner选择框

Spinner也是一个下拉列表,在选择框中快速地从一组值中选择一个值,默认状态下,Spinner会显示当前text的属性值,点击时会显示一个下拉菜单,从其中选择一个新的值。 常用属性如下 属性说明values下拉列表的值,默认空列表[]is_open是否展开,默认falsesync_height是否更改下…

从零搭建一套远程手机的桌面操控和文件传输的小工具

从零搭建一套远程手机的桌面操控和文件传输的小工具 --ADB连接专题 一、前言 前面的篇章中&#xff0c;我们确定了通过基于TCP连接的ADB控制远程手机的操作思路。本篇中我们将进行实际的ADB桥接的具体链路搭建工作&#xff0c;从原理和实际部署和操作层面上&#xff0c;从零…

【深度学习实战】kaggle 自动驾驶的假场景分类

本次分享我在kaggle中参与竞赛的历程&#xff0c;这个版本是我的第一版&#xff0c;使用的是vgg。欢迎大家进行建议和交流。 概述 判断自动驾驶场景是真是假&#xff0c;训练神经网络或使用任何算法来分类驾驶场景的图像是真实的还是虚假的。 图像采用 RGB 格式并以 JPEG 格式…

如何使用MaskerLogger防止敏感数据发生泄露

关于MaskerLogger MaskerLogger是一款功能强大的记录工具&#xff0c;该工具可以有效防止敏感数据泄露的发生。 MaskerLogger旨在保护目标系统的日子安全&#xff0c;此格式化程序可确保你的日志安全并防止敏感数据泄露。例如使用此格式化程序&#xff0c;打印下列数据&#x…

android Recyclerview viewholder统一封装

Recyclerview holder 统一封装 ViewHolder类 import android.annotation.SuppressLint import android.content.Context import android.graphics.Color import android.graphics.drawable.GradientDrawable import android.os.Build import android.os.CountDownTimer import…

【md文档】公式简单介绍

在Markdown文档中&#xff0c;可以使用LaTeX语法来插入数学公式。以下是一些常见的LaTeX公式示例及其在Markdown中的写法&#xff1a; 1. 行内公式 行内公式使用单个美元符号 $ 包裹。 ‘’’ 这是一个行内公式&#xff1a;$E mc^2$效果&#xff1a; 这是一个行内公式&…

【网络协议】RFC3164-The BSD syslog Protocol

引言 Syslog常被称为系统日志或系统记录&#xff0c;是一种标准化的协议&#xff0c;用于网络设备、服务器和应用程序向中央Syslog服务器发送日志消息。互联网工程任务组&#xff08;IETF&#xff09;发布的RFC 3164&#xff0c;专门定义了BSD Syslog协议的规范和实现方式。通…

ArkUI概述

鸿蒙操作系统&#xff08;HarmonyOS&#xff09;是华为公司推出的一款面向未来、面向全场景的分布式操作系统。它不仅能够支持各种不同的设备&#xff0c;从手机、平板到智能穿戴和智能家居产品&#xff0c;而且为开发者提供了一套统一的开发环境和工具链。对于想要深入鸿蒙开发…

oracle使用case when报错ORA-12704字符集不匹配原因分析及解决方法

问题概述 使用oracle的case when函数时&#xff0c;报错提示ORA-12704字符集不匹配&#xff0c;如下图&#xff0c;接下来分析报错原因并提出解决方法。 样例演示 现在有一个TESTTABLE表&#xff0c;本表包含的字段如下图所示&#xff0c;COL01字段是NVARCHAR2类型&#xff0…

springboot医院信管系统

摘 要 随着信息技术和网络技术的飞速发展&#xff0c;人类已进入全新信息化时代&#xff0c;传统管理技术已无法高效&#xff0c;便捷地管理信息。为了迎合时代需求&#xff0c;优化管理效率&#xff0c;各种各样的管理系统应运而生&#xff0c;各行各业相继进入信息管理时代&a…

[leetcode](找到vector中的特定元素并删除)无重复字符的最长子串

一.找到vector中的特定元素并删除 #include <iostream> #include <vector> #include <algorithm> int main() { // 示例 vector std::vector<int> vec {1, 2, 3, 4, 5, 6}; // 要删除的元素 int aim 3; // 查找元素 auto it std::fin…

LabVIEW 实现线路板 PCB 可靠性测试

在电子设备制造领域&#xff0c;线路板 PCB&#xff08;Printed Circuit Board&#xff09;的可靠性直接影响产品的整体性能和使用寿命。企业在生产新型智能手机主板时&#xff0c;需要对 PCB 进行严格的可靠性测试&#xff0c;以确保产品在复杂环境下能稳定运行。传统的测试方…

【docker踩坑记录】

docker踩坑记录 踩坑记录(持续更新中.......)docker images 权限问题 踩坑记录(持续更新中…) docker images 权限问题 permission denied while trying to connect to the Docker daemon socket at unix:///var/run/docker.sock: Head "http://%2Fvar%2Frun%2Fdocker.s…