延迟队列的时间轮算法实现

业务背景

很多时候,业务需要在一段时间之后完成一个工作任务。例如,滴滴打车订单完成后,如果用户一直不评价,会在48小时后自动评价为5星。
一般来说,实现这类需求需要设置一个定时器,在规定的时间后自动执行相应的操作。

数据结构

高效延时消息,包含两个重要的数据结构:

  • 环形队列,例如可以创建一个包含3600个slot的环形队列(本质是个数组)
  • 任务集合,环上每一个节点是一个任务集合

同时,启动一个timer,这个timer每隔固定时间:如1s,在上述环形队列中移动一格,有一个当前指针来标识正在检测的环节点。

算法执行过程

假设当前指针指向第一格,当有延时消息到达之后,例如希望3610秒之后,触发一个延时消息任务,只需:

  • 计算这个Task应该放在哪一个环节点,现在指向1,3610秒之后,应该是第11格,所以这个Task应该放在第11个环节点的任务集合中
  • 计算这个Task的Cycle-Num,由于环形队列是3600格(每秒移动一格,正好1小时),这个任务是3610秒后执行,所以应该绕3610/3600=1圈之后再执行,于是Cycle-Num=1

当前指针不停的移动,每秒移动到一个新slot环节点,这个环节点中对应的任务集合中每个任务看Cycle-Num是不是0:

  • 如果不是0,说明还需要多移动几圈,将Cycle-Num减1
  • 如果是0,说明马上要执行这个Task了,取出Task-Funciton执行(可以用单独的线程来执行Task),并把这个Task从任务集合中删除

优点

使用了“延时消息”方案之后,“订单48小时后关闭评价”的需求,只需将在订单关闭时,触发一个48小时之后的延时消息即可:

  • 无需轮询全部订单,效率高
  • 一个订单,任务只执行一次
  • 时效性好,精确到秒(控制timer移动频率可以控制精度)

代码实现

  • 基础类,环形队列的实现,需要配合定时器使用以实现延时队列
import java.util.*;
import java.util.function.BiConsumer;
import java.util.function.Consumer;/*** 延时队列,用于延时执行任务* 采用环形队列实现*/
public class DelayQueue implements Runnable {/*** 延时任务*/public static class DelayRunnable implements Runnable {private final Runnable runnable;private final int cycleCount;private int currentCycleCount = 0;public DelayRunnable(Runnable runnable, int cycleCount) {this.runnable = runnable;this.cycleCount = cycleCount;}public void addCurrentCycleCount() {currentCycleCount++;}public int getCurrentCycleCount() {return currentCycleCount;}public int getCycleCount() {return cycleCount;}@Overridepublic void run() {runnable.run();}}private final int queueSize;private final List<Set<DelayRunnable>> queue;private int currentIndex = 0;private final Consumer<Set<DelayRunnable>> runnableConsumer;private final BiConsumer<Set<DelayRunnable>, Exception> exceptionBiConsumer;public DelayQueue(int queueSize, Consumer<Set<DelayRunnable>> runnableConsumer, BiConsumer<Set<DelayRunnable>, Exception> exceptionBiConsumer) {this.queueSize = queueSize;queue = new ArrayList<>(queueSize);this.exceptionBiConsumer = exceptionBiConsumer;for (int i = 0; i < queueSize; i++) {queue.add(new HashSet<>());}this.runnableConsumer = runnableConsumer;}public void execute(Runnable task, int taskDelayCount) {// 计算延时任务下标int index = (taskDelayCount + currentIndex) % queueSize;int cycleCount = (taskDelayCount + currentIndex) / queueSize;Set<DelayRunnable> tasks = queue.get(index);tasks.add(new DelayRunnable(task, cycleCount));}@Overridepublic void run() {// 探测队列中是否有需要执行的任务Set<DelayRunnable> tasks = queue.get(currentIndex);// 移动下标到下一个位置currentIndex = (currentIndex + 1) % queueSize;if (tasks == null || tasks.isEmpty()) {return;}Set<DelayRunnable> executeTasks = new HashSet<>();for (DelayRunnable task : tasks) {if (task.getCurrentCycleCount() < task.getCycleCount()) {task.addCurrentCycleCount();continue;}executeTasks.add(task);}//移除本次会执行完毕的任务tasks.removeAll(executeTasks);if (runnableConsumer != null) {try {runnableConsumer.accept(executeTasks);} catch (Exception e) {if (exceptionBiConsumer != null) {exceptionBiConsumer.accept(executeTasks, e);}}}}
}
  • 秒级精度延时队列实现:通过DelayQueue环形队列和定时任务实现,延时队列的精度为1s
import java.util.Date;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;/*** 秒级延迟队列*/
public class SecondDelayQueue {private final DelayQueue queue;private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();public SecondDelayQueue(Consumer<Set<DelayQueue.DelayRunnable>> runnableConsumer) {this(runnableConsumer, null);}public SecondDelayQueue(Consumer<Set<DelayQueue.DelayRunnable>> runnableConsumer, BiConsumer<Set<DelayQueue.DelayRunnable>, Exception> exceptionBiConsumer) {this(100, runnableConsumer, exceptionBiConsumer);}public SecondDelayQueue(int queueSize, Consumer<Set<DelayQueue.DelayRunnable>> runnableConsumer, BiConsumer<Set<DelayQueue.DelayRunnable>, Exception> exceptionConsumer) {this.queue = new DelayQueue(queueSize, runnableConsumer, exceptionConsumer);}public void execute(Runnable task, int delaySeconds) {queue.execute(task, delaySeconds);}public void start() {start(0);}public void shutdown() {scheduledExecutorService.shutdown();}public void start(long delayMills) {scheduledExecutorService.scheduleAtFixedRate(queue, delayMills, 1, TimeUnit.SECONDS);}public static void main(String[] args) throws InterruptedException {SecondDelayQueue queue = new SecondDelayQueue(delayRunnables -> {System.out.println("执行任务");for (DelayQueue.DelayRunnable delayRunnable : delayRunnables) {delayRunnable.run();}}, (delayRunnables, e) -> {System.out.println("异常:" + e.getMessage());});queue.start();queue.execute(() -> System.out.println("第一个任务:" + new Date()), 2);queue.execute(() -> System.out.println("第二个任务:" + new Date()), 1);queue.execute(() -> System.out.println("第三个任务:" + new Date()), 10);queue.execute(() -> System.out.println("第四个任务:" + new Date()), 101);//第四个任务无法执行Thread.sleep(1000*20);queue.shutdown();}
}

其他延迟队列实现方式

