【并行编程框架】AsyncTool

文章目录

  • AsyncTool
    • AsyncTool是什么?
    • AsyncTool快速入门
      • 1)导入依赖
      • 2)自定义Worker
      • 3)编排包装类Wrapper
      • 4)提交任务执行
      • 5)运行结果
    • 并发编程常见的场景
      • 串行
      • 并行
      • 阻塞等待 - 先串行,后并行
      • 阻塞等待 - 先并行,后串行

AsyncTool

AsyncTool是什么?

是京东开源的一个可编排多线程框架,可解决任意的多线程并行、串行、阻塞、依赖、回调的并行框架。可以任意组合各线程的执行顺序,并且带有全链路执行结果回调。是多线程编排一站式解决方案。(注:它是单机的,不支持分布式编排),是对CompletableFuture的进一步封装。这里对框架的使用做一下总结,供日后工作中方便查看。

AsyncTool快速入门

A、B、C串行任务示例。

1)导入依赖

去gitee搜AsyncTool,京东开源项目。

2)自定义Worker

自定义线程任务A、B、C,实现IWorker,ICallback函数式接口,并重写下面的4个方法。

  1. begin():Worker开始执行前,先回调begin()
  2. action():Worker中执行耗时操作的地方,比如RPC接口调用。
  3. result():action()执行完毕后,回调result方法,可以在此处处理action中的返回值。
  4. defaultValue():整个Worker执行异常,或者超时,会回调defaultValue(),Worker返回默认值。

workerA:

(action模拟线程任务耗时操作,此处举例仅对参数+1)

public class WorkerA implements IWorker<Integer, Integer>, ICallback<Integer, Integer> {/*** Worker开始的时候先执行begin*/@Overridepublic void begin() {System.out.println("A - Thread:" + Thread.currentThread().getName() + "- start --" + SystemClock.now());}/*** Worker中耗时操作在此执行RPC/IO* @param object      object* @param allWrappers 任务包装* @return*/@Overridepublic Integer action(Integer object, Map<String, WorkerWrapper> allWrappers) {Integer res = object + 1;return res;}/*** action执行结果的回调* @param success* @param param* @param workResult*/@Overridepublic void result(boolean success, Integer param, WorkResult<Integer> workResult) {System.out.println("A - param:" + JSON.toJSONString(param));System.out.println("A - result:" + JSON.toJSONString(workResult));System.out.println("A - Thread:" + Thread.currentThread().getName() + "- end --" + SystemClock.now());}/*** Worker异常时的回调* @return*/@Overridepublic Integer defaultValue() {System.out.println("A - defaultValue");return 101;}
}

workerB:

(action()模拟线程任务耗时操作,此处举例仅对参数+2)

public class WorkerB implements IWorker<Integer, Integer>, ICallback<Integer, Integer> {/*** Worker开始的时候先执行begin*/@Overridepublic void begin() {System.out.println("B - Thread:" + Thread.currentThread().getName() + "- start --" + SystemClock.now());}/*** Worker中耗时操作在此执行RPC/IO* @param object      object* @param allWrappers 任务包装* @return*/@Overridepublic Integer action(Integer object, Map<String, WorkerWrapper> allWrappers) {Integer res = object + 2;return res;}/*** action执行结果的回调* @param success* @param param* @param workResult*/@Overridepublic void result(boolean success, Integer param, WorkResult<Integer> workResult) {System.out.println("B - param:" + JSON.toJSONString(param));System.out.println("B - result:" + JSON.toJSONString(workResult));System.out.println("B - Thread:" + Thread.currentThread().getName() + "- end --" + SystemClock.now());}/*** Worker异常时的回调* @return*/@Overridepublic Integer defaultValue() {System.out.println("B - defaultValue");return 102;}
}

WorkerC:

(action()模拟线程任务耗时操作,此处举例仅对参数+3)

public class WorkerC implements IWorker<Integer, Integer>, ICallback<Integer, Integer> {/*** Worker开始的时候先执行begin*/@Overridepublic void begin() {System.out.println("C - Thread:" + Thread.currentThread().getName() + "- start --" + SystemClock.now());}/*** Worker中耗时操作在此执行RPC/IO* @param object      object* @param allWrappers 任务包装* @return*/@Overridepublic Integer action(Integer object, Map<String, WorkerWrapper> allWrappers) {Integer res = object + 3;return res;}/*** action执行结果的回调* @param success* @param param* @param workResult*/@Overridepublic void result(boolean success, Integer param, WorkResult<Integer> workResult) {System.out.println("C - param:" + JSON.toJSONString(param));System.out.println("C - result:" + JSON.toJSONString(workResult));System.out.println("C - Thread:" + Thread.currentThread().getName() + "- end --" + SystemClock.now());}/*** Worker异常时的回调* @return*/@Overridepublic Integer defaultValue() {System.out.println("C - defaultValue");return 103;}
}

