一、本文内容
本文将简单梳理下,延迟任务通知服务的webhook模块实现。
这里的回调接口,请求方式约定为post,参数通过body传递参数。
实现比较简单,先梳理其流程图,再是简单的代码实现。
二、流程图
三、异步执行任务
对线程池进行自定义其配置。
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.ThreadPoolExecutor;/*** Async注解的配置.** @author xxx*/
@Configuration
@EnableAsync
public class AsyncConfig {@Bean(name = "taskExecutor")public AsyncTaskExecutor taskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();// 核心线程数executor.setCorePoolSize(10);// 最大线程数executor.setMaxPoolSize(50);// 队列容量executor.setQueueCapacity(2000);// 线程名称前缀executor.setThreadNamePrefix("DelayTask-");// 其他配置executor.setKeepAliveSeconds(60);executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.initialize();return executor;}
}
四、任务通知的代码实现
@Async("taskExecutor")public void handleTask(String transNo, Date notifyDate) {// 分布式锁if (!stringRedisTemplate.opsForValue().setIfAbsent(String.format(LOCK_KEY_TASK_EXECUTE, transNo),"1", 10, TimeUnit.SECONDS)) {log.warn("任务正在执行中, transNo={},notifyDate={}", transNo, notifyDate);return;}// 任务执行前,判断任务的状态NotifyTask notifyTask = this.getNotifyTask(transNo);if (null == notifyTask) {log.warn("任务不存在, transNo={},notifyDate={}", transNo, notifyDate);return;}if (notifyTask.getIsFinished()) {log.warn("任务已执行, transNo={},notifyUrl={},notifyParams={},notifyDate={}",transNo, notifyTask.getNotifyUrl(), notifyTask.getNotifyParams(), notifyDate);return;}// 判断任务的执行时间是否与期望执行时间一致if (null != notifyDate && !DateUtil.isSameTime(notifyDate, notifyTask.getNotifyDate())) {log.warn("任务的执行时间与期望执行时间不一致, transNo={},notifyUrl={},notifyParams={},notifyDate={},expectDate={}",transNo, notifyTask.getNotifyUrl(), notifyTask.getNotifyParams(), notifyDate, notifyTask.getNotifyDate());return;}boolean success = this.callback(notifyTask.getNotifyUrl(), notifyTask.getNotifyParams());if (!success) {// 判断任务是否支持重试if (notifyTask.getIsRetry() && notifyTask.getRetryTimes() < commonConfig.getMaxRetryTimes()) {notifyTask.retry();this.saveOrUpdateNotifyTask(notifyTask);} else {this.removeTaskFromRedis(transNo);}} else {// 更新任务的状态及完成时间notifyTask.finish();this.saveOrUpdateNotifyTask(notifyTask);}}private boolean callback(String notifyUrl, String notifyParams) {if (log.isInfoEnabled()) {log.info("回调接口, notifyUrl={}, notifyParams={}", notifyUrl, notifyParams);}HttpResponse httpResponse = HttpUtil.createPost(notifyUrl).body(notifyParams).setConnectionTimeout(3000).setReadTimeout(5000).execute();return httpResponse.isOk();}
五、总结
webhook模块,它跟使用什么延迟队列无关,主要步骤包括:
- 对任务的交易流水号进行加分布式锁,防止不同节点的重复回调。(虽然我们要求业务方的回调接口是要满足幂等性的)
- 判断任务是否存在,任务是否已完成
- 任务可能修改了回调时间,作为任务的版本,当延迟队列中的任务和最新的版本不一样时,给与拒绝回调。
- 回调失败,如果支持重试,则更新任务的回调时间;反之删除任务,不再执行。
- 回调成功,更新任务的执行状态和完成时间。
附:相关系列文章链接
延时任务通知服务的设计及实现(一)-- 设计方案
延时任务通知服务的设计及实现(二)-- redisson的延迟队列RDelayedQueue
延时任务通知服务的设计及实现(三)-- JDK的延迟队列DelayQueue
延时任务通知服务的设计及实现(四)-- webhook执行任务