延迟队列实现方案总结

日常开发中,可能会遇到一些延迟处理的消息任务,例如以下场景

①订单支付超时未支付
②考试时间结束试卷自动提交
③身份证或其他验证信息超时未提交等场景。
④用户申请退款,一天内没有响应默认自动退款等等。

如何处理这类任务,最简单的方法就是将消息插入到数据库,然后使用定时任务扫描数据库。但是如果如果大量用户请求需要处理,就需要线程频繁的连接数据库,这样可能会对其他数据库请求造成影响,这样情况下我们可以使用延迟队列方式解决此类问题。

1.DelayQueue实现方案

首先使用java自带的DelayQueue完成此方案。
DelayQueue内部使用优先级队列PriorityQueue完成任务存储,而 PriorityQueue 采用二叉堆的思想确保在数据插入到队列中时最小值的排在堆顶,每次从拿数据只要从堆顶取即可。
同时 DelayQueue 还是用了可重入锁 ReentrantLock来确保线程并发安全。
DelayQueue的源码解析可以查看DelayQueue源码解析
使用DelayQueue完成延迟需要定义Delayed 实现类来充当任务元素,具体使用方法:

//DelayQueue的元素必须是Delayed的实现类
class DelayTask implements Delayed {private long time;private Consumer consumer;public DelayTask(long time, Consumer consumer) {this.time = time;this.consumer = consumer;}@Overridepublic long getDelay(TimeUnit unit) {return time - System.currentTimeMillis();}@Overridepublic int compareTo(Delayed o) {DelayTask delayTask = (DelayTask) o;return (int) (time - delayTask.getTime());}public long getTime() {return time;}public void call() {this.consumer.accept(this);}
}public class QueueTest {public static void main(String[] args) throws InterruptedException {long startTime = System.currentTimeMillis();DelayTask d1 = new DelayTask(10000 + startTime, (o) -> System.out.println("3333"));DelayTask d2 = new DelayTask(1000 + startTime, (o) -> System.out.println("1111"));DelayTask d3 = new DelayTask(2000 + startTime, (o) -> System.out.println("2222"));DelayQueue<DelayTask> delayQueue = new DelayQueue<>();delayQueue.add(d1);delayQueue.add(d2);delayQueue.add(d3);while (!delayQueue.isEmpty()) {//阻塞等待,如果有任务到期就取出,如果没有任务到期就等待DelayTask delayTask = delayQueue.take();delayTask.call();}}
}

getDelay()方法用于从队列中取任务时查看是否到期,如果小于等于0则表示可以取出,如果大于,当前线程需要根据是否是leader判断等待时间。
compareTo()方法用于任务入队时,判断该任务元素在堆位置时比较的逻辑。
Consumer consumer;存储实际执行的任务,也可以使用Runnable,Callable以及其他自定义类。

note:该方法支持动态添加和删除任务,而且线程安全,但是只适用于单机环境,而且需要自己定义查询逻辑,实现稍微复杂。

2.定时任务实现方案

通过线程池对象ScheduledExecutorService也可以实现延迟处理任务的功能,而且操作更简单。ScheduledExecutorService是jdk提供的类来完成指定时间或定期执行某些任务。代码如下:

class Task implements Callable {private int idx;public Task(Integer idx) {this.idx = idx;}@Overridepublic Object call() throws Exception {System.out.println("---" + this.idx);return null;}
}public class DelayedTest {public static void main(String[] args) {ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(10);scheduledExecutorService.schedule(new Task(3), 1, TimeUnit.SECONDS);scheduledExecutorService.schedule(new Task(2), 2, TimeUnit.SECONDS);scheduledExecutorService.schedule(new Task(1), 1, TimeUnit.SECONDS);}
}

ScheduledExecutorService继承了ExecutorService,与ExecutorService的逻辑大致相同
schedule()方法是ScheduledExecutorService特有的方法,这个方法会将我们定义的Task封装成ScheduledFutureTask
②然后生成Worker线程(内部存在一个Thread,是真正的执行类)
Worker类
③Worker线程在执行的时候会先判断当前firstTask(就是要执行的Runnable)属性是否为空,如果有就先执行firstTask,执行完成firstTask之后,然后再从workQueue中取任务,红字也是ExecutorService 的执行逻辑,
但是ScheduledExecutorService的 schedule()会先生成null 的firstTask,Worker会直接从workQueue中阻塞的获取任务
Worker类获取task并执行的逻辑
workQueue获取task的逻辑

④workQueue是在我们new对象的时候生成的DelayedWorkQueue,它的逻辑定义和DelayedQueue基本相同,下面是DelayedWorkQueuetake()方法的代码逻辑

public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());
}

