Java通过Redis进行延时队列,定时发布消息(根据用户选择时间进行发布)

前言

目前很多产品都用到过定时发布或者定时推送等功能,定时推送有两种定义,一种是后台自己有相关规则,通过定时器设置好相应的时间进行推送(例如定时任务框架QuartZ、xxl-job等实现,或者通过springboot自带定时任务@Scheduled注解等实现),这些都是基于后台设定的规则来进行定时推送。
还有一种场景便是根据用户自己选择想要的时间进行推送,这时候再用到上面的方法来做会比较麻烦和复杂,就需要用到延时队列来实现

实现方式

在做这个功能之前,我在网上查阅了想要实现这种根据用户选择时间来推送的相关资料,发现方式还是挺多的,包括但不限于以下几种:

  1. 最简单暴力的方法,通过上述的定时任务框架或者springboot自带的定时器来实现,把cron表达式书写为每分钟一次,然后每分钟都去检查是否和用户设置的时间能匹配上,如能匹配上就进行相关的业务操作
  2. 通过实现springboot自带的SchedulingConfigurer接口来进行动态任务调用
  3. 通过DelayQueue队列进行实现
  4. 通过MQ中间件的发送消费来实现
  5. 通过Redis设置key过期时间触发来进行实现

    上面的几种实现第1、2点比较简单,也很有效果,但是容易出现效率问题和准确性的问题,下面45点的比较不错,但是相比较起来学习成本会高一些,具体实现的思路差不太多,这些都有相关的资料,通过上面的关键字搜索便能查阅到

功能实现

看了那么多的方案之后再结合自身的项目,最终决定用一种新的方案来实现,通过Redis自带的DelayedQueue延时队列来完成,和上面的第45点其实思路差不太多,只不过这个更简单方便一点

定义一个实体类来进行配置

@Data
public class TaskBodyDto implements Serializable {/*** 重试最大次数*/public static final int MAX_RETRY = 3;private String idKey;private String beanName;private String methodName;private Map<String, Object> paramMap;/*** 重试计时器*/private int cnt;/*** 延迟的时间*/private long delay;/*** 延迟的时间单位*/private TimeUnit timeUnit;}

定义RedissonDelayQueue类

