使用RxJava和Completable并行执行阻止任务

借助RxJava 1.1.1中引入的Completable抽象,如何并行执行阻止“仅副作用”(也称为void)任务的并行执行变得更加容易。

rx_logo_512 正如您可能已经注意到,阅读我的博客时,我主要专注于软件Craft.io和自动代码测试。 但是,此外,我还是持续交付和广义并发的狂热者。 最后一点从C中的纯线程和信号量到更高级别的解决方案(例如ReactiveX和actor模型)不等。 这次是全新RxJava 1.1.1 – rx.Completable引入的非常方便(在特定情况下)功能的用例。 与我的许多博客条目类似,这也反映了我在处理实际任务和用例时遇到的实际事件。

要做的任务

想象一下,一个系统对来自不同来源的异步事件进行了非常复杂的处理。 过滤,合并,转换,分组,丰富等等。 RxJava非常适合这里,特别是如果我们想要反应式的话。 假设我们已经实现了它(外观和效果很好),只剩下一件事了。 在开始处理之前,需要告知3个外部系统我们准备好接收消息。 对旧系统的3个同步调用(通过RMI,JMX或SOAP)。 它们每个都可以持续几秒钟,我们需要在开始之前等待所有它们。 幸运的是,它们已经实现,我们将它们视为可能成功(或失败的例外)的黑匣子。 我们只需要调用它们(最好同时调用)并等待完成即可。

rx.Observable –方法1

触手可及的RxJava似乎是显而易见的方法。 首先,可以通过Observable来包装作业执行:

private Observable<Void> rxJobExecute(Job job) {return Observable.fromCallable(() -> { job.execute();return null;});
}

