rxjava 背压
事实证明,将文件作为流进行处理非常有效且方便。 许多人似乎忘记了,自Java 8(3年以上!)以来,我们可以很容易地将任何文件变成一行代码:
String filePath = "foobar.txt";
try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) {reader.lines().filter(line -> !line.startsWith("#")).map(String::toLowerCase).flatMap(line -> Stream.of(line.split(" "))).forEach(System.out::println);
}
reader.lines()
返回Stream<String>
,您可以对其进行进一步转换。 在此示例中,我们丢弃以"#"
开头的行,并通过将其拆分为单词来爆炸每行。 这样,我们就可以实现单词流而不是行流。 使用文本文件几乎与使用普通Java集合一样简单。 在RxJava中, 我们已经学习了generate()
运算符。 它也可以在这里用于从文件创建健壮的行流:
Flowable<String> file = Flowable.generate(() -> new BufferedReader(new FileReader(filePath)),(reader, emitter) -> {final String line = reader.readLine();if (line != null) {emitter.onNext(line);} else {emitter.onComplete();}},reader -> reader.close()
);
在上述示例中, generate()
运算符稍微复杂一些。 第一个参数是状态工厂。 每次有人订阅此流时,都会调用工厂并创建有状态的BufferedReader
。 然后,当下游运营商或订户希望接收某些数据时,将调用第二个lambda(带有两个参数)。 此lambda表达式尝试从文件中精确提取一行,然后将其发送到下游( onNext()
)或在遇到文件结尾时完成。 这很简单。 generate()
的第三个可选参数是一个lambda表达式,可以对state进行一些清理。 在我们的情况下这非常方便,因为我们不仅必须在到达文件末尾时关闭文件,而且还必须在使用者过早取消订阅时关闭文件。
认识Flowable.using()运算符
这似乎需要做很多工作,尤其是当我们已经有了来自JDK 8的一行代码时。事实证明,有一个类似的工厂运算符using()
很方便。 的翻译的所有最简单的方法首先Stream
从Java到Flowable
是通过转换Stream
成Iterator
(checked异常处理忽略):
Flowable.fromIterable(new Iterable<String>() {@Overridepublic Iterator<String> iterator() {final BufferedReader reader = new BufferedReader(new FileReader(filePath));final Stream<String> lines = reader.lines();return lines.iterator();}
});
可以简化为:
Flowable.<String>fromIterable(() -> {final BufferedReader reader = new BufferedReader(new FileReader(filePath));final Stream<String> lines = reader.lines();return lines.iterator();
});
但是我们忘记了关闭BufferedReader
从而关闭FileReader
从而关闭了文件句柄。 因此,我们引入了资源泄漏。 在这种情况下, using()
运算符的作用就像一个超级按钮。 在某种程度上,它类似于try-with-resources
语句。 您可以基于某些外部资源创建流。 当有人订阅或取消订阅时,将为您管理此资源的生命周期(创建和处置):
Flowable.using(() -> new BufferedReader(new FileReader(filePath)),reader -> Flowable.fromIterable(() -> reader.lines().iterator()),reader -> reader.close()
);
它与上一个generate()
示例非常相似,但是中间最重要的lambda表达式却大不相同。 我们获得一个资源( reader
)作为参数,并假设返回一个Flowable
(而不是单个元素)。 该lambda仅被调用一次,而不是在下游每次请求新项时都被调用。 using()
运算符给我们的是管理BufferedReaders
的生命周期。 当我们有一个状态(可以一次生成整个Flowable
,而不是一次generate()
一个using()
时, using()
很有用。
流XML文件
…或JSON。 假设您有一个非常大的XML文件,其中包含以下条目,成千上万个条目:
<trkpt lat="52.23453" lon="21.01685"><ele>116</ele>
</trkpt>
<trkpt lat="52.23405" lon="21.01711"><ele>116</ele>
</trkpt>
<trkpt lat="52.23397" lon="21.0166"><ele>116</ele>
</trkpt>
这是标准GPS交换格式的片段,可以描述任意长度的地理路线。 每个<trkpt>
是具有纬度,经度和海拔的单个点。 我们希望有一个跟踪点流(为简单起见忽略高程),以便可以部分使用文件,而不是一次加载所有文件。 我们有三个选择:
- DOM / JAXB –必须将所有内容加载到内存中并映射到Java对象。 不适用于无限长的文件(甚至非常大的文件)
- SAX –基于推送的库,一旦发现XML标签打开或关闭,就会调用回调。 似乎好一点,但可能无法支持背压–由库决定何时调用回调,并且无法减慢其速度
- StAX –与SAX相似,但是我们必须积极地从XML文件中提取数据。 这对于支持背压至关重要-我们决定何时读取下一个数据块
让我们尝试使用StAX和RxJava实现可能非常大的XML文件的解析和流传输。 首先,我们必须首先学习如何使用StAX 。 该解析器称为XMLStreamReader
,它是按照以下咒语和诅咒序列创建的:
XMLStreamReader staxReader(String name) throws XMLStreamException {final InputStream inputStream = new BufferedInputStream(new FileInputStream(name));return XMLInputFactory.newInstance().createXMLStreamReader(inputStream);
}
只需闭上眼睛,并确保您始终有一个地方可以复制粘贴上面的代码片段。 情况变得更糟。 为了读取第一个<trkpt>
标记及其属性,我们必须编写一些复杂的代码:
import lombok.Value;@Value
class Trackpoint {private final BigDecimal lat;private final BigDecimal lon;
}Trackpoint nextTrackpoint(XMLStreamReader r) {while (r.hasNext()) {int event = r.next();switch (event) {case XMLStreamConstants.START_ELEMENT:if (r.getLocalName().equals("trkpt")) {return parseTrackpoint(r);}break;case XMLStreamConstants.END_ELEMENT:if (r.getLocalName().equals("gpx")) {return null;}break;}}return null;
}Trackpoint parseTrackpoint(XMLStreamReader r) {return new Trackpoint(new BigDecimal(r.getAttributeValue("", "lat")),new BigDecimal(r.getAttributeValue("", "lon")));
}
API是低级报价,并且几乎是古董。 一切都发生在一个巨大的循环中,该循环读取... int
类型的东西 。 此int
可以是START_ELEMENT
, END_ELEMENT
或我们不感兴趣的其他一些东西。请记住,我们正在读取XML文件,但不是逐行或逐字符,而是通过逻辑XML标记(标记)。 因此,如果发现<trkpt>
元素的打开,我们将对其进行解析,否则我们将继续。 第二个重要条件是当我们发现关闭</gpx>
,这应该是GPX文件中的最后一件事。 在这种情况下,我们返回null
,表示XML文件结束。
感觉复杂吗? 实际上,这是读取具有恒定内存使用量的大型XML(与文件大小无关)的最简单方法。 所有这些与RxJava有何关系? 在这一点上,我们可以很容易地构建Flowable<Trackpoint>
。 是的, Flowable
,没有Observable
(见: Obsevable
与Observable
)。 这样的流将完全支持背压,这意味着它将以适当的速度读取文件:
Flowable<Trackpoint> trackpoints = generate(() -> staxReader("track.gpx"),this::pushNextTrackpoint,XMLStreamReader::close);void pushNextTrackpoint(XMLStreamReader reader, Emitter<Trackpoint> emitter) {final Trackpoint trkpt = nextTrackpoint(reader);if (trkpt != null) {emitter.onNext(trkpt);} else {emitter.onComplete();}
}
哇,如此简单,如此反压! [1]我们首先创建一个XMLStreamReader
,并确保在文件结束或有人取消订阅时将其关闭。 请记住,每个订阅者将一次又一次打开并开始解析相同的文件。 中间的lambda表达式仅使用状态变量( XMLStreamReader
)并发出另一个跟踪点。 所有这些似乎都很晦涩,事实是! 但是,现在我们有了一个使用很少的资源就可以从一个可能很大的文件中获取的反向压力感知流。 我们可以同时处理跟踪点,也可以将它们与其他数据源组合在一起。 在下一篇文章中,我们将学习如何以非常相似的方式加载JSON。
翻译自: https://www.javacodegeeks.com/2017/09/loading-files-backpressure-rxjava-faq.html
rxjava 背压