DelayedWorkQueue take()方法代码

scheduleAtFixedRate()是在Worker执行完task后,计算任务下次的执行时间,并重新将任务放入workQueue中来实现循环执行。下面的代码就是ScheduledFutureTask再执行过程中判断逻辑,如果periodic是true则执行run方法。如果period是false则执行计算下次执行时间和重新放入任务的逻辑。
定时器ScheduledExecutorService原理分析
ScheduledFutureTask中run()方法逻辑

note:ScheduledExecutorService是jdk提供的非常方便的延迟消息处理类,支持多线程处理消息,前一个任务的阻塞并不会影响下一个任务的运行。内部使用的是类似DelayQueue的逻辑,而且不需要再实现轮询过程。但是它和DelayQueue一样,只能单机使用。

3.Redis实现方案

通过Redis的Zset结构也可以实现延迟队列的功能。通过将过期时间的时间戳作为score存入Zset中,然后调用zrangebyscore key 0 当前时间命令定时扫描Zset数据,如果返回的结果那一定是已经过期的数据,然后再执行删除命令删除指定的key-value。为了防止多线程执行过程中可能存在的问题,需要配置lua脚本使用。

//lua脚本:定义查询zset数据和删除数据的原子操作
//定义查询的最大值和最小值
local minscore = ARGV[1]
local maxscore = ARGV[2]
local key = KEYS[1]local tables = redis.call("zrangebyscore", key, minscore, maxscore)for i, value in ipairs(tables) doredis.call("zrem", key, value)
endreturn tables
   //java代码:定义轮询线程@Testpublic void test() throws InterruptedException {String script = "lua脚本";String key = "test";long time = System.currentTimeMillis();redisTemplate.opsForZSet().add(key, "123", time + 1000 * 3);Thread scanExpireThread = new Thread(() -> {System.out.println("开始扫描过期数据...");while (true) {try {long currentTime = System.currentTimeMillis();long count = redisTemplate.opsForZSet().count(key, 0, currentTime);if (count == 0) {// 查询最小等待时间并睡眠,减少cpu空转sleep(key);}System.out.println("获取数据...");RedisScript redisScript = new DefaultRedisScript(script, List.class);List<String> expireList = (List) redisTemplate.execute(redisScript, Arrays.asList(key), 0, >System.currentTimeMillis());System.out.println(expireList);if (!CollectionUtils.isEmpty(expireList)) {String msg = RandomStringUtils.random(3, "1234567890");Integer delayedTime = RandomUtils.nextInt(0, 10);System.out.println("随机生成延迟信息:" + msg + ", 延迟时间:" + delayedTime);redisTemplate.opsForZSet().add(key, msg, System.currentTimeMillis() + 1000 * >delayedTime);} else {// 查询最小等待时间并睡眠,减少cpu空转sleep(key);}TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}});scanExpireThread.start();TimeUnit.MINUTES.sleep(10);}public void sleep(String key) throws InterruptedException {Set<DefaultTypedTuple> objs = redisTemplate.opsForZSet().rangeWithScores(key, 0, 0);for (DefaultTypedTuple<String> typedTuple : objs) {Long minTime = typedTuple.getScore().longValue();long diffTime = minTime - System.currentTimeMillis();TimeUnit.MINUTES.sleep(diffTime / (1000 * 60));}}

以上的示例代码只是延迟队列的简单时间。并没有考虑任务失败重试的问题。而且上面的方案还可以优化,比如当获取的元素的集合是空的时候,可以使用LockSupport.park()阻塞线程。只有有延迟任务被推送到redis中时,才重新唤醒轮询线程,避免轮询线程空转。

note:redis实现版本中,并发性高。需要自己定义轮询线程。在消息量较少的时候,会浪费资源,在消息量非常多的时候,又会出现因为轮询间隔设置不合理导致延时时间不准确的问题。

######4.Rabbitmq/Rocketmq实现
很多MQ消息中间件自带延迟消息功能,如果系统本身RocketMQ组件,则可以使用MQ来完成。不仅使用方便,而且可能存在的诸多细节问题。
RocketMQ

RocketMQ本身支持延迟消息功能,但是RocketMQ4.x只支持固定级别的延迟消息,并没有自定义延迟时间的功能。如果想实现自定义延迟消息的功能,可以使用Rocket5.x或者RabbitMQ

实现原理:RocketMQ实现延迟消息的过程是先将消息写入到SCHEDULE_TOPIC_XXXX的topic中,然后根据 level 存入特定的queue,每个queue都有一个调度线程消费消息,如果发现消息到期,就会将消息投递到指定的topic中。
rocketmq延迟消息过程

