rxjava 并行_使用RxJava和Completable并行执行阻塞任务

rxjava 并行

通过RxJava 1.1.1中引入的Completable抽象,如何并行执行阻止“仅副作用”(也称为void)任务的并行执行变得更加容易。

rx_logo_512 正如您可能已经注意到,阅读我的博客时,我主要专注于软件Craft.io和自动代码测试。 但是,此外,我还是持续交付和广义并发的狂热者。 最后一点从C中的纯线程和信号量到更高级别的解决方案(例如ReactiveX和actor模型)不等。 这次是全新RxJava 1.1.1 – rx.Completable引入的非常方便(在特定情况下)功能的用例。 与我的许多博客条目类似,这也反映了我在处理实际任务和用例时遇到的实际事件。

要做的任务

想象一下,一个系统对来自不同来源的异步事件进行了非常复杂的处理。 过滤,合并,转换,分组,丰富等等。 RxJava非常适合这里,特别是如果我们想要React式的话。 假设我们已经实现了它(外观和效果很好),只剩下一件事了。 在开始处理之前,需要告知3个外部系统我们准备好接收消息。 对遗留系统的3个同步调用(通过RMI,JMX或SOAP)。 它们每个都可以持续几秒钟,我们需要等待所有它们之后才能开始。 幸运的是,它们已经实现,我们将它们视为可能成功(或因异常而失败)的黑匣子。 我们只需要调用它们(最好同时调用)并等待完成即可。

rx.Observable –方法1

触手可及的RxJava似乎是显而易见的方法。 首先,可以使用Observable来包装作业执行:

private Observable<Void> rxJobExecute(Job job) {return Observable.fromCallable(() -> { job.execute();return null;});
}

