Mono.fromRunnable
Mono.fromRunnable
是一种创建 Mono
的方式,它接受一个 Runnable
作为参数。当 Mono
订阅者订阅时,Runnable
会被执行。Mono.fromRunnable
不会发出任何值,只会执行 Runnable
的逻辑,并在完成后发出一个完成信号。
示例
import reactor.core.publisher.Mono;public class FromRunnableExample {public static void main(String[] args) {Mono<Void> mono = Mono.fromRunnable(() -> {System.out.println("Runnable is executed");// 执行你的业务逻辑});// 订阅Mono以触发执行mono.subscribe(null, // onNext,不会被调用,因为fromRunnable不会发出任何值error -> System.err.println("Error: " + error), // onError() -> System.out.println("Runnable execution completed") // onComplete);}
}
在这个示例中,当 Mono
被订阅时,Runnable
中的逻辑会被执行,并且会打印出 “Runnable is executed”。完成后,Mono
会发出一个完成信号,因此会打印 “Runnable execution completed”。
Mono.zip
Mono.zip
用于合并多个 Mono
的结果。它接受多个 Mono
作为参数,并将它们的结果合并成一个新的 Mono
。当所有输入的 Mono
都完成时,Mono.zip
发出一个包含所有输入 Mono
结果的 Tuple
。
示例
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;public class ZipExample {public static void main(String[] args) {Mono<String> mono1 = Mono.just("Hello");Mono<String> mono2 = Mono.just("World");// 使用Mono.zip合并两个Mono的结果Mono<Tuple2<String, String>> zipped = Mono.zip(mono1, mono2);// 订阅以触发执行zipped.subscribe(result -> {String value1 = result.getT1(); // "Hello"String value2 = result.getT2(); // "World"System.out.println("Combined Result: " + value1 + " " + value2);});}
}
在这个示例中,Mono.zip
合并了两个 Mono
的结果。当这两个 Mono
都完成时,Mono.zip
会发出一个包含两个结果的 Tuple
,在 subscribe
中可以访问和处理这个合并的结果。
结合使用
结合使用 Mono.fromRunnable
和 Mono.zip
可以先执行一个同步任务(如 Mono.fromRunnable
),然后并行执行两个或多个异步任务(如使用 Mono.zip
),并在它们完成时合并结果。
示例
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;@Service
@Slf4j
public class LeaveSimulationRoomService {@Resourceprivate MmPromptRecordService mmPromptRecordService;@Resourceprivate RedisService redisService;@Resourceprivate SessionMappingService sessionMappingService;@Resourceprivate MmRehearseRecordSimulationService mmRehearseRecordSimulationService;@Resourceprivate MmRehearseSimulationService mmRehearseSimulationService;public Mono<Void> handleLeaveRoom(String sessionId) {log.info("Handling leave room logic for sessionId: {}", sessionId);String rehearseRecordIdStr = redisService.getRehearseRecordIdBySessionId(sessionId);if (rehearseRecordIdStr == null) {log.error("No rehearse record ID found for sessionId: {}", sessionId);return Mono.empty();}Long rehearseRecordId = Long.parseLong(rehearseRecordIdStr);String rehearseSimulationId = sessionMappingService.getSessionInfo(sessionId).getRehearseSimulationId();if (rehearseSimulationId == null) {log.error("No rehearse simulation ID found for sessionId: {}", sessionId);return Mono.empty();}// 先执行同步任务return Mono.fromRunnable(() -> {MmRehearseRecordSimulation mmRehearseRecordSimulation = mmRehearseRecordSimulationService.getById(rehearseRecordId);Integer communicationRound = mmRehearseRecordSimulation.getCommunicationRound() + 1;long duration = Duration.between(mmRehearseRecordSimulation.getStartTime(), LocalDateTime.now()).getSeconds();mmRehearseRecordSimulationService.updateCommunicationRoundAndDuration(rehearseRecordId, communicationRound, duration, null);}).then(Mono.defer(() -> {// 并行执行异步任务Mono<Boolean> repaymentPlanMono = mmPromptRecordService.getFinalRepaymentPlan(sessionId);Mono<String> contentSummaryMono = mmPromptRecordService.getContentSummary(sessionId);return Mono.zip(repaymentPlanMono, contentSummaryMono).flatMap(tuple -> {Boolean repaymentPlanAchieved = tuple.getT1();String briefSummary = tuple.getT2();// 再次更新communicationRounds并添加briefSummarymmRehearseRecordSimulationService.updateCommunicationRoundAndDuration(rehearseRecordId, null, null, briefSummary);// 更新演练信息MmRehearseSimulationDto dto = MmRehearseSimulationDto.builder().identifierId(rehearseSimulationId).repaymentPlanAchieved(repaymentPlanAchieved).briefSummary(briefSummary).communicationRounds(null) // communicationRounds已更新,不再需要更新.build();return mmRehearseSimulationService.updateRehearseSimulation(dto);});})).then();}
}
在这个代码示例中:
- 使用
Mono.fromRunnable
先同步更新communicationRounds
。 - 然后使用
Mono.defer
创建新的Mono
并并行执行两个异步任务(getFinalRepaymentPlan
和getContentSummary
),利用Mono.zip
合并它们的结果。 - 合并结果后,更新演练记录和演练信息。