rxjava 并行_使用RxJava和Completable并行执行阻塞任务

rxjava 并行

通过RxJava 1.1.1中引入的Completable抽象,如何并行执行阻止“仅副作用”(也称为void)任务的并行执行变得更加容易。

rx_logo_512 正如您可能已经注意到,阅读我的博客时,我主要专注于软件Craft.io和自动代码测试。 但是,此外,我还是持续交付和广义并发的狂热者。 最后一点从C中的纯线程和信号量到更高级别的解决方案(例如ReactiveX和actor模型)不等。 这次是全新RxJava 1.1.1 – rx.Completable引入的非常方便(在特定情况下)功能的用例。 与我的许多博客条目类似,这也反映了我在处理实际任务和用例时遇到的实际事件。

要做的任务

想象一下,一个系统对来自不同来源的异步事件进行了非常复杂的处理。 过滤,合并,转换,分组,丰富等等。 RxJava非常适合这里,特别是如果我们想要React式的话。 假设我们已经实现了它(外观和效果很好),只剩下一件事了。 在开始处理之前,需要告知3个外部系统我们准备好接收消息。 对遗留系统的3个同步调用(通过RMI,JMX或SOAP)。 它们每个都可以持续几秒钟,我们需要等待所有它们之后才能开始。 幸运的是,它们已经实现,我们将它们视为可能成功(或因异常而失败)的黑匣子。 我们只需要调用它们(最好同时调用)并等待完成即可。

rx.Observable –方法1

触手可及的RxJava似乎是显而易见的方法。 首先,可以使用Observable来包装作业执行:

private Observable<Void> rxJobExecute(Job job) {return Observable.fromCallable(() -> { job.execute();return null;});
}