不幸的是(在我们的例子中) Observable期望返回一些元素。 我们需要使用Void并且尴尬地return null (而不是仅仅引用方法job::execute

接下来,我们可以使用subscribeOn()方法来使用另一个线程来执行我们的作业(而不是阻塞主/当前线程–我们不想顺序执行我们的作业)。 Schedulers.io()为调度Schedulers.io()提供了一组用于IO绑定工作的线程。

Observable<Void> run1 = rxJobExecute(job1).subscribeOn(Schedulers.io());
Observable<Void> run2 = rxJobExecute(job2).subscribeOn(Schedulers.io());

最后,我们需要等待所有它们完成(所有Obvervable s完成)。 为此,可以调整zip功能。 它结合了Obserbable拉链发射的物品的序列号。 在我们的例子中,我们只对每个Observable到的作业中的第一个伪项目感兴趣(我们仅发出null以满足API),并以阻塞的方式等待它们。 zip运算符中的zip函数需要返回某些内容,因此我们需要重复null的变通方法。

Observable.zip(run1, run2, (r1, r2) -> return null).toBlocking().single();

显而易见, Observable设计为Observable使用值流,并且需要进行一些额外的工作才能将其调整为仅产生副作用(不返回任何值)操作。 当我们需要将仅具有副作用的操作与其他返回一些值的值合并(例如合并)时,情况变得更加糟糕–需要更丑陋的转换。 请参阅RxNetty API的实际用例 。

public void execute() {Observable<Void> run1 = rxJobExecute(job1).subscribeOn(Schedulers.io());Observable<Void> run2 = rxJobExecute(job2).subscribeOn(Schedulers.io());Observable.zip(run1, run2, (r1, r2) -> null).toBlocking().single();
}private Observable<Void> rxJobExecute(Job job) {return Observable.fromCallable(() -> { job.execute();return null;});
}

rx.Observable –方法2

可能会使用另一种方法。 代替生成人工项目,可以将我们的任务的空Observable作为onComplete操作执行。 这迫使我们从zip操作切换到merge 。 结果,我们需要提供一个onNext动作(对于空的Observable永远不会执行),这肯定了我们试图破解该系统的信念。

public void execute() {Observable<Object> run1 = rxJobExecute(job1).subscribeOn(Schedulers.io());Observable<Object> run2 = rxJobExecute(job2).subscribeOn(Schedulers.io());Observable.merge(run1, run2).toBlocking().subscribe(next -> {});
}private Observable<Object> rxJobExecute(Job job) {return Observable.empty().doOnCompleted(job::execute);
}

rx.Completable

RxJava 1.1.1解决了对不返回任何值的Observable的更好支持。 Completable可以视为Observable的简化版本,可以成功完成(发出onCompleted事件)或失败( onError )。 创建Completable实例的最简单方法是使用fromAction方法,该方法采用不返回任何值的Action0 (例如Runnable )。

Completable completable1 = Completable.fromAction(job1::execute).subscribeOn(Schedulers.io());
Completable completable2 = Completable.fromAction(job2::execute).subscribeOn(Schedulers.io());

接下来,我们可以使用merge()方法,该方法返回一个Completable实例,该实例立即订阅所有下游Completable ,并在它们全部完成(或其中一个失败)时完成。 由于我们在外部调度程序中使用了subscribeOn方法,因此所有作业都是并行执行的(在不同的线程中)。

Completable.merge(completable1, completable2).await();

await()方法将阻塞,直到所有作业完成(如果发生错误,将重新抛出异常)。 纯净而简单。

public void execute() {Completable completable1 = Completable.fromAction(job1::execute).subscribeOn(Schedulers.io());Completable completable2 = Completable.fromAction(job2::execute).subscribeOn(Schedulers.io());Completable.merge(completable1, completable2).await();
}

java.util.concurrent.CompletableFuture

有人可能会问:为什么不只使用CompletableFuture ? 这将是一个很好的问题。 Java 5中引入的纯Future可能需要我们做更多的工作,而ListenableFuture (来自Guava)和CompletableFuture (来自Java 8)使其变得微不足道。

首先,我们需要运行/安排作业执行。 接下来,使用CompletableFuture.allOf()方法,我们可以创建一个新的CompletableFuture ,它在所有作业完成时就完成了(我们以前没有看到过这个概念吗?)。 get()方法只是阻止等待。

public void execute() {try {CompletableFuture<Void> run1 = CompletableFuture.runAsync(job1::execute);CompletableFuture<Void> run2 = CompletableFuture.runAsync(job2::execute);CompletableFuture.allOf(run1, run2).get();} catch (InterruptedException | ExecutionException e) {throw new RuntimeException("Jobs execution failed", e);}
}

我们需要对受检查的异常做一些事情(很多时候我们不想使用它们来污染我们的API),但是总的来说,这看起来很明智。 但是,值得记住的是,当需要更复杂的链处理时, CompletableFuture不足。 另外,在我们的项目中已经使用RxJava时,使用相同(或相似)的API而不是引入全新的东西通常会很有用。

摘要

多亏了rx.Completable ,使用RxJava仅完成副作用(不返回任何内容)任务的执行更加轻松。 在已经使用RxJava的代码库中,即使在简单情况下,它也可能比CompletableFuture更受欢迎。 但是, Completable提供了许多先进的操作员和技术,此外,还可以轻松地将它与Observable混合使用,这使其功能更加强大。

rxjava-book-small 要了解有关Completable更多信息,您可能需要查看发行说明 。 对于那些想深入了解主题的人,Advanced RxJava博客( 第1部分和第2 部分 )上有关于Completable API的非常详细的介绍。

  • 可以从GitHub获得代码示例的源代码。

顺便说一句,如果您总体上对RxJava感兴趣,我可以凭良心向您推荐一本书,该书目前由Tomasz Nurkiewicz和Ben Christensen – RxJava的React式编程编写 。

翻译自: https://www.javacodegeeks.com/2016/03/parallel-execution-blocking-tasks-rxjava-completable.html

rxjava 并行

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/336081.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

系统工程师主要做什么_Filecoin运维工程师在做什么?

前言固然运维这个职能范畴对于绝大多数人来说认知模糊&#xff0c;特别是在分布式存储领域&#xff0c;“运维”常常和“机房”“IDC”等名词相伴&#xff0c;导致很多异业者对于运维的了解停留在物理层面&#xff0c;以搬运机器、上下架服务器、管理网电等为标杆&#xff0c;好…

Leecode 136. 只出现一次的数字

原题链接 解法&#xff1a;异或运算 A^A0 A^0A class Solution { public:int singleNumber(vector<int>& nums) {int ret 0;for(auto p : nums) ret^p;return ret;} };

jdk 9和jdk8_了解有关JDK9紧凑弦乐的信息(视频评论Charlie Hunt)

jdk 9和jdk8JDK 9引入了一个称为紧凑字符串的新功能。 鉴于Java程序中字符串的普遍存在&#xff0c;我认为这是一个非常重要的更改&#xff0c;所有Java开发人员都需要理解。 在此视频中&#xff0c;查理亨特&#xff08;Charlie Hunt&#xff09;解释了此新功能的历史和实现。…

mysql怎么返回上一行_月球上并没有发射基地,阿波罗飞船是怎么返回地球的?...

美国上世纪六七十年代成功实施了载人登月工程&#xff0c;这一壮举震惊世界。阿波罗系列载人登月飞船和用于发射飞船的土星五号运载火箭名声大噪&#xff0c;而用于发射土星五号&#xff0c;位于卡纳维拉尔角肯尼迪航天发射中心的39号发射台也格外引人瞩目。土星五号全长110多米…

java中接口私有反方_Java 8:在接口中声明私有和受保护的方法

java中接口私有反方引入Java 8时&#xff0c;我们可以在接口中使用默认方法。 此功能的主要驱动程序是允许接口扩展&#xff0c;同时保留对旧接口版本的向后兼容性。 一个示例是在现有Collection类中引入stream()方法。 有时候&#xff0c;当我们想引入几种默认方法时&#xf…

凯立德地图导航2020年最新版车载_明明有车载导航,为什么很多人还是选择用手机导航?有5个原因...

提到导航&#xff0c;你首先会想到什么导航产品呢&#xff1f;如果是高德导航、百度导航之类的&#xff0c;那你主要用手机导航。但是如果除了高德导航和百度导航之外&#xff0c;你还能随口说出四维图新导航、凯立德导航&#xff0c;甚至是道道通导航之类的话&#xff0c;那你…

WebRTC Google的 BBR拥塞控制算法解析

正文之前,给出本文的图例: BBR的组成 bbr算法实际上非常简单,在实现上它由5部分组成: 1.即时速率的计算 计算一个即时的带宽bw,该带宽是bbr一切计算的基准,bbr将会根据当前的即时带宽以及其所处的pipe状态来计算pacing rate以及cwnd(见下文),后面我们会看到,这个即时…

javafx阴影_JavaFX技巧23:节省内存! 属性的阴影场

javafx阴影Java 8中引入的属性和属性绑定是非常有用的编程概念。 当您开发用户界面时&#xff0c;它们特别有用。 实际上&#xff0c;它们是如此有用&#xff0c;以至于开发人员成为所有事物都应该是属性而不是原始属性的想法的受害者。 不幸的是&#xff0c;他们很容易忘记诸如…

Leetcode 151. 翻转字符串里的单词

原题链接 题解思路&#xff0c;操作分解&#xff0c;先把整体翻转&#xff0c;然后使用双指针算法分割出单个单词再次进行翻转。 class Solution { public:string reverseWords(string s) {int k 0;//保存单词首字母位置for (int i 0; i < s.size(); i ) {if (s[i] )…

python斐波那契数列前20项_兔子繁殖问题带来的智商碾压:斐波那契数列趣谈

本文来自公众号&#xff1a;超级数学建模微信号 &#xff1a;supermodeling原文标题&#xff1a;斐波那契数列趣谈via 善科by BB一般认为斐波那契数列的提出是基于兔子的繁殖问题&#xff1a;如果一开始有一对兔子&#xff0c;它们每月生育一对兔子&#xff0c;小兔在出生后一个…

jboss使用_使用JBoss Cool Store的终极云零售指南

jboss使用我们一直在讨论为什么应用程序开发人员在App Dev Cloud Stack系列中不能再忽略其堆栈了。 带有JBoss Cool Store的App Dev Cloud 我们从头到尾讨论了各个层次&#xff0c;但尚未为您提供除Red Hat Container Development Kit&#xff08;CDK&#xff09;之外的任何应…

python基础论文_北大博士Python学习笔记,Python基础语法总结,一篇文章带你入门...

image.png网上现在Python学习资料有很多&#xff0c;但是很杂。很多初学Python的朋友就不知道该怎么去抉择&#xff0c;那些是自己当下所需要的。刚好朋友是北大的博士&#xff0c;在IT行业也工作八年了。就把他学习Python的笔记做了一些整理写下了本文。这份资料非常纯粹&…

C++ 11 深度学习(四)结构、权限修饰符

1.struct struct MyStruct {int a;char b; }; struct默认权限为public,其中最重要的是涉及到结构体数据对齐。 数据对齐&#xff0c;1.数据对齐是指存在处理结构体成员时&#xff0c;成员在内存中的起始地址编码必须是成员类型字节的整倍数。2.要以结构体中最深层的基本数据…

C++ 11 深度学习(五)类型转换:static_cast dynamic_cast const_cast reinterpret_cast

四种cast 通用形式&#xff1a;强制类型转换名<type>(express) 强制类型转换名&#xff0c;以上四种 &#xff1b;type:想要转成成的目标类型 &#xff1b; express&#xff0c;需要转换的目标 static_cast 内置数据类型转换&#xff0c;具有继承关系的指针和引用&am…

小程序triggerevent 传参_微信小程序——无限递归的层次列表

——上礼拜踩的坑1、关于为什么不直接操作DOM对象&#xff1f;因为微信小程序里没有document对象。2、为什么坑了这多时间&#xff1f;因为之前看了个过期的帖子&#xff0c;完美避开了解决方案。下面进入正文&#xff0c;需求是在微信小程序里构造一棵文件树。3、解决思路定义…

java ab工具_(ab)使用Java 8 FunctionalInterfaces作为本地方法

java ab工具如果您正在使用更高级的语言&#xff08;例如Scala或Ceylon&#xff0c;甚至JavaScript&#xff09;进行编程&#xff0c;则“嵌套函数”或“本地函数”是您非常常见的习惯用法。 例如&#xff0c;您将编写诸如fibonacci函数之类的东西&#xff1a; def f() {def …

C++ 11 深度学习(六)智能指针综述

以下三种智能指针均为类模板 1.shared_ptr 共享指针 &#xff0c;多个指针指向同一个对象&#xff0c;最后一个指针被销毁时&#xff0c;这个对象会被释放。 2.week_ptr 是辅助shared_ptr工作的 3.unique_ptr 是独占式指针&#xff0c;同一时间只能有一个指针能指向该对象…

python的仿真效果好吗_Python SimPy 仿真系列 (1)

本系列文章旨在介绍 SimPy 在工业仿真中的应用。在物流行业/工厂制造业/餐饮服务业存在大量急需优化的场景&#xff0c; 例如&#xff1a;如何最优化快递分拣人员的排班表以满足双十一突发的快递件量如何估算餐厅在用餐高峰的排队时长估算特定工序下&#xff0c;工厂生产所需要…

jax-rs jax-ws_您的JAX-RS API并非天生就等于:使用动态功能

jax-rs jax-ws这次&#xff0c;我们将讨论一些有关JAX-RS 2.0 API的内容&#xff0c;并涉及规范的一个非常有趣的方面&#xff1a; 动态功能及其实用性。 传统上&#xff0c;当配置和部署JAX-RS 2.0 API&#xff08;使用Application类&#xff0c;从servlet引导或通过RuntimeD…

10 NAT网络地址转换

广域网技术 上面聊的内容都是内网的一些配置&#xff0c;但内网终将要访问外网的&#xff0c;我们需要怎么处理呢&#xff1f;一般使用HDLC&#xff08;高级数据链路控制协议&#xff09;或者PPP&#xff08;点对点协议&#xff09;。 使用PPP安全接入Internet PPP&#xff0…