背景
很多开放平台都使用Webhook的方式进行外部系统数据同步或者通知,对于Webhook请求的对外发送不进行重试显然有点说不过去。使用简单的while一个条件去重试N次好像达不到什么效果,只能是说有重试而已,而使用消息队列中间件好像依赖又太重,于是索性自己动手写了一个基于有界内存队列的抽象范型延迟重试队列组件。
1. 实现思路及考虑
- 按照重试间隔(RetryDelaySeconds)& 重试持续结束时间(RetryTask.retryEndTime)进行重试以支持较长的重试周期(例如,保障一定可跨天的持续24小时的重试);
- 重试基于有界内存队列(BlockingQueue)以避免失败后发送线程的阻塞以及重试任务积压较多时发生OOM;
- 重试队列的消费使用重试任务哈希值(RetryTask.retryHash)得到的线程进行执行以避免不同业务之间重试任务的互相影响以及相同业务重试任务的串行执行;
- 如果接受重试任务不落盘(程序意外重启时直接丢弃没有达到最大重试持续时间的任务)则可以不实现:flushTask() 和 recoverTask() 方法;
2. 代码实现
2.1 重试队列组件代码
2.1.1 RetryQueue
package retry;import lombok.Data;
import org.apache.commons.collections.CollectionUtils;import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.*;/*** RetryQueue* <p>* 1、按照重试间隔(RetryDelaySeconds)& 重试持续结束时间(RetryTask.retryEndTime)进行重试以支持较长的重试周期(例如,保障一定可跨天的持续24小时的重试);* 2、重试基于有界内存队列(BlockingQueue<T extends RetryTask>)以避免失败后发送线程的阻塞以及重试任务积压较多时发生OOM;* 3、重试队列的消费使用重试任务哈希值(RetryTask.retryHash)得到的线程进行执行以避免不同业务之间重试任务的互相影响以及相同业务重试任务的串行执行;* 4、如果接受重试任务不落盘(程序意外重启时直接丢弃没有达到最大重试持续时间的任务)则可以不实现:flushTask() 和 recoverTask() 方法;** @author chenx*/
public abstract class RetryQueue<T extends RetryQueue.RetryTask> {private final int maxQueueSize;private final int scheduledExecutorPoolSize;private final int awaitTerminationSeconds = 0;private boolean isStart = false;private int retryDelaySeconds;private BlockingQueue<T> queue;private ExecutorService executor;private ScheduledExecutorService[] scheduledExecutors;protected RetryQueue(int maxQueueSize, int scheduledExecutorPoolSize) {this.maxQueueSize = maxQueueSize;this.scheduledExecutorPoolSize = scheduledExecutorPoolSize;this.queue = new LinkedBlockingQueue<>(this.maxQueueSize);this.executor = Executors.newSingleThreadExecutor();// init scheduledExecutorsthis.scheduledExecutors = new ScheduledExecutorService[this.scheduledExecutorPoolSize];for (int i = 0; i < this.scheduledExecutorPoolSize; i++) {this.scheduledExecutors[i] = Executors.newSingleThreadScheduledExecutor();}}/*** process(重试任务处理)** @param task*/public abstract void process(T task);/*** flushTask(重试任务落盘)** @param taskList*/public abstract void flushTask(List<T> taskList);/*** recoverTask(重试任务恢复)** @return*/public abstract List<T> recoverTask();/*** startup*/public void startup(int retryDelaySeconds) {System.out.println("===RetryQueue startup begin===");if (this.isStart) {System.out.println("RetryQueue startup already!");return;}this.isStart = true;this.retryDelaySeconds = retryDelaySeconds;// 落盘任务恢复this.recoverTaskProcess();this.executor.execute(() -> {while (this.isStart) {try {T task = this.queue.take();this.onRetryTaskTaken(task);} catch (InterruptedException ex) {Thread.currentThread().interrupt();System.out.println("RetryQueue interrupted!");} catch (Exception ex) {System.out.println("retryQueue.startup() error!" + ex.getMessage());}}});System.out.println("===RetryQueue startup done===");}/*** shutdown*/public void shutdown() {System.out.println("===RetryQueue shutdown begin===");if (!this.isStart) {System.out.println("RetryQueue shutdown already!");return;}this.isStart = false;this.executor.shutdown();this.flushTask(this.drainToTask());try {if (!this.executor.awaitTermination(this.awaitTerminationSeconds, TimeUnit.SECONDS)) {this.executor.shutdownNow();}} catch (InterruptedException ex) {System.out.println("RetryQueue.executor.shutdown() InterruptedException!" + ex.getMessage());this.executor.shutdownNow();Thread.currentThread().interrupt();}for (int i = 0; i < this.scheduledExecutorPoolSize; i++) {ScheduledExecutorService scheduler = this.scheduledExecutors[i];scheduler.shutdown();try {if (!scheduler.awaitTermination(this.awaitTerminationSeconds, TimeUnit.SECONDS)) {scheduler.shutdownNow();}} catch (InterruptedException ex) {System.out.println("RetryQueue.scheduledExecutors.shutdown() InterruptedException!" + ex.getMessage());scheduler.shutdownNow();Thread.currentThread().interrupt();}}System.out.println("===RetryQueue shutdown done===");}/*** enqueue** @param task*/public void enqueue(T task) {try {if (task.getRetryEndTime() < System.currentTimeMillis()) {System.out.println("Discarded an expired RetryTask: " + task);return;}if (!this.queue.offer(task)) {System.out.println("RetryQueue.enqueue() is full!");return;}System.out.println("RetryQueue.enqueue() done, task: " + task);} catch (Exception ex) {System.out.println("RetryQueue.enqueue() error!" + ex.getMessage());}}/*** onRetryTaskTaken** @param task*/private void onRetryTaskTaken(T task) {try {if (Objects.isNull(task)) {System.out.println("RetryTask is null!");return;}ScheduledExecutorService scheduler = this.getScheduledExecutorService(task);scheduler.schedule(() -> this.process(task), this.retryDelaySeconds, TimeUnit.SECONDS);} catch (Exception ex) {System.out.println("RetryQueue.onRetryTaskTaken() error!" + ex.getMessage());}}/*** getScheduledExecutorService** @param task* @return*/private ScheduledExecutorService getScheduledExecutorService(T task) {if (Objects.isNull(task)) {throw new RuntimeException("RetryTask is null!");}int hashCode = task.getRetryHash();if (hashCode == Integer.MIN_VALUE) {hashCode = 0;}return this.scheduledExecutors[Math.abs(hashCode) % this.scheduledExecutorPoolSize];}/*** recoverTaskProcess*/private void recoverTaskProcess() {List<T> recoverTaskList = this.recoverTask();System.out.println("recoverTask size is: " + (CollectionUtils.isEmpty(recoverTaskList) ? 0 : recoverTaskList.size()));if (CollectionUtils.isEmpty(recoverTaskList)) {return;}for (T task : recoverTaskList) {this.enqueue(task);}}/*** drainToTask(获取所有未执行任务并清空队列:任务落盘时使用)** @return*/private List<T> drainToTask() {List<T> list = new ArrayList<>();this.queue.drainTo(list);System.out.println("recoverTask size is: " + list.size());return list;}@Datapublic static class RetryTask {/*** retryEndTime*/private Long retryEndTime;/*** retryHash*/private Integer retryHash;}
}
备注:
1、不使用Disruptor而用LinkedBlockingQueue做为有界内存队列的原因是:Disruptor不提供获取队列中所有未消费条目的方法。本来很想用Disruptor,毕竟效率高,也不必while true的方式去take。
2、Disruptor 简介及使用示例:https://blog.csdn.net/camelials/article/details/123492015
2.2 测试代码
2.2.1 FooRetryQueue
package retry;import lombok.Data;import java.util.List;/*** FooRetryQueue** @author chenx*/public class FooRetryQueue extends RetryQueue<FooRetryQueue.FooRetryTask> {/*** 队列容量(可改为走配置)*/private static final int MAX_QUEUE_SIZE = 10000;/*** 延迟任务执行线程池线程数(可改为走配置)*/private static final int SCHEDULED_EXECUTOR_POOL_SIZE = 64;private FooRetryQueue() {super(MAX_QUEUE_SIZE, SCHEDULED_EXECUTOR_POOL_SIZE);}/*** getInstance*/public static FooRetryQueue getInstance() {return FooRetryQueue.SingletonHolder.INSTANCE;}@Overridepublic void process(FooRetryTask task) {FooSendService.getInstance().sendMessage(task.getFooMessage(), task.getRetryEndTime());}@Overridepublic void flushTask(List<FooRetryTask> taskList) {/*** 这里不实现重试任务落盘* 思路:将taskList数据自行进行持久化,例如:写本地磁盘文件、保存到其他中间件中(例如:数据库)*/}@Overridepublic List<FooRetryTask> recoverTask() {/*** 这里不实现恢重试任务恢复* 思路:将flushTask(List<T> taskList)中的持久数据恢复为List<T>*/return null;}/*** SingletonHolder*/private static class SingletonHolder {public static final FooRetryQueue INSTANCE = new FooRetryQueue();}@Datapublic static class FooRetryTask extends RetryTask {/*** fooMessage*/private FooSendService.FooMessage fooMessage;}
}
2.2.2 FooSendService
package retry;import lombok.Builder;
import lombok.Data;import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;/*** FooSendService** @author chenx*/
public class FooSendService {/*** 最大重试持续时间(单位:秒),实际使用时建议走配置;*/public static final int MAX_RETRY_DURATION = 30;/*** 重试延迟时间(单位:秒),实际使用时建议走配置;*/public static final int RETRY_DELAY_SECONDS = 5;private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");private FooSendService() {// just do nothing}/*** getInstance** @return*/public static FooSendService getInstance() {return SingletonHolder.INSTANCE;}/*** startup*/public void startup() {FooRetryQueue.getInstance().startup(RETRY_DELAY_SECONDS);}/*** sendMessage*/public void sendMessage(FooMessage msg, long retryEndTime) {boolean isRetry = false;try {// 模拟消息发送失败isRetry = true;System.out.println("[" + LocalDateTime.now().format(formatter) + "]" + "FooSender.sendMessage() failed! " + msg.toString());} finally {if (isRetry) {FooRetryQueue.FooRetryTask task = new FooRetryQueue.FooRetryTask();task.setRetryEndTime(retryEndTime <= 0 ? getRetryEndTime() : retryEndTime);task.setRetryHash(msg.getApplicationId().hashCode());task.setFooMessage(msg);FooRetryQueue.getInstance().enqueue(task);}}}/*** shutdown*/public void shutdown() {FooRetryQueue.getInstance().shutdown();}/*** getRetryEndTime** @return*/private static long getRetryEndTime() {return System.currentTimeMillis() + MAX_RETRY_DURATION * 1000;}/*** 测试Foo消息发送重试*/public static void main(String[] args) throws InterruptedException {FooMessage fooMessage = FooMessage.builder().applicationId("app1").message("msg123").timestamp(System.currentTimeMillis()).build();// 模拟服务启动FooSendService.getInstance().startup();// 模拟服务发送一个需要失败重试的消息FooSendService.getInstance().sendMessage(fooMessage, -1L);// 模拟服务退出Thread.sleep(MAX_RETRY_DURATION * 1000 + RETRY_DELAY_SECONDS * 1000);FooSendService.getInstance().shutdown();}/*** SingletonHolder*/private static class SingletonHolder {public static final FooSendService INSTANCE = new FooSendService();}@Data@Builderpublic static class FooMessage {/*** applicationId*/private String applicationId;/*** message*/private String message;/*** timestamp*/private Long timestamp;}
}
3. 执行结果
FooSendService中的main方法执行结果如下,从执行结果可以看出确实可以按照预期进行重试:最多持续30秒,每5秒进行一次重试。
另外需要补充说明是:如果大家希望对重试任务进行落盘则需要:
1:实现recoverTask()和flushTask(List taskList)这2个方法(实现思路参考注释即可);
2:程序需要支持优雅停机(关于java优雅停机方法一搜一大把),并且在停机阶段调用RetryQueue.shutdown()方法。
以上代码主要为展示过程及实现,因此sout和hardCode代码在正式使用需要进行相关调整(sout信息使用日志输出、设置走配置)。同时大家也可以以此为脚手架代码进行改造,例如:重试时间间隔改为逐步增大等(当前为固定重试时间,改造为逐步增大的做法可以为:扩展RetryTask属性,增加retryCount,然后将延迟时间设置为:retryCount * retryDelaySeconds)。