【Redis】redis高阶-使用zset实现延时队列

Hi,大家好,我是抢老婆酸奶的小肥仔。

最近在使用redis时,就想能不能用其实现消息队列?也在网上看了下其他小伙伴写的实现,结合自身业务实现了如下消息队列,希望对大家有用。

废话不多说,直接开撸。

1、为什么zset可以做消息队列?

首先我们来看下,设计消息队列需要考虑的需求:有序性,消息重复性,可靠性。

  • 有序性:zset所有元素可以根据成员关联的score来进行从低到高的排序,例如,我们可以利用时间戳来进行排序
  • 消息重复性:在zset中每个元素都是唯一的,这也保证了消息的唯一性
  • 可靠性:zset会自动维护元素之间的顺序,在添加或删除元素时无需手动排序,提升操作速度。

2、使用的zset命令

命令描述
zadd将一个给定score的成员添加到有序集合中,返回添加元素的个数
zrange根据元素在有序排序中的位置,从有序集合中获取多个元素
rank(K key, Object o)获取指定元素在集合中的索引,索引从0开始

3、代码实现

使用zset实现消息队列时,具体的流程,如下:

生产者流程:

  1. 用户获取消息Id,并封装消息体
  2. 用户发送数据到生产者,先获取锁
  3. 如果获取到锁,则校验该消息体是否已添加到队列中,已添加则直接返回提醒。
  4. 若未添加则调用方法将数据保存到zset集合中,否则等到指定时间后再获取锁。
  5. 推送数据后,释放锁

消费者流程:

  1. 调用方法获取数据
  2. 获取到数据,则直接返回,否则到指定时间后再次获取数据,直到获取到数据并返回。

统一返回类:

/*** @Author: jiangjs* @Description:* @Date: 2021/11/12 15:46**/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ResultUtil<T> implements Serializable {private int code;private String msg;private T data;public static <T> ResultUtil<T> success(){return ResultUtil.<T>builder().code(1000).msg("成功").build();}public static <T> ResultUtil<T> success(T data){return ResultUtil.<T>builder().code(1000).msg("成功").data(data).build();}public static <T> ResultUtil<T> error(String msg){return ResultUtil.<T>builder().code(5000).msg(msg).data(null).build();}public static <T> ResultUtil<T> error(int code,String msg){return ResultUtil.<T>builder().code(code).msg(msg).build();}
}

3.1 消息实体

需添加消息Id,主要防止消息重复消费。

/*** @author: jiangjs* @description: 消息实体* @date: 2023/5/30 11:11**/
@Data
@Accessors(chain = true)
public class QueueTask<T> {/*** 消息Id*/private String taskId;/*** 任务*/private T task;
}

3.2 队列类型

队列类型可以理解为队列的名称,通过枚举,可以随意添加队列名称。

/*** @author: jiangjs* @description: 队列类型* @date: 2023/5/30 10:53**/
public enum QueueTypeEnum {/*** 订单*/ORDER("order");private final String type;QueueTypeEnum(String type){this.type = type;}public String getType(){return type;}
}

3.3 创建消息工具