以下是RocketMQ5.x文档中的示例程序:

//生产者 延时消息发送
MessageBuilder messageBuilder = new MessageBuilderImpl();;
//以下示例表示:延迟时间为10分钟之后的Unix时间戳。
Long deliverTimeStamp = System.currentTimeMillis() + 10L * 60 * 1000;
Message message = messageBuilder.setTopic("topic")//设置消息索引键,可根据关键字精确查找某条消息。.setKeys("messageKey")//设置消息Tag,用于消费端根据指定Tag过滤消息。.setTag("messageTag").setDeliveryTimestamp(deliverTimeStamp)//消息体.setBody("messageBody".getBytes()).build();
try {//发送消息,需要关注发送结果,并捕获失败等异常。SendReceipt sendReceipt = producer.send(message);System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {e.printStackTrace();
}//消费示例一:使用PushConsumer消费定时消息,只需要在消费监听器处理即可。
MessageListener messageListener = new MessageListener() {@Overridepublic ConsumeResult consume(MessageView messageView) {System.out.println(messageView.getDeliveryTimestamp());//根据消费结果返回状态。return ConsumeResult.SUCCESS;}
};

note:RocketMQ的实现版本性能较好,可靠性较高,但是不支持动态添加或删除队列。

RabbitMQ
RabbitMQ也可以根据自身的特性实现延迟消息的功能。比如利用RabbitMQ的TTL和DLX特性
TTL是指存活时间(可以作用在消息中,也可以作用在队列中)
DLX是指死信队列(是指消息被拒绝或者消息过期后存放的)

使用TTL与DLX存在的问题:
1)TTL作用在队列中,需要为每一个延迟时间定义一种队列,灵活性太差。
2)TTL作用在消息上,消息是在即将投递到消费者之前判定是否过期的,所以如果前一个消息阻塞了太长,将导致后面的消息不能即时的被执行。

而且使用上面的方式需要定义普通交换机和死信交换机,所以一般使用延迟消息插件 rabbitmq-delayed-message-exchange来完成。使用插件生成的消息不会立即进入对应队列,而是先将消息保存至 Mnesia (RabbitMQ中的一种数据存储形式) ,然后插件会尝试确认是否过期,再投递到对应绑定的队列之中
下载地址: rabbitmq-delayed-message-exchange
插件使用步骤:

  • 下载延迟插件,然后解压放置到 RabbitMQ 的插件目录。注意一定要解压并且把版本名字取消
    如果是使用docker安装的rabbitmq,可以使用docker cp rabbitmq_delayed_message_exchange containerId:/RabbitMQ_HOME/plugins/拷贝到RabbitMQ容器中。
  • 进入 RabbitMQ 的安装目录下的 plgins 目录,执行下面命令让该插件生效rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    可以使用rabbitmq-plugins list查看生效的插件,插件前面带上*号的才是真正生效的插件
    RabbitMQ启用插件
  • 最后重启 RabbitMQ,就可以在管理界面看到Type是x-delayed-messageexchange
    RabbitMQ管理界面
    更加详细的安装步骤可以查看链接 RabbitMQ 学习笔记 – 13 使用插件方式实现延迟队列

接下来就可以测试RabbitMQ的延迟消息功能了,以下是示例代码:
Rabbitmq配置

@Configuration
public class RabbitMqConfig {//定义队列交换机,队列,路由public static final String DELAYED_QUEUE = "delayed_queue";public static final String DELAYED_EXCHANGE = "delayed_exchange";public static final String DELAYED_ROUTINGKEY = "delayed_test";@Bean(DELAYED_EXCHANGE)public Exchange DELAYED_EXCHANGE() {HashMap<String, Object> map = new HashMap<>(1);map.put("x-delayed-type", "direct");return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", true, false, map);}@Bean(DELAYED_QUEUE)public Queue DELAYED_QUEUE() {return new Queue(DELAYED_QUEUE);}//队列绑定交换机,@Beanpublic Binding >BINDING_DELAYED_QUEUE(@Qualifier(DELAYED_QUEUE) Queue queue,@Qualifier(DELAYED_EXCHANGE) Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(DELAYED_ROUTINGKEY).noargs();}
}

延迟消息生产者

@RestController
public class TestController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/test")public String test(@RequestParam(required = false) Integer delay) {rabbitTemplate.convertAndSend(RabbitMqConfig.DELAYED_EXCHANGE, RabbitMqConfig.DELAYED_ROUTINGKEY,"hello", msg -> {//设置消息的延迟时间msg.getMessageProperties().setDelay(delay * 1000);//设置优先级msg.getMessageProperties().setPriority(9);//设置消息的持久化方式>msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);//设置唯一标识msg.getMessageProperties().setMessageId(UUID.randomUUID().toString());return msg;});return "success";}
}

