一、无返回值任务函数
List < List < StatisticsDTO > > batches = Lists . partition ( statisticsList, BATCH_SIZE ) ;
List < CompletableFuture < Void > > futures = new ArrayList < > ( batches. size ( ) ) ;
for ( int i = 0 ; i < batches. size ( ) ; i++ ) { logger. info ( "批次 " + i + " 开始处理..." ) ; String logId = LogIdThreadLocal . getLogId ( ) ; List < StatisticsDTO > batchData = batches. get ( i) ; CompletableFuture < Void > future = CompletableFuture . runAsync ( ( ) -> { try { LogIdThreadLocal . setLogId ( logId) ; processBatch ( batchData) ; } finally { LogIdThreadLocal . clean ( ) ; } } ) ; futures. add ( future) ;
}
CompletableFuture < Void > allOf = CompletableFuture . allOf ( futures. toArray ( new CompletableFuture [ 0 ] ) ) ;
allOf. join ( ) ;
二、带返回值任务函数
List < List < StatisticsDTO > > batches = Lists . partition ( statisticsList, BATCH_SIZE ) ;
List < CompletableFuture < List < StatisticsDTO > > > futures = new ArrayList < > ( batches. size ( ) ) ;
for ( int i = 0 ; i < batches. size ( ) ; i++ ) { logger. info ( "批次 " + i + " 开始处理..." ) ; String logId = LogIdThreadLocal . getLogId ( ) ; List < StatisticsDTO > batchData = batches. get ( i) ; CompletableFuture < List < DoctorAvatarAnalysisDTO > > future = CompletableFuture . supplyAsync ( ( ) -> { try { LogIdThreadLocal . setLogId ( logId) ; return processBatch ( batchData) ; } finally { LogIdThreadLocal . clean ( ) ; } } ) ; futures. add ( future) ;
}
CompletableFuture < Void > allOf = CompletableFuture . allOf ( futures. toArray ( new CompletableFuture [ 0 ] ) ) ;
List < StatisticsDTO > result = allOf. thenApply ( v -> futures. stream ( ) . map ( CompletableFuture :: join ) . flatMap ( List :: stream ) . collect ( Collectors . toList ( ) )
) . join ( ) ;