业务背景
很多时候,业务需要在一段时间之后完成一个工作任务。例如,滴滴打车订单完成后,如果用户一直不评价,会在48小时后自动评价为5星。
一般来说,实现这类需求需要设置一个定时器,在规定的时间后自动执行相应的操作。
数据结构
高效延时消息,包含两个重要的数据结构:
- 环形队列,例如可以创建一个包含3600个slot的环形队列(本质是个数组)
- 任务集合,环上每一个节点是一个任务集合
同时,启动一个timer,这个timer每隔固定时间:如1s,在上述环形队列中移动一格,有一个当前指针来标识正在检测的环节点。
算法执行过程
假设当前指针指向第一格,当有延时消息到达之后,例如希望3610秒之后,触发一个延时消息任务,只需:
- 计算这个Task应该放在哪一个环节点,现在指向1,3610秒之后,应该是第11格,所以这个Task应该放在第11个环节点的任务集合中
- 计算这个Task的Cycle-Num,由于环形队列是3600格(每秒移动一格,正好1小时),这个任务是3610秒后执行,所以应该绕3610/3600=1圈之后再执行,于是Cycle-Num=1
当前指针不停的移动,每秒移动到一个新slot环节点,这个环节点中对应的任务集合中每个任务看Cycle-Num是不是0:
- 如果不是0,说明还需要多移动几圈,将Cycle-Num减1
- 如果是0,说明马上要执行这个Task了,取出Task-Funciton执行(可以用单独的线程来执行Task),并把这个Task从任务集合中删除
优点
使用了“延时消息”方案之后,“订单48小时后关闭评价”的需求,只需将在订单关闭时,触发一个48小时之后的延时消息即可:
- 无需轮询全部订单,效率高
- 一个订单,任务只执行一次
- 时效性好,精确到秒(控制timer移动频率可以控制精度)
代码实现
- 基础类,环形队列的实现,需要配合定时器使用以实现延时队列
import java.util.*;
import java.util.function.BiConsumer;
import java.util.function.Consumer;/*** 延时队列,用于延时执行任务* 采用环形队列实现*/
public class DelayQueue implements Runnable {/*** 延时任务*/public static class DelayRunnable implements Runnable {private final Runnable runnable;private final int cycleCount;private int currentCycleCount = 0;public DelayRunnable(Runnable runnable, int cycleCount) {this.runnable = runnable;this.cycleCount = cycleCount;}public void addCurrentCycleCount() {currentCycleCount++;}public int getCurrentCycleCount() {return currentCycleCount;}public int getCycleCount() {return cycleCount;}@Overridepublic void run() {runnable.run();}}private final int queueSize;private final List<Set<DelayRunnable>> queue;private int currentIndex = 0;private final Consumer<Set<DelayRunnable>> runnableConsumer;private final BiConsumer<Set<DelayRunnable>, Exception> exceptionBiConsumer;public DelayQueue(int queueSize, Consumer<Set<DelayRunnable>> runnableConsumer, BiConsumer<Set<DelayRunnable>, Exception> exceptionBiConsumer) {this.queueSize = queueSize;queue = new ArrayList<>(queueSize);this.exceptionBiConsumer = exceptionBiConsumer;for (int i = 0; i < queueSize; i++) {queue.add(new HashSet<>());}this.runnableConsumer = runnableConsumer;}public void execute(Runnable task, int taskDelayCount) {// 计算延时任务下标int index = (taskDelayCount + currentIndex) % queueSize;int cycleCount = (taskDelayCount + currentIndex) / queueSize;Set<DelayRunnable> tasks = queue.get(index);tasks.add(new DelayRunnable(task, cycleCount));}@Overridepublic void run() {// 探测队列中是否有需要执行的任务Set<DelayRunnable> tasks = queue.get(currentIndex);// 移动下标到下一个位置currentIndex = (currentIndex + 1) % queueSize;if (tasks == null || tasks.isEmpty()) {return;}Set<DelayRunnable> executeTasks = new HashSet<>();for (DelayRunnable task : tasks) {if (task.getCurrentCycleCount() < task.getCycleCount()) {task.addCurrentCycleCount();continue;}executeTasks.add(task);}//移除本次会执行完毕的任务tasks.removeAll(executeTasks);if (runnableConsumer != null) {try {runnableConsumer.accept(executeTasks);} catch (Exception e) {if (exceptionBiConsumer != null) {exceptionBiConsumer.accept(executeTasks, e);}}}}
}
- 秒级精度延时队列实现:通过DelayQueue环形队列和定时任务实现,延时队列的精度为1s
import java.util.Date;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;/*** 秒级延迟队列*/
public class SecondDelayQueue {private final DelayQueue queue;private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();public SecondDelayQueue(Consumer<Set<DelayQueue.DelayRunnable>> runnableConsumer) {this(runnableConsumer, null);}public SecondDelayQueue(Consumer<Set<DelayQueue.DelayRunnable>> runnableConsumer, BiConsumer<Set<DelayQueue.DelayRunnable>, Exception> exceptionBiConsumer) {this(100, runnableConsumer, exceptionBiConsumer);}public SecondDelayQueue(int queueSize, Consumer<Set<DelayQueue.DelayRunnable>> runnableConsumer, BiConsumer<Set<DelayQueue.DelayRunnable>, Exception> exceptionConsumer) {this.queue = new DelayQueue(queueSize, runnableConsumer, exceptionConsumer);}public void execute(Runnable task, int delaySeconds) {queue.execute(task, delaySeconds);}public void start() {start(0);}public void shutdown() {scheduledExecutorService.shutdown();}public void start(long delayMills) {scheduledExecutorService.scheduleAtFixedRate(queue, delayMills, 1, TimeUnit.SECONDS);}public static void main(String[] args) throws InterruptedException {SecondDelayQueue queue = new SecondDelayQueue(delayRunnables -> {System.out.println("执行任务");for (DelayQueue.DelayRunnable delayRunnable : delayRunnables) {delayRunnable.run();}}, (delayRunnables, e) -> {System.out.println("异常:" + e.getMessage());});queue.start();queue.execute(() -> System.out.println("第一个任务:" + new Date()), 2);queue.execute(() -> System.out.println("第二个任务:" + new Date()), 1);queue.execute(() -> System.out.println("第三个任务:" + new Date()), 10);queue.execute(() -> System.out.println("第四个任务:" + new Date()), 101);//第四个任务无法执行Thread.sleep(1000*20);queue.shutdown();}
}
其他延迟队列实现方式
- Java.util.concurrent 包下 DelayQueue 也可以直接使用。队列中的元素只有到了 Delay 时间才允许从队列中取出。
- Redis 的数据结构 Zset ,同样可以实现延迟队列的效果,主要利用它的 score 属性, redis 通过 score 来为集合中的成员进行从小到大的排序。 通过 zadd 命令向队列 delayqueue 中添加元素,并设置 score 值表示元素过期的时间
- 利用 RabbitMQ 做延时队列是比较常见的一种方式,而实际上 RabbitMQ 自身并没有直接支持提供延迟队列功能,而是通过 RabbitMQ 消息队列的 TTL 和 DXL 这两个属性间接实现的。
总结
- 环形队列可以高效实现延时队列
- 通过提高定时器的频率可以提高延时队列的精度
- 如果想实现消息队列的延时队列,也可以订阅对应消息,在本地中转后再重新发布消息,从而达到实现延时队列的目的(MQ如果有延迟队列建议使用自带延迟队列方案)
- 延迟队列有很多实现方案