给初学者的 RxJava2.0 教程 (八)

Outline

[TOC]

前言

在上一节中, 我们学习了FLowable的一些基本知识, 同时也挖了许多坑, 这一节就让我们来填坑吧.

正题

在上一节中最后我们有个例子, 当上游一次性发送128个事件的时候是没有任何问题的, 一旦超过128就会抛出MissingBackpressureException异常, 提示你上游发太多事件了, 下游处理不过来, 那么怎么去解决呢?

我们先来思考一下, 发送128个事件没有问题是因为FLowable内部有一个大小为128的水缸, 超过128就会装满溢出来, 那既然你水缸这么小, 那我给你换一个大水缸如何, 听上去很有道理的样子, 来试试:

Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(FlowableEmitter<Integer> emitter) throws Exception {for (int i = 0; i < 1000; i++) {Log.d(TAG, "emit " + i);emitter.onNext(i);}}}, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription s) {Log.d(TAG, "onSubscribe");mSubscription = s;}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "onNext: " + integer);}@Overridepublic void onError(Throwable t) {Log.w(TAG, "onError: ", t);}@Overridepublic void onComplete() {Log.d(TAG, "onComplete");}});复制代码

这次我们直接让上游发送了1000个事件,下游仍然不调用request去请求, 与之前不同的是, 这次我们用的策略是BackpressureStrategy.BUFFER, 这就是我们的新水缸啦, 这个水缸就比原来的水缸牛逼多了,如果说原来的水缸是95式步枪, 那这个新的水缸就好比黄金AK , 它没有大小限制, 因此可以存放许许多多的事件.

所以这次的运行结果就是:

zlc.season.rxjava2demo D/TAG: onSubscribe
zlc.season.rxjava2demo D/TAG: emit 0
zlc.season.rxjava2demo D/TAG: emit 1
zlc.season.rxjava2demo D/TAG: emit 2
...
zlc.season.rxjava2demo D/TAG: emit 997
zlc.season.rxjava2demo D/TAG: emit 998
zlc.season.rxjava2demo D/TAG: emit 999复制代码

不知道大家有没有发现, 换了水缸的FLowable和Observable好像是一样的嘛...

不错, 这时的FLowable表现出来的特性的确和Observable一模一样, 因此, 如果你像这样单纯的使用FLowable, 同样需要注意OOM的问题, 例如下面这个例子:

Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(FlowableEmitter<Integer> emitter) throws Exception {for (int i = 0; ; i++) {emitter.onNext(i);}}}, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription s) {Log.d(TAG, "onSubscribe");mSubscription = s;}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "onNext: " + integer);}@Overridepublic void onError(Throwable t) {Log.w(TAG, "onError: ", t);}@Overridepublic void onComplete() {Log.d(TAG, "onComplete");}});复制代码

按照我们以前学习Observable一样, 让上游无限循环发送事件, 下游一个也不去处理, 来看看运行结果吧:

flowable.gif

同样可以看到, 内存迅速增长, 直到最后抛出OOM. 所以说不要迷恋FLowable, 它只是个传说.

可能有朋友也注意到了, 之前使用Observable测试的时候内存增长非常迅速, 几秒钟就OOM, 但这里增长速度却比较缓慢, 可以翻回去看之前的文章中的GIF图进行对比, 这也看出FLowable相比Observable, 在性能方面有些不足, 毕竟FLowable内部为了实现响应式拉取做了更多的操作, 性能有所丢失也是在所难免, 因此单单只是说因为FLowable是新兴产物就盲目的使用也是不对的, 也要具体分场景,

那除了给FLowable换一个大水缸还有没有其他的办法呢, 因为更大的水缸也只是缓兵之计啊, 动不动就OOM给你看.

想想看我们之前学习Observable的时候说到的如何解决上游发送事件太快的, 有一招叫从数量上取胜, 同样的FLowable中也有这种方法, 对应的就是BackpressureStrategy.DROPBackpressureStrategy.LATEST这两种策略.

