异步线程池配置:
@Configuration
@EnableAsync
public class AsyncConfig { @Bean ( name = "taskExecutor" ) public Executor taskExecutor ( ) { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor ( ) ; executor. setCorePoolSize ( 2 ) ; executor. setMaxPoolSize ( 5 ) ; executor. setQueueCapacity ( 500 ) ; executor. setThreadNamePrefix ( "Async-" ) ; executor. initialize ( ) ; return executor; }
}
异步线程服务
@Component
public class AsyncTaskService { private static final Logger logger = LoggerFactory. getLogger ( AsyncTaskService. class) ; private static final int DEFAULT_MAX_ATTEMPTS = 3 ; private static final long DEFAULT_DELAY_BETWEEN_ATTEMPTS = 1000 ; @Asyncpublic < T> CompletableFuture< T> executeAsyncTaskWithRetry ( Callable< T> task) { return executeAsyncTaskWithRetry ( task, DEFAULT_MAX_ATTEMPTS, DEFAULT_DELAY_BETWEEN_ATTEMPTS) ; } @Asyncpublic < T> CompletableFuture< T> executeAsyncTaskWithRetry ( Callable< T> task, int maxAttempts, long initialDelay) { CompletableFuture< T> future = new CompletableFuture< > ( ) ; ScheduledExecutorService scheduler = Executors. newScheduledThreadPool ( 1 ) ; executeTaskWithRetry ( task, maxAttempts, initialDelay, future, scheduler, 0 , new ArrayList< > ( ) ) ; return future; } private < T> void executeTaskWithRetry ( Callable< T> task, int maxAttempts, long initialDelay, CompletableFuture< T> future, ScheduledExecutorService scheduler, int attempt, List< Exception> exceptions) { if ( attempt >= maxAttempts) { RuntimeException exception = new RuntimeException ( "任务失败,已达最大重试次数: " + maxAttempts) ; exceptions. forEach ( exception:: addSuppressed) ; future. completeExceptionally ( exception) ; scheduler. shutdown ( ) ; logger. error ( "任务失败,已达最大重试次数: {}" , maxAttempts, exception) ; return ; } CompletableFuture< T> attemptFuture = CompletableFuture. supplyAsync ( ( ) -> { try { return task. call ( ) ; } catch ( Exception e) { throw new CompletionException ( e) ; } } ) ; attemptFuture. whenComplete ( ( result, throwable) -> { if ( throwable == null) { future. complete ( result) ; scheduler. shutdown ( ) ; } else { long delay = ( long ) ( initialDelay * Math. pow ( 2 , attempt) ) ; logger. info ( "任务失败,将在 {} 毫秒后重试,尝试第:{} 次" , delay, attempt + 1 ) ; exceptions. add ( ( Exception) throwable. getCause ( ) ) ; scheduler. schedule ( ( ) -> executeTaskWithRetry ( task, maxAttempts, initialDelay, future, scheduler, attempt + 1 , exceptions) , delay, TimeUnit. MILLISECONDS) ; } } ) ; } @Asyncpublic void executeAsyncRunnableTask ( Runnable task) { task. run ( ) ; }
}
执行测试:
@RestController
public class AsyncController { @Resource private AsyncTaskService genericAsyncService; @GetMapping ( "/startAsyncTask" ) @ApiOperation ( "执行异步任务" ) public String startAsyncTask ( ) { genericAsyncService. executeAsyncTaskWithRetry ( ( ) -> { System . out. println ( "执行异步任务: " + Thread . currentThread ( ) . getName ( ) ) ; Thread . sleep ( 5000 ) ; System . out. println ( "异步任务完成: " + Thread . currentThread ( ) . getName ( ) ) ; return "任务结果" ; } ) ; return "异步任务已启动" ; } @GetMapping ( "/startAsyncRunnableTask" ) @ApiOperation ( "不需要返回值的操作的任务" ) public String startAsyncRunnableTask ( ) { genericAsyncService. executeAsyncRunnableTask ( ( ) -> { System . out. println ( "执行异步Runnable任务: " + Thread . currentThread ( ) . getName ( ) ) ; System . out. println ( "异步Runnable任务完成: " + Thread . currentThread ( ) . getName ( ) ) ; } ) ; return "异步Runnable任务已启动" ; } private static final Random RANDOM = new Random ( ) ; @GetMapping ( "/executeTask" ) @ApiOperation ( "执行异步任务[重试]" ) public ResponseEntity < CompletableFuture < String > > executeTask ( ) { Callable < String > myTask = ( ) -> { int randomNumber = RANDOM . nextInt ( 10 ) ; System . out. println ( "我是任务 ====== 随机数" + randomNumber) ; if ( randomNumber < 5 ) { System . out. println ( "随机数: " + randomNumber + ",任务将失败。" ) ; throw new Exception ( "随机失败" ) ; } return "Task executed successfully" ; } ; CompletableFuture < String > resultFuture = genericAsyncService. executeAsyncTaskWithRetry ( myTask) ; return new ResponseEntity < > ( resultFuture, HttpStatus . OK ) ; } }