异步任务使用场景
根据同步/异步方式划分场景,各场景下常用的技术方案如下:
方式 | 实现 | 特点 | 缺点 |
---|---|---|---|
同步 | HTTP RPC Cache etc. | 指标:RT、QPS、TPS、缓存命中率 等; 关注(准)实时数据,用户可交互 | 1. 处理数据量小:对应的业务妥协有必选筛选项、分页、滑动分页、异步渲染,首屏接口; 2. 并发请求压力不宜过大:利用缓存、CDN、异构存储等方式缓解压力; 3. 调用失败率较高:如调用超时,对应的处理方式有重试、降级、熔断; |
异步 | MQ Job Hive etc. | 指标:吞吐量 等; 最终一致性,处理数据量大,耗时长,定时,削峰,解耦 | 1. 异步用户需等待; 2. 系统复杂度提升; |
Job 业务场景:
- 批量场景:物流发货、订单超时关闭
- 周期性场景:财务结算,业务指标统计
Job 技术场景:
- 定时向其他服务/第三方系统推送数据:如本地消息表补偿发送消息,外部系统数据变更无法和当前系统业务动作不处于一个事务中无法保证业务动作的原子性,故利用本地表存储交互内容,后续通过定时任务查询该表执行外部交互动作,确保该行为一定发生。
- 定时拉取第三方系统数据:一般第三方系统不提供MQ这种数据变更的主动Push方式,而是当前系统主动Pull数据,一般第三方提供HTTP接口。
这里选取Job技术场景进行说明:
本地消息表
微服务之间通常使用 Event Driven 的方式异步处其他领域事件,达到解耦服务的目的。实现上利用MQ,MQ的发布-订阅模式具有高扩展性。
本地业务动作提交成功后,预期是MQ事件消息一定发送成功,但是存在两种异常Case:
常用的解决方式有两种:
- 事务消息
- 本地消息表
由于事务消息集成复杂度高,且事务消息失败后无法自动重试,故本地消息表才是最终解决方案,本地消息表的处理方式如下图:
如果是顺序消息,则 【Try Send Message】步骤不可添加,同时补偿任务发送需要以shardingKey维度串行发送。
如果分库,每个库都必须有单独的本地消息表,避免跨库事务
本地消息表组件可参考:开源中…
如果只是想在事务提交后执行逻辑而不用补偿,可注册TransactionSynchronization
在事务提交后回调执行,代码如下:
import java.util.function.Consumer;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;public class TransactionUtils {public static void executeAfterCommit(Runnable runnable , Consumer<Exception> exceptionConsumer) {// 有事务,注册Synchronization,事务提交后执行if (TransactionSynchronizationManager.isSynchronizationActive()) {TransactionSynchronization transactionSynchronization = new TransactionSynchronizationAdapter() {@Overridepublic void afterCommit() {try {runnable.run();} catch (Exception e) {exceptionConsumer.accept(e);}}};// 注册SynchronizationTransactionSynchronizationManager.registerSynchronization(transactionSynchronization);}// 无事务直接执行runnable.run();}
}
定时拉取第三方系统数据
公司内部不同系统间通信一般通过 RPC 或 MQ的方式交互,但第三方系统出于通用性、安全性考虑,一般只提供HTTP接口,则与第三方系统的交互必须有服务自行发起调用。调用过程中由于不具备原子性,通常需要有对账机制去保证三方系统和内部系统数据的一致性。同时,三方接口调用需要考虑批量参数、调用频次、加密解密等问题。
异步任务实践
任务中断与自动恢复执行
任务执行一般耗时较长,中断可能性高,比如任务在执行过程中有上线行为,则该任务必须被强制打断(虽然有优雅下线的处理方式,但是任务执行时长是不可预知的,所以一般直接打断)。打断后重启执行需要保证上次执行没处理完的任务接续执行,不会因为任务打断导致任务数据不可重试。通常业务模型状态需要加一个“执行中”状态标识该数据正在被任务处理,比如,现在需要推送一单发货单到第三方系统,推送状态机如下:
可以看到加了一个"xx中"状态表示任务执行过程,任务执行过程如下:
如上图所示,即使任务被打断也能自动恢复推送,【推送中】状态表示该项数据已经开始处理,在用户侧可以感知该状态。在向第三方推送完成之后,如果提交失败,下次任务执行会再次推送,这里要求第三方接口幂等。 推送数据添加最大重试推送次数,超过最大次数应告警。
分布式调度与任务并行执行
在分布式环境下,任务调度的执行策略可分为:
- 单机调度:只调度到单一Pod,如第一个、最后一个;
- 分片调度:根据业务参数分片,不同业务参数的任务调度到不同的单机并行执行,类似于MQ的分区顺序;
- 并行调度:完全并行调度,使用较少。
单机调度情况下不一定是串行执行的,任务执行时长可能大于调度间隔,仍然存在并发修改数据问题,这里任务组件可配置【丢弃执行】【排队等待】等阻塞策略,但在任务业务代码层面,如果需要保证严格串行处理,需要在任务执行前加分布式锁;同时加锁的粒度可按照业务维度进一步拆分,最好不要锁整个任务,这样结合分片执行策略可以提升任务处理效率。
第二类丢失更新
异步任务往往需要和其他状态变更动作协作。例如如下场景:
- 现有统计记录表包含统计状态字段,包含两个状态 0:【待统计】 1:【已统计】 ;
- 用户点击【开始统计】,插入/变更统计记录,状态置为【待统计】;
- 有异步任务定时查询【待统计】 状态记录,统计完成后将状态置为【已统计】;
- 【已统计】的记录可再次统计。
考虑如下Case:
用户更新了待统计数据,但是第二次点击【开始统计】后,统计结果仍然是更新前的结果。
解决方式: 统计状态字段不再使用二值表示,而是使用累加值表示待统计次数,类似可重入锁设计;
或者任务最后更新时加上 where update_time = #{selectedUpdateTime}
的乐观锁。
限制任务单次处理batchSize大小、只打印关键日志
任务处理通常需要批量查询/变更表数据,最好加batchSize限制单次任务执行的数量,减少单次任务耗时。同时任务涉及可重试的数据变更应设置最大变更次数,超过最大变更次数应跳过执行并告警。最后,批量任务应只打印关键日志,批量场景下日志过多打印有Pod磁盘占用率升高的风险。