使用RxJava和Completable并行执行阻止任务

借助RxJava 1.1.1中引入的Completable抽象,如何并行执行阻止“仅副作用”(也称为void)任务的并行执行变得更加容易。

rx_logo_512 正如您可能已经注意到,阅读我的博客时,我主要专注于软件Craft.io和自动代码测试。 但是,此外,我还是持续交付和广义并发的狂热者。 最后一点从C中的纯线程和信号量到更高级别的解决方案(例如ReactiveX和actor模型)不等。 这次是全新RxJava 1.1.1 – rx.Completable引入的非常方便(在特定情况下)功能的用例。 与我的许多博客条目类似,这也反映了我在处理实际任务和用例时遇到的实际事件。

要做的任务

想象一下,一个系统对来自不同来源的异步事件进行了非常复杂的处理。 过滤,合并,转换,分组,丰富等等。 RxJava非常适合这里,特别是如果我们想要反应式的话。 假设我们已经实现了它(外观和效果很好),只剩下一件事了。 在开始处理之前,需要告知3个外部系统我们准备好接收消息。 对旧系统的3个同步调用(通过RMI,JMX或SOAP)。 它们每个都可以持续几秒钟,我们需要在开始之前等待所有它们。 幸运的是,它们已经实现,我们将它们视为可能成功(或失败的例外)的黑匣子。 我们只需要调用它们(最好同时调用)并等待完成即可。

rx.Observable –方法1

触手可及的RxJava似乎是显而易见的方法。 首先,可以通过Observable来包装作业执行:

private Observable<Void> rxJobExecute(Job job) {return Observable.fromCallable(() -> { job.execute();return null;});
}