@Slf4j
@Component
public class RedissonDelayQueueDemo {@Resourceprivate RedissonClient redissonClient;private RBlockingQueue rBlockingQueue;private RDelayedQueue rDelayedQueue;@PostConstructprivate void init() {rBlockingQueue = redissonClient.getBlockingQueue(TaskListener.class.getName());rDelayedQueue = redissonClient.getDelayedQueue(rBlockingQueue);}public void add(TaskBodyDto reqVo) {rDelayedQueue.offer(reqVo, reqVo.getDelay(), reqVo.getTimeUnit());log.info("增加了延时队列{}", reqVo);}/*** 增加订单延时队列 -单位为秒** @param id         id,传入一个唯一标识,可以是业务ID* @param beanName   类名* @param methodName 方法名* @param paramMap   参数* @param delay      延迟时间*/public void add(String id, String beanName, String methodName, Map<String, Object> paramMap, long delay) {TaskBodyDto reqVo= new TaskBodyDto();String idKey = beanName + ":" + methodName + ":" + id;log.info("增加了延时队列" + idKey);reqVo.setIdKey(idKey);reqVo.setBeanName(beanName);reqVo.setMethodName(methodName);reqVo.setParamMap(paramMap);reqVo.setTimeUnit(TimeUnit.SECONDS);reqVo.setDelay(delay);this.add(reqVo);}/*** 删除延时队列** @param id         id,传入一个唯一标识,可以是业务ID* @param beanName   类名* @param methodName 方法名*/public void remove(String id, String beanName, String methodName) {String idKey = beanName + ":" + methodName + ":" + id;log.info("删除了延时队列:" + idKey);RDelayedQueue<TaskBodyDto> delayedQueue = rDelayedQueue;Stream<TaskBodyDto> stream = delayedQueue.stream().filter(s -> idKey.equals(s.getIdKey()));List<TaskBodyDto> c = stream.collect(Collectors.toList());if (!c.isEmpty()) {
//            log.info("删除延时队列{}", c);delayedQueue.remove(c.get(0));}}}

然后写一个工具类方便调用

@Component
public class RedissionDelayQueueUtils {@Autowiredprivate RedissonDelayQueue redissonDelayQueue;@Autowiredprivate static RedissonDelayQueue staticRedissonDelayQueue;@PostConstructpublic void init() {staticRedissonDelayQueue = redissonDelayQueue;}/*** 添加定时任务* @param id 唯一标识,可以是业务ID* @param paramMap 参数 key-value* @param beanName bean类名称 注意类名需要小写* @param methodName 方法名* @param seconds 延迟时间 单位为秒*/public static void addDelayQueue(String id, Map<String, Object> paramMap, String beanName, String methodName, Integer seconds) {staticRedissonDelayQueue.add(id, beanName, methodName, paramMap, seconds);}/*** 删除定时任务* @param id 唯一标识,可以是业务ID* @param beanName bean类名称 注意类名需要小写* @param methodName 方法名*/public static void removeDelayQueue(String id, String beanName, String methodName) {staticRedissonDelayQueue.remove(id, beanName, methodName);}}

然后再配置好监听器,在监听器里面通过反射获取到相关的方法然后执行里面的业务

@Slf4j
@Component
public class TaskListener implements RedisDelayedQueueListener<TaskBodyDto> {private static final List<Class> WRAP_CLASS = Arrays.asList(Integer.class, Boolean.class, Double.class, Byte.class, Short.class, Long.class, Float.class, Double.class, BigDecimal.class, String.class);//队列Queue@Autowiredprivate RedissonDelayQueue redissonDelayQueue;@Autowiredprivate TaskSender taskSender;@Overridepublic void invoke(TaskBodyDto reqVo) {log.info("开始执行监听...{}", reqVo);reqVo.setCnt(reqVo.getCnt() + 1);try {Object bean = ApplicationContextUtil.getBean(reqVo.getBeanName());Method method = ReflectUtil.getMethodByName(bean.getClass(), reqVo.getMethodName());Class target = AopUtils.getTargetClass(bean);Method targetMethod = ReflectUtil.getMethodByName(target, reqVo.getMethodName());List<Object> objects = getMethodParamList(targetMethod, reqVo.getParamMap());method.invoke(bean, objects.toArray());} catch (Exception e) {log.error("invoke task err!", e);if (reqVo.getCnt() > TaskBodyDto.MAX_RETRY) {log.error("重试次数超过最大次数,不再重试。");DeadQueDto deadQueDto = new DeadQueDto();deadQueDto.setBeanName(reqVo.getBeanName());deadQueDto.setMethodName(reqVo.getMethodName());deadQueDto.setParamMap(reqVo.getParamMap());taskSender.sendTask(deadQueDto);} else {//重试,30分钟后重试,秒为单位则用原数据if (reqVo.getTimeUnit().name().equals(TimeUnit.DAYS.name()) || reqVo.getTimeUnit().name().equals(TimeUnit.HOURS.name())) {reqVo.setDelay(30);reqVo.setTimeUnit(TimeUnit.MINUTES);redissonDelayQueue.add(reqVo);} else if (reqVo.getTimeUnit().name().equals(TimeUnit.MINUTES.name()) && reqVo.getDelay() > 30) {reqVo.setDelay(30);reqVo.setTimeUnit(TimeUnit.MINUTES);redissonDelayQueue.add(reqVo);} else {redissonDelayQueue.add(reqVo);}}}}private List<Object> getMethodParamList(Method method, Map<String, Object> paramMap) throws Exception {List<Object> objectList = new ArrayList<>();// 利用Spring提供的类获取方法形参名DefaultParameterNameDiscoverer nameDiscoverer = new DefaultParameterNameDiscoverer();String[] param = nameDiscoverer.getParameterNames(method);for (int i = 0; i < method.getParameterTypes().length; i++) {Class<?> parameterType = method.getParameterTypes()[i];Object object = null;// 基本类型不支持,支持包装类String paramKey = param[i];if (WRAP_CLASS.contains(parameterType)) {if (param != null && paramMap.containsKey(paramKey)) {object = paramMap.get(paramKey);object = ConvertUtils.convert(object, parameterType);}} else if (!parameterType.isPrimitive()) {if (parameterType.isAssignableFrom(List.class) || parameterType.isAssignableFrom(Map.class) || parameterType.isAssignableFrom(Set.class)) {object = paramMap.get(paramKey);} else {object = parameterType.newInstance();BeanUtils.populate(object, paramMap);}}objectList.add(object);}return objectList;}}

都配置好之后,可以写个方法进行测试
比如我要三十分钟之后执行test方法

public class Test {private void test(String name,String value){//执行业务代码}}

然后在需要执行这个功能的地方进行调用,比如用户在界面选择了发布时间之后,后端接口收到请求进行处理