不幸的是(在我们的例子中) Observable期望返回一些元素。 我们需要使用Void并且尴尬地return null (而不是仅仅引用方法job::execute

接下来,我们可以使用subscribeOn()方法来使用另一个线程来执行我们的工作(而不是阻塞主/当前线程–我们不想顺序地执行我们的工作)。 Schedulers.io()为调度Schedulers.io()提供了一组用于IO绑定工作的线程。

Observable<Void> run1 = rxJobExecute(job1).subscribeOn(Schedulers.io());
Observable<Void> run2 = rxJobExecute(job2).subscribeOn(Schedulers.io());

最后,我们需要等待所有它们完成(所有Obvervable s完成)。 为此,可以调整zip功能。 它结合了Obserbable拉链发射的物品的序列号。 在我们的例子中,我们只对每个Observable到的作业中的第一个伪项目感兴趣(我们仅发出null以满足API),并以阻塞的方式等待它们。 zip运算符中的zip函数需要返回某些内容,因此我们需要重复null的变通方法。

Observable.zip(run1, run2, (r1, r2) -> return null).toBlocking().single();

显而易见, Observable设计为Observable使用值流,并且需要进行一些额外的工作才能将其调整为仅产生副作用(不返回任何值)操作。 当我们需要将仅具有副作用的操作与其他返回一些值的值合并(例如合并)时,情况变得更加糟糕–需要更丑陋的转换。 请参阅RxNetty API的实际用例 。

public void execute() {Observable<Void> run1 = rxJobExecute(job1).subscribeOn(Schedulers.io());Observable<Void> run2 = rxJobExecute(job2).subscribeOn(Schedulers.io());Observable.zip(run1, run2, (r1, r2) -> null).toBlocking().single();
}private Observable<Void> rxJobExecute(Job job) {return Observable.fromCallable(() -> { job.execute();return null;});
}

rx.Observable –方法2

可能会使用另一种方法。 代替生成人工项目,可以将我们的任务中的空Observable作为onComplete操作执行。 这迫使我们从zip操作切换到merge 。 结果,我们需要提供一个onNext动作(对于空的Observable永远不会执行),这肯定了我们试图破解该系统的信念。

public void execute() {Observable<Object> run1 = rxJobExecute(job1).subscribeOn(Schedulers.io());Observable<Object> run2 = rxJobExecute(job2).subscribeOn(Schedulers.io());Observable.merge(run1, run2).toBlocking().subscribe(next -> {});
}private Observable<Object> rxJobExecute(Job job) {return Observable.empty().doOnCompleted(job::execute);
}

可完成

RxJava 1.1.1解决了对不返回任何值的Observable的更好支持。 Completable可以视为Observable的简化版本,可以成功完成(发出onCompleted事件)或失败( onError )。 创建Completable实例的最简单方法是使用fromAction方法,该方法采用不返回任何值的Action0 (例如Runnable )。

Completable completable1 = Completable.fromAction(job1::execute).subscribeOn(Schedulers.io());
Completable completable2 = Completable.fromAction(job2::execute).subscribeOn(Schedulers.io());

接下来,我们可以使用merge()方法,该方法返回一个Completable实例,该实例立即订阅所有下游Completable ,并在所有下游Completable完成(或其中一个失败)时完成。 当我们使用带外部调度程序的subscribeOn方法时,所有作业将并行执行(在不同线程中)。

Completable.merge(completable1, completable2).await();

await()方法将阻塞,直到所有作业完成(如果发生错误,将重新抛出异常)。 纯粹而简单。

public void execute() {Completable completable1 = Completable.fromAction(job1::execute).subscribeOn(Schedulers.io());Completable completable2 = Completable.fromAction(job2::execute).subscribeOn(Schedulers.io());Completable.merge(completable1, completable2).await();
}

java.util.concurrent.CompletableFuture

有人可能会问:为什么不只使用CompletableFuture ? 这将是一个很好的问题。 Java 5中引入的纯Future可能需要我们做更多的工作,而ListenableFuture (来自Guava)和CompletableFuture (来自Java 8)使其变得微不足道。

首先,我们需要运行/安排作业执行。 接下来,使用CompletableFuture.allOf()方法,我们可以创建一个新的CompletableFuture ,该工作在所有作业完成CompletableFuture完成了(我们之前没有看到过这个概念吗?)。 get()方法只是阻止等待。

public void execute() {try {CompletableFuture<Void> run1 = CompletableFuture.runAsync(job1::execute);CompletableFuture<Void> run2 = CompletableFuture.runAsync(job2::execute);CompletableFuture.allOf(run1, run2).get();} catch (InterruptedException | ExecutionException e) {throw new RuntimeException("Jobs execution failed", e);}
}

我们需要对受检查的异常进行处理(很多时候我们不想用它们来污染我们的API),但是总的来说,这看起来很明智。 但是,值得记住的是,当需要更复杂的链处理时, CompletableFuture不足。 此外,在我们的项目中已经使用RxJava时,使用相同(或相似)的API而不是引入全新的东西通常会很有用。

摘要

多亏了rx.Completable ,使用RxJava仅完成副作用(不返回任何内容)任务的执行要舒适得多。 在已经使用RxJava的代码库中,即使在简单情况下,它也可能比CompletableFuture更为可取。 但是, Completable提供了许多先进的操作员和技术,此外,还可以轻松地将它与Observable混合使用,这使其功能更加强大。

rxjava-book-small 要了解有关Completable更多信息,您可能需要查看发行说明 。 对于那些想更深入地了解主题的人,Advanced RxJava博客( 第1部分和第2 部分 )上有关于Completable API的非常详细的介绍。

  • 可以从GitHub获得代码示例的源代码。

顺便说一句,如果您总体上对RxJava感兴趣,我可以凭良心向您推荐一本书,该书目前由Tomasz Nurkiewicz和Ben Christensen – RxJava的反应式编程编写 。

翻译自: https://www.javacodegeeks.com/2016/03/parallel-execution-blocking-tasks-rxjava-completable.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/354310.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

javafx 使用_使用JavaFX AnimationTimer

javafx 使用回想一下&#xff0c;给AnimationTimer起个名字可能不是一个好主意&#xff0c;因为它不仅可以用于动画&#xff0c;还可以用于测量fps速率&#xff0c;碰撞检测&#xff0c;模拟步骤&#xff0c;游戏主循环等。实际上&#xff0c;我大部分时间都在看AnimationTimer…

.Net Core 简洁架构事件(这个不完整,待仔细补充)

.Net Core的架构 - 根据微软官方文档 微软给出了.Net Core的架构方法&#xff0c;无论是在web&#xff0c;azure&#xff0c;uwp等等 微软的github地址&#xff1a;https://github.com/dotnet-architecture/eShopOnWeb 转载于:https://www.cnblogs.com/bijinshan/p/8250512.htm…

原理图中如何连线_Altium Designer10绘制原理图

在进行原理图绘制之前,应先将原理图库与PCB库相关联,在原理图绘制完成后,在生成PCB图,如何将原理图库与PCB库相关联?先进入原理图库,如下图所示。双击元器件文件,进行元器件配置。 元器件配置界面如下图所示。选择Add...进行添加对应的PCB库。 选择浏览进行查找相关PCB库…

NetBeans Java EE技巧3:数据库中的RESTful Web服务

许多现代的Web应用程序正朝着使用HTTP使用无状态通信的方向发展。 REST&#xff08;代表性状态转移&#xff09;体系结构样式通常用于设计网络应用程序&#xff0c;而使用Java EE 7&#xff0c;很容易开发用于数据库通信的RESTful后端。 使用简单的POJO&#xff08;普通的Java旧…

参数php_PHP多参数方法的重构

php中文网最新课程每日17点准时技术干货分享假设我们要完成一个保存文章的功能&#xff0c;如果采用函数编程的方式&#xff0c;大概会是下面这个样子&#xff1a;<?php function saveArticle($title, $content, $categoryId){ // ...}?>每个参数代表一个属性&#…

k8s集群部署成功后某个节点突然出现notready状态的问题原因分析和解决办法

文章目录 1、问题描述2、查看node03的日志3、错误原因分析4、解决办法 1、问题描述 k8s集群配置为 一主三个节点&#xff1b;刚开始运行一直正常&#xff1b;某天突然node03主机状态变为notready&#xff0c;问题如下&#xff1a; 在master节点使用&#xff1a; #master节点…

kickstart_具有Java Kickstart的MongoDB

kickstartNoSQL数据库由于其可伸缩性而变得越来越流行。 适当使用时 NoSQL数据库可以提供真正的好处。 MongoDB是使用C 编写的高度可扩展的开源NoSQL数据库。 1.安装MongoDB 您可以按照所使用的操作系统&#xff0c;按照MongoDB官方网站上的说明安装MongoDB&#xff0c;而不会…

DataGuard之DG环境搭建

DG 环境搭建 1.设置归档模式 DG环境的搭建必须要把数据库启动到归档模式&#xff0c;并且为了避免开发人员使用nologging语句&#xff0c;我们还要把数据库设置为force logging。 查看数据库是否运行在归档模式&#xff1a; #su - oracle $sqlplus / as sysdba SQL>archive…

a算法解决八数码问题_javascript,八皇后问题解决

八皇后问题 八皇后问题&#xff0c;是一个古老而著名的问题&#xff0c;是回溯算法的典型案例。 该问题是国际西洋棋棋手马克斯贝瑟尔于1848年提出&#xff1a; 在88格的国际象棋上摆放八个皇后&#xff0c;使其不能互相攻击&#xff0c; 即&#xff1a;任意两个皇后都不能处于…

CentOS6.9部署zabbix3.0监控系统

环境&#xff1a; [rootredis ~]# uname -a Linux redis 2.6.32-696.el6.x86_64 #1 SMP Tue Mar 21 19:29:05 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux [rootredis ~]# cat /etc/redhat-release CentOS release 6.9 (Final) [rootredis ~]# getenforce Disabled [rootredis …

输出整形变量语句_Python合集之Python变量

在上一节的合集中&#xff0c;我们了解了Python的保留字与标识符&#xff0c;本节我们将进一步了解一下Python中关于变量的问题。变量严格意义上来讲应该称之为“名称”&#xff0c;也可以理解为标签。在Python中&#xff0c;不需要先声明变量名及其数据类型&#xff0c;直接赋…

activemq 连接_ActiveMQ网络连接器

activemq 连接这篇文章对我以及对ActiveMQ的网络连接器的工作方式可能感兴趣的任何ActiveMQ贡献者都适用。 我最近花了一些时间查看代码&#xff0c;并认为最好画一些快速的图表来帮助我记住我学到的知识&#xff0c;并在将来发现问题时帮助将来确定在哪里进行调试。 如果我输入…

如何让fragment刷新界面_快速实现android版抖音主界面的心得

原文作者&#xff1a;DK_BurNIng如何快速确定竞品某个界面的实现方式&#xff1f;当你收到产品一个需求是模仿某个竞品且时间很短没有过多时间给你调研技术方案的时候&#xff0c;如何尽快确定这个功能的技术方案呢&#xff1f; 这里我给出我自己的一个小窍门&#xff0c;可以避…

使用JBoss Cool Store的终极云零售指南

我们一直在讨论为什么应用程序开发人员在App Dev Cloud Stack系列中不能再忽略其堆栈了。 带有JBoss Cool Store的App Dev Cloud 我们从头到尾讨论了各个层&#xff0c;但尚未为您提供除Red Hat Container Development Kit&#xff08;CDK&#xff09;之外的任何应用程序开发…

02.Python 3.6.4下载与安装

02.Python 3.6.4下载与安装 https://www.python.org/downloads/release/python-364/ Windows x86-64可执行文件安装程序视窗对于AMD64 / EM64T / x64&#xff0c;不是安腾处理器bee5746dc6ece6ab49573a9f54b5d0a131684744SIG我下载的是这个&#xff1a; https://www.python.or…

2018年1月 常用的linux命令

项目中经常用到的Linux命令 &#xff08;注意&#xff1a;linux命令要小写哦&#xff01;&#xff09; &#xff08;1&#xff09;、ls 显示当前目录下的文件 &#xff08;2&#xff09;、vi vim 进入编辑器&#xff0c;可以选择你要编辑的文档&#xff0c;一般我们将项目打…

javafx2_JavaFX 2 GameTutorial第4部分

javafx2介绍 这是与JavaFX 2游戏教程相关的六个部分系列的第四部分。 如果您错过了第1部分 &#xff0c; 第2部分或第3部分 &#xff0c;我建议您在开始本教程之前仔细阅读它们。 回顾一下&#xff0c;在第3部分中&#xff0c;我为您提供了许多经典街机风格游戏以及所使用的不同…

vue项目 乐橙云 轻应用直播SDK imouplayer.js

官网案例&#xff1a;https://open.imoulife.com/book/light/sdk.html 文档&#xff1a; https://open.imou.com/developDoc/31 1&#xff0c;下载 对应的资源 https://open.imoulife.com/book/readme/upload.html 2&#xff0c;引入资源 2.1 把下载的资源(static&#xff0c…

jmx 复用 jmx_JMX:一些入门说明

jmx 复用 jmxJMX&#xff08;Java管理扩展&#xff09;是一种J2SE技术&#xff0c;可以管理和监视Java应用程序。 基本思想是实现一组管理对象&#xff0c;并将实现注册到平台服务器&#xff0c;在平台服务器上&#xff0c;可以使用一组连接器或适配器从本地或远程调用这些实现…

git 上下载的项目在本地安装依赖时报错 Could not resolve dependency

安装依赖时报错&#xff1a;无法安装依赖 看报错里面的提示&#xff1a;this command with --force, or --legacy-peer-deps –force 会无视冲突&#xff0c;并强制获取远端npm库资源&#xff0c;即使本地有资源也会覆盖掉&#xff1b;–legacy-peer-deps&#xff1a;安装时…