3)编排包装类Wrapper

Worker创建好之后,使用WorkerWrapper对Worker进行包装以及编排,WorkerWrapper是AsyncTool组件的最小可执行任务单元。

C是最后一步,它没有next。B的next是C,A的next是B。编排顺序就是:C <- B <- A

public class Test {public static void main(String[] args) {//引入Worker工作单元WorkerA workerA = new WorkerA();WorkerB workerB = new WorkerB();WorkerC workerC = new WorkerC();//包装Worker,编排串行顺序:C <- B <- A//C是最后一步,它没有nextWorkerWrapper wrapperC = new WorkerWrapper.Builder<Integer, Integer>().id("workerC").worker(workerC).callback(workerC).param(3)//3+3.build();//B的next是CWorkerWrapper wrapperB = new WorkerWrapper.Builder<Integer, Integer>().id("workerB").worker(workerB).callback(workerB).param(2)//2+2.next(wrapperC).build();//A的next是BWorkerWrapper wrapperA = new WorkerWrapper.Builder<Integer, Integer>().id("workerA").worker(workerA).callback(workerA).param(1)//1+1.next(wrapperB).build();try {//Action 提交任务🚩🚩🚩Async.beginWork(1000, wrapperA);} catch (ExecutionException | InterruptedException e) {e.printStackTrace();}}
}

或者还有一种写法:可以使用depend方式编排

//A没有depend
WorkerWrapper wrapperA = new WorkerWrapper.Builder<Integer, Integer>().id("workerA").worker(workerA).callback(workerA).param(1).build();//B的depend是A
WorkerWrapper wrapperB = new WorkerWrapper.Builder<Integer, Integer>().id("workerB").worker(workerB).callback(workerB).param(2).depend(wrapperA).build();//C的depend是B
WorkerWrapper wrapperC = new WorkerWrapper.Builder<Integer, Integer>().id("workerC").worker(workerC).callback(workerC).param(3).depend(wrapperB).build();
//begin
Async.beginWork(1000, wrapperA);

4)提交任务执行

通过执行器类Async的beginWork方法提交任务执行。

//默认不定长线程池
private static final ThreadPoolExecutor COMMON_POOL = (ThreadPoolExecutor) Executors.newCachedThreadPool();//提交任务🚩🚩🚩
Async.beginWork(long timeout, ExecutorService executorService, WorkerWrapper... workerWrapper)
  • Timeout:全组任务超时时间设定,如果Worker任务超时,则Worker结果使用defaultValue()默认值。
  • ExecutorService executorService:自定义线程池,不自定义的话,就走默认的COMMON_POOL。默认的线程池是不定长线程池。
  • WorkerWrapper… workerWrapper:起始任务,可以是多个。注意不要提交中间节点的任务,只需要提交起始任务即可,编排的后续任务会自动执行。

5)运行结果

运行结果:A:1+1=2;B:2+2=4;C:3+3=6

并发编程常见的场景

串行

Worker创建好之后,使用WorkerWrapper对Worker进行包装以及编排,WorkerWrapper是AsyncTool组件的最小可执行任务单元。

C是最后一步,它没有next。B的next是C,A的next是B。编排顺序就是:C <- B <- A

(1)next写法:

public class Test {public static void main(String[] args) {//引入Worker工作单元WorkerA workerA = new WorkerA();WorkerB workerB = new WorkerB();WorkerC workerC = new WorkerC();//包装Worker,编排串行顺序:C <- B <- A//C是最后一步,它没有nextWorkerWrapper wrapperC = new WorkerWrapper.Builder<Integer, Integer>().id("workerC").worker(workerC).callback(workerC).param(3)//3+3.build();//B的next是CWorkerWrapper wrapperB = new WorkerWrapper.Builder<Integer, Integer>().id("workerB").worker(workerB).callback(workerB).param(2)//2+2.next(wrapperC).build();//A的next是BWorkerWrapper wrapperA = new WorkerWrapper.Builder<Integer, Integer>().id("workerA").worker(workerA).callback(workerA).param(1)//1+1.next(wrapperB).build();    try {//Action 提交任务🚩🚩🚩Async.beginWork(1000, wrapperA);} catch (ExecutionException | InterruptedException e) {e.printStackTrace();}}
}

并行

WorkerWrapper并行编排:A\B\C都没有next和depend, 3个WorkerWrapper一起begin。

Async.beginWork(1000, wrapperA, wrapperB, wrapperC);

public class Test {public static void main(String[] args) {//引入Worker工作单元WorkerA workerA = new WorkerA();WorkerB workerB = new WorkerB();WorkerC workerC = new WorkerC();/*** 包装Worker,编排并行顺序*///AWorkerWrapper wrapperA = new WorkerWrapper.Builder<Integer, Integer>().id("workerA").worker(workerA).callback(workerA).param(1)//1+1.build();//BWorkerWrapper wrapperB = new WorkerWrapper.Builder<Integer, Integer>().id("workerB").worker(workerB).callback(workerB).param(2)//2+2.build();//CWorkerWrapper wrapperC = new WorkerWrapper.Builder<Integer, Integer>().id("workerC").worker(workerC).callback(workerC).param(3)//3+3.build();try {//3个WorkerWrapper一起begin🚩🚩🚩Async.beginWork(1000, wrapperA, wrapperB, wrapperC);} catch (ExecutionException | InterruptedException e) {e.printStackTrace();}}
}

阻塞等待 - 先串行,后并行

阻塞等待 - 先串行,后并行场景模拟:A先执行,对参数+1;A执行完毕之后,B\C同时并行执行,B任务基于A的返回值+2,C任务基于A的返回值+3

(1)next写法:

public static void nextWork() {//引入Worker工作单元WorkerA workerA = new WorkerA();WorkerB workerB = new WorkerB();WorkerC workerC = new WorkerC();//C是最后一步,它没有nextWorkerWrapper wrapperC = new WorkerWrapper.Builder<Integer, Integer>().id("workerC").worker(workerC).callback(workerC).param(null)//没有参数,根据A的返回值+3.build();//B是最后一步,它没有nextWorkerWrapper wrapperB = new WorkerWrapper.Builder<Integer, Integer>().id("workerB").worker(workerB).callback(workerB).param(null)//没有参数,根据A的返回值+2.build();//A的next是B、CWorkerWrapper wrapperA = new WorkerWrapper.Builder<Integer, Integer>().id("workerA").worker(workerA).callback(workerA).param(1)//1+1//next是B、C.next(wrapperB, wrapperC).build();try {//ActionAsync.beginWork(1000, wrapperA);} catch (ExecutionException | InterruptedException e) {e.printStackTrace();}
}

(2)depend写法:

//A没有depend,就是开始
WorkerWrapper wrapperA = new WorkerWrapper.Builder<Integer, Integer>().id("workerA").worker(workerA).callback(workerA).param(1).build();//C depend A
WorkerWrapper wrapperC = new WorkerWrapper.Builder<Integer, Integer>().id("workerC").worker(workerC).callback(workerC).param(null).depend(wrapperA)//🚩依赖A.build();
W
//B depend A
WorkerWrapper wrapperB = new WorkerWrapper.Builder<Integer, Integer>().id("workerB").worker(workerB).callback(workerB).param(null).depend(wrapperA)//🚩依赖A.build();

阻塞等待 - 先并行,后串行

B\C并行执行。B对参数+2,C对参数+3,B\C全部执行完后,A = B返回值+C返回值。

注意:需要B和C同时begin。Async.beginWork(4000, wrapperB, wrapperC);

(1)next写法:

public static void nextWork() {//引入Worker工作单元WorkerA workerA = new WorkerA();WorkerB workerB = new WorkerB();WorkerC workerC = new WorkerC();//A是最后一步,没有nextWorkerWrapper wrapperA = new WorkerWrapper.Builder<Integer, Integer>().id("workerA").worker(workerA).callback(workerA).param(null)//参数是null,A = B + C.build();//C next AWorkerWrapper wrapperC = new WorkerWrapper.Builder<Integer, Integer>().id("workerC").worker(workerC).callback(workerC).param(3)//3+3 = 6.next(wrapperA).build();//B next AWorkerWrapper wrapperB = new WorkerWrapper.Builder<Integer, Integer>().id("workerB").worker(workerB).callback(workerB).param(2)//2+2 = 4.next(wrapperA).build();try {new SynchronousQueue<Runnable>();//ActionAsync.beginWork(4000, wrapperB, wrapperC);} catch (ExecutionException | InterruptedException e) {e.printStackTrace();}
}

(2)depend写法:

//C没有depend,是起始节点
WorkerWrapper wrapperC = new WorkerWrapper.Builder<Integer, Integer>().id("workerC").worker(workerC).callback(workerC).param(3)//3+3 = 6.build();
//B没有depend,是起始节点
WorkerWrapper wrapperB = new WorkerWrapper.Builder<Integer, Integer>().id("workerB").worker(workerB).callback(workerB).param(2)//2+2 = 4.build();
//A depend B,C
WorkerWrapper wrapperA = new WorkerWrapper.Builder<Integer, Integer>().id("workerA").worker(workerA).callback(workerA).param(null)//参数是null,A = B + C.depend(wrapperB, wrapperC).build();

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/670650.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

电力负荷预测 | 基于TCN的电力负荷预测(Python)———数据预处理

文章目录 效果一览文章概述源码设计参考资料效果一览 文章概述 基于TCN的电力负荷预测(Python) python3.8 keras2.6.0 matplotlib3.5.2 numpy1.19.4 pandas1.4.3 tensorflow==2.6.0

浏览器提示ERR_SSL_KEY_USAGE_INCOMPATIBLE解决

ERR_SSL_KEY_USAGE_INCOMPATIBLE报错原因 ERR_SSL_KEY_USAGE_INCOMPATIBLE 错误通常发生在使用 SSL/TLS 连接时,指的是客户端和服务器之间进行安全通信尝试失败,原因是证书中的密钥用途(Key Usage)或扩展密钥用途(Extended Key Usage, EKU)与正在尝试的操作不兼容。这意味…

Unity笔记:相机移动

基础知识 鼠标输入 在Unity中&#xff0c;开发者在“Edit” > “Project Settings” > “Input Manager”中设置输入&#xff0c;如下图所示&#xff1a; 在设置了Mouse X后&#xff0c;Input.GetAxis("Mouse X")返回的是鼠标在X轴上的增量值。这意味着它会…

STM32TIM定时器(3)

文章目录 前言一、介绍部分输入捕获简介频率测量捕获比较通道主从模式输入捕获基本结构PWMI基本结构 二、代码部分使用输入捕获捕获另一个端口的PWM输入线路连接代码内容 PWMI获取频率占空比线路连接与上个案例一致代码实现 总结相关函数PSC、ARR都有1的误差 前言 这部分主要介…

Spring Boot整合新版Spring Security:Lambda表达式配置优雅安全

文章目录 1. 引言2. 项目依赖配置3. 使用Lambda表达式配置Spring Security4. 自定义身份验证逻辑5. 认证与授权注解5.1 Secured注解5.2 PreAuthorize和PostAuthorize注解 6. 总结 &#x1f389;Spring Boot整合新版Spring Security&#xff1a;Lambda表达式配置优雅安全 ☆* o(…

spring cloud stream

背景 主要解决不同消息中间件切换问题。实现不同中间件的代码解耦。 链接: 支持的中间件 后文使用kafka测试。 引入依赖 <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream</artifactId></depende…

[EFI]英特尔 猛兽峡谷NUC11BTM电脑 Hackintosh 黑苹果efi引导文件

硬件型号驱动情况主板 猛兽峡谷NUC11BTM 处理器 Intel Core™ i9-11900KB 处理器 已驱动内存英睿达 DDR4 16G 3200MHz*2已驱动硬盘铠侠RC20 1T已驱动显卡AMD Radeon RX 6600 XT已驱动声卡USB音频已驱动网卡以太网控制器 i225-LM已驱动无线网卡蓝牙奋威t919Sonoma以上版本自行安…

Spring Web Body 转化常见错误

在 Spring 中&#xff0c;对于 Body 的处理很多是借助第三方编解码器来完成的。例如常见的 JSON 解析&#xff0c;Spring 都是借助于 Jackson、Gson 等常见工具来完成。所以在 Body 处理中&#xff0c;我们遇到的很多错误都是第三方工具使用中的一些问题。 真正对于 Spring 而…

【C#学习路线——超详细介绍】

C#学习路线——超详细介绍 1. 介绍2. C#基础3. 进阶技术4. .NET框架核心5. 前端和桌面应用开发6. Web 后端开发7. 游戏开发8. 软件开发实践9. 性能调优和最佳实践10. 实战经验11. 持续学习 1. 介绍 C#是由微软开发的一种强类型、面向对象的编程语言&#xff0c;主要用于.NET框…

2024年 复习 HTML5+CSS3+移动web 笔记 之CSS遍 第5天

第 五 天 整个网站例 5.1 准备工作 项目目录与版心 base.css 5.2 网页制作思路 5.3 header 区域-整体布局 5.4 header区域-logo 5.5 header区域-导航 index.html <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8">&l…

掌握数据预测的艺术:线性回归模型详解

线性回归是统计学中用于建模两个或多个变量之间线性关系的一种方法,广泛应用于数据分析、机器学习等领域。从数学建模的角度出发,线性回归旨在找到一个线性方程,最好地描述自变量(或称为解释变量、特征变量)和因变量(或称为目标变量)之间的关系。本文将通过Python代码示…

网桥与网关

文章目录 概要网桥网关联系与区别参考文章 概要 网桥和网关的理解 网桥 几个名词的概念 网关 联系与区别 参考文章 如何通俗地解释什么是网桥&#xff1f; 网关到底是什么求通俗易懂讲解? 网桥&#xff1a;网桥也叫桥接器&#xff0c;是连接两个局域网的一种存储/转发…

C语言第十九弹---指针(三)

✨个人主页&#xff1a; 熬夜学编程的小林 &#x1f497;系列专栏&#xff1a; 【C语言详解】 【数据结构详解】 指针 1、数组名的理解 2、使用指针访问数组 3、⼀维数组传参的本质 4、冒泡排序 5、二级指针 6、指针数组 7、指针数组模拟二维数组 总结 1、数组名的理解…

测试开发体系

软件测试 通过手工或者工具对 “被测对象”进行测试验证实际结果与预期结果之间是否存在差异 软件测试作用 通过测试工作可以发现并修复软件当中存在的缺陷&#xff0c;从而提高用户对产品的使用信心测试可以降低同类型产品开发遇到问题的风险 软件缺陷 软件缺陷被测试工程…

快速渲染效果图:设计师的高效工作流揭秘

渲染技能是每个建模设计师需求的一个重要技能&#xff0c;尽管在许多设计公司里&#xff0c;建模和渲染往往是分开由各自的专家来完成。不过&#xff0c;一个全能型的建模师还是应该精通渲染技术。对于那些接外包项目来制作渲染效果图的设计师来说&#xff0c;掌握如何提速渲染…

神经网络基本原理

神经网络是一门重要的机器学习技术。它是目前最为火热的研究方向–深度学习的基础。 神经网络是一种模拟人脑的神经网络以期能够实现类人工智能的机器学习技术。人脑中的神经网络是一个非常复杂的组织。成人的大脑中估计有1000亿个神经元之多。 1 介绍 下面是一个包含三个层…

使用webstorm调试vue 2 项目

学习目标&#xff1a; 使用webstorm调试vue 2 项目 笔者环境&#xff1a; npm 6.14.12 webstorm 2023.1 vue 2 学习内容&#xff1a; 例如&#xff1a; 正常启动npm 项目 配置javaScruot dubug 配置你的项目地址就好 使用dubug运行你配置的调式页 问题 如果进入了js页无…

【推荐算法】userid是否建模

看到一个din的源码&#xff0c;将userid也构建了emb table。 于是调研了一下。即推荐算法需要建模userid吗&#xff1f; 参考&#xff1a; 推荐算法user_id在train和serving时应该怎么用&#xff1f; - 知乎 深度学习推荐算法中user-id和item-id是否需要放入模型中作为特征进…

使用pandas将excel转成json格式

1.Excel数据 2.我们想要的JSON格式 {"0": {"raw_data1": "Sam","raw_data2": "Wong","raw_data3": "Good","layer": "12v1"},"1": {"raw_data1": "Lucy…

算法效率的度量-时间空间复杂度

常对幂指阶 1.时间复杂度 事前预估 算法 时间开销 T(n) 与 问题规模 n 的关系&#xff08; T 表示 “ time ”&#xff09; 一般默认问题规模为n。 1.单循环 2.嵌套两层循环都为n 3.单层循环指数递增型 4.搜索型 链接 &#xff1a;第七章查找算法&#xff01;&#xff01…