检测和测试停滞的流– RxJava常见问题解答

假设您有一个流以不可预测的频率发布事件。 有时您可以预期每秒会有数十条消息,但是偶尔几秒钟都看不到任何事件。 如果您的流是通过Web套接字,SSE或任何其他网络协议传输的,则可能会出现问题。 静默时间过长(停顿)可以解释为网络问题。 因此,我们经常不时发送人工事件( ping ),以确保:

  • 客户还活着
  • 让客户知道我们还活着

举一个更具体的例子,假设我们有一个Flowable<String>流,它会产生一些事件。 如果没有事件超过一秒钟,则应发送占位符"PING"消息。 当静默时间更长时,应该每秒发出一个"PING"消息。 我们如何在RxJava中实现这样的要求? 最明显但不正确的解决方案是将原始流与ping合并:

Flowable<String> events = //...
Flowable<String> pings = Flowable.interval(1, SECONDS).map(x -> "PING");Flowable<String> eventsWithPings = events.mergeWith(pings);

mergeWith()运算符至关重要:它接受真正的events ,并将它们与恒定的ping流合并。 当然,当不存在真实事件时,将显示"PING"消息。 不幸的是,它们与原始流完全无关。 这意味着即使有很多正常事件,我们也会继续发送ping命令。 而且,当静默开始时,我们不会在一秒钟后精确发送"PING" 。 如果您对这种机制感到满意,则可以在此处停止阅读。

一种更复杂的方法需要发现持续超过1秒的静音。 我们可以使用timeout()运算符。 不幸的是,它会产生TimeoutException并从上游退订-行为过于激进。 我们只想收到某种通知。 事实证明,可以使用debounce()运算符。 通常,此操作员会推迟新事件的发出,以防万一有新事件出现,从而覆盖了旧事件。 所以,如果我说:

Flowable<String> events = //...
Flowable<String> delayed = events.debounce(1, SECONDS);

这意味着delayed流仅在1秒内跟随其他事件时才会发出事件。 如果events流保持足够快的速度产生事件,那么技术上delayed可能永远不会发出任何东西。 我们将使用delayed流通过以下方式发现沉默:

Flowable<String> events = //...
Flowable<String> delayed = events.debounce(1, SECONDS);
Flowable<String> pings = delayed.map(ev -> "PING");
Flowable<String> eventsWithPings = Flowable.merge(events, pings);

请记住, mergeWith()和它的static merge()对应物之间没有区别。 所以我们到了某个地方。 如果流繁忙,则delayed流将永远不会收到任何事件,因此不会发送"PING"消息。 但是,当原始流不发送任何事件超过1秒时, delayed接收到最后一次看到的事件,将其忽略并转换为"PING" 。 聪明,但坏了。 此实现仅在发现停顿后才发送一个"PING" ,而不是每秒发送一次定期ping。 很容易修复! 除了将最后一次看到的事件转换为单个"PING"我们还可以将其转换为周期性ping序列:

Flowable<String> events = //...
Flowable<String> delayed = events.debounce(1, SECONDS);
Flowable<String> pings = delayed.flatMap(x -> Flowable.interval(0, 1, SECONDS).map(e -> "PING"));
Flowable<String> eventsWithPings = Flowable.merge(events, pings);

您能看到缺陷在哪里吗? 每当原始流中出现一点沉默时,我们就会每秒发出一次ping 。 但是,一旦出现真正的事件,我们应该停止这样做。 我们没有。 上游的每个停顿都会导致新的无限ping流出现在最终的合并流中。 我们必须以某种方式告诉pings流,因为原始流发出了真正的事件,所以它应该停止发出ping 。 猜猜是什么,有takeUntil()运算符可以做到这一点!

Flowable<String> events = //...
Flowable<String> delayed = events.debounce(1, SECONDS);
Flowable<String> pings = delayed.flatMap(x -> Flowable.interval(0, 1, SECONDS).map(e -> "PING").takeUntil(events));
Flowable<String> eventsWithPings = Flowable.merge(events, pings);

花一点时间完全掌握上面的代码片段。 每当原始流上超过1秒没有任何反应时, delayed流就会发出一个事件。 pings流发射的序列"PING"每秒从发射每个事件的事件delayed 。 但是,一旦事件出现在events流上,便会终止pings流。 您甚至可以将所有这些定义为单个表达式:

