flatMap()与concatMap()与concatMapEager()– RxJava常见问题解答

RxJava 2.x中共有三个无缝相似的运算符: flatMap()concatMap()concatMapEager() 。 它们都接受相同的参数-从原始流的单个项目到任意类型的(子)流的函数。 换句话说,如果您有Flowable<T>则可以为任意R类型提供从TFlowable<R>的函数。 应用任何这些运算符后,您最终得到Flowable<R> 。 那么它们有何不同?

样例项目

首先,让我们构建一个示例应用程序。 我们将使用Retrofit2 HTTP客户端包装器,该包装器具有RxJava2的内置插件。 我们的任务是利用GeoNames API来查找世界上任何城市的人口。 该界面如下所示:

public interface GeoNames {Flowable<Long> populationOf(String city);}

该接口的实现由Retrofit自动生成,向下滚动以查看胶粘源代码。 暂时假设我们有一个函数,该函数采用具有城市名称的String并异步返回具有该城市人口的单元素流。 还要假设我们有固定的城市要查找:

Flowable<String> cities = Flowable.just("Warsaw", "Paris", "London", "Madrid"
);

我们的目标是获取每个城市的人口。

带有concatMap()的示例应用程序如下所示:

cities.concatMap(geoNames::populationOf).subscribe(response -> log.info("Population: {}", response));

在看到结果之前,让我们研究一下concatMap()在做什么。 对于每个上游事件( 城市 ),它都调用一个函数,该函数用(子)流替换该事件。 在我们的情况下,它是Long的一元流( Flowable<Long> )。 因此,与所有运算符进行比较之后,我们最终得到的是Long流( Flowable<Flowable<Long>> )流。 当我们分析操作员为展平此类嵌套流所做的操作时,就会出现真正的区别。

concatMap()将首先订阅第一concatMap()流( Flowable<Long>代表华沙的人口)。 订阅实际上是指进行物理HTTP调用。 仅当第一concatMap()流完成时(在我们的情况下发出单个Long并发出完成信号), concatMap()才会继续。 继续意味着订阅第二个子流并等待其完成。 最后一个子流完成时,结果流完成。 这导致了随后的潮流:1702139,2138551,7556900和3255944。因此,恰好是华沙,巴黎,伦敦和马德里的人口。 输出顺序完全可以预测。 但是它也是完全顺序的。 完全没有并发发生,只有在第一个HTTP结束时才进行第二个HTTP调用。 RxJava所增加的复杂性根本没有回报:

23:33:33.531 | Rx-1 | --> GET .../searchJSON?q=Warsaw http/1.1
23:33:33.656 | Rx-1 | <-- 200 OK .../searchJSON?q=Warsaw (123ms)
23:33:33.674 | Rx-1 | Population: 1702139
23:33:33.676 | Rx-1 | --> GET .../searchJSON?q=Paris http/1.1
23:33:33.715 | Rx-1 | <-- 200 OK .../searchJSON?q=Paris (38ms)
23:33:33.715 | Rx-1 | Population: 2138551
23:33:33.716 | Rx-1 | --> GET .../searchJSON?q=London http/1.1
23:33:33.754 | Rx-1 | <-- 200 OK .../searchJSON?q=London (37ms)
23:33:33.754 | Rx-1 | Population: 7556900
23:33:33.755 | Rx-1 | --> GET .../searchJSON?q=Madrid http/1.1
23:33:33.795 | Rx-1 | <-- 200 OK .../searchJSON?q=Madrid (40ms)
23:33:33.796 | Rx-1 | Population: 3255944

如您所见,没有多线程发生,请求是顺序的,彼此等待。 从技术上讲,并非所有这些都必须在同一线程中发生,但是它们绝不会重叠并且可以利用并发性。 最大的好处是可以保证结果事件的顺序,一旦我们进入flatMap() ,就不会那么明显了……

flatMap()代码几乎完全相同:

cities.flatMap(geoNames::populationOf).subscribe(response -> log.info("Population: {}", response));

就像之前一样,我们从Long流开始( Flowable<Flowable<Long>> )。 但是, flatMap()运算符不是一次又一次地订阅每个子流,而是急切地一次订阅所有子流。 这意味着我们看到在不同线程中同时启动多个HTTP请求:

00:10:04.919 | Rx-2 | --> GET .../searchJSON?q=Paris http/1.1
00:10:04.919 | Rx-1 | --> GET .../searchJSON?q=Warsaw http/1.1
00:10:04.919 | Rx-3 | --> GET .../searchJSON?q=London http/1.1
00:10:04.919 | Rx-4 | --> GET .../searchJSON?q=Madrid http/1.1
00:10:05.449 | Rx-3 | <-- 200 OK .../searchJSON (529ms)
00:10:05.462 | Rx-3 | Population: 7556900
00:10:05.477 | Rx-1 | <-- 200 OK .../searchJSON (557ms)
00:10:05.478 | Rx-1 | Population: 1702139
00:10:05.751 | Rx-4 | <-- 200 OK .../searchJSON (831ms)
00:10:05.752 | Rx-4 | Population: 3255944
00:10:05.841 | Rx-2 | <-- 200 OK .../searchJSON (922ms)
00:10:05.843 | Rx-2 | Population: 2138551

当任何基础子流中的任何一个发出任何值时,它将立即向下游传递给订户。 这意味着我们现在可以在事件发生时即时处理事件。 请注意,结果流是乱序的。 我们收到的第一个事件是7556900,恰好是伦敦的人口,在第一流中排名第二。 与concatMap()相反, flatMap()无法保留顺序,因此以“随机”顺序发出值。 好吧,不是真正随机的,我们只是在它们可用时立即接收值。 在此特定执行中,首先是针对伦敦的HTTP响应,但绝对不能保证。 这导致一个有趣的问题。 我们有各种各样的人口价值流和最初的城市流。 但是,输出流可以是事件的任意排列,并且我们不知道哪个人口对应哪个城市。 我们将在后续文章中解决此问题。

concatMapEager()似乎带来了两全其美:并发性和输出事件的有保证顺序:

cities.concatMapEager(geoNames::populationOf).subscribe(response -> log.info("Population: {}", response));

在了解了concatMap()flatMap()功能之后,了解concatMapEager()相当简单。 急切地让流concatMapEager()流( duh! )同时预订所有子流。 但是,此运算符可确保首先传播第一个子流的结果,即使它不是要完成的第一个子流也是如此。 一个示例将Swift揭示这意味着什么:

00:34:18.371 | Rx-2 | --> GET .../searchJSON?q=Paris http/1.1
00:34:18.371 | Rx-3 | --> GET .../searchJSON?q=London http/1.1
00:34:18.371 | Rx-4 | --> GET .../searchJSON?q=Madrid http/1.1
00:34:18.371 | Rx-1 | --> GET .../searchJSON?q=Warsaw http/1.1
00:34:18.517 | Rx-3 | <-- 200 OK .../searchJSON?q=London (143ms)
00:34:18.563 | Rx-1 | <-- 200 OK .../searchJSON?q=Warsaw (189ms)
00:34:18.565 | Rx-1 | Population: 1702139
00:34:20.460 | Rx-2 | <-- 200 OK .../searchJSON?q=Paris (2086ms)
00:34:20.460 | Rx-4 | <-- 200 OK .../searchJSON?q=Madrid (2086ms)
00:34:20.461 | Rx-2 | Population: 2138551
00:34:20.462 | Rx-2 | Population: 7556900
00:34:20.462 | Rx-2 | Population: 3255944

我们立即启动四个HTTP请求。 从日志输出中,我们可以清楚地看到伦敦的居民首先被返回。 但是,订户没有收到它,因为华沙尚未到来。 巧合的是,华沙排名第二,因此华沙人口可以在下游传递给订户。 不幸的是,伦敦人口必须等待更多,因为首先我们需要巴黎人口。 巴黎(紧随其后是马德里)完成后,所有剩余结果都将传递到下游。

