使用Flowable.generate()生成可感知背压的流– RxJava常见问题解答

RxJava缺少创建无限自然数流的工厂。 这样的流很有用,例如,当您想通过压缩两个事件的唯一序列号给可能的无限事件流时:

Flowable<Long> naturalNumbers = //???Flowable<Event> someInfiniteEventStream = //...
Flowable<Pair<Long, Event>> sequenced = Flowable.zip(naturalNumbers,someInfiniteEventStream,Pair::of
);

实现naturalNumbers令人惊讶地复杂。 在RxJava 1.x中,您可以短暂地放弃不遵守背压的Observable

import rx.Observable;  //RxJava 1.xObservable<Long> naturalNumbers = Observable.create(subscriber -> {long state = 0;//poor solution :-(while (!subscriber.isUnsubscribed()) {subscriber.onNext(state++);}
});

这样的流没有背压是什么意思? 好吧,基本上,流可以轻松地以CPU内核允许的速度生成事件( state变量不断增加),每秒数百万。 但是,当使用者无法快速使用事件时,未处理事件的积压开始出现:

naturalNumbers
//      .observeOn(Schedulers.io()).subscribe(x -> {//slooow, 1 millisecond});

上面的程序(带有observeOn()运算符的注释掉)可以正常运行,因为它具有意外的反压。 默认情况下,所有内容在RxJava中都是单线程的,因此生产者和使用者在同一个线程中工作。 实际上,调用subscriber.onNext()会阻止,因此while循环会自动对其进行限制。 但是,尝试取消注释observeOn() ,灾难会在几毫秒后发生。 订阅回调在设计上是单线程的。 对于每个元素,它至少需要1毫秒,因此该流每秒可以处理不超过1000个事件。 我们有些幸运。 RxJavaSwift发现这种灾难性状况,并因MissingBackpressureException而快速失败

我们最大的错误是在生成事件时没有考虑消费者的速度。 顺便说一下,这是响应流背后的核心思想:不允许生产者发出比消费者请求更多的事件。 在RxJava 1.x中,即使实现最简单的流(从头开始考虑背压)也不是一件容易的事。 RxJava 2.x带来了一些便利的运算符,这些运算符建立在先前版本的经验基础之上。 首先RxJava 2.x时不允许你实现Flowable (背压-aware)的相同的方式,你可以与Observable 。 创建Flowable会使消费者使消息过载是不可能的:

Flowable<Long> naturalNumbers = Flowable.create(subscriber -> {long state = 0;while (!subscriber.isCancelled()) {subscriber.onNext(state++);}
}, BackpressureStrategy.DROP);

您是否发现了这个额外的DROP参数? 在解释之前,让我们看一下使用慢速用户订阅时的输出:

0
1
2
3
//...continuous numbers...
126
127
101811682
//...where did my 100M events go?!?
101811683
101811684
101811685
//...continuous numbers...
101811776
//...17M events disappeared again...
101811777
//...

你的旅费可能会改变。 怎么了? observeOn()运算符在调度程序(线程池)之间切换。 从未决事件队列中合并的线程池。 此队列是有限的,容量为128个元素。 知道此限制的observeOn()运算符仅从上游请求128个元素(我们的自定义Flowable )。 此时,它使我们的订户可以处理事件,每毫秒1次。 因此,大约100毫秒后, observeOn()发现其内部队列几乎为空,并要求更多。 会得到128、129、130…吗? 没有! 我们的Flowable在这0.1秒内产生了疯狂的事件,并且(令人惊讶地)在该时间范围内成功产生了超过1亿个数字。 他们去哪了 好吧, observeOn()并没有要求它们,因此DROP策略(强制性参数)只是丢弃了不需要的事件。

BackpressureStrategy