Flowable<String> events = //...
Flowable<String> eventsWithPings = events.mergeWith(events.debounce(1, SECONDS).flatMap(x1 -> Flowable.interval(0, 1, SECONDS).map(e -> "PING").takeUntil(events)));

可测性

好的,我们已经编写了所有这些内容,但是我们应该如何测试事件驱动代码的这个三层嵌套的Blob? 我们如何确保ping在正确的时间出现并在静音结束后停止? 如何模拟各种与时间相关的场景? RxJava具有许多杀手级功能,但是测试时间流逝可能是最大的功能。 首先,让我们的ping代码更具可测试性和通用性:

<T> Flowable<T> withPings(Flowable<T> events, Scheduler clock, T ping) {return events.mergeWith(events.debounce(1, SECONDS, clock).flatMap(x1 -> Flowable.interval(0, 1, SECONDS, clock).map(e -> ping).takeUntil(events)));}

此实用程序方法采用任意的T流并添加ping ,以防该流在较长时间内不产生任何事件。 我们在测试中像这样使用它:

PublishProcessor<String> events = PublishProcessor.create();
TestScheduler clock = new TestScheduler();
Flowable<String> eventsWithPings = withPings(events, clock, "PING");

哦,男孩, PublishProcessorTestSchedulerPublishProcessor是一个有趣的类,它是一个亚型Flowable (所以我们可以使用它作为一个普通的流)。 另一方面,我们可以使用其onNext()方法强制发出事件:

events.onNext("A");

如果有人收听events流,他将立即收到"A"事件。 这clock是怎么回事? RxJava中以任何方式处理时间的每个运算符(例如debounce debounce()interval()timeout()window() )都可以采用可选的Scheduler参数。 它充当时间的外部来源。 特殊的TestScheduler是我们完全控制的人为时间来源。 也就是说,只要我们不显式调用advanceTimeBy()时间就保持静止:

clock.advanceTimeBy(999, MILLISECONDS);

999毫秒不是巧合。 Ping在1秒钟后开始精确显示,因此在999毫秒后将不可见。 现在是时候揭示完整的测试用例了:

@Test
public void shouldAddPings() throws Exception {PublishProcessor<String> events = PublishProcessor.create();final TestScheduler clock = new TestScheduler();final Flowable<String> eventsWithPings = withPings(events, clock, "PING");final TestSubscriber<String> test = eventsWithPings.test();events.onNext("A");test.assertValues("A");clock.advanceTimeBy(999, MILLISECONDS);events.onNext("B");test.assertValues("A", "B");clock.advanceTimeBy(999, MILLISECONDS);test.assertValues("A", "B");clock.advanceTimeBy(1, MILLISECONDS);test.assertValues("A", "B", "PING");clock.advanceTimeBy(999, MILLISECONDS);test.assertValues("A", "B", "PING");events.onNext("C");test.assertValues("A", "B", "PING", "C");clock.advanceTimeBy(1000, MILLISECONDS);test.assertValues("A", "B", "PING", "C", "PING");clock.advanceTimeBy(999, MILLISECONDS);test.assertValues("A", "B", "PING", "C", "PING");clock.advanceTimeBy(1, MILLISECONDS);test.assertValues("A", "B", "PING", "C", "PING", "PING");clock.advanceTimeBy(999, MILLISECONDS);test.assertValues("A", "B", "PING", "C", "PING", "PING");events.onNext("D");test.assertValues("A", "B", "PING", "C", "PING", "PING", "D");clock.advanceTimeBy(999, MILLISECONDS);events.onNext("E");test.assertValues("A", "B", "PING", "C", "PING", "PING", "D", "E");clock.advanceTimeBy(999, MILLISECONDS);test.assertValues("A", "B", "PING", "C", "PING", "PING", "D", "E");clock.advanceTimeBy(1, MILLISECONDS);test.assertValues("A", "B", "PING", "C", "PING", "PING", "D", "E", "PING");clock.advanceTimeBy(3_000, MILLISECONDS);test.assertValues("A", "B", "PING", "C", "PING", "PING", "D", "E", "PING", "PING", "PING", "PING");
}

看起来像一堵墙,但这实际上是我们逻辑的完整测试方案。 它可以确保ping在1000毫秒后精确显示,在寂静时间很长的情况下会重复执行,而在出现真正的事件时会重复执行。 但最重要的部分是:该测试是100%可预测的并且非常快。 没有Awaitility ,忙等待,轮询,间歇性测试失败和缓慢。 我们完全控制的人工时钟可确保所有这些组合流均按预期工作。

