延时任务通知服务的设计及实现(三)-- JDK的延迟队列DelayQueue

一、接着上文

上文我们讲述了使用redisson的RDelayedQueue实现分布式延迟队列,本文我们将自己JDK的延迟队列DelayQueue实现。

相比前者的实现,作为进程内的延迟队列,它会遇到许多技术难点:

  • 如何支持分布式的多个节点部署场景
  • 应用重启会恢复延时队列
  • 冷数据如何转换为热数据
  • 如何删除延迟队列中的任务

随后,我们也将提及:

  • 保存任务至延迟队列(生产者)
  • 读取延迟队列中的任务(消费者)

二、设计概要

在这里插入图片描述

  • 冷数据:mysql表中的任务数据

  • 热数据:jdk 延迟队列中的任务

  • 广播事件:删除延迟队列中的任务,发布的是广播事件,可以使用redis topic实现。

  • 本地事件:分布式多节点部署的时候,每个任务只保存在其中一个节点的延迟队列中,可以使用spring事件驱动实现。

  • 延迟队列 DelayQueueJob, 它实现了接口Delayed

包括任务的交易流水号和过期时间(即任务的回调时间)

import lombok.Builder;
import lombok.Data;import java.util.Date;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;/*** @author xxx*/
@Builder
@Data
public class DelayQueueJob implements Delayed {/*** 交易流水号*/private String transNo;/*** 到期时间*/private Date expireDate;public DelayQueueJob(String transNo, Date expireDate) {super();this.transNo = transNo;this.expireDate = expireDate;}/*** 用于队列中排序过期时间** @param o* @return*/@Overridepublic int compareTo(Delayed o) {return Long.valueOf(this.expireDate.getTime()).compareTo(Long.valueOf(((DelayQueueJob) o).expireDate.getTime()));}/*** 用于获取过期时间* 延迟关闭时间 = 过期时间 - 当前时间** @param unit* @return*/@Overridepublic long getDelay(TimeUnit unit) {return this.expireDate.getTime() - System.currentTimeMillis();}
}

三、应用启动流程

解决恢复延迟队列的问题。因为DelayQueue是进程内的,一旦重启,将被销毁。

在这里插入图片描述