不幸的是(在我们的例子中) Observable期望返回一些元素。 我们需要使用Void并且尴尬地return null (而不是仅仅引用方法job::execute

接下来,我们可以使用subscribeOn()方法来使用另一个线程来执行我们的作业(而不是阻塞主/当前线程–我们不想顺序执行我们的作业)。 Schedulers.io()为调度Schedulers.io()提供了一组用于IO绑定工作的线程。

Observable<Void> run1 = rxJobExecute(job1).subscribeOn(Schedulers.io());
Observable<Void> run2 = rxJobExecute(job2).subscribeOn(Schedulers.io());

最后,我们需要等待所有它们完成(所有Obvervable s完成)。 为此,可以调整zip功能。 它结合了Obserbable拉链发射的物品的序列号。 在我们的例子中,我们只对每个Observable到的作业中的第一个伪项目感兴趣(我们仅发出null以满足API),并以阻塞的方式等待它们。 zip运算符中的zip函数需要返回某些内容,因此我们需要重复null的变通方法。

Observable.zip(run1, run2, (r1, r2) -> return null).toBlocking().single();

显而易见, Observable设计为Observable使用值流,并且需要进行一些额外的工作才能将其调整为仅产生副作用(不返回任何值)操作。 当我们需要将仅具有副作用的操作与其他返回一些值的值合并(例如合并)时,情况变得更加糟糕–需要更丑陋的转换。 请参阅RxNetty API的实际用例 。

public void execute() {Observable<Void> run1 = rxJobExecute(job1).subscribeOn(Schedulers.io());Observable<Void> run2 = rxJobExecute(job2).subscribeOn(Schedulers.io());Observable.zip(run1, run2, (r1, r2) -> null).toBlocking().single();
}private Observable<Void> rxJobExecute(Job job) {return Observable.fromCallable(() -> { job.execute();return null;});
}

rx.Observable –方法2

可能会使用另一种方法。 代替生成人工项目,可以将我们的任务的空Observable作为onComplete操作执行。 这迫使我们从zip操作切换到merge 。 结果,我们需要提供一个onNext动作(对于空的Observable永远不会执行),这肯定了我们试图破解该系统的信念。

public void execute() {Observable<Object> run1 = rxJobExecute(job1).subscribeOn(Schedulers.io());Observable<Object> run2 = rxJobExecute(job2).subscribeOn(Schedulers.io());Observable.merge(run1, run2).toBlocking().subscribe(next -> {});
}private Observable<Object> rxJobExecute(Job job) {return Observable.empty().doOnCompleted(job::execute);
}

rx.Completable

RxJava 1.1.1解决了对不返回任何值的Observable的更好支持。 Completable可以视为Observable的简化版本,可以成功完成(发出onCompleted事件)或失败( onError )。 创建Completable实例的最简单方法是使用fromAction方法,该方法采用不返回任何值的Action0 (例如Runnable )。

Completable completable1 = Completable.fromAction(job1::execute).subscribeOn(Schedulers.io());
Completable completable2 = Completable.fromAction(job2::execute).subscribeOn(Schedulers.io());

接下来,我们可以使用merge()方法,该方法返回一个Completable实例,该实例立即订阅所有下游Completable ,并在它们全部完成(或其中一个失败)时完成。 由于我们在外部调度程序中使用了subscribeOn方法,因此所有作业都是并行执行的(在不同的线程中)。

Completable.merge(completable1, completable2).await();

await()方法将阻塞,直到所有作业完成(如果发生错误,将重新抛出异常)。 纯净而简单。

public void execute() {Completable completable1 = Completable.fromAction(job1::execute).subscribeOn(Schedulers.io());Completable completable2 = Completable.fromAction(job2::execute).subscribeOn(Schedulers.io());Completable.merge(completable1, completable2).await();
}

java.util.concurrent.CompletableFuture

有人可能会问:为什么不只使用CompletableFuture ? 这将是一个很好的问题。 Java 5中引入的纯Future可能需要我们做更多的工作,而ListenableFuture (来自Guava)和CompletableFuture (来自Java 8)使其变得微不足道。

首先,我们需要运行/安排作业执行。 接下来,使用CompletableFuture.allOf()方法,我们可以创建一个新的CompletableFuture ,它在所有作业完成时就完成了(我们以前没有看到过这个概念吗?)。 get()方法只是阻止等待。

public void execute() {try {CompletableFuture<Void> run1 = CompletableFuture.runAsync(job1::execute);CompletableFuture<Void> run2 = CompletableFuture.runAsync(job2::execute);CompletableFuture.allOf(run1, run2).get();} catch (InterruptedException | ExecutionException e) {throw new RuntimeException("Jobs execution failed", e);}
}

我们需要对受检查的异常做一些事情(很多时候我们不想使用它们来污染我们的API),但是总的来说,这看起来很明智。 但是,值得记住的是,当需要更复杂的链处理时, CompletableFuture不足。 另外,在我们的项目中已经使用RxJava时,使用相同(或相似)的API而不是引入全新的东西通常会很有用。

摘要

多亏了rx.Completable ,使用RxJava仅完成副作用(不返回任何内容)任务的执行更加轻松。 在已经使用RxJava的代码库中,即使在简单情况下,它也可能比CompletableFuture更受欢迎。 但是, Completable提供了许多先进的操作员和技术,此外,还可以轻松地将它与Observable混合使用,这使其功能更加强大。

rxjava-book-small 要了解有关Completable更多信息,您可能需要查看发行说明 。 对于那些想深入了解主题的人,Advanced RxJava博客( 第1部分和第2 部分 )上有关于Completable API的非常详细的介绍。

  • 可以从GitHub获得代码示例的源代码。

顺便说一句,如果您总体上对RxJava感兴趣,我可以凭良心向您推荐一本书,该书目前由Tomasz Nurkiewicz和Ben Christensen – RxJava的React式编程编写 。

翻译自: https://www.javacodegeeks.com/2016/03/parallel-execution-blocking-tasks-rxjava-completable.html

rxjava 并行

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/336081.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

系统工程师主要做什么_Filecoin运维工程师在做什么?

前言固然运维这个职能范畴对于绝大多数人来说认知模糊&#xff0c;特别是在分布式存储领域&#xff0c;“运维”常常和“机房”“IDC”等名词相伴&#xff0c;导致很多异业者对于运维的了解停留在物理层面&#xff0c;以搬运机器、上下架服务器、管理网电等为标杆&#xff0c;好…

mysql怎么返回上一行_月球上并没有发射基地,阿波罗飞船是怎么返回地球的?...

美国上世纪六七十年代成功实施了载人登月工程&#xff0c;这一壮举震惊世界。阿波罗系列载人登月飞船和用于发射飞船的土星五号运载火箭名声大噪&#xff0c;而用于发射土星五号&#xff0c;位于卡纳维拉尔角肯尼迪航天发射中心的39号发射台也格外引人瞩目。土星五号全长110多米…

凯立德地图导航2020年最新版车载_明明有车载导航,为什么很多人还是选择用手机导航?有5个原因...

提到导航&#xff0c;你首先会想到什么导航产品呢&#xff1f;如果是高德导航、百度导航之类的&#xff0c;那你主要用手机导航。但是如果除了高德导航和百度导航之外&#xff0c;你还能随口说出四维图新导航、凯立德导航&#xff0c;甚至是道道通导航之类的话&#xff0c;那你…

WebRTC Google的 BBR拥塞控制算法解析

正文之前,给出本文的图例: BBR的组成 bbr算法实际上非常简单,在实现上它由5部分组成: 1.即时速率的计算 计算一个即时的带宽bw,该带宽是bbr一切计算的基准,bbr将会根据当前的即时带宽以及其所处的pipe状态来计算pacing rate以及cwnd(见下文),后面我们会看到,这个即时…

python斐波那契数列前20项_兔子繁殖问题带来的智商碾压:斐波那契数列趣谈

本文来自公众号&#xff1a;超级数学建模微信号 &#xff1a;supermodeling原文标题&#xff1a;斐波那契数列趣谈via 善科by BB一般认为斐波那契数列的提出是基于兔子的繁殖问题&#xff1a;如果一开始有一对兔子&#xff0c;它们每月生育一对兔子&#xff0c;小兔在出生后一个…

jboss使用_使用JBoss Cool Store的终极云零售指南

jboss使用我们一直在讨论为什么应用程序开发人员在App Dev Cloud Stack系列中不能再忽略其堆栈了。 带有JBoss Cool Store的App Dev Cloud 我们从头到尾讨论了各个层次&#xff0c;但尚未为您提供除Red Hat Container Development Kit&#xff08;CDK&#xff09;之外的任何应…

C++ 11 深度学习(六)智能指针综述

以下三种智能指针均为类模板 1.shared_ptr 共享指针 &#xff0c;多个指针指向同一个对象&#xff0c;最后一个指针被销毁时&#xff0c;这个对象会被释放。 2.week_ptr 是辅助shared_ptr工作的 3.unique_ptr 是独占式指针&#xff0c;同一时间只能有一个指针能指向该对象…

10 NAT网络地址转换

广域网技术 上面聊的内容都是内网的一些配置&#xff0c;但内网终将要访问外网的&#xff0c;我们需要怎么处理呢&#xff1f;一般使用HDLC&#xff08;高级数据链路控制协议&#xff09;或者PPP&#xff08;点对点协议&#xff09;。 使用PPP安全接入Internet PPP&#xff0…

java应用性能指标_性能与可靠性:Java应用为何像F1汽车

java应用性能指标再想一想。 性能和可靠性相关吗&#xff1f; 还是这些东西相互排斥&#xff1f; 我认为是后者。 如今&#xff0c;现实是IT部门将应用程序的性能和可靠性视为同一事物&#xff0c;但这离事实还差得远。 让我们看看一级方程式车队如何管理性能和可靠性。 上赛…

tomcat ajp协议安全限制绕过漏洞_Apache Tomcat文件包含漏洞(CVE20201938)复现

一、漏洞背景2020年02月20日&#xff0c;国家信息安全漏洞共享平台(CNVD)发布了关于Apache Tomcat文件包含漏洞(CVE-2020-1938/CNVD-2020-10487)的安全公告。Tomcat作为一款免费开源轻量级的web应用服务器&#xff0c;广泛应用于并发量不是很高的场合&#xff0c;Tomact默认端口…

【H.264/AVC视频编解码技术】第三章【熵编码】

熵编码的概念 熵&#xff1a;化学与热力学概念&#xff0c;用于度量能量退化的指标。熵越高&#xff0c;物体/系统做工能力越低。 信息学中的熵&#xff1a;用于度量消息的平均信息量&#xff0c;和信息的不确定性。越是随机的&#xff0c;前后不相关的信息&#xff0c;其熵越…

mock 抛出一个异常如何终止_教你使用Mock完成单元测试

更多精彩文章请关注本人微信公众号1、什么是Mock?mock是在测试过程中&#xff0c;对于一些不容易构造/获取的对象&#xff0c;创建一个mock对象来模拟对象的行为。比如说你需要调用B服务&#xff0c;可是B服务还没有开发完成&#xff0c;那么你就可以将调用B服务的那部分给Moc…

jconsole查看连接数_在JConsole和VisualVM中查看DiagnosticCommandMBean

jconsole查看连接数我已经将JConsole用作合适的通用JMX客户端已有很多年了。 该工具通常随Oracle JDK一起提供&#xff0c;并且易于使用。 在JMX交互方面&#xff0c;JConsole优于VisualVM的最大优点是JConsole带有内置的MBeans选项卡&#xff0c;而必须为VisualVM中的相同功能…

如何知道一个域名是否存在_域名检测API实现查看一个域名在微信中是否被封

针对微信转发分享链接过程中&#xff0c;您宝贵的域名被微信检测系统过滤拦截而无法正常浏览&#xff0c;例如该网页包含诱导分享内容&#xff0c;被多人投诉等&#xff0c;又例如提示该网页已停止访问等提示。怎么查询域名在微信中是否被封了呢?以上接口可检测到域名的四种异…

量化指标公式源码_通达信指标公式源码线上阴线指标公式

工作线:(EMA(C,14)),POINTDOT,LINETHICK3,COLOR22ACDE;生命线:(MA(C,25)),LINETHICK1,COLORMAGENTA;不惑线:(MA(C,40)),COLORCYAN,LINETHICK1;姊妹线:(EMA(C,56)),POINTDOT,COLOR33CCDD,LINETHICK1;A3:EMA((((SLOPE(C,21)) * 20) C),68);A4:EMA(C,10);A5:REF(C,2);A6:((SMA((M…

jsf和jsp_带有JSF,Servlet和CDI的DynamicReports和JasperReports

jsf和jsp在此示例中&#xff0c;我将展示如何将DynamicReport和JasperReports与Servlet和CDI集成。 工具&#xff1a; TIBCO Jaspersoft Studio-6.0.4。最终版 Eclipse Luna服务版本2&#xff08;4.4.2&#xff09;。 WildFly 8.x应用程序服务器。 这是Eclipse上项目层次结…

接口隔离原则_设计模式的三大分类及六大原则

设计模式(Design pattern)代表了最佳的实践&#xff0c;通常被有经验的面向对象的软件开发人员所采用。设计模式是软件开发人员在软件开发过程中面临的一般问题的解决方案。这些解决方案是众多软件开发人员经过相当长的一段时间的试验和错误总结出来的。设计模式分为三大类&…

Leetcode 24. 两两交换链表中的节点

原题链接 题解&#xff1a; 1.头部会改变创建虚拟头结点 2.前两个点定义为a,b&#xff0c;c 3.p指向b,a指向c,b指向a ; p指向本段最后一个结点。 class Solution { public:ListNode* swapPairs(ListNode* head) {auto dummy new ListNode(-1);dummy->next head;for (a…

python 获取文件名_真实需求 | Python+os+openpyxl 批量获取Excel的文件名和最大行数...

1. 提出需求 这已经不知道是粉丝问我的第几个办公自动化的问题了&#xff0c;并且这些问题都是大家在学习和工作中碰到过的真实问题场景。其实从下图中已经可以很明确的看出别人的需求了&#xff0c;我这里就不用在赘述了&#xff0c;下面直接上思路吧&#xff01;2. 解题思路为…

apache ignite_从In Memory Data Grid,Apache Ignite快速入门

apache igniteIMDG或内存数据网格不是内存中关系数据库&#xff0c;NOSQL数据库或关系数据库。 它是另一种软件数据存储库。 数据模型分布在单个位置或多个位置的许多服务器上。 这种分布称为数据结构。 这种分布式模型被称为“无共享”架构。 IMDG具有以下特征&#xff1a; 所…