rxjava 背压_背压加载文件– RxJava常见问题解答

rxjava 背压

事实证明,将文件作为流进行处理非常有效且方便。 许多人似乎忘记了,自Java 8(3年以上!)以来,我们可以很容易地将任何文件变成一行代码:

String filePath = "foobar.txt";
try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) {reader.lines().filter(line -> !line.startsWith("#")).map(String::toLowerCase).flatMap(line -> Stream.of(line.split(" "))).forEach(System.out::println);
}

reader.lines()返回Stream<String> ,您可以对其进行进一步转换。 在此示例中,我们丢弃以"#"开头的行,并通过将其拆分为单词来爆炸每行。 这样,我们就可以实现单词流而不是行流。 使用文本文件几乎与使用普通Java集合一样简单。 在RxJava中, 我们已经学习了generate()运算符。 它也可以在这里用于从文件创建健壮的行流:

Flowable<String> file = Flowable.generate(() -> new BufferedReader(new FileReader(filePath)),(reader, emitter) -> {final String line = reader.readLine();if (line != null) {emitter.onNext(line);} else {emitter.onComplete();}},reader -> reader.close()
);

在上述示例中, generate()运算符稍微复杂一些。 第一个参数是状态工厂。 每次有人订阅此流时,都会调用工厂并创建有状态的BufferedReader 。 然后,当下游运营商或订户希望接收某些数据时,将调用第二个lambda(带有两个参数)。 此lambda表达式尝试从文件中精确提取一行,然后将其发送到下游( onNext() )或在遇到文件结尾时完成。 这很简单。 generate()的第三个可选参数是一个lambda表达式,可以对state进行一些清理。 在我们的情况下这非常方便,因为我们不仅必须在到达文件末尾时关闭文件,而且还必须在使用者过早取消订阅时关闭文件。

认识Flowable.using()运算符

这似乎需要做很多工作,尤其是当我们已经有了来自JDK 8的一行代码时。事实证明,有一个类似的工厂运算符using()很方便。 的翻译的所有最简单的方法首先Stream从Java到Flowable是通过转换StreamIterator (checked异常处理忽略):

Flowable.fromIterable(new Iterable<String>() {@Overridepublic Iterator<String> iterator() {final BufferedReader reader = new BufferedReader(new FileReader(filePath));final Stream<String> lines = reader.lines();return lines.iterator();}
});

可以简化为:

Flowable.<String>fromIterable(() -> {final BufferedReader reader = new BufferedReader(new FileReader(filePath));final Stream<String> lines = reader.lines();return lines.iterator();
});

但是我们忘记了关闭BufferedReader从而关闭FileReader从而关闭了文件句柄。 因此,我们引入了资源泄漏。 在这种情况下, using()运算符的作用就像一个超级按钮。 在某种程度上,它类似于try-with-resources语句。 您可以基于某些外部资源创建流。 当有人订阅或取消订阅时,将为您管理此资源的生命周期(创建和处置):

Flowable.using(() -> new BufferedReader(new FileReader(filePath)),reader -> Flowable.fromIterable(() -> reader.lines().iterator()),reader -> reader.close()
);

