Redis 实现的延时队列组件

最近看开源看到一个好用的延时队列组件,已经上生产。代码量很少,主要就是利用Redis监听过期键实现的。然后搞点策略模式柔和柔和。利用Spring Start 封装了一下,全是俺掌握的知识,稍微研究了下就搞懂了。觉得挺有用的,这里分享一下。

Redis 过期键监听

之前写责任链手撸二级缓存的时候,也是借助过期键监听器来更新二级缓存的,详情挪步
CaffeineCache+Redis 接入系统做二层缓存,SPI 思路实现(借鉴 mybatis 二级缓存、自动装配源码)

效果

效果前提: Redis 开启了过期键通知:config set notify-keyspace-events Ex

根据 code 值发布延时任务(10s)。
在这里插入图片描述
对应的code 的处理器,10s后收到通知进行处理任务

在这里插入图片描述
在这里插入图片描述

基于这套组件可实现的功能:订单超时自动取消、会议前 30 分钟自动提醒、订单到点自动收货等,比MQ灵活性更高,RocketMq 老版只支持最高30 分钟的延时任务,这套组件可以指定任意时间。且可无限扩展 topic,满足不同类型的业务。缺点就是严重依赖Redis,需要保证Redis的高可用

RedisExpiredListener配置

利用ApplicationContextAware注册所有的messageHandleRouter处理器,当有消息过来时解析消息格式中的CODE,根据CODE把任务分发给具体的某个messageHandleRouter实现类进行处理。进行业务隔离。