听起来不对,还有其他策略吗? 是的,很多:

  • BackpressureStrategy.BUFFER :如果上游产生太多事件,则将它们缓冲在无界队列中。 没有任何事件丢失,但是您的整个应用程序很可能会丢失。 如果幸运的话, OutOfMemoryError将拯救您。 我停留在5秒以上的GC暂停中。
  • BackpressureStrategy.ERROR :如果发现事件的过度产生,将抛出MissingBackpressureException 。 这是一个理智(安全)的策略。
  • BackpressureStrategy.LATEST :类似于DROP ,但是记住上次删除的事件。 万一要求提供更多数据,但我们只是丢弃了所有内容–至少具有最后看到的价值。
  • BackpressureStrategy.MISSING :没有安全措施,请加以处理。 下游运算符之一(如observeOn() )最有可能抛出MissingBackpressureException
  • BackpressureStrategy.DROP :删除未请求的事件。

顺便说一句,当您将Observable变为Flowable还必须提供BackpressureStrategy 。 RxJava必须知道如何限制过量产生的Observable 。 好的,那么简单的序列自然数流的正确实现是什么?

认识

create()generate()之间的区别在于责任。 假设Flowable.create()会在不考虑背压的情况下完整地生成流。 它只是在需要时才产生事件。 另一方面,仅允许Flowable.generate()一次生成一个事件(或完成流)。 背压机制透明地计算出当前需要多少个事件。 generate()调用适当的次数,例如,在observeOn()情况下, observeOn() 128次。

因为此运算符一次生成一个事件,所以通常需要某种状态来确定上次出现的时间1 。 这就是generate()含义:(im)可变状态的持有者和基于该状态生成下一个事件的函数:

Flowable<Long> naturalNumbers =Flowable.generate(() -> 0L, (state, emitter) -> {emitter.onNext(state);return state + 1;});

generate()的第一个参数是初始状态(工厂),在本例中为0L 。 现在,每当订户或任何下游操作员要求一定数量的事件时,都会调用lambda表达式。 它的职责是根据提供的状态最多调用一次onNext() (最多发出一个事件)。 首次调用lambda时, state等于初始值0L 。 但是,我们可以修改状态并返回其新值。 在此示例中,我们增加了long以便随后的lambda表达式调用收到state = 1L 。 显然,这种情况不断发生,产生连续的自然数。

这样的编程模型显然比while循环难。 它还从根本上改变了实现事件源的方式。 与其在任何时候都想推送事件,不如只是被动地等待请求。 下游运营商和订户正在从您的流中提取数据。 这种转变可在管道的所有级别上产生背压。

generate()有一些风格。 首先,如果您的状态是可变对象,则可以使用不需要返回新状态值的重载版本。 尽管功能较少,但可变状态往往会产生较少的垃圾。 这假设您的状态不断变化,并且每次都传递相同的状态对象实例。 例如,您可以轻松地将Iterator (也是基于pull的!)变成具有反压奇观的流:

Iterator<Integer> iter = //...Flowable<String> strings = Flowable.generate(() -> iter, (iterator, emitter) -> {if (iterator.hasNext()) {emitter.onNext(iterator.next().toString());} else {emitter.onComplete();}
});

注意,流的类型( <String> )不必与状态类型( Iterator<Integer> )相同。 当然,如果您有Java Collection并想将其转换为流,则不必先创建迭代器。 使用Flowable.fromIterable()足够了。 甚至更简单的generate()版本都假定您根本没有任何状态。 例如随机数流:

Flowable<Double> randoms = Flowable.generate(emitter -> emitter.onNext(Math.random()));

但老实说,您可能最终将需要一个Random实例:

Flowable.generate(Random::new, (random, emitter) -> {emitter.onNext(random.nextBoolean());
});

摘要

如您所见,RxJava 1.x中的Observable.create()和Flowable.create Flowable.create()有一些缺点。 如果您真的在乎大量并发系统的可伸缩性和运行状况(否则您将不会读到这篇文章!),则必须了解背压。 如果您真的需要从头开始创建流,而不是使用from*()系列方法或繁重工作的各种库,请熟悉generate() 。 本质上,您必须学习如何将某些类型的数据源建模为奇特的迭代器。 期待有更多文章解释如何实现更多现实生活流。

这类似于无状态HTTP协议,该协议在服务器上使用称为session *的小状态来跟踪过去的请求。