不幸的是(在我们的例子中) Observable期望返回一些元素。 我们需要使用Void并且尴尬地return null (而不是仅仅引用方法job::execute

接下来,我们可以使用subscribeOn()方法来使用另一个线程来执行我们的工作(而不是阻塞主/当前线程–我们不想顺序地执行我们的工作)。 Schedulers.io()为调度Schedulers.io()提供了一组用于IO绑定工作的线程。

Observable<Void> run1 = rxJobExecute(job1).subscribeOn(Schedulers.io());
Observable<Void> run2 = rxJobExecute(job2).subscribeOn(Schedulers.io());

最后,我们需要等待所有它们完成(所有Obvervable s完成)。 为此,可以调整zip功能。 它结合了Obserbable拉链发射的物品的序列号。 在我们的例子中,我们只对每个Observable到的作业中的第一个伪项目感兴趣(我们仅发出null以满足API),并以阻塞的方式等待它们。 zip运算符中的zip函数需要返回某些内容,因此我们需要重复null的变通方法。

Observable.zip(run1, run2, (r1, r2) -> return null).toBlocking().single();

显而易见, Observable设计为Observable使用值流,并且需要进行一些额外的工作才能将其调整为仅产生副作用(不返回任何值)操作。 当我们需要将仅具有副作用的操作与其他返回一些值的值合并(例如合并)时,情况变得更加糟糕–需要更丑陋的转换。 请参阅RxNetty API的实际用例 。

public void execute() {Observable<Void> run1 = rxJobExecute(job1).subscribeOn(Schedulers.io());Observable<Void> run2 = rxJobExecute(job2).subscribeOn(Schedulers.io());Observable.zip(run1, run2, (r1, r2) -> null).toBlocking().single();
}private Observable<Void> rxJobExecute(Job job) {return Observable.fromCallable(() -> { job.execute();return null;});
}

rx.Observable –方法2

可能会使用另一种方法。 代替生成人工项目,可以将我们的任务中的空Observable作为onComplete操作执行。 这迫使我们从zip操作切换到merge 。 结果,我们需要提供一个onNext动作(对于空的Observable永远不会执行),这肯定了我们试图破解该系统的信念。

public void execute() {Observable<Object> run1 = rxJobExecute(job1).subscribeOn(Schedulers.io());Observable<Object> run2 = rxJobExecute(job2).subscribeOn(Schedulers.io());Observable.merge(run1, run2).toBlocking().subscribe(next -> {});
}private Observable<Object> rxJobExecute(Job job) {return Observable.empty().doOnCompleted(job::execute);
}

可完成

RxJava 1.1.1解决了对不返回任何值的Observable的更好支持。 Completable可以视为Observable的简化版本,可以成功完成(发出onCompleted事件)或失败( onError )。 创建Completable实例的最简单方法是使用fromAction方法,该方法采用不返回任何值的Action0 (例如Runnable )。

Completable completable1 = Completable.fromAction(job1::execute).subscribeOn(Schedulers.io());
Completable completable2 = Completable.fromAction(job2::execute).subscribeOn(Schedulers.io());

接下来,我们可以使用merge()方法,该方法返回一个Completable实例,该实例立即订阅所有下游Completable ,并在所有下游Completable完成(或其中一个失败)时完成。 当我们使用带外部调度程序的subscribeOn方法时,所有作业将并行执行(在不同线程中)。

Completable.merge(completable1, completable2).await();

await()方法将阻塞,直到所有作业完成(如果发生错误,将重新抛出异常)。 纯粹而简单。

public void execute() {Completable completable1 = Completable.fromAction(job1::execute).subscribeOn(Schedulers.io());Completable completable2 = Completable.fromAction(job2::execute).subscribeOn(Schedulers.io());Completable.merge(completable1, completable2).await();
}

java.util.concurrent.CompletableFuture

有人可能会问:为什么不只使用CompletableFuture ? 这将是一个很好的问题。 Java 5中引入的纯Future可能需要我们做更多的工作,而ListenableFuture (来自Guava)和CompletableFuture (来自Java 8)使其变得微不足道。

首先,我们需要运行/安排作业执行。 接下来,使用CompletableFuture.allOf()方法,我们可以创建一个新的CompletableFuture ,该工作在所有作业完成CompletableFuture完成了(我们之前没有看到过这个概念吗?)。 get()方法只是阻止等待。

public void execute() {try {CompletableFuture<Void> run1 = CompletableFuture.runAsync(job1::execute);CompletableFuture<Void> run2 = CompletableFuture.runAsync(job2::execute);CompletableFuture.allOf(run1, run2).get();} catch (InterruptedException | ExecutionException e) {throw new RuntimeException("Jobs execution failed", e);}
}

我们需要对受检查的异常进行处理(很多时候我们不想用它们来污染我们的API),但是总的来说,这看起来很明智。 但是,值得记住的是,当需要更复杂的链处理时, CompletableFuture不足。 此外,在我们的项目中已经使用RxJava时,使用相同(或相似)的API而不是引入全新的东西通常会很有用。

摘要

多亏了rx.Completable ,使用RxJava仅完成副作用(不返回任何内容)任务的执行要舒适得多。 在已经使用RxJava的代码库中,即使在简单情况下,它也可能比CompletableFuture更为可取。 但是, Completable提供了许多先进的操作员和技术,此外,还可以轻松地将它与Observable混合使用,这使其功能更加强大。

rxjava-book-small 要了解有关Completable更多信息,您可能需要查看发行说明 。 对于那些想更深入地了解主题的人,Advanced RxJava博客( 第1部分和第2 部分 )上有关于Completable API的非常详细的介绍。

  • 可以从GitHub获得代码示例的源代码。

顺便说一句,如果您总体上对RxJava感兴趣,我可以凭良心向您推荐一本书,该书目前由Tomasz Nurkiewicz和Ben Christensen – RxJava的反应式编程编写 。

翻译自: https://www.javacodegeeks.com/2016/03/parallel-execution-blocking-tasks-rxjava-completable.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/354310.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

svn 回退到指定版本无法提交_SVN终端演练-版本回退

1. 版本回退概念以及原因?概念: 是指将代码(本地代码或者服务器代码), 回退到之前记录的某一特定版本原因: 如果代码做错了, 想返回之前某个状态重做;2. 修改了,但未提交的情况下, 回退代码方案1: (大力推荐)svn revert (作用:返回到上次提交后版本对应的最原始的状态)方案2: …

vue-awesome-swiper缩略图无法联动的问题

一&#xff0c;安装 npm install vue-awesome-swiper --save // npm install vue-awesome-swiper3.x --save我安装的版本是 “swiper”: “^6.6.1”, “vue-awesome-swiper”: “^3.1.3”, 在使用vue-awesome-swiper组件的时候&#xff0c;缩略图无法和大图实现联动&#xff…

个人博客13

今天的任务依旧为美化界面。 昨天的任务为美化界面。 遇到的问题为页面的布局&#xff0c;以及颜色的搭配。转载于:https://www.cnblogs.com/qilin20/p/8232942.html

tcp wireshark 过滤syn_使用 WireShark 分析 TCP/IP 三次握手 和 四次挥手

TCP 三次握手 示意图Wireshark 抓包注意事项为了演示一个TCP三次握手建立连接的过程&#xff0c;我们通过 Chrome 访问一个网页。已知 HTTP 协议就是建立在TCP链接上的通过 Cmd 的 ping 命令获取 这个网站对应的 IP地址 183.136.236.13确定 这个IP 有一个非常重要的好处&#x…

Python 基础函数

1.输入输出函数 input() 打印输出的内容后&#xff0c;接收输入用户输入的内容&#xff0c;默认为“字符串”类型 print() 打印输出的内容 2.类型转换函数 int() 转为整数 str() 转为字符串 3.类型判断函数 string.isdigit() 判断string是否为数字格式&#xff0c;返回布尔值 4…

Vue+Element导入导出Excel

一&#xff0c;安装 npm install -S file-saver xlsx npm install -D script-loader 二&#xff0c;导入Excel 1&#xff0c;Element 上传控件 <el-uploadaction"/":on-change"onChange":auto-upload"false":show-file-list"false&quo…

javafx 使用_使用JavaFX AnimationTimer

javafx 使用回想一下&#xff0c;给AnimationTimer起个名字可能不是一个好主意&#xff0c;因为它不仅可以用于动画&#xff0c;还可以用于测量fps速率&#xff0c;碰撞检测&#xff0c;模拟步骤&#xff0c;游戏主循环等。实际上&#xff0c;我大部分时间都在看AnimationTimer…

python 定时任务系统_Python定时任务,实现自动化的方法

python教程栏目介绍实现自动化的方法。1. 安装cron基本上所有的Linux发行版在默认情况下都预安装了cron工具。即使未预装cron&#xff0c;也很简单&#xff0c;执行几条简单的命令就可手动安装# 检查是否已经预装了cronservice cron status复制代码安装并启动服务安装&#xff…

sudo su 和 sudo -s区别

sudo su 和 sudo -s区别 sudo su 和 sudo -s都是切换到root用户&#xff0c;不同的是&#xff1a; sudo su 环境用的是目标用户(root)的环境 sudo -s 环境用的是当前用户本身的环境 posted on 2018-01-08 22:25 老于601 阅读(...) 评论(...) 编辑 收藏 转载于:https://www.cnbl…

js UTF-8编码转为字符串

// UTF8编码转成汉字字符串 export function revertUTF8(szInput) {var x,wch,wch1,wch2,uch"",szRet"";for (x0; x<szInput.length; x) {if (szInput.charAt(x)"%") {wch parseInt(szInput.charAt(x) szInput.charAt(x),16);if (!wch) {bre…

.Net Core 简洁架构事件(这个不完整,待仔细补充)

.Net Core的架构 - 根据微软官方文档 微软给出了.Net Core的架构方法&#xff0c;无论是在web&#xff0c;azure&#xff0c;uwp等等 微软的github地址&#xff1a;https://github.com/dotnet-architecture/eShopOnWeb 转载于:https://www.cnblogs.com/bijinshan/p/8250512.htm…

原理图中如何连线_Altium Designer10绘制原理图

在进行原理图绘制之前,应先将原理图库与PCB库相关联,在原理图绘制完成后,在生成PCB图,如何将原理图库与PCB库相关联?先进入原理图库,如下图所示。双击元器件文件,进行元器件配置。 元器件配置界面如下图所示。选择Add...进行添加对应的PCB库。 选择浏览进行查找相关PCB库…

vue 父组件与子组件之间的传值(普通传值)

一&#xff0c;子组件向父组件传值&#xff08;$emit&#xff09;&#xff1a; 1、定义子组件 <template><div>子组件:<span>{{childValue}}</span><!-- 定义一个子组件传值的方法 --><input type"button" value"点击触发&q…

NetBeans Java EE技巧3:数据库中的RESTful Web服务

许多现代的Web应用程序正朝着使用HTTP使用无状态通信的方向发展。 REST&#xff08;代表性状态转移&#xff09;体系结构样式通常用于设计网络应用程序&#xff0c;而使用Java EE 7&#xff0c;很容易开发用于数据库通信的RESTful后端。 使用简单的POJO&#xff08;普通的Java旧…

python的序列包括字符串列表和什么_Python基础:03序列:字符串、列表和元组

一&#xff1a;序列1&#xff1a;连接操作符()这个操作符允许把一个序列和另一个相同类型的序列做连接&#xff0c;生成新的序列。语法如下&#xff1a;sequence1 sequence2该表达式的结果是一个包含sequence1和sequence2 的内容的新序列。注意&#xff0c;这个操作不是合并操…

vue 父组件与子组件之间的传值(主动传值)

一&#xff0c;父组件主动传值 1&#xff0c;父组件 <Settlement-Table1 ref"comp1"></Settlement-Table1>click(){this.$refs.comp1.getData(this.list1) } // this.list1 是需要穿的值2&#xff0c;子组件 getData(data){console.log("父组件…

参数php_PHP多参数方法的重构

php中文网最新课程每日17点准时技术干货分享假设我们要完成一个保存文章的功能&#xff0c;如果采用函数编程的方式&#xff0c;大概会是下面这个样子&#xff1a;<?php function saveArticle($title, $content, $categoryId){ // ...}?>每个参数代表一个属性&#…

[转载].NET平台微服务项目汇集

最近博客园出现了一篇文章《微服务时代之2017年五军之战&#xff1a;Net PHP谁先死》&#xff0c;掀起了一波撕逼&#xff0c;作者只是从一个使用者的角度来指点江山&#xff0c;这个姿势是不对的。.NET Core就是专门针对模块化的微服务架构而设计&#xff0c;在微服务架构这方…

k8s集群部署成功后某个节点突然出现notready状态的问题原因分析和解决办法

文章目录 1、问题描述2、查看node03的日志3、错误原因分析4、解决办法 1、问题描述 k8s集群配置为 一主三个节点&#xff1b;刚开始运行一直正常&#xff1b;某天突然node03主机状态变为notready&#xff0c;问题如下&#xff1a; 在master节点使用&#xff1a; #master节点…

el-tree与el-transfer结合成树形穿梭框(tree-transfer)

下载 npm install el-tree-transfer --save<tree-transfer :title"[模块&#xff08;菜单&#xff09;访问权限, 拥有的操作权限]":from_datafromData:to_datatoData:defaultProps"{label:label, children: children}"addBtnaddremoveBtnremove:modemo…