文章目录
- AsyncTool
- AsyncTool是什么?
- AsyncTool快速入门
- 1)导入依赖
- 2)自定义Worker
- 3)编排包装类Wrapper
- 4)提交任务执行
- 5)运行结果
- 并发编程常见的场景
- 串行
- 并行
- 阻塞等待 - 先串行,后并行
- 阻塞等待 - 先并行,后串行
AsyncTool
AsyncTool是什么?
是京东开源的一个可编排多线程框架,可解决任意的多线程并行、串行、阻塞、依赖、回调的并行框架。可以任意组合各线程的执行顺序,并且带有全链路执行结果回调。是多线程编排一站式解决方案。(注:它是单机的,不支持分布式编排),是对CompletableFuture的进一步封装。这里对框架的使用做一下总结,供日后工作中方便查看。
AsyncTool快速入门
A、B、C串行任务示例。
1)导入依赖
去gitee搜AsyncTool,京东开源项目。
2)自定义Worker
自定义线程任务A、B、C,实现IWorker,ICallback函数式接口,并重写下面的4个方法。
- begin():Worker开始执行前,先回调begin()
- action():Worker中执行耗时操作的地方,比如RPC接口调用。
- result():action()执行完毕后,回调result方法,可以在此处处理action中的返回值。
- defaultValue():整个Worker执行异常,或者超时,会回调defaultValue(),Worker返回默认值。
workerA:
(action模拟线程任务耗时操作,此处举例仅对参数+1)
public class WorkerA implements IWorker<Integer, Integer>, ICallback<Integer, Integer> {/*** Worker开始的时候先执行begin*/@Overridepublic void begin() {System.out.println("A - Thread:" + Thread.currentThread().getName() + "- start --" + SystemClock.now());}/*** Worker中耗时操作在此执行RPC/IO* @param object object* @param allWrappers 任务包装* @return*/@Overridepublic Integer action(Integer object, Map<String, WorkerWrapper> allWrappers) {Integer res = object + 1;return res;}/*** action执行结果的回调* @param success* @param param* @param workResult*/@Overridepublic void result(boolean success, Integer param, WorkResult<Integer> workResult) {System.out.println("A - param:" + JSON.toJSONString(param));System.out.println("A - result:" + JSON.toJSONString(workResult));System.out.println("A - Thread:" + Thread.currentThread().getName() + "- end --" + SystemClock.now());}/*** Worker异常时的回调* @return*/@Overridepublic Integer defaultValue() {System.out.println("A - defaultValue");return 101;}
}
workerB:
(action()模拟线程任务耗时操作,此处举例仅对参数+2)
public class WorkerB implements IWorker<Integer, Integer>, ICallback<Integer, Integer> {/*** Worker开始的时候先执行begin*/@Overridepublic void begin() {System.out.println("B - Thread:" + Thread.currentThread().getName() + "- start --" + SystemClock.now());}/*** Worker中耗时操作在此执行RPC/IO* @param object object* @param allWrappers 任务包装* @return*/@Overridepublic Integer action(Integer object, Map<String, WorkerWrapper> allWrappers) {Integer res = object + 2;return res;}/*** action执行结果的回调* @param success* @param param* @param workResult*/@Overridepublic void result(boolean success, Integer param, WorkResult<Integer> workResult) {System.out.println("B - param:" + JSON.toJSONString(param));System.out.println("B - result:" + JSON.toJSONString(workResult));System.out.println("B - Thread:" + Thread.currentThread().getName() + "- end --" + SystemClock.now());}/*** Worker异常时的回调* @return*/@Overridepublic Integer defaultValue() {System.out.println("B - defaultValue");return 102;}
}
WorkerC:
(action()模拟线程任务耗时操作,此处举例仅对参数+3)
public class WorkerC implements IWorker<Integer, Integer>, ICallback<Integer, Integer> {/*** Worker开始的时候先执行begin*/@Overridepublic void begin() {System.out.println("C - Thread:" + Thread.currentThread().getName() + "- start --" + SystemClock.now());}/*** Worker中耗时操作在此执行RPC/IO* @param object object* @param allWrappers 任务包装* @return*/@Overridepublic Integer action(Integer object, Map<String, WorkerWrapper> allWrappers) {Integer res = object + 3;return res;}/*** action执行结果的回调* @param success* @param param* @param workResult*/@Overridepublic void result(boolean success, Integer param, WorkResult<Integer> workResult) {System.out.println("C - param:" + JSON.toJSONString(param));System.out.println("C - result:" + JSON.toJSONString(workResult));System.out.println("C - Thread:" + Thread.currentThread().getName() + "- end --" + SystemClock.now());}/*** Worker异常时的回调* @return*/@Overridepublic Integer defaultValue() {System.out.println("C - defaultValue");return 103;}
}
3)编排包装类Wrapper
Worker创建好之后,使用WorkerWrapper对Worker进行包装以及编排,WorkerWrapper是AsyncTool组件的最小可执行任务单元。
C是最后一步,它没有next。B的next是C,A的next是B。编排顺序就是:C <- B <- A
public class Test {public static void main(String[] args) {//引入Worker工作单元WorkerA workerA = new WorkerA();WorkerB workerB = new WorkerB();WorkerC workerC = new WorkerC();//包装Worker,编排串行顺序:C <- B <- A//C是最后一步,它没有nextWorkerWrapper wrapperC = new WorkerWrapper.Builder<Integer, Integer>().id("workerC").worker(workerC).callback(workerC).param(3)//3+3.build();//B的next是CWorkerWrapper wrapperB = new WorkerWrapper.Builder<Integer, Integer>().id("workerB").worker(workerB).callback(workerB).param(2)//2+2.next(wrapperC).build();//A的next是BWorkerWrapper wrapperA = new WorkerWrapper.Builder<Integer, Integer>().id("workerA").worker(workerA).callback(workerA).param(1)//1+1.next(wrapperB).build();try {//Action 提交任务🚩🚩🚩Async.beginWork(1000, wrapperA);} catch (ExecutionException | InterruptedException e) {e.printStackTrace();}}
}
或者还有一种写法:可以使用depend方式编排
//A没有depend
WorkerWrapper wrapperA = new WorkerWrapper.Builder<Integer, Integer>().id("workerA").worker(workerA).callback(workerA).param(1).build();//B的depend是A
WorkerWrapper wrapperB = new WorkerWrapper.Builder<Integer, Integer>().id("workerB").worker(workerB).callback(workerB).param(2).depend(wrapperA).build();//C的depend是B
WorkerWrapper wrapperC = new WorkerWrapper.Builder<Integer, Integer>().id("workerC").worker(workerC).callback(workerC).param(3).depend(wrapperB).build();
//begin
Async.beginWork(1000, wrapperA);
4)提交任务执行
通过执行器类Async的beginWork方法提交任务执行。
//默认不定长线程池
private static final ThreadPoolExecutor COMMON_POOL = (ThreadPoolExecutor) Executors.newCachedThreadPool();//提交任务🚩🚩🚩
Async.beginWork(long timeout, ExecutorService executorService, WorkerWrapper... workerWrapper)
- Timeout:全组任务超时时间设定,如果Worker任务超时,则Worker结果使用defaultValue()默认值。
- ExecutorService executorService:自定义线程池,不自定义的话,就走默认的COMMON_POOL。默认的线程池是不定长线程池。
- WorkerWrapper… workerWrapper:起始任务,可以是多个。注意不要提交中间节点的任务,只需要提交起始任务即可,编排的后续任务会自动执行。
5)运行结果
运行结果:A:1+1=2;B:2+2=4;C:3+3=6
并发编程常见的场景
串行
Worker创建好之后,使用WorkerWrapper对Worker进行包装以及编排,WorkerWrapper是AsyncTool组件的最小可执行任务单元。
C是最后一步,它没有next。B的next是C,A的next是B。编排顺序就是:C <- B <- A
(1)next写法:
public class Test {public static void main(String[] args) {//引入Worker工作单元WorkerA workerA = new WorkerA();WorkerB workerB = new WorkerB();WorkerC workerC = new WorkerC();//包装Worker,编排串行顺序:C <- B <- A//C是最后一步,它没有nextWorkerWrapper wrapperC = new WorkerWrapper.Builder<Integer, Integer>().id("workerC").worker(workerC).callback(workerC).param(3)//3+3.build();//B的next是CWorkerWrapper wrapperB = new WorkerWrapper.Builder<Integer, Integer>().id("workerB").worker(workerB).callback(workerB).param(2)//2+2.next(wrapperC).build();//A的next是BWorkerWrapper wrapperA = new WorkerWrapper.Builder<Integer, Integer>().id("workerA").worker(workerA).callback(workerA).param(1)//1+1.next(wrapperB).build(); try {//Action 提交任务🚩🚩🚩Async.beginWork(1000, wrapperA);} catch (ExecutionException | InterruptedException e) {e.printStackTrace();}}
}
并行
WorkerWrapper并行编排:A\B\C都没有next和depend, 3个WorkerWrapper一起begin。
Async.beginWork(1000, wrapperA, wrapperB, wrapperC);
public class Test {public static void main(String[] args) {//引入Worker工作单元WorkerA workerA = new WorkerA();WorkerB workerB = new WorkerB();WorkerC workerC = new WorkerC();/*** 包装Worker,编排并行顺序*///AWorkerWrapper wrapperA = new WorkerWrapper.Builder<Integer, Integer>().id("workerA").worker(workerA).callback(workerA).param(1)//1+1.build();//BWorkerWrapper wrapperB = new WorkerWrapper.Builder<Integer, Integer>().id("workerB").worker(workerB).callback(workerB).param(2)//2+2.build();//CWorkerWrapper wrapperC = new WorkerWrapper.Builder<Integer, Integer>().id("workerC").worker(workerC).callback(workerC).param(3)//3+3.build();try {//3个WorkerWrapper一起begin🚩🚩🚩Async.beginWork(1000, wrapperA, wrapperB, wrapperC);} catch (ExecutionException | InterruptedException e) {e.printStackTrace();}}
}
阻塞等待 - 先串行,后并行
阻塞等待 - 先串行,后并行场景模拟:A先执行,对参数+1;A执行完毕之后,B\C同时并行执行,B任务基于A的返回值+2,C任务基于A的返回值+3
(1)next写法:
public static void nextWork() {//引入Worker工作单元WorkerA workerA = new WorkerA();WorkerB workerB = new WorkerB();WorkerC workerC = new WorkerC();//C是最后一步,它没有nextWorkerWrapper wrapperC = new WorkerWrapper.Builder<Integer, Integer>().id("workerC").worker(workerC).callback(workerC).param(null)//没有参数,根据A的返回值+3.build();//B是最后一步,它没有nextWorkerWrapper wrapperB = new WorkerWrapper.Builder<Integer, Integer>().id("workerB").worker(workerB).callback(workerB).param(null)//没有参数,根据A的返回值+2.build();//A的next是B、CWorkerWrapper wrapperA = new WorkerWrapper.Builder<Integer, Integer>().id("workerA").worker(workerA).callback(workerA).param(1)//1+1//next是B、C.next(wrapperB, wrapperC).build();try {//ActionAsync.beginWork(1000, wrapperA);} catch (ExecutionException | InterruptedException e) {e.printStackTrace();}
}
(2)depend写法:
//A没有depend,就是开始
WorkerWrapper wrapperA = new WorkerWrapper.Builder<Integer, Integer>().id("workerA").worker(workerA).callback(workerA).param(1).build();//C depend A
WorkerWrapper wrapperC = new WorkerWrapper.Builder<Integer, Integer>().id("workerC").worker(workerC).callback(workerC).param(null).depend(wrapperA)//🚩依赖A.build();
W
//B depend A
WorkerWrapper wrapperB = new WorkerWrapper.Builder<Integer, Integer>().id("workerB").worker(workerB).callback(workerB).param(null).depend(wrapperA)//🚩依赖A.build();
阻塞等待 - 先并行,后串行
B\C并行执行。B对参数+2,C对参数+3,B\C全部执行完后,A = B返回值+C返回值。
注意:需要B和C同时begin。Async.beginWork(4000, wrapperB, wrapperC);
(1)next写法:
public static void nextWork() {//引入Worker工作单元WorkerA workerA = new WorkerA();WorkerB workerB = new WorkerB();WorkerC workerC = new WorkerC();//A是最后一步,没有nextWorkerWrapper wrapperA = new WorkerWrapper.Builder<Integer, Integer>().id("workerA").worker(workerA).callback(workerA).param(null)//参数是null,A = B + C.build();//C next AWorkerWrapper wrapperC = new WorkerWrapper.Builder<Integer, Integer>().id("workerC").worker(workerC).callback(workerC).param(3)//3+3 = 6.next(wrapperA).build();//B next AWorkerWrapper wrapperB = new WorkerWrapper.Builder<Integer, Integer>().id("workerB").worker(workerB).callback(workerB).param(2)//2+2 = 4.next(wrapperA).build();try {new SynchronousQueue<Runnable>();//ActionAsync.beginWork(4000, wrapperB, wrapperC);} catch (ExecutionException | InterruptedException e) {e.printStackTrace();}
}
(2)depend写法:
//C没有depend,是起始节点
WorkerWrapper wrapperC = new WorkerWrapper.Builder<Integer, Integer>().id("workerC").worker(workerC).callback(workerC).param(3)//3+3 = 6.build();
//B没有depend,是起始节点
WorkerWrapper wrapperB = new WorkerWrapper.Builder<Integer, Integer>().id("workerB").worker(workerB).callback(workerB).param(2)//2+2 = 4.build();
//A depend B,C
WorkerWrapper wrapperA = new WorkerWrapper.Builder<Integer, Integer>().id("workerA").worker(workerA).callback(workerA).param(null)//参数是null,A = B + C.depend(wrapperB, wrapperC).build();