  • Java.util.concurrent 包下 DelayQueue 也可以直接使用。队列中的元素只有到了 Delay 时间才允许从队列中取出。
  • Redis 的数据结构 Zset ,同样可以实现延迟队列的效果,主要利用它的 score 属性, redis 通过 score 来为集合中的成员进行从小到大的排序。 通过 zadd 命令向队列 delayqueue 中添加元素,并设置 score 值表示元素过期的时间
  • 利用 RabbitMQ 做延时队列是比较常见的一种方式,而实际上 RabbitMQ 自身并没有直接支持提供延迟队列功能,而是通过 RabbitMQ 消息队列的 TTL 和 DXL 这两个属性间接实现的。

总结

  • 环形队列可以高效实现延时队列
  • 通过提高定时器的频率可以提高延时队列的精度
  • 如果想实现消息队列的延时队列,也可以订阅对应消息,在本地中转后再重新发布消息,从而达到实现延时队列的目的(MQ如果有延迟队列建议使用自带延迟队列方案)
  • 延迟队列有很多实现方案

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/18487.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

芯片原厂驱动开发工程师:初学到精通,如何快速成长?

01 前言 大家好&#xff0c;我是XX&#xff0c;来自湖南XX学院&#xff0c;电子信息18级&#xff0c;也曾在创新基地控制组学习过两三年&#xff0c;毕业后就职于一家芯片原厂的解决方案部&#xff0c;担任驱动工程师的职位&#xff0c;算上实习期&#xff0c;我的工作时长已有…

Redis操作之Jedis

Jedis是Redis官方推荐的Java连接开发工具&#xff0c;它是一个流行的Redis客户端中间件&#xff0c;提供了简单易用的API和高性能的连接池管理。Jedis是一个轻量级的库&#xff0c;适用于大多数Redis应用场景&#xff0c;包括数据缓存、消息队列等。 一、简介 功能全面&#…

htb-Mailing

因为做windows服务器渗透较少&#xff0c;不妥的地方还请师傅们指出 可先看思路&#xff0c;实在不行再看writeup 任意文件下载拿pop3登录邮箱——》利用邮件服务器漏洞拿下NTLM——》利用组件版本漏洞拿下 拿shell 端口扫描开放服务 Host is up (0.91s latency). Not shown:…

CSS学习笔记:rem实现移动端适配的原理——媒体查询

移动端适配 移动端即手机端&#xff0c;也称M端 移动端适配&#xff1a;同一套移动端页面在不同屏幕尺寸的手机上可以实现宽度和高度的自适应&#xff0c;也就是页面中元素的宽度和高度可以根据屏幕尺寸的变化等比缩放 rem配合媒体查询可实现移动端适配 rem单位 媒体查询 …

SpringAdminClient如何将Httpbasic账号密码告知SpringAdminServer

场景&#xff0c;因为Config Service开了权限校验&#xff0c;注册到eureka之后&#xff0c;SpringAdmin查看信息会报错401&#xff0c;如果想在SpringAdmin中正确的看到Config Service的actuator信息则需要将账号密码告知给SpringAdmin&#xff0c;磁力用的是Eureka作为发现服…

1045. 买下所有产品的客户

1045. 买下所有产品的客户 题目链接&#xff1a;1045. 买下所有产品的客户 代码如下&#xff1a; # Write your MySQL query statement below select customer_id from Customer where product_key in (select product_key from Product) group by customer_id having count(…

WPF 如何调试

简述 它是一种系统机制&#xff0c;用于识别和修复一段代码中的错误或缺陷&#xff0c;这些错误或缺陷的行为与您的预期不同。调试子系统紧密耦合的复杂应用程序并不容易&#xff0c;因为修复一个子系统中的错误可能会在另一个子系统中创建错误。 在 C# 中调试 在 WPF 应用程序…

javaIO流知识点概况

一、前言&#xff1a; 1.1.流的概念: java将输入与输出比喻为"流"&#xff0c;英文:Stream. 就像生活中的"电流","水流"一样,它是以同一个方向顺序移动的过程.只不过这里流动的是字节(2进制数据).所以在IO中有输入流和输出流之分,我们理解他们…

Vue3中使用 filter 方法通过 id 删除数组中的指定对象

Vue中使用 filter 方法通过 id 删除数组中的指定对象 一、前言1、示例2、案例 一、前言 在 Vue3 中&#xff0c;我们经常需要处理数据并进行相应操作&#xff0c;比如删除数组中的特定对象。在这篇文章中&#xff0c;我们将学习如何使用 filter 方法和响应式变量来实现这一目标…

单点11.2.0.3备份恢复到单点11.2.0.4

保命法则&#xff1a;先备份再操作&#xff0c;磁盘空间紧张无法备份就让满足&#xff0c;给自己留退路。 场景说明&#xff1a; 1.本文档的环境为同平台、不同版本&#xff08;操作系统版本可以不同&#xff0c;数据库小版本不同&#xff09;&#xff0c;源机器和目标机器部…

swiftui基础组件Image加载图片,以及记载gif动图示例

想要在swiftui中展示图片&#xff0c;可以使用Image这个组件&#xff0c;这个组件可以加载本地图片和网络图片&#xff0c;也可以调整图片大小等设置。先大概看一下Image的方法有哪些可以用。 常用的Image属性 1.调整图像尺寸&#xff1a; 使用 resizable() 方法使图像可调整…

Java如何分块读取大文件

在Java中&#xff0c;分块读取大文件通常使用FileInputStream或BufferedInputStream结合循环来实现。以下是一个基本的示例&#xff0c;展示如何分块读取大文件&#xff1a; import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.IOException…

黑龙江等保测评:强化网络安全的北方防线

在数字化时代&#xff0c;网络空间已成为国家发展的新领域&#xff0c;其安全直接关系到国家安全、社会稳定和个人隐私。作为中国东北的重要省份&#xff0c;黑龙江省积极响应国家网络安全战略&#xff0c;深入实施信息安全等级保护制度&#xff08;简称“等保”&#xff09;&a…

量子密钥分发系统基础器件(一):光纤干涉仪

干涉仪的基本原理是利用波的叠加来获得波的相位信息&#xff0c;从而获取实验中所关心的物理量。光纤干涉仪是由光学干涉仪发展而来的&#xff0c;利用光纤实现光的干涉&#xff0c;由于光纤取代透镜系统构成的光路具有柔软、形状可随意变化、传输距离远等特点&#xff0c;当前…

【Linux】23. 线程封装

如何理解C11中的多线程(了解) #include <iostream> #include <unistd.h> #include <thread>void thread_run() {while (true){std::cout << "我是新线程..." << std::endl;sleep(1);} } int main() {// 任何语言需要在Linux上实现多线…

项目结构与模块划分策略

项目结构与模块划分策略可以根据项目的规模、功能需求和团队组成进行合理的设计。以下是一些常见的策略&#xff1a; 按功能划分&#xff1a;将项目按照不同的功能划分为不同的模块。每个模块负责处理特定的功能&#xff0c;如用户管理、订单处理、支付等。这种划分方式使得代码…

Vue组件通讯$refs获取组件实例例子

在Vue中&#xff0c;$refs 是一个对象&#xff0c;它持有注册过 ref 特性 (attribute) 的所有 DOM 元素和子组件实例。你可以使用 $refs 在父组件中直接访问子组件的实例或者 DOM 元素。 下面是一个使用 $refs 获取子组件实例的例子&#xff1a; 首先&#xff0c;我们有一个子…

解决IDEA菜单栏找不到VCS的问题,且使用IDEA推送新项目到托管仓库

问题描述&#xff1a; 在idea软件中使用git推送项目&#xff0c;idea页面顶部菜单栏无VCS 解决方案&#xff1a; 一&#xff1a;File->Settings->Version Control-> 点击 ->选择项目->VCS:->点击ok&#xff1a; 二&#xff1a;托管平台创建一个Git仓库来保…

Mysql 8.0 主从复制及读写分离搭建记录

前言 搭建参考&#xff1a;搭建Mysql主从复制 为什么要做主从复制&#xff1f; 做数据的热备&#xff0c;作为后备数据库&#xff0c;主数据库服务器故障后&#xff0c;可切换到从数据库继续工作&#xff0c;避免数据丢失。架构的扩展。业务量越来越大&#xff0c;I/O访问频…

MySQL忘记密码怎么办?教你无密码登录

MySQL免密钥登录 文章目录 MySQL免密钥登录一、修改配置文件二、无密码登录三、修改root密码四、使用新密码登录 一、修改配置文件 # 这个配置项的意思是告诉mysql跳过权限验证&#xff0c;允许任何用户以任何密码登录 [rootmysql ~]# echo "skip-grant-tables" >…