package com.jiashn.springbootproject.redis.utils;import com.jiashn.springbootproject.redis.domain.QueueTask;
import com.jiashn.springbootproject.utils.ResultUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;/*** @author: jiangjs* @description: redis实现消息队列* @date: 2023/5/30 10:51**/
public class RedisQueueUtil<T> {private static final Logger log = LoggerFactory.getLogger(RedisQueueUtil.class);private RedisTemplate<String,QueueTask<T>> redisTemplate;/*** 队列类型,即名称*/private final QueueTypeEnum typeEnum;public RedisQueueUtil(QueueTypeEnum typeEnum,RedisTemplate<String,QueueTask<T>> redisTemplate){this.typeEnum = typeEnum;this.redisTemplate = redisTemplate;}/*** 添加消息数据* @param queueTask 消息* @param time 延迟时间,单位s*/public ResultUtil<String> sendQueueTask(QueueTask<T> queueTask, long time){//加锁if (getLock()){try {Long rank = redisTemplate.opsForZSet().rank(typeEnum.getType(), queueTask);if (Objects.nonNull(rank)){return ResultUtil.error(6000,"消息数据已经存在,不予添加......");}Boolean result = redisTemplate.opsForZSet().add(typeEnum.getType(), queueTask, System.currentTimeMillis() + time*1000);if (Objects.nonNull(result) && result){log.info("添加消息数据成功:" + queueTask + ",添加时间:" + LocalDateTime.now());return ResultUtil.success("添加消息数据成功");}return ResultUtil.error("添加消息数据失败");}finally {//释放锁releaseLock();}} else {log.info("未获取到锁,稍后再试");return ResultUtil.error("未获取到锁,稍后再试");}}/*** 获取zset前count数据* @param count 数据数* @return 返回获取到数据*/public Set<QueueTask<T>> loopGetTask(int count) {//rangeByScore,根据score顺序获取zset数据的值return redisTemplate.opsForZSet().rangeByScore(typeEnum.getType(), 0, System.currentTimeMillis(), 0, count-1);}/*** 注销消息队列* @param typeEnum 消息队列名称*/public void destroy(QueueTypeEnum typeEnum){redisTemplate.opsForZSet().remove(typeEnum.getType());}/*** 获取任务Id* @return 返回消息Id*/public String getTaskId(){return typeEnum.getType() + "_" + UUID.randomUUID().toString().replace("-","");}/*** 获取锁* @return 返回加锁状态*/private boolean getLock(){Boolean absent = redisTemplate.opsForValue().setIfAbsent(typeEnum.getType() + "_Locked", null, 30L, TimeUnit.MINUTES);return Objects.nonNull(absent) ? absent : false;}/*** 释放锁*/public void releaseLock(){redisTemplate.delete(typeEnum.getType() + "_Locked");}
}

在消息工具类中,创建消息任务时添加了锁,只有在获取锁的前提下才能添加消息任务。

提供获取消息Id的方法是为了让提交消息任务前,先获取Id,即使在提交时网络发生问题,提交的Id还是同一个,再进行消息消费时,可以根据这个Id来进行判断该消息任务是否已被消费,被消费则直接丢弃。

3.4 消费消息

/*** @author: jiangjs* @description: 启动消费* @date: 2023/5/30 14:27**/
@Component
public class CustomerTaskLineRunner implements CommandLineRunner {@Resourceprivate RedisTemplate<String,QueueTask<String>> redisTemplate;private final static String QUEUE_TYPE = QueueTypeEnum.ORDER.getType();private final static Logger log = LoggerFactory.getLogger(CustomerTaskLineRunner.class);@Overridepublic void run(String... args) throws Exception {RedisQueueUtil<String> queueUtil = new RedisQueueUtil<>(QueueTypeEnum.ORDER,redisTemplate);while (true){Set<QueueTask<String>> queueTasks = queueUtil.loopGetTask(10);if (CollectionUtils.isNotEmpty(queueTasks)){for (QueueTask<String> queueTask : queueTasks) {//校验当前消息是否已消费,主要防止网络延时,导致多次提交同一任务 存在QueueTask<String> stringQueueTask = redisTemplate.opsForValue().get(QUEUE_TYPE + "_" + queueTask.getTaskId());if (Objects.nonNull(stringQueueTask)){log.info("该任务已经消费,不能重复消费");redisTemplate.opsForZSet().remove(QUEUE_TYPE,queueTask);continue;}Long removeNum = redisTemplate.opsForZSet().remove(QUEUE_TYPE,queueTask);if (Objects.nonNull(removeNum) && removeNum > 0){String task = queueTask.getTask();log.info("消费任务数据:" + task);//设置过期时间,10分钟内则默认是重复提交redisTemplate.opsForValue().set(QUEUE_TYPE + "_" + queueTask.getTaskId(),queueTask,10L, TimeUnit.MINUTES);}}}log.info("------1分钟后再次获取------");Thread.sleep(60000);}}
}

校验重复消息,若消息重复且在10分钟内未被消费,则直接将该消息从队列中删除。在消息任务被消费后,将数据从队列中移除。

执行结果:

谢谢大家,今天的分享就到这,不合理的地方希望大家指正。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/21471.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

hmcode硬件编程1

在/home/golemon/hmcode/applications/sample/wifi-iot/app内创建文件夹。 这里创建了d_6_3文件夹 . ├── BUILD.gn ├── d_6_3 │ ├── BUILD.gn │ └── lab.c ├── demolink │ ├── BUILD.gn │ └── helloworld.c ├── iothardware │ ├── B…

安装Lubuntu24.04

Lubuntu24.04安装过程与22.04、20.04等完全一致。 记录 01 02 03 04 05 09 给出提示 10 11 12 13 特点 Lubuntu 22.04的特点主要包括以下几点&#xff1a; 轻量级且高效&#xff1a;Lubuntu作为Ubuntu的一个轻量级分支&#xff0c;专注于为低端电脑、老旧电脑或需要最大限…

【Java】设计一个支持敏感数据存储和传输安全的加解密平台

一、问题解析 在一个应用系统运行过程中&#xff0c;需要记录、传输很多数据&#xff0c;这些数据有的是非常敏感的&#xff0c;比如用户姓名、手机号码、密码、甚至信用卡号等等。这些数据如果直接存储在数据库&#xff0c;记录在日志中&#xff0c;或者在公网上传输的话&…

如何在QGIS中加载MapBox图源

在设计行业中需要多风格地图的调用&#xff0c;不管是规划、建筑设计还是景观&#xff0c;分析图的工作量都大&#xff0c;有好的底图&#xff0c;会事半功倍。 针对不同项目&#xff0c;会选择不同配色的底图&#xff0c;以便让设计内容中的呈现足够清晰。 这里就来分享一个…

DP读书:《半导体物理学(第八版)》(七) 金属与半导体的接触- 10 min 速通(载流子分布)

《半导体物理学&#xff08;第八版&#xff09;》10 min 速通 金属与半导体的接触 7.1 金属与半导体的接触及其能带图7.1.1 金属和半导体的功函数7.1.2 接触电势差7.1.3 表面态对接触势垒的影响 7.2 金属半导体接触整流理论7.2.1 扩散理论7.2.2 热电子发射理论7.2.3 镜像力和隧…

深度神经网络——什么是梯度下降?

如果对神经网络的训练有所了解&#xff0c;那么很可能已经听说过“梯度下降”这一术语。梯度下降是提升神经网络性能、降低其误差率的主要技术手段。然而&#xff0c;对于机器学习新手来说&#xff0c;梯度下降的概念可能稍显晦涩。本文旨在帮助您直观理解梯度下降的工作原理。…

论文精读--Swin Transformer

想让ViT像CNN一样分成几个block&#xff0c;做层级式的特征提取&#xff0c;从而使提取出的特征有多尺度的概念 Abstract This paper presents a new vision Transformer, called Swin Transformer, that capably serves as a general-purpose backbone for computer vision. …

cesium 的初步认识

Cesium是一个基于JavaScript开发的WebGL三维地球和地图可视化库。它利用了现代Web技术&#xff0c;如HTML5、WebGL和WebAssembly&#xff0c;来提供跨平台和跨浏览器的三维地理空间数据可视化。Cesium的主要特点包括&#xff1a; 跨平台、跨浏览器&#xff1a;无需额外插件&am…

常见4种时间管理方法及实施步骤(收藏版)

有效的时间管理方法&#xff0c;不仅能够保证项目按时交付&#xff0c;还能提高开发效率&#xff0c;减少成本超支和质量风险。如果缺乏明确的时间规划&#xff0c;可能会导致任务延误&#xff1b;容易造成资源分配不当&#xff0c;导致整体效率低下和成本增加。 因此有效的时间…

docker 安装mysql,redis,rabbitmq

文章目录 docker 安装ngnix&#xff0c;mysql,redis,rabbitmq安装docker1.安装下载docker-ce源命令2.安装docker3.查看版本4.查看docker状态5.启动docker6.测试安装ngnix 安装mysql8.0.361.拉取mysql镜像2.安装mysql8 安装redis1.拉取redis7.0.11镜像2.安装redis3.进入容器内部…

独立游戏开发的 6 个步骤

&#x1f482; 个人网站:【 摸鱼游戏】【神级代码资源网站】【工具大全】&#x1f91f; 一站式轻松构建小程序、Web网站、移动应用&#xff1a;&#x1f449;注册地址&#x1f91f; 基于Web端打造的&#xff1a;&#x1f449;轻量化工具创作平台&#x1f485; 想寻找共同学习交…

高安全且适应不同业务模式的跨网文件交换系统

在当今的商业环境中&#xff0c;文件的快速和安全传输对于企业运营至关重要。特别是在金融、医疗和政府等对数据保护和合规性要求极高的领域&#xff0c;传统的文件传输方式已经显得力不从心。因此&#xff0c;跨网络文件交换系统成为了企业数据传输不可或缺的工具&#xff0c;…

文件访问被拒绝,原来可以这样处理!

在使用电脑的过程中&#xff0c;我们有时会遇到无法访问某些文件的情况&#xff0c;通常会看到“文件访问被拒绝”的错误提示。这种情况可能是由于文件权限设置问题、文件正在被其他程序使用、系统错误或者病毒感染等原因引起的。本文将介绍三种解决文件访问被拒绝问题的方法&a…

【遂愿赠书 - 1期】:安恒“网安三剑客”-大模型时代下的网络安全实战指南

文章目录 一、图书背景二、网安实战宝典2.1《内网渗透技术》2.2《渗透测试技术》2.3《Web应用安全》 三、校企合作&#xff0c;产学研结合四、大模型时代的数字安全五、 网络安全无小事 一、图书背景 大模型风潮已掀起&#xff0c;各大巨头争相入局&#xff0c;从ChatGPT到Sor…

【自然语言处理】Transformer中的一种线性特征

相关博客 【自然语言处理】【大模型】语言模型物理学 第3.3部分&#xff1a;知识容量Scaling Laws 【自然语言处理】Transformer中的一种线性特征 【自然语言处理】【大模型】DeepSeek-V2论文解析 【自然语言处理】【大模型】BitNet&#xff1a;用1-bit Transformer训练LLM 【自…

干货分享:搭建知识库系统的优势和技巧

如何搭建一个高效、实用的知识库系统成为很多企业绞尽脑汁的问题&#xff0c;知识库系统能够帮助我们整理、存储和快速检索各种知识信息。本文将给大家分享搭建知识库系统的优势以及技巧&#xff0c;接着往下看吧&#xff01; 一、搭建知识库系统的优势 提升工作效率&#xff1…

编辑任何场景! 3DitScene:通过语言引导的解耦 Gaussian Splatting开源来袭!

文章&#xff1a;https://arxiv.org/pdf/2405.18424 项目&#xff1a;https://zqh0253.github.io/3DitScene/ huggingface:https://huggingface.co/spaces/qihang/3Dit-Scene 场景图像编辑在娱乐、摄影和广告设计中至关重要。现有方法仅专注于2D个体对象或3D全局场景编辑&…

遥感卫星影像处理流程

当空中的遥感卫星获取了地球数字影像&#xff0c;并传回地面&#xff0c;是否工作就结束了&#xff1f;答案显然是否定的&#xff0c;相反&#xff0c;这正是遥感数字图像处理工作的开始。 遥感数字图像&#xff08;Digital image&#xff0c;后简称“遥感影像”&#xff09;是…

24、Linux网络端口

Linux网络端口 1、查看网络接口信息ifconfig ens33 eth0 文件 ifconfig 当前设备正在工作的网卡&#xff0c;启动的设备。 ifconfig -a 查看所有的网络设备。 ifconfig ens33 查看指定网卡设备。 ifconfig ens33 up/down 对指定网卡设备进行开关 基于物理网卡设备虚拟的…

Vue3生命周期钩子

Vue2和Vue3的生命周期对比 选项式API下的生命周期钩子组合式API下的生命周期钩子beforeCreate不需要&#xff0c;直接写到setup函数中created不需要&#xff0c;直接写到setup函数中beforeMountonBeforeMountmountedonMountedbeforeUpdateonBeforeUpdateupdatedonUpdatedbefor…