请注意,即使人口充足,伦敦的人口也必须等待休眠,直到华沙和巴黎完成。 那么concatMapEager()是最好的并发运算符吗? 不完全的。 想象一下,我们有一个数千个城市的列表,每一个城市我们都获取一张1MB的图片。 使用concatMap()我们可以依次(即缓慢concatMap()下载图片。 使用flatMap()可以同时下载图片,并在图片到达时尽快进行处理。 现在, concatMapEager()呢? 在最坏的情况下,我们可以使用concatMapEager()缓存999张图片,因为来自第一个城市的图片恰好是最慢的。 即使我们已经拥有99.9%的结果,但由于我们执行严格的排序,因此我们无法对其进行处理。

使用哪个运算符?

flatMap()应该是您的首选武器。 它允许与流行为进行有效的并发。 但是要准备好接收乱序的结果。 仅当提供的转换速度如此之快,顺序处理不是问题时, concatMap()才能很好地工作。 concatMapEager()非常方便,但是要注意内存消耗。 同样在最坏的情况下,您可能最终会闲置,等待很少的响应。

附录:配置Retrofit2客户端

实际上,我们在本文中始终使用的GeoNames服务接口如下所示:

public interface GeoNames {@GET("/searchJSON")Single<SearchResult> search(@Query("q") String query,@Query("maxRows") int maxRows,@Query("style") String style,@Query("username") String username);default Flowable<Long> populationOf(String city) {return search(city, 1, "LONG", "s3cret").map(SearchResult::getGeonames).map(g -> g.get(0)).map(Geoname::getPopulation).toFlowable();}}

非默认方法的实现由Retrofit2自动生成。 请注意,为简单起见, populationOf()返回一个元素的Flowable<Long> 。 但是,要完全拥抱此API的本质,在现实世界中,其他实现将更为合理。 首先, SearchResult类返回结果的有序列表(省略了获取器/设置器):

class SearchResult {private List<Geoname> geonames = new ArrayList<>();
}class Geoname {private double lat;private double lng;private Integer geonameId;private Long population;private String countryCode;private String name;
}

毕竟,世界上有许多华沙和伦敦 。 我们默默假设列表将包含至少一个元素,而第一个是正确的匹配。 更合适的实现应返回所有匹配,甚至返回更好的Maybe<Long>类型以反映没有匹配项:

default Maybe<Long> populationOf(String city) {return search(city, 1, "LONG", "nurkiewicz").flattenAsFlowable(SearchResult::getGeonames).map(Geoname::getPopulation).firstElement();
}

粘合代码如下所示。 首先Jackson的设置,以便解析来自API的响应:

import com.fasterxml.jackson.databind.ObjectMapper;private ObjectMapper objectMapper() {return new ObjectMapper().configure(FAIL_ON_UNKNOWN_PROPERTIES, false);
}

FAIL_ON_UNKNOWN_PROPERTIES通常是您想要的。 否则,您必须映射JSON响应中的所有字段,并且当API生产者引入新的或向后兼容的字段时,代码将中断。 然后我们设置OkHttpClient ,由Retrofit在下面使用:

import okhttp3.OkHttpClient;
import okhttp3.logging.HttpLoggingInterceptor;private OkHttpClient client() {HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor();interceptor.setLevel(HttpLoggingInterceptor.Level.BASIC);return new OkHttpClient.Builder().addInterceptor(interceptor).build();
}

有时您可以跳过OkHttp客户端的配置,但是我们添加了日志拦截器。 默认情况下,OkHttp使用java.util.logging日志记录,因此为了使用体面的日志记录框架,我们必须在开始时就安装网桥:

import org.slf4j.bridge.SLF4JBridgeHandler;static {SLF4JBridgeHandler.removeHandlersForRootLogger();SLF4JBridgeHandler.install();
}

最后进行改造:

import io.reactivex.schedulers.Schedulers;
import retrofit2.Retrofit;
import retrofit2.adapter.rxjava2.RxJava2CallAdapterFactory;
import retrofit2.converter.jackson.JacksonConverterFactory;GeoNames createClient() {return new Retrofit.Builder().client(client()).baseUrl("http://api.geonames.org").addCallAdapterFactory(RxJava2CallAdapterFactory.createWithScheduler(Schedulers.io())).addConverterFactory(JacksonConverterFactory.create(objectMapper())).build().create(GeoNames.class);
}

调用createClient()将产生GeoNames接口的动态实现。 我们使用了以下依赖项:

compile 'io.reactivex.rxjava2:rxjava:2.0.6'compile 'com.squareup.retrofit2:adapter-rxjava2:2.3.0'
compile 'com.squareup.retrofit2:converter-jackson:2.0.1'
compile 'com.squareup.okhttp3:logging-interceptor:3.8.0'compile 'ch.qos.logback:logback-classic:1.1.7'
compile 'org.slf4j:slf4j-api:1.7.21'
compile 'org.slf4j:jul-to-slf4j:1.7.21'

翻译自: https://www.javacodegeeks.com/2017/08/flatmap-vs-concatmap-vs-concatmapeager-rxjava-faq.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/349167.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

android的padding属性,以编程方式获取android:padding属性

从一个角度来看&#xff0c;如何以编程方式获取android&#xff1a;padding属性的值&#xff1f; 我目前正在使用&#xff1a;private static final String ANDROID_NAMESPACE "http://schemas.android.com/apk/res/android"; private static final String ATTRIBUT…

Java命令行界面(第25部分):JCommando

JCommando 网站 上将JCommando描述为“命令行参数的Java参数解析器”。 JCommando读取XML配置以生成一个Java类&#xff0c;该类处理Java应用程序中的解析。 在提供XML配置的 Java命令行解析库的本系列文章中&#xff0c;以前涵盖的唯一基于Java的库是JSAP &#xff0c;但这是该…

xss 全编码两次_XSS进阶

原标题&#xff1a;XSS进阶1、XSS常用语句及编码绕过XSS常用的测试语句有&#xff1a;●<>alert(1)>●●●常见的XSS的绕过编码有JS编码、HTML实体编码和URL编码。(1)JS编码JS提供了四种字符编码的策略&#xff0c;如下所示。●三个八进制数字&#xff0c;如果个数不够…

鸿蒙系统6月可升级,华为鸿蒙2.0系统大规模升级从6月开始?

你现在使用的智能手机是什么&#xff0c;苹果的iPhone手机&#xff0c;华为智能手机&#xff0c;小米手机还是其他智能手机。从操作系统来看目前的智能手机基本上可以分成iPhone手机跟安卓智能手机&#xff0c;其中iPhone手机使用的是iOS系统&#xff0c;而其他智能手机操作系统…

rstudio安装后如何打开_请问Rstudio安装后无法运行怎么弄?

---------------------------RStudio---------------------------The R session had a fatal error.ERROR system error 5 (&#xfffd;ܾ&#xfffd;&#xfffd;&#xfffd;&#xfffd;ʡ&#xfffd;) [pathC:/Users/&#xfffd;Ű&#xfffd;/AppData/Local/Temp/Rtm…

wso2 esb_WSO2 ESB的一种消息传递方式

wso2 esb正如我之前在WSO2 ESB工作时所发布的那样。 为了更好地理解此ESB&#xff0c;我一直在浏览示例 &#xff08;尚未完成所有示例 &#xff09;。 示例12是关于与ESB的单向消息传递&#xff0c;并使用TCP监视器使其可见。 我之前已经介绍过如何设置类似的工具“ TcpTunnel…

android 录像机,android 录像机

一直都做camera 录像功能其实知道的很少&#xff0c;以前也是迷迷糊糊知道怎么写个video&#xff0c;今天测试了一下&#xff0c;各种问题。问题来源首先是对于SDK的阅读不够仔细。 实践的比较少。 其实所谓的录像 就是两个类的结合 一个是Camera 一个是MediaRecorder 这两个类…

vue 圆形百分比进度条_uniapp Vue 圆环进度条

mode"aspectFill">export default {name: ,props: {},data() {return {animationData: {},audioCoverImg: ../../static/player/normal.png,}},created() {this.drawProgressbg()console.log("动画插件 已经onLoad");},mounted() {},methods: {updateIn…

Spring中的异步和事务性事件侦听器

内置的事件发布功能从Spring的早期版本开始存在&#xff0c;对于处理同一应用程序上下文中Spring组件之间的基本通信仍然有用。 通常&#xff0c;应用程序可以生成应用程序事件&#xff08;可以是任意对象&#xff09;并侦听它们。 整个机制非常简单&#xff1a;使用Applicatio…

html5 canvas获取坐标,HTML5 canvas坐标

在canvas当中有一个特殊的东西叫做“坐标”&#xff01;没错&#xff0c;就是平时所熟知的坐标体系。canvas拥有自己的坐标体系&#xff0c;从最上角0, 0开始&#xff0c;X向右是增大&#xff0c;Y向下是增大。也可以借助CSS当中的盒子模型的概念来帮助理解。尽管canvas元素功能…

linux u盘 慢_u盘加载较慢 建议优化 - 卡饭网

U盘加载速度十分缓慢的原因及解决方法U盘加载速度十分缓慢的原因及解决方法 很多朋友在使用U盘的时候都遇到过电脑接入U盘后&#xff0c;加载读取文件的速度十分的缓慢&#xff0c;总是要等上一段时间才能完全读取&#xff0c;这是怎么回事呢&#xff1f;该怎么处理&#xff1f…

Java命令行界面(第21部分):航空公司2

本系列文章的第21篇关于Java中解析命令行参数的文章的重点是Airline 2库。 Airline 2的GitHub项目页面描述了该库&#xff0c;“ Airline是一个Java库&#xff0c;提供了基于注释的框架来解析命令行界面。” 该页面进入状态&#xff1a;航空公司“既支持简单的单个命令&#xf…

android中进度条的使用,android的进度条使用

android的进度条1、实现的效果2、布局代码先写一个my_browser.xml文件 存放WebViewandroid:layout_width"fill_parent"android:layout_height"fill_parent"android:orientation"vertical" >android:id"id/webView"android:layout_w…

taro 重新加载小程序_Taro开发微信小程序的初体验

了解Taro听说Taro是从几个星期前开始的&#xff0c;在一次饭桌上&#xff0c;一个小伙伴说&#xff1a;“Hey&#xff0c; 你听说了Taro么&#xff0c;听说只需要写一套程序就可以生成H5&#xff0c;小程序以及RN的代码模板&#xff0c;并且类似于React的语法。”“哦&#xff…

应用服务器web服务器_最受欢迎的应用服务器

应用服务器web服务器这是本系列的第二篇文章&#xff0c;我们将发布有关Java安装的统计数据。 使用的数据集来自免费的Plumbr安装&#xff0c;在过去六个月中&#xff0c;我们总共收集了1,024个不同的环境。 该系列的第一篇文章分析了基础-运行JVM的操作系统&#xff0c;是32位…

python 类中定义列表_Python-从类定义中的列表理解访问类变量

小编典典类范围和列表&#xff0c;集合或字典的理解以及生成器表达式不混合。为什么&#xff1b;或者&#xff0c;官方用词在Python 3中&#xff0c;为列表理解赋予了它们自己的适当范围(本地名称空间)&#xff0c;以防止其局部变量渗入周围的范围内(即使在理解范围之后&#x…

mvc html 生成图片,asp.net mvc5 cs代码中获取视图生成后的HTML

public static class ViewExtensions { /// /// 在控制器内获取指定视图生成后的HTML /// /// 当前控制器的上下文 /// 视图名称 /// 视图所需要的参数 /// 视图生成的HTML public static string GetViewHtml(this ControllerContext context, string viewName, Object param) …

如何在Java 8中将Lambda表达式转换为方法引用?

如果您使用Java 8进行编码&#xff0c;那么您会知道使用方法引用代替lambda表达式会使您的代码更具可读性&#xff0c;因此建议尽可能使用方法引用替换lambda表达式&#xff0c;但是&#xff0c;最大的问题是&#xff0c;您如何查找是否可以用方法引用替换lambda&#xff1f; 是…

最大化窗口设置_打开表格总是默认窗口最小化?适用Word、PPT等其他应用

今天有小哥哥说每天早上上班打开第一个表格时&#xff0c;这样显示&#xff1b;打开第二个表时是这样显示&#xff1b;每次打开第二个表后点最大化再打开其他的表才正常显示为最大化的状态。其实&#xff0c;这只是表格的默认打开方式改变了&#xff0c;我们改正过来就好了。在…

html5块元素代码,html5 区块与内联div 与span html块级元素(示例代码)

HTML 和 可以通过 和 将 HTML 元素组合起来。HTML 块元素大多数 HTML 元素被定义为块级元素或内联元素。编者注&#xff1a;"块级元素"译为 block level element&#xff0c;"内联元素"译为 inline element。块级元素在浏览器显示时&#xff0c;通常会以新…