2019独角兽企业重金招聘Python工程师标准>>>
一个countDown在多线程调度下使用不当的分享
1. 诡异的数据抖动
在一个需求开发过程中,由于有多角色需要获取每个角色下的菜单;结果出现了单角色下拉去菜单没问题,多角色情况下只有一个角色的菜单正常返回的问题。这个问题很忧伤,没有菜单如何进功能页面?
2. 怀疑是缓存
因为多角色下菜单采用了Redis缓存,故而怀疑是其中一个角色下的菜单是缓存失效,但是关闭掉缓存依然不起作用,排除缓存影响。
3. debug发现问题
通过增加日志输出,在关闭掉缓存的实时模式下,依然存在菜单时而有,时而没有的情况。证明应该是代码有问题。
在一个多线程调度调度服务类中,发现一个问题,即在debug到如下代码,会出现后续代码未执行完全,接口结果即被返回的情况。
//经过查询相关API,不会出现时序问题,可放心使用final CountDownLatch latch = new CountDownLatch(callableList.size());for(Callable callable :callableList){ListenableFuture<T> listenableFuture = threadPoolTaskExecutor.submitListenable(callable);listenableFuture.addCallback(new ListenableFutureCallback<T>() {@Overridepublic void onFailure(Throwable throwable) {//过早调用countDown BUGlatch.countDown();LogHelper.EXCEPTION.error("执行任务异常",throwable);if(futureCallback!=null){futureCallback.onFailure(throwable);}}@Overridepublic void onSuccess(T t) {//过早调用countDown BUGlatch.countDown();if(futureCallback!=null){futureCallback.onSuccess(t);}}});}
4. countDown调用时机不对
在调用远程接口返回后,立即执行 lacth.countDown(); 会立刻造成主线程阻塞释放,立即响应结果,丢失部分数据。如下图所示,在第二个任务获取数据处理完成后, 就立即调用latch.countDown(), 致使后续的回调还未执行。主线程在收到 latch的释放阻塞后,返回了不完整的数据结果。
改造后,将countDown放在回调执行完成之后,并放置在 try {} finally { }代码块之中,保证一定得到执行,防止抛异常后,主线程阻塞。 如下所示:
final CountDownLatch latch = new CountDownLatch(callableList.size());for(Callable callable :callableList){ListenableFuture<T> listenableFuture = threadPoolTaskExecutor.submitListenable(callable);listenableFuture.addCallback(new ListenableFutureCallback<T>() {@Overridepublic void onFailure(Throwable throwable) {try{LogHelper.DEFAULT.info("多线程回调,latchCount="+latch.getCount());LogHelper.EXCEPTION.error("执行任务异常",throwable);if(futureCallback!=null){futureCallback.onFailure(throwable);}}finally {latch.countDown();}}@Overridepublic void onSuccess(T t) {try{LogHelper.DEFAULT.info("多线程回调成功,latchCount="+latch.getCount());if(futureCallback!=null){futureCallback.onSuccess(t);}}finally {latch.countDown();}}});}
5. 本次使用的多线程调度说明
使用Future阻塞模式,不会出现以上问题,使用future.get()的阻塞式获取,不需要CountDownLatch工具类配合使用。而且本次其实也可以使用Future来实现同样的功能。但是Future没有提供 onFailure, onSucess 这样的回调接口,考虑到易用性,采用了ListenableFuture;
本次采用的 spring的 ListenableFuture 方式回调来实现回调式聚合。在对CountDownLatch使用不当的情况下,出现了该问题。解决该问题后,功能运行正常。
本次提供的服务类 ThreadPoolExecutorService, 其主要的方法如下:
/*** 线程池服务类** @author David* @since 2018/5/15*/
public interface ThreadPoolExecutorService {/*** 执行多线程任务处理,并通过 futureCallback对结果进行回调处理* @param callableList 异步线程可执行的任务 List* @param futureCallback 异步回调* @param timeout 超时时间,毫秒* @param <T>*/<T> void execute(List<Callable<T>> callableList, ListenableFutureCallback<T> futureCallback, long timeout);/*** 执行多线程任务处理,并将结果聚合成一个List 进行返回** @param callableList 异步线程可执行的任务 List* @param failureCallback 失败的回调方法* @param skipNull 是否忽略Null 结果,如果忽略 null 不会添加到List 之中* @param timeout 超时时间,毫秒* @param <T>*/<T> List<T> executeAndMerge(List<Callable<T>> callableList, FailureCallback failureCallback, boolean skipNull, long timeout);/*** 执行多线程任务处理,并将结果List,聚合成一个List 进行返回** @param callableList 异步线程可执行的任务 List* @param failureCallback 失败的回调方法* @param timeout 超时时间,毫秒* @param <T>*/<T> List<T> executeAndMergeList(List<Callable<List<T>>> callableList, FailureCallback failureCallback, long timeout);
6. Future 和 ListenableFuture的区别
spring 或者 guava 的 ListenableFutrue其使用方式和机理应该是类似的,这里的说明是通用的。本次工具类使用的是spring自带的ListenableFutrue。
6.1 区别说明
ListenableFuture顾名思义就是可以监听的Future,它是对java原生Future的扩展增强。我们知道Future表示一个异步计算任务,当任务完成时可以得到计算结果。如果我们希望一旦计算完成就拿到结果展示给用户或者做另外的计算,就必须使用另一个线程不断的查询计算状态。这样做,代码复杂,而且效率低下。使用ListenableFuture帮我们检测Future是否完成了,如果完成就自动调用回调函数,这样可以让主线程不必阻塞,减少并发程序的复杂度。
6.4 spring ListenableFuture使用示例
一个简单的示例,如果要获得 ListenableFuture,则需要一个对Java线程池进行修饰过的线程池执行器。如下所示:
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"><property name="corePoolSize" value="${threadpool.corePoolSize}" /><property name="keepAliveSeconds" value="${threadpool.keepAliveSeconds}" /><property name="maxPoolSize" value="${threadpool.maxPoolSize}" /><property name="queueCapacity" value="${threadpool.queueCapacity}" /><property name="rejectedExecutionHandler"><bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" /></property></bean>
guava的请参考以下方式进行修饰,这里不进行详细说明:
//Java线程池的类型是可选的【根据场景自行构建】 ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
得到修饰过的线程池执行器后,即可提交可Callable任务,得到 ListenableFuture 进行处理。
以下为计算1~5的3次方的结果并输出,不需要聚合结果,每个线程计算完毕后,立刻输出结果:
for(int i=1; i<=5; i++){final int num = i;ListenableFuture<Integer> listenableFuture = threadPoolTaskExecutor.submitListenable(new Callable<Integer>() {@Overridepublic Integer call(){return (int)Math.pow(num,3);}});listenableFuture.addCallback(new ListenableFutureCallback<Integer>() {@Overridepublic void onFailure(Throwable throwable) {LogHelper.EXCEPTION.error("处理失败", throwable);}@Overridepublic void onSuccess(Integer result) {System.out.println(num + "的3次方为:"+ result);}});}
6.2 是否可以添加多个callback
答案是肯定的,因为ListenableFuture 的addCallback是添加到ListenableFutureCallbackRegistry 一个注册中心。而注册中心底层是支持多个回调的。在6.3会具体介绍回调方法注册中心的处理逻辑。
public void addCallback(ListenableFutureCallback<? super T> callback) {this.callbacks.addCallback(callback);}
6.3 addCallback是否需要考虑时序
guava 和 spring的 ListenableFuture 均做了时序兼容,在listenableFuture执行的任意时刻调用 addCallback 均可准确的执行回调。这里就不得不说在调用addCallback的时候,其实将回到方法注册到ListenableFutureCallbackRegistry(回调注册中心)。
6.3.1 ListenableFutureCallbackRegistry的特性有:
a. 记录了Callable的3种调度状态:
NEW(新建,还未执行),SUCCESS(返回成功),FAILURE(抛异常,失败)
b. 有两个处理队列,分别是:
successCallbacks:存储执行成功的回调方法;
failureCallbacks: 存储执行失败的回调方法(抛异常)。
c. 存储响应结果
将Callable<T> 返回的结果,也放在回调中心里面;
d. mutex 对象保证线程安全
有一个成员变量:private final Object mutex; 用来保证在添加回调任务,或者设置结果集的时候,注册中心是线程安全的。
其在添加回调任务的时候,处理流程为:
synchronized(this.mutex) {...}
如上图所示,在添加回调任务的时候,会通过synchronized先获取 mutex 排它锁,保证处理的线程安全。继而判断任务的状态:
如果为NEW,标识任务还未执行完毕,这时候需要将任务先放入队列,待任务执行完毕再根据状态调用回调方法;
如果为SUCCESS,标识任务已经执行成功,不需要再放入队列,而是在当前线程中,直接调用 onSuccess方法;
如果为FAILURE, 标识任务已经执行失败,不需要再放入队列,而是在当前线程中,直接调用 onFailure方法;
此外,还有一个流程,即在Callable任务调度完成,返回结果后,如果未抛异常:对successCallbacks队列中的方法进行逐个回调;如果抛出异常,对failureCallbacks队列中的方法进行逐个回调。因流程较简单,这里只是简单说明。
6.4 说明
本文只分析到 ListenableFuture 的使用方式, CountDownLatch的调用时机和ListenableFuture 的特性。
因时间和篇幅有限,具体spring 或 guava在submitListenable 任务之后,内部处理逻辑并没有阐述。感兴趣的同学可以具体到源码查看其内部实行逻辑。