惯用并发:flatMap()与parallel()– RxJava常见问题解答

简单,有效和安全的并发是RxJava的设计原则之一。 然而,具有讽刺意味的是,它可能是该库中最容易被误解的方面之一。 让我们举一个简单的例子:假设我们有一堆UUID并且对于每个UUID ,我们必须执行一组任务。 第一个问题是每个UUID都要执行I / O密集型操作,例如,从数据库加载对象:

Flowable<UUID> ids = Flowable.fromCallable(UUID::randomUUID).repeat().take(100);ids.subscribe(id -> slowLoadBy(id));

首先,为了测试,我将生成100个随机UUID。 然后,对于每个UUID,我想使用以下方法加载记录:

Person slowLoadBy(UUID id) {//...
}

slowLoadBy()的实现是无关紧要的,请记住它是缓慢且阻塞的。 使用subscribe()调用slowLoadBy()有许多缺点:

  • subscribe()根据设计是单线程的,无法解决。 每个UUID顺序加载
  • 当您调用subscribe() ,无法进一步转换Person对象。 这是终端操作

一种更健壮,甚至更残破的方法是map()每个UUID

Flowable<Person> people = ids.map(id -> slowLoadBy(id));  //BROKEN

这是非常可读的,但不幸的是损坏了。 就像订阅者一样,运算符也是单线程的。 这意味着在任何给定时间只能映射一个UUID ,此处也不允许并发。 更糟糕的是,我们从上游继承线程/工作者。 这有几个缺点。 如果上游使用某些专用的调度程序产生事件,我们将劫持该调度程序中的线程。 例如,许多操作符(例如interval() Schedulers.computation()透明地使用Schedulers.computation()线程池。 我们突然开始在完全不适合该池的池上执行I / O密集型操作。 此外,我们通过这一阻塞性顺序步骤降低了整个管道的速度。 非常非常糟糕。

您可能已经听说过这个subscribeOn()运算符,以及它如何启用并发。 确实,但是在应用它时必须非常小心。 以下示例(再次)是错误的

import io.reactivex.schedulers.Schedulers;Flowable<Person> people = ids.subscribeOn(Schedulers.io()).map(id -> slowLoadBy(id)); //BROKEN

上面的代码段仍然损坏。 observeOn() subscribeOn() (以及该对象的observeOn() )几乎不会将执行切换到其他工作程序(线程),而不会引入任何并发性。 流仍然按顺序处理所有事件,但是在不同的线程上。 换句话说,我们现在不是在从上游继承的线程上顺序使用事件,而是在io()线程上顺序使用事件。 那么,这个神话般的flatMap()运算符呢?

flatMap()运算符可以解救

flatMap()运算符通过将事件流分成子流来启用并发。 但首先,还有一个破碎的示例:

Flowable<Person> asyncLoadBy(UUID id) {return Flowable.fromCallable(() -> slowLoadBy(id));
}Flowable<Person> people = ids.subscribeOn(Schedulers.io()).flatMap(id -> asyncLoadBy(id)); //BROKEN

哦,天哪,这还是坏了flatMap()运算符在逻辑上做两件事:

  • 在每个上游事件上应用转换( id -> asyncLoadBy(id) )–这将产生Flowable<Flowable<Person>> 。 这是有道理的,对于每个上游UUID我们都有一个Flowable<Person>因此最终得到的是Person对象流
  • 然后flatMap()尝试一次订阅所有这些内部子流。 每当任何子流发出Person事件时,它将作为外部Flowable的结果透明地传递。

从技术上讲, flatMap()仅创建和预订前128个(默认情况下,可选的maxConcurrency参数)子流。 同样,当最后一个子流完成时, Person外部流也将完成。 现在,这到底为什么被打破? 除非明确要求,否则RxJava不会引入任何线程池。 例如,这段代码仍在阻塞:

log.info("Setup");
Flowable<String> blocking = Flowable.fromCallable(() -> {log.info("Starting");TimeUnit.SECONDS.sleep(1);log.info("Done");return "Hello, world!";});
log.info("Created");
blocking.subscribe(s -> log.info("Received {}", s));
log.info("Done");

仔细查看输出,特别是有关事件和线程的顺序:

19:57:28.847 | INFO  | main | Setup
19:57:28.943 | INFO  | main | Created
19:57:28.949 | INFO  | main | Starting
19:57:29.954 | INFO  | main | Done
19:57:29.955 | INFO  | main | Received Hello, world!
19:57:29.957 | INFO  | main | Done

没有任何并发​​,没有额外的线程。 仅将阻塞代码包装在Flowable中不会神奇地增加并发性。 您必须显式使用… subscribeOn()

log.info("Setup");
Flowable<String> blocking = Flowable.fromCallable(() -> {log.info("Starting");TimeUnit.SECONDS.sleep(1);log.info("Done");return "Hello, world!";}).subscribeOn(Schedulers.io());
log.info("Created");
blocking.subscribe(s -> log.info("Received {}", s));
log.info("Done");

这次的输出更有希望:

19:59:10.547 | INFO  | main | Setup
19:59:10.653 | INFO  | main | Created
19:59:10.662 | INFO  | main | Done
19:59:10.664 | INFO  | RxCachedThreadScheduler-1 | Starting
19:59:11.668 | INFO  | RxCachedThreadScheduler-1 | Done
19:59:11.669 | INFO  | RxCachedThreadScheduler-1 | Received Hello, world!

但是我们上次确实使用了subscribeOn() ,这是怎么回事? 嗯,外部流级别的subscribeOn()基本上说所有事件都应在此流中的不同线程上顺序处理。 我们并不是说应该同时运行许多子流。 并且由于所有子流都处于阻塞状态,因此当RxJava尝试订阅所有子流时,它会有效地依次依次订阅。 asyncLoadBy()并不是真正的async ,因此,当flatMap()运算符尝试对其进行订阅时,它会阻塞。 修复很容易。 通常,您会将subscribeOn()放在asyncLoadBy()但出于教育目的,我将其直接放置在asyncLoadBy()道中:

Flowable<Person> people = ids.flatMap(id -> asyncLoadBy(id).subscribeOn(Schedulers.io()));

现在它就像一种魅力! 默认情况下,RxJava将接收前128个上游事件( UUID ),将它们转换为子流并订阅所有这些事件。 如果子流是异步且高度可并行化的(例如,网络调用), asyncLoadBy()获得128个并发调用asyncLoadBy() 。 并发级别(128)可通过maxConcurrency参数配置:

Flowable<Person> people = ids.flatMap(id ->asyncLoadBy(id).subscribeOn(Schedulers.io()),10  //maxConcurrency);

那是很多工作,您不觉得吗? 并发性不应该更具声明性吗? 我们不再处理Executor和期货,但似乎这种方法太容易出错。 它难道不像Java 8流中的parallel()一样简单?

输入ParallelFlowable

让我们首先来看一下我们的示例,并通过添加filter()使它更加复杂:

Flowable<Person> people = ids.map(this::slowLoadBy)     //BROKEN.filter(this::hasLowRisk); //BROKEN

hasLowRisk()慢速谓词:

boolean hasLowRisk(Person p) {//slow...
}

我们已经知道解决此问题的惯用方法是使用flatMap()两次:

Flowable<Person> people = ids.flatMap(id -> asyncLoadBy(id).subscribeOn(io())).flatMap(p -> asyncHasLowRisk(p).subscribeOn(io()));

asyncHasLowRisk()相当模糊-谓词通过时返回单元素流,失败则返回空流。 这是使用flatMap()模拟filter() flatMap() 。 我们可以做得更好吗? 从RxJava 2.0.5开始,有一个新的运算符叫做… parallel() ! 令人惊讶的是,由于许多误解和滥用,在RxJava成为1.0之前已删除了同名的运算符。 2.x中的parallel()似乎最终以一种安全和声明性的方式解决了惯用并发问题。 首先,让我们看一些漂亮的代码!

Flowable<Person> people = ids.parallel(10).runOn(Schedulers.io()).map(this::slowLoadBy).filter(this::hasLowRisk).sequential();

就这样! parallel()sequential()之间的代码块parallel()运行。 我们有什么在这里? 首先,新的parallel()运算符将Flowable<UUID>转换为ParallelFlowable<UUID> ,该API的API比Flowable小得多。 您将在第二秒看到原因。 可选的int参数(在我们的示例中为10 )定义并发性,或者(如文档所述)定义创建多少个并发的“ rails”。 因此,对于我们来说,我们将单个Flowable<Person>分成10个并发的独立轨道(认为是thread )。 来自UUID原始流的事件被拆分( modulo 10 )为不同的轨,子流彼此独立。 将它们视为将上游事件发送到10个单独的线程中。 但是首先,我们必须使用方便的runOn()运算符定义这些线程的来源。 这比Java 8流上的parallel()好得多,在Java 8流上,您无法控制并发级别。

至此,我们有了一个ParallelFlowable 。 当事件出现在上游( UUID )中时,它将委派给10个“轨道”,并发,独立的管道之一。 管道提供了可以安全地并行运行的运算符的有限子集,例如map()filter() ,还包括reduce() 。 没有buffer()take()等,因为一次在多个子流上调用它们的语义尚不清楚。 我们的阻塞slowLoadBy()hasLowRisk()仍按顺序调用,但仅在单个“ rail”中。 因为我们现在有10个并发的“ rails”,所以我们无需花费太多精力就可以有效地并行化它们。

当事件到达子流(“ rail”)的末尾时,它们会遇到sequential()运算符。 该运算符将ParallelFlowableFlowable 。 只要我们的映射器和过滤器是线程安全的, parallel() / sequential()对就提供了非常简单的并行化流的方法。 一个小警告-您将不可避免地使邮件重新排序。 顺序map()filter()始终保留顺序(就像大多数运算符一样)。 但是,一旦在parallel()块中运行它们,顺序就会丢失。 这允许更大的并发性,但是您必须牢记这一点。

您是否应该使用parallel()而不是嵌套的flatMap()来并行化代码? 这取决于您,但是parallel()似乎更容易阅读和掌握。

翻译自: https://www.javacodegeeks.com/2017/09/idiomatic-concurrency-flatmap-vs-parallel-rxjava-faq.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/349195.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

python%20开发工具_20招让你的Python飞起来!

今天分享的这篇文章&#xff0c;文字不多&#xff0c;代码为主。绝对干货&#xff0c;童叟无欺&#xff0c;主要分享了提升 Python 性能的 20 个技巧&#xff0c;教你如何告别慢Python。原文作者 开元&#xff0c;全栈程序员&#xff0c;使用 Python, Java, PHP和C。1. 优化算法…

c语言万能预编译,Objective-C学习笔记

import 指令(将文件的内容在预编译的时候拷贝到写指令的地方)import Foundation 框架NSLog 函数 NSLog("Hello, World!");NSString语法: NSString *str "jack";字符串占位符: %数据类型C 语言支持的数据类型基本数据类型int double float char构造类型数组…

Java命令行界面(第19部分):jClap

本系列中第19篇文章的重点是从Java代码解析命令行参数是jClap &#xff08; Java命令行参数解析器 &#xff09;&#xff0c;不应将它与称为JCLAP的库相混淆&#xff0c;而JCLAP库是我本系列先前文章的重点。 在以前的帖子覆盖JCLAP 1.4加尔斯吉尔温斯坦利&#xff08; snaq.ne…

使用Arquillian和LocalStack脱机测试AWS云堆栈

在AWS云堆栈 &#xff08;例如DynamoDB&#xff0c;S3等&#xff09;上构建应用程序时&#xff0c;需要针对这些组件编写测试。 您可能首先想到的是拥有一个用于生产的环境和一个用于测试的环境&#xff0c;然后针对该环境运行测试。 这对于集成测试&#xff0c;部署测试&…

python取文本中间_Python读取两个字符串之间的特定文本行

我无法让python读取特定的行。我正在做的事情是这样的&#xff1a;lines of data not neededlines of data not neededlines of data not needed--------------------------------------***** REPORT 1 *****--------------------------------------[key] lines of interest ar…

c语言7.5return的值是,这个真心搞不懂了。求助

该楼层疑似违规已被系统折叠 隐藏此楼查看此楼回复 15楼. 假如你每天签到拿4经验&#xff0c;300000/475000天&#xff0c;如果从1岁开始签到&#xff0c;那100年36500天&#xff0c;你差不多要活200年保持每天签到(谁知道200年后还有没有签到这玩意)&#xff0c;如果你每天再水…

Java命令行界面(第22部分):argparser

John Lloyd的argparser是本系列的第二十二篇有关基于Java的命令行参数解析的文章中介绍的库。 该库的主页除了提供单个源代码示例外&#xff0c;还提供了指向基于Javadoc的API文档 &#xff0c;JAR文件&#xff0c;ZIP文件和TAR文件的链接。 本帖子中使用的示例与本系列的前二十…

python中布尔类型的值包括0和1_Python中布尔型变量的值为0和1。( )

【单选题】以下哪个不能作为字典的键。【多选题】以下哪些元素是可变序列。【单选题】已知 x[1,2,3,4,5,6,7],执行语句x.pop()的结果是()。【多选题】以下哪几个可以作为字典的键。【单选题】以下哪个属于列表的定界符。【多选题】以下哪几个选项类型属于元组。【单选题】下面代…

如何粗暴地下载huggingface_hub指定数据文件

参考这里&#xff1a; https://huggingface.co/docs/huggingface_hub/guides/download 可见下载单个文件&#xff0c;下载整个仓库文件都是可行的。 这是使用snapshot_download下载的一个例子&#xff1a; https://qq742971636.blog.csdn.net/article/details/135150482 sn…

顺序表输入栈元素c语言,C语言数据结构之栈简单操作

C语言数据结构之栈简单操作实验&#xff1a;编写一个程序实现顺序栈的各种基本运算&#xff0c;并在此基础上设计一个主程序&#xff0c;完成如下功能&#xff1a;(1)初始化顺序栈(2)插入元素(3)删除栈顶元素(4)取栈顶元素(5)遍历顺序栈(6)置空顺序栈分析:栈的顺序存储结构简称…

rete_Rete之外的生活– RIP Rete 2013 :)

rete我只是对我的新算法做最后的修改。 它融合了Leaps &#xff0c; 面向集合的Match和Left / Right取消链接的概念 &#xff0c;以及我自己的一些想法。 该代码已提交&#xff0c;但我正在积累工作并编写更多测试。 我将在一周左右的时间内写一个完整的博客&#xff0c;详细介…

25q64存储多个数据_一篇文章看懂,存储虚拟化在不同用例中的实践与优势

存储虚拟化是一种对物理存储资源进行抽象的技术&#xff0c;使其看起来像是一个集中的资源。虚拟化掩盖了管理内存、网络、服务器和存储中资源的复杂性。存储虚拟化运行在多个存储设备上&#xff0c;使它们看起来就像一个单一的存储池。这些池化的存储设备可以来自不同的供应商…

android代码画出波浪球,Android绘制波浪曲线,效果很赞的。

github地址&#xff1a;https://github.com/sddyljsx/Android-SurfView-WaveViewpackage neal.canvas;import android.content.Context;import android.graphics.Canvas;import android.graphics.Color;import android.graphics.Paint;import android.graphics.Path;import and…

Java命令行界面(第14部分):google-options

google-options的GitHub页面指出google-options是“来自Google&#xff08;java&#xff09;的人们的命令行参数解析库。” 该页面继续说&#xff1a;“这是Bazel Project中的命令行参数解析器。 com.google.devtools.common.options程序包已拆分为一个单独的jar&#xff0c;用…

python自动化工具哪个好用_10款好用的自动化测试工具推荐

当我们功能测试干的时间比较久了,或者想要学习更多的技术,提升自己的时候,基本上第一时间就会想到的是自动化测试。而在自动化测试领域&#xff0c;自动化工具的核心地位毋庸置疑&#xff0c;下面为大家推荐10款常见常用的自动化测试工具&#xff1a;1、SeleniumWEB自动化测试S…

android 输入法文本选择功能,Android的文本和输入---创建输入法(一)

输入法编辑器(IME)是让用户输入文本的控件。Android提供了一个可扩展的的输入法的框架&#xff0c;它允许应用程序给用户提供另外的输入法&#xff0c;如软键盘或语音输入。这些输入法一旦安装&#xff0c;用户就可以从系统的设置中选择他们想要使用的IME&#xff0c;并且这个设…

python基础list_python基础操作---list

1 #coding:utf-82 list1 [physics, chemistry, 1997, 2000];3 list2 [1, 2, 3, 4, 5 ];4 list3 ["a", "b", "c", "d"];56 #切片功能跟str一样7 print "list1[0]: ", list1[0]8 print "list2[1:5]: ", list2[1:…

华为mate40RS能升级鸿蒙,mate40Pro和40RS能用上鸿蒙系统吗

[分享交流]mate40Pro和40RS能用上鸿蒙系统吗8886电梯直达huafen210861086新学乍练发表于 2020-12-18 12:30:08来自&#xff1a;HUAWEI Mate 40 Pro最新回复 2020-12-19 09:50:21如题好多人都说不能用上鸿蒙系统林泽徐独步江湖发表于 2020-12-18 12:30:52来自&#xff1a;HUAWEI…

在JShell中尝试Java9 HTTP客户端和Process API

这篇文章继续了My My Java 9 Features博客文章中对Java9功能的探索。 在这里&#xff0c;我们用在Java9 HTTP / 2客户端和进程API试验JShell HTTP / 2客户端 HTTP / 2客户端是Java9中的孵化器项目。 这意味着该API尚未最终确定&#xff0c;因此在将来的版本中仍有一定的更改范…

python怎么读取pdf文件_Python解析并读取PDF文件内容的方法

本文实例讲述了Python解析并读取PDF文件内容的方法。分享给大家供大家参考&#xff0c;具体如下&#xff1a;一、问题描述利用python&#xff0c;去读取pdf文本内容。二、效果三、运行环境python2.7四、需要安装的库pip install pdfminer五、实现源代码代码1(win64)# codingutf…