import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;import java.util.List;
import java.util.concurrent.TimeUnit;@Slf4j
@Service
@RequiredArgsConstructor
public class ApplicationStartupListener implements ApplicationListener<ApplicationReadyEvent> {@Overridepublic void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {// 实现代码参考上面的流程图}
}

四、定时任务流程

解决冷数据如何转换为热数据的问题,防止延时任务过多导致消耗过多的jvm内存,所以只有回调时间将近的任务才放入延迟队列。

在这里插入图片描述

五、如何删除延迟队列中的任务

删除延迟队列的任务:发送广播消息通知所有的节点,当不是当前节点的时候,执行删除。

if (!NetUtil.getLocalhostStr().equals(ipAddress)) {DelayQueueSingleton.getDelayQueue().remove(transNo);
}

DelayQueueSingletons是一个单例类,详见下:

public class DelayQueueSingleton {private static volatile CustomDelayQueue<DelayQueueJob> delayQueue;private DelayQueueSingleton() {}public static CustomDelayQueue<DelayQueueJob> getDelayQueue() {if (delayQueue == null) {synchronized (DelayQueueSingleton.class) {if (delayQueue == null) {delayQueue = new CustomDelayQueue<>();}}}return delayQueue;}}

这里为了删除延迟队列的任务,我们对DelayQueue进行了重写。


import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;public class CustomDelayQueue<T extends Delayed> {private final DelayQueue<T> queue = new DelayQueue<>();private final Map<String, T> map = new ConcurrentHashMap<>();public boolean put(T task, String taskId) {// 如果任务已存在,则删除旧任务,防止重复添加this.remove(taskId);map.put(taskId, task);return queue.add(task);}public boolean remove(String taskId) {// 先删除map,再删除queueT task = map.remove(taskId);if (task != null) {return queue.remove(task);}return false;}public T take() throws InterruptedException {return queue.take();}
}

六、保存任务至延迟队列(生产者)


// 如果通知时间在一定时间范围内
if (DateUtil.offsetMinute(new DateTime(), commonConfig.getHotDataTimeLine()).after(event.getNotifyDate())) {DelayQueueSingleton.getDelayQueue().put(DelayQueueJob.builder().transNo(event.getTransNo()).expireDate(event.getNotifyDate()).build(), event.getTransNo());}

七、读取延迟队列中的任务(消费者)

作为延迟队列的消费者,它的实现和上一篇文章实现类似。不同的是take()获取任务不一样。

String transNo = null;
Date notifyDate = null;DelayQueueJob job = DelayQueueSingleton.getDelayQueue().take();
if (null != job) {transNo = job.getTransNo();notifyDate = job.getExpireDate();
}if (null == transNo) {return;
}if (log.isInfoEnabled()) {log.info("开始执行延迟队列中的任务,transNo={},notifyDate={}", transNo, notifyDate);
}// 异步执行你的操作
notifyTaskService.handleTask(transNo, notifyDate);

八、总结

作为进程内的延迟队列,在多点部署的分布式集群环境下, 代码明显比上一篇要复杂得多。

它们都需要的步骤是:

  • 任务的生产
  • 任务的消费
  • 移除任务

DelayQueue额外多出来的步骤是:

  • 应用启动的时候拉取回调时间将近的未完成任务(更新marked标记为true,防止重复拉取冷数据)
  • 定时拉取未标记且回调时间将近的未完成任务(和上面必须是互斥,等待上一步执行完成,否则会导致重复拉取)
  • 删除延迟队列DelayQueue的任务,必须发布广播消息给全部节点。(引入广播消息机制)

由此可见,任务表的字段marked仅供DelayQueue使用,防止重复拉取数据库的任务到热数据区。

    @Column(name = "marked", nullable = false, columnDefinition = "TINYINT(1) default 0 COMMENT '是否已标记为热数据'")private Boolean marked;

附:相关系列文章链接

延时任务通知服务的设计及实现(一)-- 设计方案

延时任务通知服务的设计及实现(二)-- redisson的延迟队列RDelayedQueue

延时任务通知服务的设计及实现(三)-- JDK的延迟队列DelayQueue

延时任务通知服务的设计及实现(四)-- webhook执行任务

延时任务通知服务的设计及实现(五)-- Netty时间轮HashedWheelTimer

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/832712.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Game Theory In Competitive Programming|Part2(原创)

在上一个Part部分&#xff0c;我们介绍了Bash game、Nim game、Misere Nim game 这三个游戏的玩法、必胜策略&#xff0c;以及必胜策略的证明&#xff0c;并介绍了有关必胜态以及必败态的两条定理&#xff0c;接下来我们会以Part1为基础&#xff0c;深挖其中的理论。 文章目录 …

【Oracle直播课】5月19日Oracle 19c OCM认证大师课 (附课件预览)

Oracle 19c OCM认证大师培训 - 课程体系 - 云贝教育 (yunbee.net) 部分课件预览 OCM部分课件预览 Oracle Database 19c Certified Master Exam (OCM) 认证大师 25 天 / 150课时 什么是Oracle 19c OCM&#xff1f; Oracle Certified Master (OCM)是Oracle认证大师&#xff0c;…

如何使用 iOS系统恢复软件修复 iPhone 问题

苹果公司向世界推出了他们可以拥有的最智能的手机。但即使是 iPhone 也无法避免智能手机常见的损坏和问题。您将熟悉最常见的问题。屏幕黑屏或卡在 Apple 徽标上&#xff1b;冻结或卡在恢复模式的 iPhone。但这样的问题不胜枚举&#xff0c;每天都有 iOS 用户在他们的设备中遇到…

利用傅里叶变换公式理解camera raw中的纹理和清晰度的概念(可惜的是camera raw的计算公式应该不会是这个傅里叶变换,只能说类似于这里的效果)

知乎说&#xff1a;在Adobe官方的解释中&#xff0c;就像图片可以分解成彩色通道&#xff08;如&#xff1a;红绿蓝通道&#xff09;&#xff0c;同样的&#xff0c;图片也可以分解成不同的频率&#xff0c;一张图片可以是由高频&#xff0c;中频和低频组成&#xff0c;例如&am…

VALSE 2024年度进展评述内容分享-视觉基础大模型的进展

2024年视觉与学习青年学者研讨会&#xff08;VALSE 2024&#xff09;于5月5日到7日在重庆悦来国际会议中心举行。本公众号将全方位地对会议的热点进行报道&#xff0c;方便广大读者跟踪和了解人工智能的前沿理论和技术。欢迎广大读者对文章进行关注、阅读和转发。文章是对报告人…

docker安装redis命令及运行

docker安装redis&#xff1a; docker run -d -p 6379:6379 --name redis redis:latest -d: 以 守护进程模式 运行容器&#xff0c;容器启动后会进入后台运行&#xff0c;并脱离当前命令行会话。 -p: 显示端口号。 -p 6379:6379: 将容器内部的 6379 端口映射到宿主机 6379 端…

Redis学习3——Redis应用之缓存

引言 缓存的意义 Redis作为一个NoSql数据库&#xff0c;被广泛的当作缓存数据库使用&#xff0c;所谓缓存&#xff0c;就是数据交换的缓冲区。使用缓存的具体原因有&#xff1a; 缓存数据存储于代码中&#xff0c;而代码运行在内存中&#xff0c;内存的读写性能远高于磁盘&a…

2024年第十三届工程与创新材料国际会议(ICEIM 2024)即将召开!

2024年第十三届工程与创新材料国际会议&#xff08;ICEIM 2024&#xff09;将于2024年9月6-8日在日本东京举行。ICEIM 2024由东京电机大学主办&#xff0c;会议旨在材料科学与工程、材料特性、测量方法和应用等相关领域进行学术交流与合作&#xff0c;在材料的微观世界里&#…

npm install 及使用cordova打包常见错误大全(附解决方案)

问题1、cb() 这是我们在install过程中最最常见问题&#xff0c;网络上的解决方式也都是大同小异&#xff0c;要么就是升级node(误人子弟)&#xff0c;项目里的node是不可以随意升级的&#xff0c;它有可能会导致其他依赖又不适配&#xff0c;起始很多时候就是由于咱们配置的镜像…

【docker】常用的Docker编排和调度平台

常用的Docker编排和调度平台 Kubernetes (K8s): Kubernetes是目前市场上最流行和功能最全面的容器编排和调度平台。它由Google开发并开源&#xff0c;现由CNCF&#xff08;云原生计算基金会&#xff09;维护。Kubernetes设计用于自动化容器部署、扩展和管理&#xff0c;支持跨…

v-for中的key是什么作用

在使用v-for进行列表渲染时&#xff0c;我们通常会给元素或者组件绑定一个key属性。 这个key属性有什么作用呢?我们先来看一下官方的解释&#xff1a; key属性主要用在Vue的虚拟DOM算法&#xff0c;在新Inodes对比时辨识VNodes&#xff1b; 如果不使用key&#xff0c;Vue会使用…

计算机系列之信息安全技术

15、信息安全技术 1、信息安全和信息系统安全 信息安全系统的体系架构 X轴是“安全机制”&#xff0c;为提供某些安全服务&#xff0c;利用各种安全技 术和技巧&#xff0c;所形成的一个较为完善的机构体系。 Y轴是“OSI网络参考模型”。 Z轴是“安全服务”。就是从网络中的各…

Spring框架学习笔记(一):Spring基本介绍(包含容器底层结构)

1 官方资料 1.1 官网 https://spring.io/ 1.2 进入 Spring5 下拉 projects, 进入 Spring Framework 进入 Spring5 的 github 1.3 在maven项目中导入依赖 <dependencies><!--加入spring开发的基本包--><dependency><groupId>org.springframework<…

【软考】模拟考卷错题本2024-05-05

1 算法 关键词&#xff1a;按照单位重量价值大优先&#xff0c;那就是1、2、3即430&#xff1b;之后的根据排除法又可以得到630&#xff1b;故C。 2 UML 序列图 上图已经基本上有解析&#xff1b;重点在于在四个选项中选正确的。根据概念排除&#xff1a;异步和同步是不一样的&…

Tomact安装配置及使用(超详细)

文章目录 web相关知识概述web简介(了解)软件架构模式(掌握)BS&#xff1a;browser server 浏览器服务器CS&#xff1a;client server 客户端服务器 B/S和C/S通信模式特点(重要)web资源(理解)资源分类 URL请求路径(理解)作用介绍格式浏览器通过url访问服务器的过程 服务器(掌握)…

Typecho文章采集器火车头插件

目前市面上基本没有typecho火车头采集器 而分享的这一款采集器 内置使用方法与教程&#xff01;

基于大语言模型多智体的综述:进步和挑战!

源自&#xff1a; 人工智能前沿讲习 “人工智能技术与咨询” 发布 声明:公众号转载的文章及图片出于非商业性的教育和科研目的供大家参考和探讨&#xff0c;并不意味着支持其观点或证实其内容的真实性。版权归原作者所有&#xff0c;如转载稿涉及版权等问题&#xff0c;请立即…

Python高级编程-DJango2

Python高级编程-DJango2 没有清醒的头脑&#xff0c;再快的脚步也会走歪&#xff1b;没有谨慎的步伐&#xff0c;再平的道路也会跌倒。 目录 Python高级编程-DJango2 1.显示基本网页 2.输入框的形式&#xff1a; 1&#xff09;文本输入框 2&#xff09;单选框 3&#xff…

【docker 】 IDEA 安装 Docker 工具

打开File->Settings->Plugins 配置 Docker 的远程访问连接 Engine APIURL &#xff1a;tcp://192.168.0.1:2375 &#xff08;换成自己的docker开放端口&#xff09; 使用diea的docker插件 查看已有的镜像 创建一个容器 下面是最近更新的文章&#xff1a; 【docker 】 …

Spring与Mybatis-增删改查(注解方式与配置文件方式)

Spring框架下Mybaits的使用 准备数据库配置application.propertiespom.xml添加lombok依赖创建Emp实体类准备Mapper接口&#xff1a;EmpMapper预编译SQL根据id查询数据Mapper接口方法配置application.properties开启自动结果映射单元测试 条件模糊查询Mapper接口方法单元测试 根…