延迟消息消费者

@Component
public class RabbitmqHandler {//监听delayed_test队列@RabbitListener(queues = {RabbitMqConfig.DELAYED_QUEUE})public void receive_delayed_test(Message message, Channel channel) {System.out.println("----delayed_test----");System.out.println("properties:" + message.getMessageProperties().toString());System.out.println("body:" + new String(message.getBody()));System.out.println();}
}

note:RabbitMQ支持集群,分布式,高并发场景,性能较好,可靠性高,不需要自己处理轮询线程。

参照:
延迟队列解决方案
有赞延迟队列设计
盘点JAVA中延时任务的几种实现方式
RabbitMQ之延迟队列

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/124828.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MAC缓解WebUI提示词反推

当前环境信息&#xff1a; 在mac上安装好stable diffusion后&#xff0c;能做图片生成了之后&#xff0c;遇到一些图片需要做提示词反推&#xff0c;这个时候需要下载一个插件&#xff0c;参考&#xff1a; https://gitcode.net/ranting8323/stable-diffusion-webui-wd14-tagg…

66 内网安全-域横向批量atschtasksimpacket

目录 演示案例:横向渗透明文传递at&schtasks 案例2-横向渗透明文HASH传递atexec-impacket案例3-横向渗透明文HASH传递批量利用-综合案例5-探针主机域控架构服务操作演示 传递攻击是建立在明文和hash值的一个获取基础上的攻击&#xff0c;也是在内网里面常见协议的攻击&…

一道简单的C#面试题

试题&#xff1a; 抽顺序问题&#xff1a;有10位面试者&#xff0c;需要随机抽号面试。 1&#xff09;总共十个号数&#xff0c;用数组表示&#xff1b; 2&#xff09;每一位面试者输入1开始抽签&#xff0c;然后得到抽签号&#xff0c;输入2结束抽签&#xff1b; 3&#x…

Linux玩物志:好玩却无用的软件探秘

W...Y的主页 &#x1f60a; 代码仓库分享&#x1f495; &#x1f354;前言&#xff1a; 我们已经学习了yum指令&#xff0c;可以在Linux中安装一些软件的指令。下面我们就盘点一些可玩性很高但是却没有什么用的软件&#xff0c;在枯燥的学习中增添一丝乐趣&#xff01; For…

CSS宽度100%和宽度100vw之间有什么不同?

vw和vh分别代表视口宽度和视口高度。 使用width: 100vw代替的区别在于width: 100%&#xff0c;虽然100%将使元素适合所有可用空间&#xff0c;但视口宽度具有特定的度量&#xff0c;在这种情况下&#xff0c;可用屏幕的宽度 。 如果设置样式body { margin: 0 }&#xff0c;则1…

2000-2021年上市公司产融结合度量数据

2000-2021年上市公司产融结合度量数据 1、时间&#xff1a;2000-2021年 2、指标&#xff1a;股票代码、年份、是否持有银行股份、持有银行股份比例、是否持有其他金融机构股份、产融结合 3、来源&#xff1a;上市公司年报 4、范围&#xff1a;上市公司 5、样本量&#xff…

gRPC源码剖析-Builder模式

一、Builder模式 1、定义 将一个复杂对象的构建与表示分离&#xff0c;使得同样的构建过程可以创建不同的的表示。 2、适用场景 当创建复杂对象的算法应独立于该对象的组成部分以及它们的装配方式时。 当构造过程必须允许被构造的对象有不同的表示时。 说人话&#xff1a…

java基础篇-环境变量

java基础 编程学习的关键点、重点1.环境变量设置待续 编程学习的关键点、重点 输入输出 Java语言、C语言、Python语言、甚至SQL语言&#xff0c;都需要实战、做大量输入输出等 1.环境变量设置 1.下载jdk安装 jdk官网下载直达链接&#xff1a;https://www.oracle.com/java/te…

2023香港秋灯展丨移远通信闪耀亮相,开启Matter生态互联新篇章

10月27日&#xff0c;2023香港国际秋季灯饰展于香港会议展览中心正式开幕。 移远通信携最新一站式Matter解决方案、Wi-Fi模组&#xff0c;以及多款代表前沿技术的智能灯具、插座等终端重磅亮相。同时&#xff0c;公司产品总监丁子文围绕“Matter生态互联新篇章”主题发表演讲&a…

深度学习之基于YoloV8的行人跌倒目标检测系统

