并发查询parallel_惯用并发:flatMap()与parallel()– RxJava常见问题解答

并发查询parallel

简单,有效和安全的并发是RxJava的设计原则之一。 然而,具有讽刺意味的是,它可能是该库中最容易被误解的方面之一。 让我们举一个简单的例子:假设我们有一堆UUID并且对于每个UUID ,我们必须执行一组任务。 第一个问题是每个UUID都要执行I / O密集型操作,例如,从数据库加载对象:

Flowable<UUID> ids = Flowable.fromCallable(UUID::randomUUID).repeat().take(100);ids.subscribe(id -> slowLoadBy(id));

首先,为了测试,我将生成100个随机UUID。 然后,对于每个UUID,我想使用以下方法加载记录:

Person slowLoadBy(UUID id) {//...
}

slowLoadBy()的实现是无关紧要的,请记住它是缓慢且阻塞的。 使用subscribe()调用slowLoadBy()有许多缺点:

  • subscribe()根据设计是单线程的,无法解决。 每个UUID顺序加载
  • 当您调用subscribe() ,无法进一步转换Person对象。 这是终端操作

一种更健壮,甚至更残破的方法是map()每个UUID

Flowable<Person> people = ids.map(id -> slowLoadBy(id));  //BROKEN

这是非常可读的,但不幸的是损坏了。 就像订阅者一样,运算符也是单线程的。 这意味着在任何给定时间只能映射一个UUID ,此处也不允许并发。 更糟糕的是,我们从上游继承线程/工作者。 这有几个缺点。 如果上游使用某些专用的调度程序产生事件,我们将劫持该调度程序中的线程。 例如,许多操作符(例如interval() Schedulers.computation()透明地使用Schedulers.computation()线程池。 我们突然开始在完全不适合该功能的池上执行I / O密集型操作。 此外,我们通过这一阻塞性顺序步骤降低了整个管道的速度。 非常非常糟糕。

您可能已经听说过这个subscribeOn()运算符,以及它如何启用并发。 确实,但是在应用它时必须非常小心。 以下示例(再次)是错误的

import io.reactivex.schedulers.Schedulers;Flowable<Person> people = ids.subscribeOn(Schedulers.io()).map(id -> slowLoadBy(id)); //BROKEN

上面的代码段仍然损坏。 observeOn() subscribeOn() (以及该事件的observeOn() )几乎不会将执行切换到其他工作程序(线程),而不会引入任何并发性。 流仍然按顺序处理所有事件,但是在不同的线程上。 换句话说,我们现在不是在从上游继承的线程上顺序使用事件,而是在io()线程上顺序使用事件。 那么,这个神话般的flatMap()运算符呢?

flatMap()运算符可以进行救援

flatMap()运算符通过将事件流分成子流来启用并发。 但首先,还有一个破碎的示例:

Flowable<Person> asyncLoadBy(UUID id) {return Flowable.fromCallable(() -> slowLoadBy(id));
}Flowable<Person> people = ids.subscribeOn(Schedulers.io()).flatMap(id -> asyncLoadBy(id)); //BROKEN

哦,天哪,这还是坏了flatMap()运算符在逻辑上做两件事:

  • 在每个上游事件上应用转换( id -> asyncLoadBy(id) )–这将产生Flowable<Flowable<Person>> 。 这是有道理的,对于每个上游UUID我们都有一个Flowable<Person>因此最终得到的是Person对象流
  • 然后flatMap()尝试一次订阅所有这些内部子流。 每当任何子流发出Person事件时,它都会作为外部Flowable的结果透明传递。

从技术上讲, flatMap()仅创建并预订前128个(默认情况下,可选的maxConcurrency参数)子流。 同样,当最后一个子流完成时, Person外部流也将完成。 现在,这到底为什么被打破? 除非明确要求,否则RxJava不会引入任何线程池。 例如,这段代码仍在阻塞:

log.info("Setup");
Flowable<String> blocking = Flowable.fromCallable(() -> {log.info("Starting");TimeUnit.SECONDS.sleep(1);log.info("Done");return "Hello, world!";});
log.info("Created");
blocking.subscribe(s -> log.info("Received {}", s));
log.info("Done");

仔细查看输出,特别是涉及的事件和线程的顺序:

19:57:28.847 | INFO  | main | Setup
19:57:28.943 | INFO  | main | Created
19:57:28.949 | INFO  | main | Starting
19:57:29.954 | INFO  | main | Done
19:57:29.955 | INFO  | main | Received Hello, world!
19:57:29.957 | INFO  | main | Done

没有任何并发​​,没有额外的线程。 仅将阻塞代码包装在Flowable中不会神奇地增加并发性。 您必须显式使用… subscribeOn()

log.info("Setup");
Flowable<String> blocking = Flowable.fromCallable(() -> {log.info("Starting");TimeUnit.SECONDS.sleep(1);log.info("Done");return "Hello, world!";}).subscribeOn(Schedulers.io());
log.info("Created");
blocking.subscribe(s -> log.info("Received {}", s));
log.info("Done");

这次的输出更有希望:

19:59:10.547 | INFO  | main | Setup
19:59:10.653 | INFO  | main | Created
19:59:10.662 | INFO  | main | Done
19:59:10.664 | INFO  | RxCachedThreadScheduler-1 | Starting
19:59:11.668 | INFO  | RxCachedThreadScheduler-1 | Done
19:59:11.669 | INFO  | RxCachedThreadScheduler-1 | Received Hello, world!

但是我们上次确实使用了subscribeOn() ,这是怎么回事? 嗯,外部流级别的subscribeOn()基本上说所有事件都应在此流中的不同线程上顺序处理。 我们并没有说应该同时运行许多子流。 并且由于所有子流都处于阻塞状态,因此当RxJava尝试订阅所有子流时,它会有效地依次依次订阅。 asyncLoadBy()并不是真正的async ,因此当flatMap()运算符尝试对其进行订阅时,它会阻塞。 修复很容易。 通常,您会将subscribeOn()放在asyncLoadBy()但出于教育目的,我将其直接放置在asyncLoadBy()道中:

Flowable<Person> people = ids.flatMap(id -> asyncLoadBy(id).subscribeOn(Schedulers.io()));

现在它就像一个魅力! 默认情况下,RxJava将接收前128个上游事件( UUID ),将它们转换为子流并订阅所有这些事件。 如果子流是异步且高度可并行化的(例如,网络调用), asyncLoadBy()获得128个并发调用asyncLoadBy() 。 并发级别(128)可通过maxConcurrency参数配置:

Flowable<Person> people = ids.flatMap(id ->asyncLoadBy(id).subscribeOn(Schedulers.io()),10  //maxConcurrency);

那是很多工作,您不觉得吗? 并发不应该更具声明性吗? 我们不再处理Executor和期货,但似乎这种方法太容易出错。 它不能像Java 8流中的parallel()一样简单吗?

输入ParallelFlowable

让我们首先来看一下我们的示例,并通过添加filter()使它更加复杂:

Flowable<Person> people = ids.map(this::slowLoadBy)     //BROKEN.filter(this::hasLowRisk); //BROKEN

hasLowRisk()慢速谓词:

boolean hasLowRisk(Person p) {//slow...
}

我们已经知道,针对此问题的惯用方法是使用flatMap()两次:

Flowable<Person> people = ids.flatMap(id -> asyncLoadBy(id).subscribeOn(io())).flatMap(p -> asyncHasLowRisk(p).subscribeOn(io()));

asyncHasLowRisk()相当模糊-谓词通过时返回单元素流,失败则返回空流。 这是使用flatMap()模拟filter() flatMap() 。 我们可以做得更好吗? 从RxJava 2.0.5开始,有一个新的运算符叫做… parallel() ! 令人惊讶的是,由于许多误解和滥用,在RxJava成为1.0之前已删除了同名的运算符。 2.x中的parallel()似乎最终以一种安全且声明性的方式解决了惯用并发问题。 首先,让我们看一些漂亮的代码!

Flowable<Person> people = ids.parallel(10).runOn(Schedulers.io()).map(this::slowLoadBy).filter(this::hasLowRisk).sequential();

就这样! parallel()sequential()之间的代码块parallel()运行。 我们有什么在这里? 首先,新的parallel()运算符将Flowable<UUID>转换为ParallelFlowable<UUID> ,该API的API比Flowable小得多。 您将在第二秒看到原因。 可选的int参数(在我们的例子中为10 )定义并发性,或者(如文档所述)定义创建并发“ rails”的数量。 因此,对于我们来说,我们将单个Flowable<Person>分成10个并发的独立轨道(认为是thread )。 来自UUID原始流的事件被拆分( modulo 10 )为彼此独立的不同轨,子流。 将它们视为将上游事件发送到10个单独的线程中。 但是首先我们必须使用方便的runOn()运算符定义这些线程的来源。 这比Java 8流上的parallel()好得多,在Java 8流上,您无法控制并发级别。

至此,我们有了一个ParallelFlowable 。 当事件出现在上游( UUID )中时,它将委派给10个“轨道”,并发,独立的管道之一。 管道提供了可以安全地同时运行的运算符的有限子集,例如map()filter() ,还包括reduce() 。 没有buffer()take()等,因为一次在多个子流上调用它们的语义尚不清楚。 我们的阻塞slowLoadBy()hasLowRisk()仍按顺序调用,但仅在单个“ rail”内部。 因为我们现在有10个并发的“ rails”,所以我们无需花费太多精力就可以有效地并行化它们。

当事件到达子流(“轨道”)的末尾时,它们会遇到sequential()运算符。 该运算符将ParallelFlowableFlowable 。 只要我们的映射器和过滤器是线程安全的, parallel() / sequential()对就提供了非常简单的并行化流的方法。 一个小警告-您将不可避免地使邮件重新排序。 顺序map()filter()始终保留顺序(就像大多数运算符一样)。 但是,一旦在parallel()块中运行它们,顺序就会丢失。 这允许更大的并发性,但是您必须牢记这一点。

是否应该使用parallel()而不是嵌套的flatMap()来并行化代码? 这取决于您,但是parallel()似乎更容易阅读和掌握。

翻译自: https://www.javacodegeeks.com/2017/09/idiomatic-concurrency-flatmap-vs-parallel-rxjava-faq.html

并发查询parallel

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/334538.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

java正则表达式 匹配()_学习Java正则表达式(匹配、替换、查找)

import java.util.ArrayList;import java.util.regex.Matcher;import java.util.regex.Pattern;public class test {public static void main(String[] args) {getStrings(); //用正则表达式获取指定字符串内容中的指定内容System.out.println("********************"…

Linux 命令之 chown -- 用来变更文件或目录的拥有者或所属群组

命令介绍 Linux/Unix 属于多用户多任务操作系统&#xff0c;所有的文件皆有拥有者。利用 chown 命令可以将指定文件的拥有者改为指定的用户或组&#xff0c;用户可以是用户名或者用户ID&#xff0c;组可以是组名或者组ID&#xff0c;文件是以空格分开的要改变权限的文件列表&a…

在excel中如何筛选重复数据_Excel中12招筛选使用大全,小白也能秒变高手

【温馨提示】亲爱的朋友&#xff0c;阅读之前请您点击【关注】&#xff0c;您的支持将是我最大的动力&#xff01;在日常使用Excel处理数据时&#xff0c;相信小伙伴们对于筛选功能已经是不陌生了&#xff0c;Excel筛选功能可以快速有效的帮助我们处理大量的数据&#xff0c;将…

静态内部类实现mysql连接_Java - 静态内部类

Java语言允许在类中再定义类&#xff0c;这种在其它类内部定义的类就叫内部类。内部类又分为&#xff1a;常规内部类、局部内部类、匿名内部类和静态嵌套类四种。1、静态内部类定义静态内部类&#xff0c;定义在类中&#xff0c;任何方法外&#xff0c;用static定义&#xff1b…

Linux 命令之 ll -- 列出当前文件或目录的详细信息

文章目录命令介绍常用选项参考示例&#xff08;一&#xff09;查看当前目录下所有文件的详细信息&#xff0c;且按创建时间排序&#xff08;二&#xff09;按时间倒序&#xff0c;分页排列文件&#xff08;三&#xff09;查看某个目录的详细信息&#xff08;四&#xff09;查看…

cli命令行界面 demo_Java命令行界面(第24部分):MarkUtils-CLI

cli命令行界面 demo本系列的第一篇 有关使用Java解析命令行参数的文章介绍了Apache Commons CLI库。 这是本系列中介绍的基于Java的命令行解析库中最古老的&#xff0c;而且可能是最常用的之一。 Apache Commons CLI确实显示了它的时代&#xff0c;特别是与一些更现代的基于Jav…

python打包成exe_【Python基础】一篇文件教你py文件打包成exe

场景:如果要将我们编写好的代码给别人使用,如果要他们直接使用我们的代码,就需要安装各种编译软件以及第三方模块,还要对软件操作,编程有一定的了解,这对使用者的要求比较高,不是很方便,为了解决这一问题,我们可以选择将我们编写的代码,编译成一个可执行文件,这样,就可以实现跨…

mongodb启用身份验证_为您的Web应用程序启用两因素身份验证

mongodb启用身份验证支持两因素身份验证&#xff08;2FA&#xff09;几乎总是一个好主意&#xff0c;尤其是对于后台系统。 2FA有许多不同的形式&#xff0c;其中一些包括SMS&#xff0c;TOTP甚至是硬件令牌 。 启用它们需要类似的流程&#xff1a; 用户转到其个人资料页面&a…

MySQL 数据库命令之 mysqlshow -- 显示 MySQL 数据库相关信息

文章目录介绍语法格式常用选项参考示例&#xff08;一&#xff09;显示指定数据库中的所有表的记录数和列数&#xff08;二&#xff09;显示指定数据库中所有的数据表的额外信息&#xff08;三&#xff09;查看指定数据库中的所有数据表&#xff08;四&#xff09;显示所有的数…

java中的gui_java gui快速入门教程

JCheckBox和JRadioButton使用示例import java.awt.*;import javax.swing.*;class Hobby extends JPanel {JCheckBox c1 new JCheckBox("写作",false);JCheckBox c2 new JCheckBox("音乐",false);JCheckBox c3 new JCheckBox("跑步",false);JR…

cad图标注释大全_CAD源泉插件快捷键使用教程(全集)

点击直达全集教程地址​www.bilibili.com此插件和海龙工具箱功能相似&#xff01;不建议同时安装&#xff0c;快捷命令冲突。插件工具箱 图文介绍平面空间布置 jj这个命令把我们常规用到的家装空间都已经用上了&#xff0c;除了切换不同空间布置格局&#xff0c;而且图块的样式…

java登录界面命令_Java命令行界面(第30部分):观察

java登录界面命令这个有关Java命令行参数解析的系列文章由四个月来发表的29篇帖子组成&#xff0c;涵盖了28个不同的开放源代码库&#xff0c;可用于解析Java命令行参数。 这篇文章收集了可以从本系列的前29篇文章中得出的一些观点&#xff0c;并提供了选择28个库中的一个或决定…

MySQL 数据库命令之 mysqladmin -- MySQL 服务器管理客户端

文章目录一、介绍二、语法格式三、命令参数&#xff08;一&#xff09;参数默认值&#xff08;二&#xff09;默认参数四、支持的管理命令五、参考示例&#xff08;一&#xff09;每隔两秒查看一次服务器的状态&#xff0c;总共重复 5 次&#xff08;二&#xff09;修改 root 密…

分支限界法 tsp java_基于分支限界法的旅行商问题(TSP)一

//分支限界法#include#include#include#includeconst int INF 100000;const int MAX_N 22;using namespacestd;//n*n的一个矩阵intn;int cost[MAX_N][MAX_N];//最少3个点&#xff0c;最多MAX_N个点structNode{bool visited[MAX_N];//标记哪些点走了int s;//第一个点int s_p;/…

苹果录屏功能没有声音_其实苹果手机也有录屏功能!简单操作几步,就能轻松开启...

现在手机中的娱乐方式越来越多了&#xff0c;大家遇到有趣的事情就想分享给朋友&#xff0c;但是一些视频不能直接分享链接&#xff0c;还是挺麻烦的。不过我们可以通过录屏的方式来进行分享的&#xff0c;其实苹果手机就自带录屏工具&#xff0c;简单操作几步&#xff0c;就能…

Linux 启动/重启/停止 MySQL 数据库的命令

文章目录一、启动 MySQL 数据库的命令&#xff08;一&#xff09;使用命令 service 启动&#xff08;二&#xff09;使用命令 systemctl 启动二、停止 MySQL 数据库的命令&#xff08;一&#xff09;使用命令 service 停止&#xff08;二&#xff09;使用命令 systemctl 停止&a…

natty的异步通信框架_OpenHub框架进行的异步通信

natty的异步通信框架在本系列的前一部分中&#xff0c;我们介绍了OpenHub框架 。 这部分显示了框架最强大的功能之一- 异步消息传递模型 。 当源系统无法等待目标系统的响应时&#xff0c;将使用系统之间的异步通信。 有以下几个原因&#xff1a; 源系统必须尽可能地响应 &am…

java大文件解析_java大文件(百M以上)的上传下载实例解析

javaweb上传文件上传文件的jsp中的部分上传文件同样可以使用form表单向后端发请求&#xff0c;也可以使用 ajax向后端发请求1.通过form表单向后端发送请求Save改进后的代码不需要form标签&#xff0c;直接由控件来实现。开发人员只需要关注业务逻辑即可。JS中已经帮我们封闭好了…

zip直链生成网站_安装网站程序

一、选择网站程序搭建网站的程序有很多博客类&#xff1a; WordPress、 Typecho 、Hexo 等商城类&#xff1a;EcShop、DBShop、NiuShop 等论坛类&#xff1a;Discuz 还有 苹果CMS-影视建站&#xff1b;Tipask-问答程序&#xff1b;可道云KodExplorer-强大易用的私有云/在线文档…

java jni开发_Java JNI开发实践记录

当使用到JNI的时候&#xff0c;基本可以肯定Java的平台移植性注定减弱&#xff0c;接下来记录一次使用Java JNI开发的经历。关于Java JNI的相关资料参见&#xff1a;下面是使用JNI常见三种场景:1.在Java应用中标准Java类库不支持平台相关的特性2.已经存在用其它语言写好的类库&…