翻译自: https://www.javacodegeeks.com/2017/08/generating-backpressure-aware-streams-flowable-generate-rxjava-faq.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/349324.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

java字符串构造函数的应用_StringTokenizer类的使用

StringTokenizer是一个用来分隔String的应用类&#xff0c;相当于VB的split函数。1.构造函数public StringTokenizer(String str)public StringTokenizer(String str, String delim)public StringTokenizer(String str, String delim, boolean returnDelims)第一个参数就是要分…

linkedhashmap获取第n个元素_机试真题分享——交换链表前后第K个元素

题目描述给定一个编码链表和一个加密条件K&#xff0c;对编码进行加密。加密规则&#xff1a;把编码从前往后开始数第K个元素和从后往前数第K个元素进行交换。注意&#xff1a;编码的长度为0.第一个编码的序号是1.示例&#xff1a;输入&#xff1a;[1 2 3 4 5 6] 2输出&#xf…

maven java 参数_将Maven参数注入Java类

我想将settings.xml配置文件参数注入Java类.我尝试使用maven-annotation-plugin,但值为null.我想知道这是不是因为这个插件是为Mojo设计的Setting.xml片段APP_NAMEUSER_EMAILUSER_PASSWORD在班上Parameter(defaultValue "test.email", readonly true)private Strin…

Spring Boot Admin –用于管理Spring Boot应用程序的Admin UI

作为微服务开发的一部分&#xff0c;我们许多人都将Spring Boot与Spring Cloud功能一起使用。 在微服务领域&#xff0c;我们将有许多Spring Boot应用程序将在相同/不同的主机上运行。 如果将Spring Actuator添加到Spring Boot应用程序中&#xff0c;我们将获得很多现成的端点来…

md5与des算法有何不同_Python算法详解:为什么说算法是程序的灵魂?

算法是程序的灵魂,只有掌握了算法,才能轻松地驾驭程序开发。软件开发工作不是按部就班,而是选择一种最合理的算法去实现项目功能。算法能够引导开发者在面对一个项目功能时用什么思路去实现,有了这个思路后,编程工作只需要遭循这个思路去实现即可。本章将详细讲解计算机算法的基…

java json 解析null_解析包含null的原始json数组

Json数组遇新是直朋能到&#xff1a;{"userName":null," msgArr":[null],"numrow":0} //String resultGson&#xff1a;new Gson().fromJson(result, MyClass.class);MyCl作一新求抖直微圈ass:public String userName;public int[] msgArr;...错…

python运维脚本部署jdk_基于Java/Python搭建Web UI自动化环境

Java搭建UI自动化测试环境下载JDK8https://www.cnblogs.com/thloveyl/p/12378124.html配置Java环境1.解压Jdk压缩包2.配置环境变量计算机->属性->高级->环境变量->系统变量->Path3.添加根目录下的bin与lib目录、jre下的bin目录(近期我发现只将bin目录加入Path就…

eclipse JAVA用户注册_如何eclipse编写一个简单实用的登陆界面

花了点时间写了一个简单的页面&#xff1a;package test;import java.awt.*;import java.awt.event.*;import javax.swing.*;public class UserInterface extends JFrame implements WindowListener{JFrame jf;JPanel jpan1,jpan2,jpan3;JLabel username,password,lspace;JText…

java内存泄漏案例_寻找内存泄漏:一个案例研究

java内存泄漏案例一周前&#xff0c;我被要求修复一个有内存泄漏问题的webapp。 考虑到过去两年左右的时间里我已经看到并修复了数百个泄漏&#xff0c;我想这有多难。 但是事实证明这是一个挑战。 12小时后&#xff0c;我发现该应用程序中不少于5个漏洞&#xff0c;并设法修复…

doc无法编译java文件_java编译成jar文件.doc

java编译成jar文件Java程序打包成jar包(2012-06-08 10:28:23)转载▼标签&#xff1a;分类&#xff1a; 方法一&#xff1a;通过jar命令jar命令的用法&#xff1a;下面是jar命令的帮助说明&#xff1a;用法&#xff1a;jar {ctxui}[vfm0Me] [jar-file] [manifest-file] [entry-p…