从名字上就能猜到它俩是干啥的, Drop就是直接把存不下的事件丢弃,Latest就是只保留最新的事件, 来看看它们的实际效果吧.

先来看看Drop:

public static void request() {mSubscription.request(128);}public static void demo3() {Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(FlowableEmitter<Integer> emitter) throws Exception {for (int i = 0; ; i++) {emitter.onNext(i);}}}, BackpressureStrategy.DROP).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription s) {Log.d(TAG, "onSubscribe");mSubscription = s;}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "onNext: " + integer);}@Overridepublic void onError(Throwable t) {Log.w(TAG, "onError: ", t);}@Overridepublic void onComplete() {Log.d(TAG, "onComplete");}});}复制代码

我们仍然让上游无限循环发送事件, 这次的策略选择了Drop, 同时把Subscription保存起来, 待会我们在外部调用request(128)时, 便可以看到运行的结果.

我们先来猜一下运行结果, 这里为什么request(128)呢, 因为之前不是已经说了吗, FLowable内部的默认的水缸大小为128, 因此, 它刚开始肯定会把0-127这128个事件保存起来, 然后丢弃掉其余的事件, 当我们request(128)的时候,下游便会处理掉这128个事件, 那么上游水缸中又会重新装进新的128个事件, 以此类推, 来看看运行结果吧:

drop.gif

从运行结果中我们看到的确是如此, 第一次request的时候, 下游的确收到的是0-127这128个事件, 但第二次request的时候就不确定了, 因为上游一直在发送事件. 内存占用也很正常, drop的作用相信大家也很清楚了.

再来看看Latest吧:

public static void request() {mSubscription.request(128);}public static void demo4() {Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(FlowableEmitter<Integer> emitter) throws Exception {for (int i = 0; ; i++) {emitter.onNext(i);}}}, BackpressureStrategy.LATEST).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription s) {Log.d(TAG, "onSubscribe");mSubscription = s;}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "onNext: " + integer);}@Overridepublic void onError(Throwable t) {Log.w(TAG, "onError: ", t);}@Overridepublic void onComplete() {Log.d(TAG, "onComplete");}});}复制代码

同样的, 上游无限循环发送事件, 策略选择Latest, 同时把Subscription保存起来, 方便在外部调用request(128).来看看这次的运行结果:

latest.gif

诶, 看上去好像和Drop差不多啊, Latest也首先保存了0-127这128个事件, 等下游把这128个事件处理了之后才进行之后的处理, 光从这里没有看出有任何区别啊...

古人云,师者,所以传道受业解惑也。人非生而知之者,孰能无惑?惑而不从师,其为惑也,终不解矣.复制代码

作为初学者的入门导师, 是不能给大家留下一点点疑惑的, 来让我们继续揭开这个疑问.