package com.zzh.mybatisplus5.mq;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;import java.util.HashMap;public class RedisExpiredListener implements MessageListener, ApplicationContextAware {/*** 客户端监听订阅的topic,当有消息的时候,会触发该方法;* 并不能得到value, 只能得到key。* 姑且理解为: redis服务在key失效时(或失效后)通知到java服务某个key失效了, 那么在java中不可能得到这个redis-key对应的redis-value。*/protected HashMap<Integer, DelayedMessageHandler> handlerRouter;private static final Logger logger = LoggerFactory.getLogger(RedisExpiredListener.class);@Overridepublic void onMessage(Message message, byte[] bytes) {String expiredKey = message.toString();// TASK:CODE:VALUE结构String[] split = expiredKey.split(":");if (split.length < 2 || !expiredKey.startsWith("TASK:")) {return;}logger.info("[Redis键失效通知] key=" + expiredKey);StringBuilder value = new StringBuilder();for (int i = 2; i < split.length; i++) {value.append(split[i]);if (i != split.length - 1) {value.append(":");}}int code = Integer.parseInt(split[1]);DelayedMessageHandler handler = handlerRouter.get(code);if (handler != null) {handler.handle(value.toString());}}@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.handlerRouter = (HashMap<Integer, DelayedMessageHandler>) applicationContext.getBean("messageHandleRouter");}
}

DelayedMessageQueue实现类

基础配置类,里面配置了监听哪个 Redis 库的过期事件

package com.zzh.mybatisplus5.mq;import com.zzh.mybatisplus5.component.CacheComponent;
import org.springframework.beans.factory.annotation.Autowired;import java.util.concurrent.Callable;public class RedisNotifyDelayedMessageQueueImpl implements DelayedMessageQueue {@Autowiredprivate CacheComponent cacheComponent;@Overridepublic Boolean publishTask(Integer code, String value, Integer delay) {if (delay < 0) {delay = 1;}cacheComponent.putRaw(assembleKey(code, value), "", delay);return true;}@Overridepublic Boolean deleteTask(Integer code, String value) {cacheComponent.del(assembleKey(code, value));return true;}@Overridepublic Long getTaskTime(Integer code, String value) {return cacheComponent.getKeyExpire(assembleKey(code, value));}@Overridepublic Boolean publishTask(Callable task, Integer delay) {throw new RuntimeException();}public String assembleKey(Integer code, String value) {if (value == null) {value = "";}StringBuilder sb = new StringBuilder("TASK:");sb.append(code + ":");sb.append(value);return sb.toString();}
}

Redis 配置

package com.zzh.mybatisplus5.mq;import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.*;
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;@Configuration
public class RedisAutoConfig {private static final Logger logger = LoggerFactory.getLogger(RedisAutoConfig.class);@Value("${spring.redis.database}")private Integer cacheDB;@Beanpublic Map<Integer, DelayedMessageHandler> messageHandleRouter(List<DelayedMessageHandler> delayedMessageHandlerList) {return delayedMessageHandlerList.stream().collect(Collectors.toMap(DelayedMessageHandler::getCode, v -> v));}@Beanpublic RedisExpiredListener redisExpiredListener() {return new RedisExpiredListener();}/*** 指定 redis 库运行 config set notify-keyspace-events Ex 即可,不然监听无法生效* redis服务端需要配置 notify-keyspace-events 参数 ,至少包含k或者e* K 键空间通知,所有通知以 __keyspace@<db>__ 为前缀* E 键事件通知,所有通知以 __keyevent@<db>__ 为前缀* g DEL 、 EXPIRE 、 RENAME 等类型无关的通用命令的通知* $ 字符串命令的通知* l 列表命令的通知* s 集合命令的通知* h 哈希命令的通知* z 有序集合命令的通知* x 过期事件:每当有过期键被删除时发送* e 驱逐(evict)事件:每当有键因为 maxmemory 政策而被删除时发送* A 参数 g$lshzxe 的别名** @后边可以指定db库,*代表所有库,0代表0库 __keyevent@0__:expired 0库过期的数据* __keyspace@0__:mykey   0库mykey这个键的所有操作* __keyevent@0__:del     0库所有del这个命令*/@Beanpublic RedisMessageListenerContainer container(LettuceConnectionFactory defaultLettuceConnectionFactory, RedisExpiredListener expiredListener) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(defaultLettuceConnectionFactory);//监听指定库的过期keycontainer.addMessageListener(expiredListener, new PatternTopic("__keyevent@" + cacheDB + "__:expired"));return container;}@Beanpublic DelayedMessageQueue delayedMessageQueue() {return new RedisNotifyDelayedMessageQueueImpl();}@Beanpublic LettuceConnectionFactory defaultLettuceConnectionFactory(RedisConfiguration defaultRedisConfig,GenericObjectPoolConfig defaultPoolConfig) {LettuceClientConfiguration clientConfig =LettucePoolingClientConfiguration.builder().commandTimeout(Duration.ofMillis(5000)).poolConfig(defaultPoolConfig).build();return new LettuceConnectionFactory(defaultRedisConfig, clientConfig);}@Beanpublic RedisTemplate<String, String> redisTemplate(LettuceConnectionFactory defaultLettuceConnectionFactory) {RedisTemplate<String, String> redisTemplate = new RedisTemplate<>();redisTemplate.setConnectionFactory(defaultLettuceConnectionFactory);return redisTemplate;}@Beanpublic StringRedisTemplate stringRedisTemplate(LettuceConnectionFactory defaultLettuceConnectionFactory) {StringRedisTemplate stringRedisTemplate = new StringRedisTemplate();stringRedisTemplate.setConnectionFactory(defaultLettuceConnectionFactory);return stringRedisTemplate;}@Configurationpublic static class DefaultRedisConfig {@Value("${spring.redis.master-name}")private String masterName;@Value("${spring.redis.mode}")private String mode;@Value("${spring.redis.host:127.0.0.1:6379}")private String host;@Value("${spring.redis.password:}")private String password;@Value("${spring.redis.database:0}")private Integer database;@Value("${spring.redis.lettuce.pool.max-active:8}")private Integer maxActive;@Value("${spring.redis.lettuce.pool.max-idle:8}")private Integer maxIdle;@Value("${spring.redis.lettuce.pool.max-wait:-1}")private Long maxWait;@Value("${spring.redis.lettuce.pool.min-idle:0}")private Integer minIdle;@Beanpublic GenericObjectPoolConfig defaultPoolConfig() {GenericObjectPoolConfig config = new GenericObjectPoolConfig();config.setMaxTotal(maxActive);config.setMaxIdle(maxIdle);config.setMinIdle(minIdle);config.setMaxWaitMillis(maxWait);return config;}@Beanpublic RedisConfiguration defaultRedisConfig() {return getRedisConfiguration(masterName, mode, host, password, database);}}private static RedisConfiguration getRedisConfiguration(String masterName, String mode, String host, String password, Integer database) {if (mode.equals("single")) {RedisStandaloneConfiguration config = new RedisStandaloneConfiguration();String[] hostArray = host.split(":");config.setHostName(hostArray[0]);config.setPassword(RedisPassword.of(password));config.setPort(Integer.parseInt(hostArray[1]));config.setDatabase(database);return config;} else if (mode.equals("sentinel")) {RedisSentinelConfiguration configuration = new RedisSentinelConfiguration();configuration.setMaster(masterName);String[] hostList = host.split(",");List<RedisNode> serverList = new LinkedList<>();for (String hostItem : hostList) {String[] hostArray = hostItem.split(":");RedisServer redisServer = new RedisServer(hostArray[0], Integer.parseInt(hostArray[1]));serverList.add(redisServer);}configuration.setSentinels(serverList);logger.info("[Redis] 哨兵节点: masterName={}, host={}", masterName, host);return configuration;} else {return null;}}}

顶级策略接口

没啥好说的,老三样

package com.zzh.mybatisplus5.mq;import java.util.concurrent.Callable;public interface DelayedMessageQueue {/*** 添加延迟秒数,RingDelayedMessageQueueImpl专用,单机实现* * @param delay seconds* @return*/public Boolean publishTask(Callable task, Integer delay);/*** RedisNotifyDelayedMessageQueueImpl专用,集群实现* 这两个都会被拼接为 TASK:(随机码):CODE:VALUE 当成key存入redis中,因为回调时只会返回key,而不会返回key对应的值* @param code 回调时用来选择的Handler的CODE* @param value 回调时使用的值* @param delay 多少秒后调用* @return*/public Boolean publishTask(Integer code, String value, Integer delay);/*** 删除已有任务* @param code* @param value* @return*/public Boolean deleteTask(Integer code, String value);/*** 获取指定任务还有多少时间执行,如果不存在,返回-2* @param code* @param value* @return*/public Long getTaskTime(Integer code, String value);
}
package com.zzh.mybatisplus5.mq;/*** 延迟消息处理器*/
public interface DelayedMessageHandler {/**** @param value* @return 处理成功的返回大于0结果,失败返回0*/public int handle(String value);public int getCode();
}

延时队列设计思路

和我之前使用策略模式封装的多个OSS使用,写法简直是一毛一样,详情挪步。策略模式调优(多Oss存储导致代码冗余的问题)在这里插入图片描述

延时队列消息丢失怎么解决

开个定时任务,每隔一分钟定时进行扫表,加了索引、延时消息丢失不多的情况下,查数据会很快。扫到有超时的订单,接着丢到 Redis 延时队列里面,双重保险。同时定时任务加个分布式锁,一台机器运行即可。​代码爆红是因为,我拉的开源项目,没跑起来直接看的源码。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/46315.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

006-三台交换机堆叠

三台交换机堆叠 链形连接和环形连接 链形配置IRF与环形配置IRF的区别 三个交换机链形配置IRF与三个交换机环形配置IRF的主要区别体现在以下几个方面&#xff1a; 物理位置要求&#xff1a; 链形连接&#xff1a;对成员设备的物理位置要求相对较低&#xff0c;主要适用于成员…

HybridCLR + Addressable 热更新篇(一)

目录 前言一、HybridCLR 和 Addressable 是什么&#xff1f;1. HybridCLR2. Addressable 二、使用步骤1.HybridCLR导入2.HybridCLR配置3.Addressable导入4.Addressable配置 前言 随着移动互联网和游戏行业的快速发展&#xff0c;热更新技术变得越来越重要。热更新能够在不重新…

图——定义和基本术语

图是数据结构中非常重要的一章&#xff0c;这篇文章就先介绍一下图的定义和基本术语。 一&#xff0c;图的构成 图&#xff1a;Graph(V,E) V&#xff1a;顶点(数据元素)的有穷非空集合&#xff1b; E&#xff1a;边的有穷集合。 如下面这个图&#xff0c;由点集和边集可以确定…

mupdf 编译说明

进入官网下载源码&#xff1a;https://www.mupdf.com/releases 挑选需要的版本&#xff0c;下载解压&#xff0c;然后打开解决方案&#xff0c;进行编译

python 怎样生成窗体

通过import tkinter导入Tkinter模块&#xff0c;没有这句下面的都不成立了。 wintkinter.Tk()&#xff0c;这句是创建windows的窗口对象&#xff0c;注意后面的Tk&#xff0c;大小写。 win.title("窗口")&#xff0c;这段是设置窗口上的标题。 另外窗口的大小你可以通…

java实战项目-学生管理系统(附带全套源代码)--《基础篇》

一、前言 第一个java小型学生管理系统&#xff0c;思路和其他语言都一样&#xff0c;因为有C语言的基础&#xff0c;写这个并不是太难&#xff0c;不过&#xff0c;进阶篇的就难太多了。明天晚上更新进阶篇&#xff0c;因为目前代码还没有完善&#xff0c;保守估计需要500行代…

网络请求优化:如何让你的API飞起来

网络请求优化&#xff1a;如何让你的API飞起来 亲爱的开发者朋友们&#xff0c;你是否曾经遇到过这样的场景:用户疯狂点击刷新按钮,你的服务器却像老年人散步一样慢吞吞地响应。或者,你的应用像个贪吃蛇,疯狂吞噬用户的流量包。如果你对这些情况再熟悉不过,那么恭喜你,你正需要…

Unity ColorSpace 之 【颜色空间】相关说明,以及【Linear】颜色校正 【Gamma】的简单整理

Unity ColorSpace 之 【颜色空间】相关说明&#xff0c;以及【Linear】颜色校正 【Gamma】的简单整理 目录 Unity ColorSpace 之 【颜色空间】相关说明&#xff0c;以及【Linear】颜色校正 【Gamma】的简单整理 一、简单介绍 二、在Unity中设置颜色空间 三、Unity中的Gamma…

部队物资仓库出入库管理系统|实现物资有效的战备保障

随着科技的不断发展&#xff0c;智慧营区已成为现代军事管理的重要方向。后勤物资管控作为营区管理的重要组成部分&#xff0c;对于保障营区正常运转和提高部队战斗力具有重要意义。智慧营区后勤物资管控平台作为数字化后勤建设的重要组成部分&#xff0c;能够实现营区物资的智…

Ubuntu下载安装chrome浏览器

方法一&#xff1a;wget下载并安装 1、创建文件夹存安装包 cd /root/Downloads mkdir chrome 2、下载安装包到文件夹内 wget -c https://dl.google.com/linux/direct/google-chrome-stable_current_amd64.deb -P /root/Downloads/chrome 3、安装 cd chrome sudo dpkg -i go…

药品类别功能助力智慧校园医务管理向前迈进

在智慧校园的医务管理框架下&#xff0c;药品类别管理模块发挥着举足轻重的作用&#xff0c;它以智能化的方式优化药品的存储、分配流程&#xff0c;确保每一步都符合安全与效率的标准。这一功能围绕着科学分类的核心理念&#xff0c;细致入微地组织药品信息&#xff0c;为校园…

TCP传输控制协议二

TCP 是 TCP/IP 模型中的传输层一个最核心的协议&#xff0c;不仅如此&#xff0c;在整个 4 层模型中&#xff0c;它都是核心的协议&#xff0c;要不然模型怎么会叫做 TCP/IP 模型呢。 它向下使用网络层的 IP 协议&#xff0c;向上为 FTP、SMTP、POP3、SSH、Telnet、HTTP 等应用…

威纶通触摸屏连接MySQL数据库步骤

目录 概要威纶通支持数据库的触摸屏类型测试Step 1 选择触摸屏型号Step 2 新增数据库服务器Step 3 添加SQL数据库查询功能Step 4 仿真测试 概要 通过使用威纶通带数据库类型的触摸屏&#xff0c;实现连接本地/远程MySQL数据库&#xff0c;并实现数据查询功能 威纶通支持数据库…

专业条码二维码扫描设备和手机二维码扫描软件的区别?

条码二维码技术已广泛应用于我们的日常生活中&#xff0c;从超市结账到公交出行&#xff0c;再到各类活动的入场验证&#xff0c;条码二维码的便捷性不言而喻&#xff0c;而在条码二维码的扫描识别读取过程中&#xff0c;专业扫描读取设备和手机二维码扫描软件成为了两大主要工…

ssh升级

文章目录 ssh升级一、解包ssh、ssl二、更新安装ssl三、手动更新手动复制库文件四、创建符号链接五、更新库路径六、验证库文件七、设置库路径环境变量八、配置、编译、安装OpenSSH&#xff1a;意外&#xff1a;缺少 zlib 的开发库解决方法&#xff1a; 九、刷新ssh服务、查看ss…

力扣第九题

回文数 提示&#xff1a; 给你一个整数 x &#xff0c;如果 x 是一个回文整数&#xff0c;返回 true &#xff1b;否则&#xff0c;返回 false 。 回文数是指正序&#xff08;从左向右&#xff09;和倒序&#xff08;从右向左&#xff09;读都是一样的整数。 代码展示&#…

AI绘画Stable Diffusion 零基础入门 —AI 绘画原理与工具介绍,万字解析AI绘画的使用教程

大家好&#xff0c;我是设计师阿威 想要入门 AI 绘画&#xff0c;首先需要了解它的原理是什么样的。 其实很早就已经有人基于深度学习模型展开了对图像生成的研究了&#xff0c;但在那时&#xff0c;生成的图像分辨率和内容都非常抽象。 直到近两年&#xff0c;AI 产出的图像…

防火墙nat基础实验

一&#xff0c;实验拓扑&#xff1a; 二&#xff0c;实验需求&#xff1a; 1&#xff0c;办公区设备可以通过电信链路和移动链路上网(多对多的NAT&#xff0c;并且需要保留一个公网IP不能用来转换) 2&#xff0c;分公司设备可以通过总公司的移动链路和电信链路访问到Dmz区的ht…

俄罗斯VK 平台广告投放的注意事项有哪些?

以下是为您制定适合 VK 平台的广告投放策略的一些建议&#xff1a; 市场调研 深入了解俄罗斯市场的需求、趋势和竞争情况。 分析目标受众的兴趣、行为和消费习惯&#xff0c;以确定最有潜力的细分市场。 明确目标 确定具体、可衡量的广告目标&#xff0c;例如增加品牌知名度…

Appium自动化测试系列: 2. 使用Appium启动APP(真机)

历史文章&#xff1a;Appium自动化测试系列: 1. Mac安装配置Appium_mac安装appium-CSDN博客 一、准备工作 1. 安卓测试机打开调试模式&#xff0c;然后使用可以传输数据的数据线连接上你的电脑。注意&#xff1a;你的数据线一定要支持传输数据&#xff0c;有的数据线只支持充…