欢迎大家点赞、收藏、关注、评论啦 &#xff0c;由于篇幅有限&#xff0c;只展示了部分核心代码。 文章目录 一项目简介 二、功能三、行人跌倒目标检测系统四. 总结 一项目简介 世界老龄化趋势日益严重&#xff0c;现代化的生活习惯又使得大多数老人独居&#xff0c;统计数据表…

Zynq UltraScale+ XCZU7EV 纯VHDL解码 IMX214 MIPI 视频,2路视频拼接输出,提供vivado工程源码和技术支持

目录 1、前言免责声明 2、我这里已有的 MIPI 编解码方案3、本 MIPI CSI2 模块性能及其优越性4、详细设计方案设计原理框图IMX214 摄像头及其配置D-PHY 模块CSI-2-RX 模块Bayer转RGB模块伽马矫正模块VDMA图像缓存Video Scaler 图像缓存DP 输出 5、vivado工程详解PL端FPGA硬件设计…

SurfaceFliger绘制流程

前景提要&#xff1a; 当HWComposer接收到Vsync信号时&#xff0c;唤醒DisSync线程&#xff0c;在其中唤醒EventThread线程&#xff0c;调用DisplayEventReceiver的sendObjects像BitTub发送消息&#xff0c;由于在SurfaceFlinger的init过程中创建了EventThread线程&#xff0c…

Android环境变量macOS环境变量配置

关于作者&#xff1a;CSDN内容合伙人、技术专家&#xff0c; 从零开始做日活千万级APP。 专注于分享各领域原创系列文章 &#xff0c;擅长java后端、移动开发、商业变现、人工智能等&#xff0c;希望大家多多支持。 目录 一、导读二、概览macOS基础知识 三、设置环境变量3.1 终…

Spring两大核心之一:AOP(面向切面编程)含设计模式讲解,通知类型切点;附有案例,实现spring事务管理

模拟转账业务 pom.xml <dependencies><!--spring--><dependency><groupId>org.springframework</groupId><artifactId>spring-context</artifactId><version>5.3.29</version></dependency><!--lombok-->…

物联网二维码核销盒对接文档

核销盒是干嘛的&#xff1f; 1.在某些场景下快速核销订单或打卡签到等&#xff0c;通过核销盒能快速将订单信息发送到后端进行处理。 一&#xff0c;首先你需要有一台核销设备&#xff0c;也就是核销盒。 二&#xff0c;通过接口激活或更新核销盒 ​​​​​​​ 简要描述 激…

2.Vue — 模板语法、数据绑定、el与data的写法、数据代理

文章目录 一、模板语法1.1 插值语法1.2指令语法 二、数据绑定语法2.1 单向数据绑定2.2 双向数据绑定 三、el与data的两种写法3.1 el3.2 data 四、数据代理4.1 Object.defineProperty4.2 Vue数据代理4.2.1 展示数据代理4.2.2 Vue数据代理 一、模板语法 root容器里面的代码被称为…

边缘计算技术的崭新篇章:赋能未来智能系统

边缘计算是近年来云计算和物联网技术发展的重要趋势。通过将数据处理和分析从云端迁移到设备边缘&#xff0c;边缘计算能够实现更低的延迟和更高的数据安全。本文将探索边缘计算技术的最新进展及其在不同行业中的应用场景。 1. 实时数据处理与决策 在需要快速响应的场景中&…

Seata入门系列【15】@GlobalLock注解使用场景及源码分析

1 前言 在Seata 中提供了一个全局锁注解GlobalLock&#xff0c;字面意思是全局锁&#xff0c;搜索相关文档&#xff0c;发现资料很少&#xff0c;所以分析下它的应用场景和基本原理&#xff0c;首先看下源码中对该注解的说明&#xff1a; // 声明事务仅在单个本地RM中执行 //…

从0到1之微信小程序快速入门(03)

目录 什么是生命周期函数 WXS脚本 ​编辑 与 JavaScript 不同 纯数据字段 组件生命周期 定义生命周期方法 代码示例 组件所在页面的生命周期 代码示例 插槽 什么是插槽 启用多插槽 ​编辑 定义多插槽 组件通信 组件间通信 监听事件 触发事件 获取组件实例 自…

推荐免费的文本转语音工具TTS-Vue【且开源】

标签&#xff1a; 文本转语音&#xff1b; 免费文本转语音软件&#xff1b; 网上有很多文本转语音的工具&#xff0c;但收费具多。 这里推荐一个免费的文本转语音工具。 不需要注册&#xff0c;下载安装就可以使用。且代码开源。 TTS-Vue 软件主页&#xff1a;https://loker…