						//延时队列Map<String, Object> map = new HashMap<>();map.put("name","张三");map.put("value","这是value");RedissionDelayQueueUtils.addDelayQueue("唯一标识",map,"test","test", (int) DateUtil.between("用户选择的时间",new Date(), DateUnit.SECOND));

注意事项:这上面的map便是被执行的方法需要的一些参数,切记不能直接传入Object类,只能通过基本数据类型进行传递,传入的bean类名也需要小写,DateUtil.between()这个方法是用的hutool工具类里面的日期工具类,为了算出用户选择的时间和当前时间相差多少秒,可自行更改为适合自己的方法,反正最后只需要取到两者时间差多少秒即可

后续redis的配置那些照常配置即可

总结

总结下来其实思路还是比较明确,就是通过redis延时队列的机制,这边配置好相关的参数然后加入到redis里面去,配置好监听器之后由redis进行监听触发,然后再通过反射的方式取到需要执行的bean和方法进行执行即可,其实延时队列的方法很多,我上面还推荐了一些其他的方法,通过给出的关键字即可查阅相关的资料,总之根据自身的情况选择最适合的方法就行

最后不管采取哪种方式,建议在触发以及执行的地方及时把日志打印出来,方便后期调试以及对问题的定位

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/181131.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

什么时候适合做ui自动化测试?什么时候做接口自动化测试

UI自动化测试和接口自动化测试都是软件测试中非常重要的部分&#xff0c;它们各自有适合的应用场景。 适合做UI自动化测试的场景包括&#xff1a; 用户界面&#xff08;UI&#xff09;变化频繁的应用程序。需要测试用户交互和流程的应用程序。需要验证页面布局、样式和交互的…

PHP连接数据库 错误抑制 三元运算符 学习资料

PHP连接数据库 PHP可以通过不同的扩展和库来连接各种类型的数据库。下面是一个使用MySQL数据库的连接示例&#xff1a; <?php $servername "localhost"; $username "your_username"; $password "your_password"; $dbname "your_d…

华为P40无法链接adb的解决记录

真的很讨厌华为的设备&#xff0c;很多东西啥设备都能跑得好好的&#xff0c;就华为会出问题&#xff0c;简直就是手机界的IE。 情况&#xff1a;突然无法链接adb到P40&#xff0c;拔插无效&#xff0c;关闭开发人员选项再打开也无效&#xff0c;撤销USB调试授权也无效&#x…

使用elasticsearch-head插件修改elasticsearch数据

1、先使用elasticsearch-head插件基本查询功能找到要修改的数据&#xff0c;看看是否存在 2、切换到elasticsearch-head复合查询界面&#xff0c;输入数据修改地址&#xff1a; http://es的ip地址:端口号/索引名称/文档类型&#xff08;没特殊设置过就是_doc&#xff09;/文档…

Unity 轨道展示系统(DollyMotion)

DollyMotion &#x1f371;功能展示&#x1f959;使用&#x1f4a1;设置路径点&#x1f4a1;触发点位切换&#x1f4a1;动态更新路径点&#x1f4a1;事件触发&#x1f4a1;设置路径&#x1f4a1;设置移动方案固定速度方向最近路径方向 &#x1f4a1;设置移动速度曲线 传送门 &a…

小程序开发中SSL证书的重要作用

随着互联网技术的发展&#xff0c;越来越多的企业和个人开始开发自己的小程序来满足各种需求。然而&#xff0c;在这个过程中&#xff0c;安全性和稳定性成为了开发者必须关注的重点之一。为了保障用户的隐私安全和体验效果&#xff0c;越来越多的小程序开发者开始采用SSL证书进…

Python的哈希映射:字典

# 创建一个字典 my_dict {name: John, age: 25, city: New York}# 访问字典中的值 print(my_dict[name]) # 输出: John# 添加新的键值对 my_dict[gender] Male# 更新字典中的值 my_dict[age] 26# 删除键值对 del my_dict[city]# 检查键是否存在 if name in my_dict:print(N…

RabbitMQ高级特性2 、TTL、死信队列和延迟队列

MQ高级特性 1.削峰 设置 消费者 测试 添加多条消息 拉取消息 每隔20秒拉取一次 一次拉取五条 然后在20秒内一条一条消费 TTL Time To Live&#xff08;存活时间/过期时间&#xff09;。 当消息到达存活时间后&#xff0c;还没有被消费&#xff0c;会被自动清除。 RabbitMQ…

linaro交叉编译工具链下载与使用笔记

笔记 文章目录 笔记确定目标 &#xff08;aarch64&#xff09;选择版本&#xff08;7.5&#xff09;选择目标&#xff08;aarch64-linux-gnu&#xff09;下载地址工具链&#xff08;gcc-linaro-7.5.0-2019.12-x86_64_aarch64-linux-gnu.tar.xz&#xff09;编译测试 &#xff08…

ICC2/innovus设置no 1x gap的方法

我正在「拾陆楼」和朋友们讨论有趣的话题,你⼀起来吧? 拾陆楼知识星球入口 ICC2设置no 1x的方法如下: 1) set_placement_spacing_label -name X -lib_cells {*} -side right set_placement_spacing_label -name Y -lib_cells {*} -side left 2) set_placement_spacing_rul…

Vue2 若依框架头像上传 全部代码

<template><div><div class"user-info-head" click"editCropper()"><img v-bind:src"options.img" title"点击上传头像"class"img-circle img-lg" /></div><el-dialog :title"title&…

什么是高层设计 - 学习系统设计

高层设计或HLD指的是整体系统设计&#xff0c;包括系统架构和设计的描述&#xff0c;是一种通用的系统设计&#xff0c;包括&#xff1a; •系统架构•数据库设计•对系统、服务、平台和模块之间关系的简要描述。 高层设计或HLD也被称为宏观级别设计。 什么是高层设计文档&…

解锁 ElasticJob 云原生实践的难题

发生了什么 最近在逛 ElasticJob 官方社区时发现很多小伙伴都在头疼这个 ElasticJob 上云的问题&#xff0c;ElasticJob 本就号称分布式弹性任务调度框架&#xff0c;怎么在云原生环境就有了问题了呢&#xff0c;这就要从 Kubenertes 和 ElasticJob 的一些状态化说起。 有意思的…

1076 Forwards on Weibo (链接表层序遍历)

题意&#xff1a;给出关注列表&#xff0c;博主的粉丝会给博主点赞&#xff0c;粉丝的粉丝也会给博主点赞&#xff0c;一直递推到最多L层&#xff0c;求&#xff0c;最后会有多少人给博主点赞。 思路&#xff1a;将关注的粉丝用链接表存储&#xff0c;再对博主进行层序遍历&am…

2023年生肖在不同时间段的运势预测

随着信息技术的飞速发展&#xff0c;API已经成为了数据获取和交互的重要途径。很多网站和APP都在运用API来获取数据。今天我们来介绍一个十分有趣的API——《十二生肖运势预测API》&#xff0c;通过这个API&#xff0c;我们可以获取到每个生肖在不同时间段的运势预测&#xff0…

linux(2)之buildroot使用手册

Linux(2)之buildroot配置toolchain Author&#xff1a;Onceday Date&#xff1a;2023年11月27日 漫漫长路&#xff0c;才刚刚开始… 参考文档&#xff1a; Buildroot - Making Embedded Linux Easy 文章目录 Linux(2)之buildroot配置toolchain1. 构建配置1.1 配置config生成…

Java NIO SelectionKey

在 Java NIO&#xff08;New I/O&#xff09;中&#xff0c;SelectionKey 是与选择器 Selector 绑定的对象&#xff0c;用于表示通道 Channel 注册到选择器上的状态和事件。SelectionKey 提供了管理和操作通道的能力&#xff0c;可以监视通道的可读、可写、连接和接受事件&…

【LeetCode:1670. 设计前中后队列 | 数据结构设计】

&#x1f680; 算法题 &#x1f680; &#x1f332; 算法刷题专栏 | 面试必备算法 | 面试高频算法 &#x1f340; &#x1f332; 越难的东西,越要努力坚持&#xff0c;因为它具有很高的价值&#xff0c;算法就是这样✨ &#x1f332; 作者简介&#xff1a;硕风和炜&#xff0c;…

Python怎么在py文件中执行某个命令行,这个命令行是运行另外一个程序的命令,例如“python aa.py”

1、使用os.system os.system方法可以用来运行命令行命令。它比subprocess简单&#xff0c;但功能也更有限&#xff0c;不如subprocess那样灵活。 import oscommand "python properties_computer/on.py --input_datasets 12.csv" os.system(command) 2、通过“subp…

微信小程序仿网易严选(附精选源码32套,涵盖商城团购等)

商城主要实现的功能 首页、专题、分类、购物车、我的小程序授权登陆获取用户信息首页包含品牌制造页、品牌制造详情页面、新品首发页面、人气推荐页面、各分类列表商品详情页面&#xff0c;包含常见问题、大家都在看商品列表、加入购物车、收藏商品、立即购买、下订单、选择收…