python 档案管理系统_Python 写入档案的 4 个方法

在 Python 写入档案内容跟读取档案差不多, 也很简单方便&#xff0c;以下会介绍用 Python 逐行读取档案内容的 4 种方法。在看例子前先要了解开启档案的参数, 一般上读取档案会用 “r”, 即唯读的意思, 如果要写入档案, 分别可以用 “w” (即 write 的意思) 或 “a” (即 appen…

高可用性(HA),会话复制,多VM Payara群集

抽象 在研究如何创建高可用性&#xff08;HA&#xff09;时&#xff0c;我发现了会话复制的多机Payara / GlassFish群集&#xff0c;无法在一个参考中找到所需的一切。 我认为这将是一个普遍的需求并且很容易找到。 不幸的是&#xff0c;我的假设是错误的。 因此&#xff0c;本…

java时间日期工具类_java日期处理工具类

java日期处理工具类import java.text.DecimalFormat;import java.text.ParsePosition;import java.text.SimpleDateFormat;import java.util.Calendar;import java.util.Date;import java.util.GregorianCalendar;import java.util.Locale;import java.util.Random;import java…

python运行结果闪退_Pyhton TestCase运行闪退与失败,原因不详。。。

把源码贴上来&#xff0c;希望某位大神可以指点迷津&#xff1a;"""Unit test for odbchelper.pyThis program is part of "Dive Into Python", a free Python book forexperienced programmers. Visit http://diveintopython.org/ for thelatest ver…

java.jsp.jdbc_Java-jsp使用JDBC访问数据库时显示乱码是怎么回事?

1.JSP页面编码你改成utf-8;2.servlet容器的编码格式你改成utf-8;以tomcat为例&#xff1a;找到你的安装目录tomcat下面 > conf > server.xml用记事本打开添加如下的代码&#xff1a;URIEncoding"utf-8"protocol"HTTP/1.1"port"8080"connec…

python实例讲解wxpythonhyh123_Python实例讲解 -- wxpython 基本的控件 (按钮)

使用按钮工作在wxPython 中有很多不同类型的按钮。这一节&#xff0c;我们将讨论文本按钮、位图按钮、开关按钮(toggle buttons )和通用(generic )按钮。如何生成一个按钮&#xff1f;在第一部分(part 1)中&#xff0c;我们已经说明了几个按钮的例子&#xff0c;所以这里我们只…

检测Java Web应用程序而无需修改其源代码

与其他系统进行交互时&#xff0c;大多数Java Web应用程序都使用标准Java接口。 使用接口javax.servlet.Servlet来实现基于HTTP的服务&#xff0c;例如网页或REST服务器。 使用JDBC接口java.sql.Statement和java.sql.Connection实现数据库交互。 这些标准几乎是通用的&#xff…

如何快速弄懂一个新模型_如何评估创业项目是否靠谱?一个新的模型 | 创创锦囊...

要判断一个创业项目是否靠谱&#xff0c;是否能拥有广阔的市场和巨大的增长潜力&#xff0c;不仅是投资人关心的话题&#xff0c;更是每一个创业者在创业过程中不断思考的问题。投资人关注大趋势、大机会&#xff0c;遵循自上而下的思维模型&#xff0c;在心仪的赛道上寻找合适…

java编译找不到符号 int age=in.nexint()_Java报错找不到符号,小白自学求大佬解决...

import java.util.*;public class guess_1{public static void main(String[] args){Scanner innew Scanner(System.in);System.out.println("--------猜拳游戏--------");System.out.println("请出拳(1.剪刀 2.石头 3.布)");int personin.nextInt();int c…

Java命令行界面(第24部分):MarkUtils-CLI

本系列中有关使用Java解析命令行参数的第一篇文章介绍了Apache Commons CLI库。 这是本系列中介绍的基于Java的命令行解析库中最古老的&#xff0c;而且可能是最常用的之一。 Apache Commons CLI确实显示了它的时代&#xff0c;特别是与一些更现代的基于Java的命令行处理库相比…