Java轻量延迟重试队列实现

背景

很多开放平台都使用Webhook的方式进行外部系统数据同步或者通知,对于Webhook请求的对外发送不进行重试显然有点说不过去。使用简单的while一个条件去重试N次好像达不到什么效果,只能是说有重试而已,而使用消息队列中间件好像依赖又太重,于是索性自己动手写了一个基于有界内存队列的抽象范型延迟重试队列组件。

1. 实现思路及考虑

  • 按照重试间隔(RetryDelaySeconds)& 重试持续结束时间(RetryTask.retryEndTime)进行重试以支持较长的重试周期(例如,保障一定可跨天的持续24小时的重试);
  • 重试基于有界内存队列(BlockingQueue)以避免失败后发送线程的阻塞以及重试任务积压较多时发生OOM;
  • 重试队列的消费使用重试任务哈希值(RetryTask.retryHash)得到的线程进行执行以避免不同业务之间重试任务的互相影响以及相同业务重试任务的串行执行;
  • 如果接受重试任务不落盘(程序意外重启时直接丢弃没有达到最大重试持续时间的任务)则可以不实现:flushTask() 和 recoverTask() 方法;

2. 代码实现

2.1 重试队列组件代码

2.1.1 RetryQueue

package retry;import lombok.Data;
import org.apache.commons.collections.CollectionUtils;import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.*;/*** RetryQueue* <p>* 1、按照重试间隔(RetryDelaySeconds)& 重试持续结束时间(RetryTask.retryEndTime)进行重试以支持较长的重试周期(例如,保障一定可跨天的持续24小时的重试);* 2、重试基于有界内存队列(BlockingQueue<T extends RetryTask>)以避免失败后发送线程的阻塞以及重试任务积压较多时发生OOM;* 3、重试队列的消费使用重试任务哈希值(RetryTask.retryHash)得到的线程进行执行以避免不同业务之间重试任务的互相影响以及相同业务重试任务的串行执行;* 4、如果接受重试任务不落盘(程序意外重启时直接丢弃没有达到最大重试持续时间的任务)则可以不实现:flushTask() 和 recoverTask() 方法;** @author chenx*/
public abstract class RetryQueue<T extends RetryQueue.RetryTask> {private final int maxQueueSize;private final int scheduledExecutorPoolSize;private final int awaitTerminationSeconds = 0;private boolean isStart = false;private int retryDelaySeconds;private BlockingQueue<T> queue;private ExecutorService executor;private ScheduledExecutorService[] scheduledExecutors;protected RetryQueue(int maxQueueSize, int scheduledExecutorPoolSize) {this.maxQueueSize = maxQueueSize;this.scheduledExecutorPoolSize = scheduledExecutorPoolSize;this.queue = new LinkedBlockingQueue<>(this.maxQueueSize);this.executor = Executors.newSingleThreadExecutor();// init scheduledExecutorsthis.scheduledExecutors = new ScheduledExecutorService[this.scheduledExecutorPoolSize];for (int i = 0; i < this.scheduledExecutorPoolSize; i++) {this.scheduledExecutors[i] = Executors.newSingleThreadScheduledExecutor();}}/*** process(重试任务处理)** @param task*/public abstract void process(T task);/*** flushTask(重试任务落盘)** @param taskList*/public abstract void flushTask(List<T> taskList);/*** recoverTask(重试任务恢复)** @return*/public abstract List<T> recoverTask();/*** startup*/public void startup(int retryDelaySeconds) {System.out.println("===RetryQueue startup begin===");if (this.isStart) {System.out.println("RetryQueue startup already!");return;}this.isStart = true;this.retryDelaySeconds = retryDelaySeconds;// 落盘任务恢复this.recoverTaskProcess();this.executor.execute(() -> {while (this.isStart) {try {T task = this.queue.take();this.onRetryTaskTaken(task);} catch (InterruptedException ex) {Thread.currentThread().interrupt();System.out.println("RetryQueue interrupted!");} catch (Exception ex) {System.out.println("retryQueue.startup() error!" + ex.getMessage());}}});System.out.println("===RetryQueue startup done===");}/*** shutdown*/public void shutdown() {System.out.println("===RetryQueue shutdown begin===");if (!this.isStart) {System.out.println("RetryQueue shutdown already!");return;}this.isStart = false;this.executor.shutdown();this.flushTask(this.drainToTask());try {if (!this.executor.awaitTermination(this.awaitTerminationSeconds, TimeUnit.SECONDS)) {this.executor.shutdownNow();}} catch (InterruptedException ex) {System.out.println("RetryQueue.executor.shutdown() InterruptedException!" + ex.getMessage());this.executor.shutdownNow();Thread.currentThread().interrupt();}for (int i = 0; i < this.scheduledExecutorPoolSize; i++) {ScheduledExecutorService scheduler = this.scheduledExecutors[i];scheduler.shutdown();try {if (!scheduler.awaitTermination(this.awaitTerminationSeconds, TimeUnit.SECONDS)) {scheduler.shutdownNow();}} catch (InterruptedException ex) {System.out.println("RetryQueue.scheduledExecutors.shutdown() InterruptedException!" + ex.getMessage());scheduler.shutdownNow();Thread.currentThread().interrupt();}}System.out.println("===RetryQueue shutdown done===");}/*** enqueue** @param task*/public void enqueue(T task) {try {if (task.getRetryEndTime() < System.currentTimeMillis()) {System.out.println("Discarded an expired RetryTask: " + task);return;}if (!this.queue.offer(task)) {System.out.println("RetryQueue.enqueue() is full!");return;}System.out.println("RetryQueue.enqueue() done, task: " + task);} catch (Exception ex) {System.out.println("RetryQueue.enqueue() error!" + ex.getMessage());}}/*** onRetryTaskTaken** @param task*/private void onRetryTaskTaken(T task) {try {if (Objects.isNull(task)) {System.out.println("RetryTask is null!");return;}ScheduledExecutorService scheduler = this.getScheduledExecutorService(task);scheduler.schedule(() -> this.process(task), this.retryDelaySeconds, TimeUnit.SECONDS);} catch (Exception ex) {System.out.println("RetryQueue.onRetryTaskTaken() error!" + ex.getMessage());}}/*** getScheduledExecutorService** @param task* @return*/private ScheduledExecutorService getScheduledExecutorService(T task) {if (Objects.isNull(task)) {throw new RuntimeException("RetryTask is null!");}int hashCode = task.getRetryHash();if (hashCode == Integer.MIN_VALUE) {hashCode = 0;}return this.scheduledExecutors[Math.abs(hashCode) % this.scheduledExecutorPoolSize];}/*** recoverTaskProcess*/private void recoverTaskProcess() {List<T> recoverTaskList = this.recoverTask();System.out.println("recoverTask size is: " + (CollectionUtils.isEmpty(recoverTaskList) ? 0 : recoverTaskList.size()));if (CollectionUtils.isEmpty(recoverTaskList)) {return;}for (T task : recoverTaskList) {this.enqueue(task);}}/*** drainToTask(获取所有未执行任务并清空队列:任务落盘时使用)** @return*/private List<T> drainToTask() {List<T> list = new ArrayList<>();this.queue.drainTo(list);System.out.println("recoverTask size is: " + list.size());return list;}@Datapublic static class RetryTask {/*** retryEndTime*/private Long retryEndTime;/*** retryHash*/private Integer retryHash;}
}

备注:
1、不使用Disruptor而用LinkedBlockingQueue做为有界内存队列的原因是:Disruptor不提供获取队列中所有未消费条目的方法。本来很想用Disruptor,毕竟效率高,也不必while true的方式去take。
2、Disruptor 简介及使用示例:https://blog.csdn.net/camelials/article/details/123492015

2.2 测试代码

2.2.1 FooRetryQueue

package retry;import lombok.Data;import java.util.List;/*** FooRetryQueue** @author chenx*/public class FooRetryQueue extends RetryQueue<FooRetryQueue.FooRetryTask> {/*** 队列容量(可改为走配置)*/private static final int MAX_QUEUE_SIZE = 10000;/*** 延迟任务执行线程池线程数(可改为走配置)*/private static final int SCHEDULED_EXECUTOR_POOL_SIZE = 64;private FooRetryQueue() {super(MAX_QUEUE_SIZE, SCHEDULED_EXECUTOR_POOL_SIZE);}/*** getInstance*/public static FooRetryQueue getInstance() {return FooRetryQueue.SingletonHolder.INSTANCE;}@Overridepublic void process(FooRetryTask task) {FooSendService.getInstance().sendMessage(task.getFooMessage(), task.getRetryEndTime());}@Overridepublic void flushTask(List<FooRetryTask> taskList) {/*** 这里不实现重试任务落盘* 思路:将taskList数据自行进行持久化,例如:写本地磁盘文件、保存到其他中间件中(例如:数据库)*/}@Overridepublic List<FooRetryTask> recoverTask() {/*** 这里不实现恢重试任务恢复* 思路:将flushTask(List<T> taskList)中的持久数据恢复为List<T>*/return null;}/*** SingletonHolder*/private static class SingletonHolder {public static final FooRetryQueue INSTANCE = new FooRetryQueue();}@Datapublic static class FooRetryTask extends RetryTask {/*** fooMessage*/private FooSendService.FooMessage fooMessage;}
}

2.2.2 FooSendService

package retry;import lombok.Builder;
import lombok.Data;import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;/*** FooSendService** @author chenx*/
public class FooSendService {/*** 最大重试持续时间(单位:秒),实际使用时建议走配置;*/public static final int MAX_RETRY_DURATION = 30;/*** 重试延迟时间(单位:秒),实际使用时建议走配置;*/public static final int RETRY_DELAY_SECONDS = 5;private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");private FooSendService() {// just do nothing}/*** getInstance** @return*/public static FooSendService getInstance() {return SingletonHolder.INSTANCE;}/*** startup*/public void startup() {FooRetryQueue.getInstance().startup(RETRY_DELAY_SECONDS);}/*** sendMessage*/public void sendMessage(FooMessage msg, long retryEndTime) {boolean isRetry = false;try {// 模拟消息发送失败isRetry = true;System.out.println("[" + LocalDateTime.now().format(formatter) + "]" + "FooSender.sendMessage() failed! " + msg.toString());} finally {if (isRetry) {FooRetryQueue.FooRetryTask task = new FooRetryQueue.FooRetryTask();task.setRetryEndTime(retryEndTime <= 0 ? getRetryEndTime() : retryEndTime);task.setRetryHash(msg.getApplicationId().hashCode());task.setFooMessage(msg);FooRetryQueue.getInstance().enqueue(task);}}}/*** shutdown*/public void shutdown() {FooRetryQueue.getInstance().shutdown();}/*** getRetryEndTime** @return*/private static long getRetryEndTime() {return System.currentTimeMillis() + MAX_RETRY_DURATION * 1000;}/*** 测试Foo消息发送重试*/public static void main(String[] args) throws InterruptedException {FooMessage fooMessage = FooMessage.builder().applicationId("app1").message("msg123").timestamp(System.currentTimeMillis()).build();// 模拟服务启动FooSendService.getInstance().startup();// 模拟服务发送一个需要失败重试的消息FooSendService.getInstance().sendMessage(fooMessage, -1L);// 模拟服务退出Thread.sleep(MAX_RETRY_DURATION * 1000 + RETRY_DELAY_SECONDS * 1000);FooSendService.getInstance().shutdown();}/*** SingletonHolder*/private static class SingletonHolder {public static final FooSendService INSTANCE = new FooSendService();}@Data@Builderpublic static class FooMessage {/*** applicationId*/private String applicationId;/*** message*/private String message;/*** timestamp*/private Long timestamp;}
}

3. 执行结果

FooSendService中的main方法执行结果如下,从执行结果可以看出确实可以按照预期进行重试:最多持续30秒,每5秒进行一次重试。
在这里插入图片描述
另外需要补充说明是:如果大家希望对重试任务进行落盘则需要:
1:实现recoverTask()和flushTask(List taskList)这2个方法(实现思路参考注释即可);
2:程序需要支持优雅停机(关于java优雅停机方法一搜一大把),并且在停机阶段调用RetryQueue.shutdown()方法。

以上代码主要为展示过程及实现,因此sout和hardCode代码在正式使用需要进行相关调整(sout信息使用日志输出、设置走配置)。同时大家也可以以此为脚手架代码进行改造,例如:重试时间间隔改为逐步增大等(当前为固定重试时间,改造为逐步增大的做法可以为:扩展RetryTask属性,增加retryCount,然后将延迟时间设置为:retryCount * retryDelaySeconds)。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/720500.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【论文阅读】High-Resolution Image Synthesis with Latent Diffusion Model

High-Resolution Image Synthesis with Latent Diffusion Model 引用&#xff1a; Rombach R, Blattmann A, Lorenz D, et al. High-resolution image synthesis with latent diffusion models[C]//Proceedings of the IEEE/CVF conference on computer vision and pattern re…

SkyWalking链路追踪上下文TraceContext的追踪身份traceId生成的实现原理剖析

结论先行 SkyWalking 通过字节码增强技术实现&#xff0c;结合依赖注入和控制反转思想&#xff0c;以SkyWalking方式将追踪身份traceId编织到链路追踪上下文TraceContext中。 是不是很有趣&#xff0c;很有意思&#xff01;&#xff01;&#xff01; 实现原理剖析 TraceConte…

1.1_2 性能指标——速率、带宽、吞吐量

文章目录 1.1_2 性能指标——速率、带宽、吞吐量&#xff08;一&#xff09;速率&#xff08;二&#xff09;带宽&#xff08;三&#xff09;吞吐量 1.1_2 性能指标——速率、带宽、吞吐量 &#xff08;一&#xff09;速率 速率即数据率或称数据传输率或比特率。 速率就是“快…

Redis的设计与实现

Redis的设计与实现 数据结构和内部编码 type命令实际返回的就是当前键的数据结构类型&#xff0c;它们分别是&#xff1a;string(字符串)hash(哈希)、list(列表)、set(集合)、zset (有序集合)&#xff0c;但这些只是Redis对外的数据结构。 实际上每种数据结构都有自己底层的…

Docker Protainer可视化平台,忘记登录密码,重置密码。

由于好久没有登录portainer系统&#xff0c;导致忘记了登录密码&#xff0c;试了好多常用的密码都不对&#xff0c;无奈只能重置密码。 一、停止protainer 容器 查看容器ID和COMMAND 用于停止容器 docker ps -a停止容器 docker stop portainer二、查找volume data 宿主机所在…

JavaEE之多线程

一.认识线程 1.多进程实现并发编程的不足之处&#xff1a; 引入多个进程的核心&#xff1a;实现并发编程&#xff08;c的CGI技术就是通过多进程的方式实现的网站后端开发&#xff09;。因为现在是一个多核cpu的时代&#xff0c;并发编程就是刚需。多进程实现并发编程&#xf…

达梦、金仓、南大、瀚高、优炫:从社区建设看企业技术自信心

正文约950字&#xff0c;预计阅读时间2分钟 国产技术厂商在面对自身产品问题时&#xff0c;往往保持回避态度&#xff0c;不愿公之于众&#xff0c;主要原因有2方面&#xff1a; 1&#xff0c;产品技术层面问题较多&#xff0c;如某些根本性缺陷难以攻克&#xff0c;或问题发…

java找工作之Mybatis(入门及xml配置相关)

Mybatis 学习Mybatis就要学会查看官网&#xff0c;官网地址如下&#xff1a;<MyBatis中文网 > 1、简介 1.1什么是Mybatis MyBatis 是一款优秀的持久层框架&#xff0c;它支持自定义 SQL、存储过程以及高级映射。MyBatis 免除了几乎所有的 JDBC 代码以及设置参数和获取…

【Vue3】3-6 : 仿ElementPlus框架的el-button按钮组件实

文章目录 前言 本节内容实现需求完整代码如下&#xff1a; 前言 上节,我们学习了 slot插槽&#xff0c;组件内容的分发处理 本节内容 本小节利用前面学习的组件通信知识&#xff0c;来完成一个仿Element Plus框架的el-button按钮组件实现。 仿造的地址&#xff1a;uhttps://…

预充电阻器选型报告

1. 客户基础条件 预充时间 t≤200ms &#xff0c;电容 C1280uf &#xff0c;电池包最高电压 U410V&#xff0c;预充深度 98% &#xff0c;30 秒内连续预充 15 次。 1.1 现选型号 现选EAK预充电阻额定功率 60W&#xff0c;标称阻值为 35Ω&#xff0c;在 此条件下单次预充…

Unity 协程(Coroutine)到底是什么?

参考链接&#xff1a;Unity 协程(Coroutine)原理与用法详解_unity coroutine-CSDN博客 为啥在Unity中一般不考虑多线程 因为在Unity中&#xff0c;只能在主线程中获取物体的组件、方法、对象&#xff0c;如果脱离这些&#xff0c;Unity的很多功能无法实现&#xff0c;那么多线程…

红黑树的简单介绍

红黑树 红黑树的概念 红黑树&#xff0c;是一种二叉搜索树&#xff0c;但在每个结点上增加一个存储位表示结点的颜色&#xff0c;可以是Red或Black。 通过对任何一条从根到叶子的路径上各个结点着色方式的限制&#xff0c;红黑树确保没有一条路径会比其他路径长出俩倍&#x…

Python类 __init__() 是一个特殊的方法

设计者&#xff1a;ISDF工软未来 版本&#xff1a;v1.0 日期&#xff1a;2024/3/5__init__() 是一个特殊的方法 类似c# C的构造函数 两头都包含两个下划线&#xff0c;这是约定&#xff0c;用于与普通的函数保持区分class User:用户类def __init__(self,first_name,last_name):…

Linux 运维:CentOS/RHEL防火墙和selinux设置

Linux 运维&#xff1a;CentOS/RHEL防火墙和selinux设置 一、防火墙常用管理命令1.1 CentOS/RHEL 7系统1.2 CentOS/RHEL 6系统 二、临时/永久关闭SELinux2.1 临时更改SELinux的执行模式2.2 永久更改SELinux的执行模式 &#x1f496;The Begin&#x1f496;点点关注&#xff0c;…

Finetuning Large Language Models: Sharon Zhou

Finetuning Large Language Models 课程地址&#xff1a;https://www.deeplearning.ai/short-courses/finetuning-large-language-models/ 本文是学习笔记。 Goal&#xff1a; Learn the fundamentals of finetuning a large language model (LLM). Understand how finetu…

STM32(16)使用串口向电脑发送数据

发送字节 发送数组 发送字符和字符串 字符&#xff1a; 字符串&#xff1a; 字符串在电脑中以字符数组的形式存储

ElasticSearch之分布式模型介绍,选主,脑裂

写在前面 本文看下es分布式模型相关内容。 1&#xff1a;分布式模型 1.1&#xff1a;分布式特征 支持水平扩展&#xff0c;可以存储PB级别数据&#xff0c;每个就能都有自己唯一的名称,默认名称时elasticsearch&#xff0c;可以通过配置文件&#xff0c;如cluster.name: my…

PowerBI怎么修改数据库密码

第一步&#xff1a;点击转换数据 第二步&#xff1a;点击数据源设置 第三步&#xff1a;点击编辑权限 第四步&#xff1a;点击编辑 第五步&#xff1a;输入正要修改的密码就可以了

STM32启动过程及反汇编

STM32从Flash启动的过程&#xff0c;主要是从上电复位到main函数的过程&#xff0c;主要有以下步骤&#xff1a; 1.初始化堆栈指针 SP_initial_sp&#xff0c;初始化 PC 指针Reset_Handler 2.初始化中断向量表 3.配置系统时钟 4.调用 C 库函数_main 初始化用户堆栈&#xf…

SAP HANA中PAL算法使用入门

1 应用场合 SAP HANA作为一款内存数据库产品, 使得数据常驻内存, 物理磁盘的存储作为数据备份与日志记录, 以防断电内存中数据丢失. 这种构架大大的缩短了数据存取的时间, 使得SAP HANA很”高速”. 在传统数据模型中,数据库只是作为存取数据一个工具,对于类似下图所示的应用, 客…