我们把上面两段代码改良一下, 先来看看DROP的改良版:

 Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(FlowableEmitter<Integer> emitter) throws Exception {for (int i = 0; i < 10000; i++) {  //只发1w个事件emitter.onNext(i);}}}, BackpressureStrategy.DROP).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription s) {Log.d(TAG, "onSubscribe");mSubscription = s;s.request(128);  //一开始就处理掉128个事件}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "onNext: " + integer);}@Overridepublic void onError(Throwable t) {Log.w(TAG, "onError: ", t);}@Overridepublic void onComplete() {Log.d(TAG, "onComplete");}});复制代码

这段代码和之前有两点不同, 一是上游只发送了10000个事件, 二是下游在一开始就立马处理掉了128个事件, 然后我们在外部再调用request(128)试试, 来看看运行结果:

drop_1.gif

这次可以看到, 一开始下游就处理掉了128个事件, 当我们再次request的时候, 只得到了第3317的事件, 后面的事件直接被抛弃了.

再来看看Latest的运行结果吧:

latest_1.gif

从运行结果中可以看到, 除去前面128个事件, 与Drop不同, Latest总是能获取到最后最新的事件, 例如这里我们总是能获得最后一个事件9999.

好了, 关于FLowable的策略我们也讲完了, 有些朋友要问了, 这些FLowable是我自己创建的, 所以我可以选择策略, 那面对有些FLowable并不是我自己创建的, 该怎么办呢? 比如RxJava中的interval操作符, 这个操作符并不是我们自己创建的, 来看下面这个例子吧:

Flowable.interval(1, TimeUnit.MICROSECONDS).observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber<Long>() {@Overridepublic void onSubscribe(Subscription s) {Log.d(TAG, "onSubscribe");mSubscription = s;s.request(Long.MAX_VALUE);}@Overridepublic void onNext(Long aLong) {Log.d(TAG, "onNext: " + aLong);try {Thread.sleep(1000);  //延时1秒} catch (InterruptedException e) {e.printStackTrace();}}@Overridepublic void onError(Throwable t) {Log.w(TAG, "onError: ", t);}@Overridepublic void onComplete() {Log.d(TAG, "onComplete");}});复制代码

interval操作符发送Long型的事件, 从0开始, 每隔指定的时间就把数字加1并发送出来, 在这个例子里, 我们让它每隔1毫秒就发送一次事件, 在下游延时1秒去接收处理, 不用猜也知道结果是什么:

zlc.season.rxjava2demo D/TAG: onSubscribe
zlc.season.rxjava2demo W/TAG: onError: io.reactivex.exceptions.MissingBackpressureException: Can't deliver value 128 due to lack of requestsat io.reactivex.internal.operators.flowable.FlowableInterval$IntervalSubscriber.run(FlowableInterval.java:87)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:428)at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:278)at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:273)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)at java.lang.Thread.run(Thread.java:761)复制代码

一运行就抛出了MissingBackpressureException异常, 提醒我们发太多了, 那么怎么办呢, 这个又不是我们自己创建的FLowable啊...

别慌, 虽然不是我们自己创建的, 但是RxJava给我们提供了其他的方法:

  • onBackpressureBuffer()
  • onBackpressureDrop()
  • onBackpressureLatest()

熟悉吗? 这跟我们上面学的策略是一样的, 用法也简单, 拿刚才的例子现学现用:

Flowable.interval(1, TimeUnit.MICROSECONDS).onBackpressureDrop()  //加上背压策略.observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber<Long>() {@Overridepublic void onSubscribe(Subscription s) {Log.d(TAG, "onSubscribe");mSubscription = s;s.request(Long.MAX_VALUE);}@Overridepublic void onNext(Long aLong) {Log.d(TAG, "onNext: " + aLong);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}@Overridepublic void onError(Throwable t) {Log.w(TAG, "onError: ", t);}@Overridepublic void onComplete() {Log.d(TAG, "onComplete");}});复制代码

其余的我就不一一列举了.

好了, 今天的教程就到这里吧, 这一节我们学习了如何使用内置的BackpressureStrategy来解决上下游事件速率不均衡的问题. 这些策略其实之前我们将Observable的时候也提到过, 其实大差不差, 只要理解了为什么会上游发事件太快, 下游处理太慢这一点, 你就好处理了, FLowable无非就是给你封装好了, 确实对初学者友好一点, 但是很多初学者往往只知道How, 却不知道Why, 最重要的其实是知道why, 而不是How.

(其余的教程大多数到这里就结束了, 但是, 你以为FLowable就这么点东西吗, 骚年, Too young too simple, sometimes naive! 这仅仅是开始, 真正牛逼的还没来呢. 敬请关注下一节, 下节见 ! )

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/283158.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

mysql数据库的目录_了解MySQl数据库目录

数据库目录是MySQL数据库服务器存放数据文件的地方&#xff0c;不仅包括有关表的文件&#xff0c;还包括数据文件和MySQL的服务器选项文件。不同的分发&#xff0c;数据库目录的缺省位置是不同的。数据目录的位置缺省的数据库位置缺省数据库的位置编译在服务器中。◆如果您是在…

修改docker的默认存储位置及镜像存储位置

2019独角兽企业重金招聘Python工程师标准>>> 方法一、软链接 默认情况下Docker的存放位置为&#xff1a;/var/lib/docker 可以通过下面命令查看具体位置&#xff1a; sudo docker info | grep "Docker Root Dir" 解决这个问题&#xff0c;最直接的方法当然…

微信小程序第三方服务公司有哪些

虽然微信小程序还没有正式推出&#xff0c;但围绕着微信小程序第三方服务公司之间的战争早已经开始。他们在小程序生成工具&#xff08;一键生成小程序&#xff0c;无需开发&#xff09;、微信小程序开发工具、小程序数据统计等领域展开激烈竞争&#xff0c;我们一起来看看微信…

python maketrans_Python maketrans()方法 - Python 教程 - 自强学堂

Python maketrans()方法描述Python maketrans() 方法用于创建字符映射的转换表&#xff0c;对于接受两个参数的最简单的调用方式&#xff0c;第一个参数是字符串&#xff0c;表示需要转换的字符&#xff0c;第二个参数也是字符串表示转换的目标。注&#xff1a;两个字符串的长度…

Blazor University (49)依赖注入 —— 比较依赖范围

原文链接&#xff1a;https://blazor-university.com/dependency-injection/dependency-lifetimes-and-scopes/comparing-dependency-scopes/比较依赖范围源代码[1]在本节中&#xff0c;我们将创建一个 Blazor 应用程序来演示各种依赖注入作用域的不同生命周期。为此&#xff0…

CSS选择器的权重与优先规

我们把特殊性分为4个等级&#xff0c;每个等级代表一类选择器&#xff0c;每个等级的值为其所代表的选择器的个数乘以这一等级的权值&#xff0c;最后把所有等级的值相加得出选择器的特殊值。 4个等级的定义如下&#xff1a; 第一等&#xff1a;代表内联样式&#xff0c;如: st…

设计模式概论

此文转载于 http://blog.csdn.net/hguisu/article/details/74968191. 设计模式设计模式&#xff08;Design pattern&#xff09;是一套被反复使用、多数人知晓的、经过分类编目的、代码设计经验的总结。使用设计模式是为了可重用代码、让代码更容易被他人理解、保证代码可靠性…

只需要2个工具,百度云盘大文件就能用迅雷和IDM下载

不会代码&#xff0c;不懂脚本&#xff0c;没关系 &#xff0c;能找到一座通往它们的桥梁&#xff0c;照样能到达彼岸。 这里以360极速浏览器为例。 在浏览器地址框输入以下地址直接到达浏览器安装扩展插件的地方&#xff08;偷个懒&#xff0c;复制网址吧&#xff09;&#xf…

rsync服务器的配置

一、rsync 简介Rsync&#xff08;remote synchronize&#xff09;是一个远程数据同步工具&#xff0c;可通过LAN/WAN快速同步多台主机间的文件&#xff0c;也可以使用 Rsync 同步本地硬盘中的不同目录。 Rsync 是用于取代rcp的一个工具&#xff0c;Rsync使用所谓的 “Rsync 算法…

用ajax连接mysql_页面用ajax实现简单的连接数据库

(1) 写发送代码var myXmlHttpRequest "";myXmlHttpRequest getXmlHttpRequest();if (myXmlHttpRequest) { //xmlHttpRequest创建成功了&#xff0c;才能发送请求//地址一定要写正确var url "../zhuCe/zhuCeYanZheng.aspx?username" $(Text1).value;m…

Vue学习笔记入门篇——数据及DOM

本文为转载&#xff0c;原文&#xff1a;Vue学习笔记入门篇——数据及DOM 数据 data 类型 Object | Function 详细 Vue 实例的数据对象。Vue 将会递归将 data 的属性转换为 getter/setter&#xff0c;从而让 data 的属性能够响应数据变化。对象必须是纯粹的对象(含有零个或多个…

Thinkphp 3.2中控制页面不缓存

最近开发WAP网站时&#xff0c;最讨厌的就是back键&#xff0c;会造成些麻烦事。不过&#xff0c;问题总有办法解决。 有些页面&#xff0c;点击back键回退会加载缓存&#xff0c;这不是想要的&#xff0c;所以希望能够控制该页面不缓存&#xff0c;每次请求都需要从服务器获取…

能识别nvme的pe启动_PE系统纯净(可以识别nvme固态)

此前我一直用的微PE系统&#xff0c;纯净没有广告&#xff0c;但是我却发现不能识别nvme固态&#xff0c;于是就寻找可以识别nvme固态的PE&#xff0c;虽然知道老毛桃可以用&#xff0c;但是我并不想用。然后网上查到可以自动向PE系统中添加nvme的驱动&#xff0c;从而就可以识…

H3CNE认证

H3CNE&#xff08;H3C Certified Network Engineer&#xff0c;H3C认证网络工程师&#xff09;H3CNE认证主要定位于中小型网络的规划、设计、配置与维护。通过H3CNE认证&#xff0c;将证明您对数据通信网络有全面深入的了解&#xff0c;掌握面向中小型企业的网络通用技术&#…

BZOJ 3144 [Hnoi2013]切糕

3144: [Hnoi2013]切糕 Description Input 第一行是三个正整数P,Q,R&#xff0c;表示切糕的长P、 宽Q、高R。第二行有一个非负整数D&#xff0c;表示光滑性要求。接下来是R个P行Q列的矩阵&#xff0c;第z个 矩阵的第x行第y列是v(x,y,z) (1≤x≤P, 1≤y≤Q, 1≤z≤R)。 100%的数据…

java java 大端_Java 大小端转换

package nlp.nlp;/*** 小端数据&#xff0c;Byte转换**/public class ByteConvert {public static void main(String[] args) {ByteConvert c new ByteConvert();c.Int2Bytes_LE(126);}public static final int UNICODE_LEN 2;/*** int转换为小端byte[](高位放在高地址中)* p…

《ASP.NET Core 6框架揭秘》实例演示[18]:HttpClient处理管道

在《《ASP.NET Core 6框架揭秘》实例演示[17]&#xff1a;利用IHttpClientFactory工厂来创建HttpClient》之后&#xff0c;我们将关注点放到HttpClient对象上。我们知道ASP.NET的核心就是由中间件组成的请求处理管道&#xff0c;HttpClient也采用了类似的设计。HttpClient管道由…

腾讯云副总裁答治茜:移动互联网破局要借助“三张网”

5月24日&#xff0c;2018腾讯云未来峰会在广州召开。在互联网专场上&#xff0c;腾讯云副总裁答治茜就泛互联网行业云化的主题发表演讲。在演讲中答治茜表示&#xff0c;过去移动互联网的高速增长到现在已经遇到了一个天花板&#xff0c;需要借助马化腾提到的“人联网、物联网、…

雅诗兰黛天猫超级品牌日:未央唇膏、红装小棕瓶“当红不让”

随着年末圣诞季的临近&#xff0c;各大美妆品牌陆续推出了圣诞套装&#xff0c;红红火火的超豪华套装&#xff0c;算是对用户最实在的回馈。高端美妆品牌的“领头羊”雅诗兰黛&#xff0c;当然也“当红不让”&#xff0c;趁着圣诞季&#xff0c;与天猫超级品牌日联手打造了一场…

JAVA常见算法题(三十一)---冒泡排序

package com.jege.spring.boot.hello.world;/*** java算法之冒泡排序<br>* 将数组按照从大到小的顺序排列<br>* * * author Administrator**/ public class BubbleSort{public static void main(String[] args){int score[] {67, 69, 75, 87, 89, 90, 99, 100};fo…