翻译自: https://www.javacodegeeks.com/2017/09/detecting-testing-stalled-streams-rxjava-faq.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/349069.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

linux 远程挂载摄像头_如何实现嵌入式Linux下USB摄像头视频采集

展开全部在linux下所e5a48de588b662616964757a686964616f31333337613134有设备都是文件。所以对摄像头的操作其实就是对文件的操作。USB摄像头的设备文件就是在/dev目录下的video0(假如只有一个摄像头)。在linux下操作摄像头就是使用v4l2对摄像头进行视频的操作&#xff0c;操作…

Effective Java第三版有哪些新功能?

自从听说即将出版的有效Java 第三版以来&#xff0c;我一直想知道其中有什么新内容。 我假设将涵盖自Java 6以来引入Java的功能&#xff0c;的确如此。 但是&#xff0c;第三版Java开发人员经典版也有一些其他更改。 在本文中&#xff0c;我提供了有关在第三版中添加&#xff0…

es管理kabina_小白学ES 05 - 通过Kibana管理集群服务

目录前述步骤:① 启动Kibana;② 通过浏览器访问Kibana;③ 进入Dev Tools(开发者工具)界面.1 检查集群的健康状况ES提供了一套_cat API, 可以查看ES中的各类数据.# 查询API:GET /_cat/health?v# 响应信息如下:epoch timestamp cluster status node.total node.data shards pri …

+h eclipse中ctrl_Eclipse 常用的快捷键都有哪些?

今天&#xff0c;小编大概整理了 几 组 Eclipse 的快捷键&#xff0c;希望对你有帮助。1、打开资源CTRL SHIFT R&#xff1a;打开所有类型文件&#xff0c;不包括 JAR 包&#xff1b; CTRL SHIFT T&#xff1a;打开 Java 类型文件&#xff0c;包括 JAR 包&#xff1b;2、查…

apache.camel_Apache Camel 2.11发布

apache.camel上周Apache Camel 2.11发布了。 这篇博客文章总结了最引人注目的新功能和改进。 有关详细说明&#xff0c;请参见Camel 2.11发行说明 。 1&#xff09;新组件 与往常一样&#xff0c;每个新版本都包含许多新组件&#xff0c;这些组件是由我们庞大的用户群贡献的。…

c向文件中插入数据_Redis从文件中批量插入数据

简介在redis中&#xff0c;有时候需要批量执行某些命令&#xff0c;但是在redis的redis-cli下&#xff0c;只能一条条的执行指令&#xff0c;实在太麻烦了&#xff01;想到这&#xff0c;你是不是蓝瘦香菇&#xff1f; 如果能将要执行的指令一行行存储到文件中&#xff0c;然后…

用杰克逊流式传输大型JSON文件– RxJava常见问题解答

在上一篇文章中&#xff0c;我们学习了如何解析过大的XML文件并将其转换为RxJava流。 这次让我们看一个大的JSON文件。 我们的示例将基于微小的colors.json&#xff0c;其中包含将近150种这种格式的记录&#xff1a; {"aliceblue": [240, 248, 255, 1],"antiqu…

python多级目录import_你真的会用Python模块与工具包吗?

在开发过程中&#xff0c;我们无法把所有代码、资源都放在同一个文件中。因此&#xff0c;模块导入在编码中是很常见的。无论是C、Java&#xff0c;还是Python、Go。可以把不同功能、不同模块进行分离&#xff0c;当使用的时候&#xff0c;可以通过import关键字在一个模块中使用…

八边形点坐标数的lisp_图形学入门第五课:齐次坐标

齐次坐标(Homegeneous Coordinates)在学习齐次坐标之前&#xff0c;我们要先好奇的问一下&#xff0c;为什么要学习齐次坐标。上一节课&#xff0c;我们学习了变换的三种基本形式&#xff1a;旋转&#xff0c;缩放&#xff0c;和切变。但是还有一种特殊的变换&#xff1a;Trans…

spring java配置_Spring Java配置

spring java配置我发现许多我认识的Spring开发人员仍然不了解或使用Spring Java Configuration&#xff08;aka JavaConfig&#xff09;。 Spring 3.0引入了此功能&#xff0c;该功能使Spring可以完全用Java进行配置-不再需要XML&#xff01; 我真的很喜欢使用JavaConfig&#…

分段概率密度矩估计_考研数学:高数、线代、概率3科目知识框架梳理