它与上一个generate()示例非常相似,但是中间最重要的lambda表达式却大不相同。 我们获得一个资源( reader )作为参数,并假设返回一个Flowable (而不是单个元素)。 该lambda仅被调用一次,而不是在下游每次请求新项时都被调用。 using()运算符给我们的是管理BufferedReaders的生命周期。 当我们有一个状态(可以一次生成整个Flowable ,而不是一次generate()一个using()时, using()很有用。

流XML文件

…或JSON。 假设您有一个非常大的XML文件,其中包含以下条目,成千上万个条目:

<trkpt lat="52.23453" lon="21.01685"><ele>116</ele>
</trkpt>
<trkpt lat="52.23405" lon="21.01711"><ele>116</ele>
</trkpt>
<trkpt lat="52.23397" lon="21.0166"><ele>116</ele>
</trkpt>

这是标准GPS交换格式的片段,可以描述任意长度的地理路线。 每个<trkpt>是具有纬度,经度和海拔的单个点。 我们希望有一个跟踪点流(为简单起见忽略高程),以便可以部分使用文件,而不是一次加载所有文件。 我们有三个选择:

  • DOM / JAXB –必须将所有内容加载到内存中并映射到Java对象。 不适用于无限长的文件(甚至非常大的文件)
  • SAX –基于推送的库,一旦发现XML标签打开或关闭,就会调用回调。 似乎好一点,但可能无法支持背压–由库决定何时调用回调,并且无法减慢其速度
  • StAX –与SAX相似,但是我们必须积极地从XML文件中提取数据。 这对于支持背压至关重要-我们决定何时读取下一个数据块

让我们尝试使用StAX和RxJava实现可能非常大的XML文件的解析和流传输。 首先,我们必须首先学习如何使用StAX 。 该解析器称为XMLStreamReader ,它是按照以下咒语和诅咒序列创建的:

XMLStreamReader staxReader(String name) throws XMLStreamException {final InputStream inputStream = new BufferedInputStream(new FileInputStream(name));return XMLInputFactory.newInstance().createXMLStreamReader(inputStream);
}

只需闭上眼睛,并确保您始终有一个地方可以复制粘贴上面的代码片段。 情况变得更糟。 为了读取第一个<trkpt>标记及其属性,我们必须编写一些复杂的代码:

import lombok.Value;@Value
class Trackpoint {private final BigDecimal lat;private final BigDecimal lon;
}Trackpoint nextTrackpoint(XMLStreamReader r) {while (r.hasNext()) {int event = r.next();switch (event) {case XMLStreamConstants.START_ELEMENT:if (r.getLocalName().equals("trkpt")) {return parseTrackpoint(r);}break;case XMLStreamConstants.END_ELEMENT:if (r.getLocalName().equals("gpx")) {return null;}break;}}return null;
}Trackpoint parseTrackpoint(XMLStreamReader r) {return new Trackpoint(new BigDecimal(r.getAttributeValue("", "lat")),new BigDecimal(r.getAttributeValue("", "lon")));
}

API是低级报价,并且几乎是古董。 一切都发生在一个巨大的循环中,该循环读取... int类型的东西 。 此int可以是START_ELEMENTEND_ELEMENT或我们不感兴趣的其他一些东西。请记住,我们正在读取XML文件,但不是逐行或逐字符,而是通过逻辑XML标记(标记)。 因此,如果发现<trkpt>元素的打开,我们将对其进行解析,否则我们将继续。 第二个重要条件是当我们发现关闭</gpx> ,这应该是GPX文件中的最后一件事。 在这种情况下,我们返回null ,表示XML文件结束。

感觉复杂吗? 实际上,这是读取具有恒定内存使用量的大型XML(与文件大小无关)的最简单方法。 所有这些与RxJava有何关系? 在这一点上,我们可以很容易地构建Flowable<Trackpoint> 。 是的, Flowable ,没有Observable (见: ObsevableObservable )。 这样的流将完全支持背压,这意味着它将以适当的速度读取文件:

Flowable<Trackpoint> trackpoints = generate(() -> staxReader("track.gpx"),this::pushNextTrackpoint,XMLStreamReader::close);void pushNextTrackpoint(XMLStreamReader reader, Emitter<Trackpoint> emitter) {final Trackpoint trkpt = nextTrackpoint(reader);if (trkpt != null) {emitter.onNext(trkpt);} else {emitter.onComplete();}
}

哇,如此简单,如此反压! [1]我们首先创建一个XMLStreamReader ,并确保在文件结束或有人取消订阅时将其关闭。 请记住,每个订阅者将一次又一次打开并开始解析相同的文件。 中间的lambda表达式仅使用状态变量( XMLStreamReader )并发出另一个跟踪点。 所有这些似乎都很晦涩,事实是! 但是,现在我们有了一个使用很少的资源就可以从一个可能很大的文件中获取的反向压力感知流。 我们可以同时处理跟踪点,也可以将它们与其他数据源组合在一起。 在下一篇文章中,我们将学习如何以非常相似的方式加载JSON。

翻译自: https://www.javacodegeeks.com/2017/09/loading-files-backpressure-rxjava-faq.html

rxjava 背压

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/334794.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

linux 卸载nfs device is busy,umount.nfs: device is busy解决办法

&period;NET Core全新的配置管理&lbrack;共9篇&rsqb;提到“配置”二字,我想绝大部分.NET开发人员脑海中会立马浮现出两个特殊文件的身影,那就是我们再熟悉不过的app.config和web.config,多年以来我们已经习惯了将结构化的配置信息定义在这两 ...Windows无法安装到G…

php 时间加法函数_php 时间加减

date_default_timezone_set(PRC); //默认时区echo"今天:",date("Y-m-d",time()),"";echo "今天:",date("Y-m-d",strtotime("18 june2008")),"";echo "昨天:",date("Y-m-d",strtoti…

如何用xapmm测试php_如何在Xampp中运行PHP程序?

成为经过认证的专业PHP是最流行的web后端编程语言。PHP代码将作为web服务器模块或命令行界面运行。要运行PHP for the web&#xff0c;您需要安装像Apache这样的web服务器&#xff0c;还需要像MyS成为经过认证的专业PHP是最流行的web后端编程语言。PHP代码将作为web服务器模块或…

中文标点符号大全

文章目录常见的中文标点符号标点符号的位置中文的标点符号包括句号&#xff0c;逗号&#xff0c;感叹号&#xff0c;问号&#xff0c;引号&#xff0c;冒号等等&#xff0c;接下来分享常见的中文标点符号名称。常见的中文标点符号 句号 。 用于句子末尾&#xff0c;表示陈述语气…

linux 查看链接最终目标,linux学习笔记7-链接

hard link and soft link硬链接&#xff1a;一个文件两个不同的进入&#xff0c;相当于一个教室两个门&#xff0c;从哪个门进都进到同一个教室硬链接特征&#xff1a;1、拥有相同的 i节点 和相同的存储block快&#xff0c;可以看做是同一个文件2、可通过i节点识别&#xff0c;…

apache.camel_Apache Camel 2.20发布–新增功能

apache.camelApache Camel 2.20已于今天发布&#xff0c;并且像往常一样&#xff0c;我受命撰写有关此出色新版本及其亮点的博客。 该版本具有以下重点。 1&#xff09;Java 9技术预览支持 我们已经开始支持Java 9的工作&#xff0c;此版本称为技术预览。 源代码在Java 9上…

操作无法完成(错误 0x000006ba),Windows 11 PDF打印机无法使用解决办法

操作无法完成(错误 0x000006ba)&#xff0c;Windows 11 PDF打印机无法使用解决办法 解决方式一 先重启一次电脑&#xff0c;看看是否可以解决问题。 解决方式二 重新启动 Printer Spooler 服务

java 头像 微信群_java怎么生成带用户微信头像的图片,并把这张图片发送给用户。...

展开全部这个是要一个图片中嵌套另外一张图片你可以62616964757a686964616fe59b9ee7ad9431333431336163试试下面这段代码import java.awt.Color;import java.awt.Font;import java.awt.Graphics2D;import java.awt.image.BufferedImage;import java.io.File;import java.io.IOE…

MyEclipse 的 TCP/IP Monitor 的使用

文章目录步骤 1&#xff1a;新建 web02 工程步骤 2&#xff1a;在 WebRoot 下创建 add_employee.html步骤 3&#xff1a;部署项目步骤 4&#xff1a;启动服务器步骤 5&#xff1a;打开 TCP/IP Monitor步骤 6&#xff1a;增加新的监视器步骤 7&#xff1a;启动监视器步骤 8&…

ubuntu linux mac地址,Ubuntu下修改mac地址

说明&#xff0c;本文翻译自man macchanger&#xff0c;若遇到不能理解的地方请参考man文档概述macchanger是linux下用于查看和修改网络接口mac地址的工具使用方法macchanger [options] device选项-e, --ending不要修改vendor-a, --another设置为一个同类型的MAC&#xff0c;同…

openwrt固定速率_固定速率与固定延迟– RxJava常见问题解答

openwrt固定速率如果您使用的是纯Java&#xff0c;从版本5开始&#xff0c;我们有一个方便的调度程序类&#xff0c;该类允许以固定速率或固定延迟运行任务&#xff1a; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService;Schedu…

如何处理表单中的中文(中文编码/解码问题)

浏览器会如何对表单中的数据进行编码? 当表单采用 post 方式提交时&#xff0c;浏览器会按照打开该表单所在的页面的编码来对表单中的数据进行编码。 在 html 文件当中设置字符编码集&#xff08;即字符编码格式&#xff09; <meta http-equiv"content-type" …

linux 进城 管道丢数据,linux – 使用命名管道与bash – 数据丢失的问题

有人在线搜索,发现简单的“教程”使用命名管道.但是,当我做任何后台工作时,我似乎失去了大量的数据.[[编辑&#xff1a;找到一个更简单的解决方案,看到回复帖子.所以我提出的问题现在是学术性的 – 万一有人想要一个工作服务器]]使用Ubuntu 10.04与Linux 2.6.32-25-generic#45-…

wso2 安装_WSO2注册表安装简介

wso2 安装这篇文章基于有关注册表安装及其工作原理等常见问题。以下是人们提出的主要问题&#xff1a; 1&#xff09;。 安装如何工作&#xff1f; 2&#xff09;。 Config Registry和Governance Registry有什么区别&#xff1f; 3&#xff09;。 可以将H2以外的数据库用于本…

Servlet配置错误处理页面/配置错误页面

写一个错误处理页面 error.html <html> <head> <meta http-equiv"Content-Type" content"text/html; charsetUTF-8"> <title>Insert title here</title> </head> <body style"font-size:30px;color:red;&quo…

azdb文件怎么打开_AZDBExplorerSvcs.dll

我该如何安装从金山毒霸下载的DLL文件&#xff1f;一&#xff1a;1、从金山毒霸下载压缩文件。2、将DLL文件解压到电脑上的某个地方。3、把该文件跟要求使用它的程序放在同一路径上。注意32位程序需要使用32位的DLL文件&#xff0c;64位程序需要使用64位的DLL文件。否则会出现0…

linux添加windows网络打印机,Linux Mint如何添加windows分享的网络打印机?

1.安装sambasudo apt-get install samba2.找到系统打印机选项通过 Menu-->>控制中心-->>系统管理找到 Printers选项&#xff0c;双击打开。3.核对windows打印机名并添加在如上图的画面中&#xff0c;点击 “Network Printer”&#xff0c;再点击“windows Printer …

5菜鸟教程_excel图文教程:应用PQ工具进行数据整理

编按&#xff1a;哈喽&#xff0c;大家好&#xff01;在日常工作中&#xff0c;我们经常会与数据打交道&#xff0c;那整理数据自然也是一件习以为常的事。但就是这么一件天天都会做的事&#xff0c;却让很多人压力山大&#xff0c;这不&#xff0c;又有一位小伙伴遇到问题了&a…

MyEclipse 如何将 jar 包导入项目中

步骤 1 项目右键->build path( 构建路径 )->configure build path(配置构建路径) 步骤 2 Java Build Path --> libraries(库) --> Add External JARs --> 选择 ojdbc14_11g.jar 在项目中出现 Referenced Libraries , 则完成

redis lettuce_Redis Client Lettuce 5 GA发布

redis lettuce经过13个月的开发阶段和208张已解决的故障单&#xff0c;我很高兴宣布Lettuce 5.0全面上市。 这是一个主要发行版&#xff0c;带有一些重大更改&#xff0c;新的有趣功能以及Java 9兼容性。 从Maven Central获取发行版 <dependency><groupId>io.let…