该楼层疑似违规已被系统折叠 隐藏此楼查看此楼首先要确保常考题型&#xff0c;常考知识点非常熟练。下面从高等数学、线性代数、概率统计三个模块进行阐述。高等数学部分1.函数的极 限;数列的极 限;无穷小及阶的问题;2.微分中值定理的证明;不等式的证明;方程根的存在性及个数问…

对速度的需求,访问现有数据的速度提高了1000倍

了解如何通过使用标准Java 8流和Speedment的In-JVM-Memory加速器将分析数据库应用程序加速1000倍。 Web和移动应用程序有时会很慢&#xff0c;因为后备数据库很慢和/或与数据库的连接施加了延迟。 现代UI和交互式应用程序需要快速后端&#xff0c;并且理想情况下没有可观察到的…

mysqls压力测试怎么用_用 Swagger 测试接口,怎么在请求头中携带 Token?

松哥周末抽空给 Spring Security 系列也录制了一套视频&#xff0c;目录如下&#xff1a;感兴趣的小伙伴戳这里-->Spring BootVue微人事视频教程今天的话题来自一个小伙伴在微信上的提问&#xff1a;看到这个问题&#xff0c;松哥忽然想到我自己之前写过 Spring BootSwagger…

disruptor3_发布Disruptor 3.0.0

disruptor3我决定对整个版本的Disruptor都放置一个beta标签感到无聊&#xff0c;所以我决定将Disruptor 3.0.0发行到全世界。 此版本的最大挑战是清理代码并提出更好的算法来处理多个生产者。 如果我很幸运&#xff0c;可以更快。 在发布此版本时&#xff0c;我最初走了几个阴暗…

安卓手机背景变黑色怎么改_别着急扔掉旧手机 你的电脑可能需要它

PC玩家中&#xff0c;不少人都会有在玩游戏时观测电脑硬件状态的习惯。比如查看游戏帧数、CPU频率、GPU频率或是温度等。大多数人都是通过第三方软件&#xff0c;在游戏内把监测数据显示到电脑显示屏角落。可就算是在角落&#xff0c;这些数据依旧会阻挡游戏画面&#xff0c;在…

JDeps入门–分析项目的依赖关系

JDeps是Java依赖关系分析工具 &#xff0c;这是一个命令行工具&#xff0c;它处理Java字节码&#xff08;意味着.class文件或包含它们的JAR&#xff09;&#xff0c;并分析类之间静态声明的依赖关系。 可以用各种方式过滤结果&#xff0c;并可以将其汇总到包或JAR级别。 JDeps还…

禅道开源版用户手册_Docker搭建开源版禅道以及项目基本流程介绍

对于自学软件测试的同学来说&#xff0c;经常会遇到这样的困惑&#xff1a;测试用例怎么写&#xff1f;有啥好的模板&#xff1f;缺陷提交的模板是什么样的&#xff1f;bug的生命周期是啥&#xff1f;项目的流程是啥&#xff1f;以上这些困惑&#xff0c;在你仔细看完这篇文章后…

f12 卡 谷歌浏览器_抢券第二课:利用浏览器F12获取优惠券请求链接

抢券第二课为什么迟迟不来呢&#xff1f;因为最近京东没有那种神券需要定点抢购的&#xff0c;我也没法测试我的理论。现在京东的券随时可以领取到&#xff0c;我多没法测试的东西不想就这样欺骗你们。所以今天的第二课我们讲一讲神奇的谷歌浏览器F1201 工具准备一、浏览器这里…

Java命令行界面(第5部分):JewelCli

细算在Java命令行处理与Apache的百科全书CLI &#xff0c; args4j &#xff0c; jbock和命令行中先前的文章&#xff0c;我把注意力转向在这个岗位使用JewelCli完成的命令行参数相似的处理Java中。 几个Java命令行处理库使用批注来定义命令行选项。 到目前为止&#xff0c;本系…

dnf用虚拟机会被制裁吗_DNF: 神豪奶妈扬言, 战斗力没有超过他的, 都不配被加buff!...

要说到现在的年轻人们的交友方式绝对少不了游戏交友&#xff0c;以前的人们只要不出门那就是与世隔绝&#xff0c;而现在就算是不出门也可以在网络上结交一大帮朋友&#xff0c;游戏就是现在的年轻人们交友最多的地方之一。科技的发展让游戏进入了一个繁荣的春天&#xff0c;不…