使用RxJava和SseEmitter进行服务器发送的事件

Spring Framework 4.2 GA即将发布,让我们看一下它提供的一些新功能。 引起我注意的一个事件是一个简单的新类SseEmitter ,它是对Spring MVC控制器中易于使用的发送事件的抽象。 SSE是一项技术,使您可以在一个HTTP连接内沿一个方向将数据从服务器流式传输到浏览器。 听起来像是websocket可以做什么的子集。 但是,由于它是一个简单得多的协议,因此可以在不需要全双工的情况下使用,例如,实时推动股价变化或显示长时间运行的进程。 这将是我们的例子。

假设我们有一个具有以下API的虚拟硬币矿工:

public interface CoinMiner {BigDecimal mine() {//...}
}

每次调用mine()我们都必须等待几秒钟,才能获得大约1个硬币的回报(平均)。 如果要挖掘多个硬币,我们必须多次调用此方法:

@RestController
public class MiningController {//...@RequestMapping("/mine/{count}")void mine(@PathVariable int count) {IntStream.range(0, count).forEach(x -> coinMiner.mine());}}

这项工作有效,我们可以请求/mine/10mine()方法将执行10次。 到目前为止,一切都很好。 但是挖掘是一项占用大量CPU的任务,将计算分散到多个内核将是有益的。 此外,即使使用并行化,我们的API端点也相当慢,我们必须耐心等待直到所有工作完成而没有任何进度通知。 让我们首先修复并行性–但是,由于并行流无法控制底层线程池,因此我们来使用显式的ExecutorService

@Component
class CoinMiner {CompletableFuture<BigDecimal> mineAsync(ExecutorService executorService) {return CompletableFuture.supplyAsync(this::mine, executorService);}//...}

客户端代码必须显式提供ExecutorService (只是设计选择):

@RequestMapping("/mine/{count}")
void mine(@PathVariable int count) {final List<CompletableFuture<BigDecimal>> futures = IntStream.range(0, count).mapToObj(x -> coinMiner.mineAsync(executorService)).collect(toList());futures.forEach(CompletableFuture::join);
}

首先多次调用mineAsync ,然后(作为第二阶段)等待所有mineAsync完成并join这非常重要。 很容易写:

IntStream.range(0, count).mapToObj(x -> coinMiner.mineAsync(executorService)).forEach(CompletableFuture::join);

但是,由于Java 8中流的惰性,该任务将按顺序执行! 如果您还不习惯流的懒惰,请始终从下至上阅读它们:我们要求join一些将来的内容,以便流上升并只调用一次mineAsync() (惰性!),并将其传递给join() 。 当join()完成时,它再次上升并要求另一个Future 。 通过使用collect()我们强制所有mineAsync()执行,开始所有异步计算。 稍后我们等待每一个。

介绍

现在该变得更具反应性了(我说过了)。 控制器可以返回SseEmitter的实例。 从处理程序方法return后,容器线程将被释放并可以处理更多即将到来的请求。 但是连接没有关闭,客户端一直在等待! 我们应该做的是保留对SseEmitter实例的引用,并在以后从另一个线程调用其send()complete方法。 例如,我们可以启动一个长时间运行的进程,并保持send()从任意线程进行进度。 完成该过程后,我们complete() SseEmitter ,最后关闭HTTP连接(至少从逻辑SseEmitter ,请记住Keep-alive )。 在下面的示例中,我们有一堆CompletableFuture ,当每个CompletableFuture完成时,我们只需将1发送给客户端( notifyProgress() )。 当所有期货都完成后,我们完成流( thenRun(sseEmitter::complete) ),关闭连接:

@RequestMapping("/mine/{count}")
SseEmitter mine(@PathVariable int count) {final SseEmitter sseEmitter = new SseEmitter();final List<CompletableFuture<BigDecimal>> futures = mineAsync(count);futures.forEach(future ->future.thenRun(() -> notifyProgress(sseEmitter)));final CompletableFuture[] futuresArr = futures.toArray(new CompletableFuture[futures.size()]);CompletableFuture.allOf(futuresArr).thenRun(sseEmitter::complete);return sseEmitter;
}private void notifyProgress(SseEmitter sseEmitter) {try {sseEmitter.send(1);} catch (IOException e) {throw new RuntimeException(e);}
}private List<CompletableFuture<BigDecimal>> mineAsync(@PathVariable int count) {return IntStream.range(0, count).mapToObj(x -> coinMiner.mineAsync(executorService)).collect(toList());
}

调用此方法将产生以下响应(注意Content-Type ):

< HTTP/1.1 200 OK
< Content-Type: text/event-stream;charset=UTF-8
< Transfer-Encoding: chunked
< 
data:1data:1data:1data:1* Connection #0 to host localhost left intact

稍后我们将学习如何在客户端解释这种响应。 现在暂时让我们整理一下设计。

与引进RxJava

上面的代码有效,但是看起来很凌乱。 实际上,我们有一系列事件,每个事件都代表计算的进度。 计算最终完成,因此流也应发出信号结束。 听起来就像是Observable ! 我们从重构CoinMiner开始,以返回Observable<BigDecimal

Observable<BigDecimal> mineMany(int count, ExecutorService executorService) {final ReplaySubject<BigDecimal> subject = ReplaySubject.create();final List<CompletableFuture<BigDecimal>> futures = IntStream.range(0, count).mapToObj(x -> mineAsync(executorService)).collect(toList());futures.forEach(future ->future.thenRun(() -> subject.onNext(BigDecimal.ONE)));final CompletableFuture[] futuresArr = futures.toArray(new CompletableFuture[futures.size()]);CompletableFuture.allOf(futuresArr).thenRun(subject::onCompleted);return subject;
}

每当mineMany()返回的事件出现在Observable ,我们就mineMany()那么多硬币。 当所有期货都完成后,我们也完成了交易。 在实现方面,这看起来还没有改善,但是从控制器的角度来看,它有多干净:

@RequestMapping("/mine/{count}")
SseEmitter mine(@PathVariable int count) {final SseEmitter sseEmitter = new SseEmitter();coinMiner.mineMany(count, executorService).subscribe(value -> notifyProgress(sseEmitter),sseEmitter::completeWithError,sseEmitter::complete);return sseEmitter;
}

调用coinMiner.mineMany()我们只需订阅事件。 事实证明ObservableSseEmitter方法匹配1:1。 这里发生的事情很不言自明:启动异步计算,每当后台计算发出任何进度信号时,将其转发给客户端。 好的,让我们回到实现。 看起来很乱,因为我们将CompletableFutureObservable混合使用。 我已经描述了如何仅使用一个元素将CompletableFuture转换为Observable 。 这是一个概述,包括rx.Single从RxJava 1.0.13开始发现的rx.Single抽象(此处未使用):

public class Futures {public static <T> Observable<T> toObservable(CompletableFuture<T> future) {return Observable.create(subscriber ->future.whenComplete((result, error) -> {if (error != null) {subscriber.onError(error);} else {subscriber.onNext(result);subscriber.onCompleted();}}));}public static <T> Single<T> toSingle(CompletableFuture<T> future) {return Single.create(subscriber ->future.whenComplete((result, error) -> {if (error != null) {subscriber.onError(error);} else {subscriber.onSuccess(result);}}));}}

将这些实用程序运算符放在某个地方,我们可以改善实现并避免混合使用两个API:

Observable<BigDecimal> mineMany(int count, ExecutorService executorService) {final List<Observable<BigDecimal>> observables = IntStream.range(0, count).mapToObj(x -> mineAsync(executorService)).collect(toList());return Observable.merge(observables);
}Observable<BigDecimal> mineAsync(ExecutorService executorService) {final CompletableFuture<BigDecimal> future = CompletableFuture.supplyAsync(this::mine, executorService);return Futures.toObservable(future);
}

RxJava有一个内置的运算符,用于将多个Observable合并为一个,我们的每个基础Observable发出一个事件,这无关紧要。

深入研究RxJava运算符

让我们使用RxJava的功能来稍微改善流式传输。

scan()

当前,每次我们开采一枚硬币时,我们都会send(1)客户端send(1)事件。 这意味着每个客户都必须跟踪其已经收到的硬币数量,以便计算总的计算数量。 如果服务器始终发送总金额而不是增量,那就太好了。 但是,我们不想更改实现。 事实证明,使用Observable.scan()运算符非常简单:

@RequestMapping("/mine/{count}")
SseEmitter mine(@PathVariable int count) {final SseEmitter sseEmitter = new SseEmitter();coinMiner.mineMany(count, executorService).scan(BigDecimal::add).subscribe(value -> notifyProgress(sseEmitter, value),sseEmitter::completeWithError,sseEmitter::complete);return sseEmitter;
}private void notifyProgress(SseEmitter sseEmitter, BigDecimal value) {try {sseEmitter.send(value);} catch (IOException e) {e.printStackTrace();}
}

scan()运算符接收上一个事件和当前事件,并将它们组合在一起。 通过应用BigDecimal::add我们只需将所有数字相加即可。 例如1、1 +1,(1 +1)+1等。 scan()类似于flatMap() ,但保留中间值。

sample()采样

可能是因为我们的后端服务产生了太多的进度更新,我们无法使用。 我们不想给客户端增加不相关的更新并饱和带宽。 每秒最多发送两次更新听起来很合理。 幸运的是,RxJava也有一个内置的运算符:

Observable<BigDecimal> obs = coinMiner.mineMany(count, executorService);
obs.scan(BigDecimal::add).sample(500, TimeUnit.MILLISECONDS).subscribe(//...);

sample()将定期查看底层流,并仅发出最新的项,并丢弃中间的项。 幸运的是,我们使用scan()即时聚合了项目,因此我们不会丢失任何更新。

window() –恒定的发射间隔

不过有一个陷阱。 如果在选定的500毫秒内没有新内容出现, sample()将不会两次发出相同的项目。 很好,但是请记住我们正在通过TCP / IP连接推送这些更新。 最好是定期向客户端发送更新,即使在此期间什么也没发生–只是为了保持连接的正常运行,就像ping 。 可能有多种方法可以满足此要求,例如,涉及timeout()运算符。 我选择使用window()运算符每500毫秒对所有事件进行分组:

Observable<BigDecimal> obs = coinMiner.mineMany(count, executorService);
obs.window(500, TimeUnit.MILLISECONDS).flatMap(window -> window.reduce(BigDecimal.ZERO, BigDecimal::add)).scan(BigDecimal::add).subscribe(//...);

这是一个棘手的问题。 首先,我们将所有进度更新分组在500毫秒窗口中。 然后,我们使用reduce来计算在此时间段内开采的硬币的总数(类似于scan() )。 如果在此期间未开采任何硬币,我们只需返回ZERO 。 最后,我们使用scan()汇总每个窗口的小计。 我们不再需要sample()因为window()确保每500毫秒发出一个事件。

客户端

JavaScript中有很多SSE用法的示例,因此为您提供一种快速的解决方案,称为我们的控制器:

var source = new EventSource("/mine/10");
source.onmessage = function (event) {console.info(event);
};

我相信SseEmitter是Spring MVC的一项重大改进,它将使我们能够编写更健壮和更快的Web应用程序,需要即时的单向更新。

翻译自: https://www.javacodegeeks.com/2015/08/server-sent-events-with-rxjava-and-sseemitter.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/357471.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

常用正则表达式整理【总结】

平时不太喜欢记忆这些东西,开发的时候需要拿过来直接用就好,还有面试的时候直接让你敲的,这里记录一下。 目录 一、校验数字的表达式 二、校验字符的表达式

cmstop中实例化controller_admin_content类传递$this,其构造方法中接收到的是--名为cmstop的参数--包含cmstop中所有属性...

主程序cmstop类,实例化controller_admin_content类(接收请求后拼接的).传递cmstop所有属性过去.controller_admin_content.构造方法中接收到名称为cmstop,已经内容为cmstop所有属性 class cmstop extends object{public $app, $controller, $action, $args, $class } 设置好属性…

XPS 15 9530使用Windows10频繁发生Intel HD Graphics 4600驱动奔溃的一种解决方法

本人使用XPS 15 9530、集成显卡为Intel HD Graphics 4600、操作系统Windows 10 Pro&#xff0c;使用过程当中经常会发生集成显卡奔溃的问题&#xff0c;错误提示如下&#xff1a; Display driver stopped responding and has recovered Display driver Intel HD Graphics Drive…

安徽阜阳计算机高中学校排名,安徽阜阳排名靠前的三大高中,有争议?2020年高考成绩说话!...

安徽省阜阳市&#xff0c;古称汝阴&#xff0c;阜阳历史悠久、文化璀璨、人才辈出&#xff0c;有阜阳剪纸等国家非物质文化遗产&#xff0c;也是管仲、鲍叔牙等历史名人的故乡&#xff1b;阜阳风景秀丽&#xff0c;辖区内有诸多知名景点&#xff0c;其中八里河风景区为国家AAAA…

WAF安恒

http://wenku.baidu.com/view/c242927f581b6bd97e19ea1a.html?fromsearch转载于:https://www.cnblogs.com/diyunpeng/p/5317630.html

java se和java_Java:改进了Java SE 6和Java SE 7的客户端和桌面部分!

java se和javaJava 6和Java 7中的客户端改进 了解有关Java SE 6和Java SE 7的客户端和桌面部分的改进&#xff0c;包括新的applet插件&#xff0c;Java Deployment Toolkit&#xff0c;成形和半透明的窗口&#xff0c;重量级-轻量级混合以及Java Web Start。 介绍 自2006年12月…

【vue系列】小白入门篇,一天就能掌握vue开发技巧及规则

摘要:简单做了一个简单的vue入门,了解最基础的知识点,环境的配置,创建脚手架项目,创建uni-app项目。 vue官网文档:https://cn.vuejs.org/ uni-app官网文档:https://uniapp.dcloud.io/ HBuilderX:https://www.dcloud.io/hbuilderx.html 文章中涉及的代码下载 vue:https:/…

液位单闭环实验计算机控制,过程控制实验指导书

内容简介&#xff1a;过程控制实验指导书目 录第一章 前言............... ..........................................3第二章 过程控制仪表实验1&#xff0e; 压力、液位变送器的认识和校验.....................................52&#xff0e;调节器的认识和校验&#xff…

数组练习2

结对开发&#xff1a;张哲 张晓菲 题目&#xff1a;返回一个数组中子数组最大和&#xff0c;数组可以首尾相连。 一、实验思路 本次实验在第一次的基础上增加了一些难度&#xff0c;数组可以首尾相连组成一个环&#xff0c;我们两个经过思考和讨论后得到一个方法&#xff1a; …

Java 8中最快的垃圾收集器是什么?

OpenJDK 8具有几种垃圾收集器算法&#xff0c;例如Parallel GC &#xff0c; CMS和G1 。 哪一个最快&#xff1f; 如果默认的GC从Java 8中的并行GC更改为Java 9中的G1&#xff08;当前建议&#xff09;&#xff0c;将会发生什么&#xff1f; 让我们对其进行基准测试。 基准方法…

如何做好内容策划并完成一篇合格的深度文?

目录 常见的策划方案有哪些类型? 如何思考策划方向? 如何确定内容形式?

在计算机硬件中mo是指,计算机导论 - [课件]第2章 计算机系统的硬件.ppt

计算机导论 - [课件]第2章 计算机系统的硬件微操作控制部件(MOCU)可有下列两种实现方案&#xff1a; 组合逻辑控制 微程序控制 CPU 主存储器 I/O接口 DMA控制器 I/O设备 总线 交换数据 ① ② ④ ③ 3. 直接存储器存取方式DMA P83 上一页 返 回 下一页 * 教学小结 常见的输入输出…

System.Timers.Timer 嵌套 System.Windows.Forms.Timer的问题

如题“System.Timers.Timer 嵌套 System.Windows.Forms.Timer的问题”&#xff0c;最近在项目中在类uc_Map中启用了System.Timers.Timer&#xff0c;并在Timer的Timer_Elapsed方法中需要启动或停止GMapMarkerDirection markerPlane类中的System.Windows.Forms.Timer&#xff0c…

【粉丝需求】如何把一个前端网页都搞下来?

一般比较简单的就是展示型网站,这类网站 最好仿制,如果带后台的不太好获取完整后台代码,但是搞一下前端代码还是可以的。一般前端不管用什么框架,基础元素由html+css+javaScript组成。 声明:本文仅仅提供一种思路,如有对站点侵权的地方,请联系博主删除。 我用的是HB-X,…

韦冬雪计算机应用,捕获效应下RFID防碰撞算法的研究与应用

摘要&#xff1a;作为物联网核心技术之一的射频识别(Radio Frequency Identification,RFID)技术,其应用市场正随着物联网的普及而拓宽.阅读器和标签是RFID系统的重要组成部分,阅读器负责发出查询命令,标签负责响应命令.当多个标签同时向同一个阅读器发送响应命令时会发生标签碰…

九、其他常用命令

一、挂载命令 可以理解为windows当中的分配盘符操作 1.查询与自动挂载 [rootlocalhost ~]# mount #查询系统中已经挂载的设备 [rootlocalhost ~]# mount –a #依据配置文件/etc/fstab的内容&#xff0c;自动挂载一遍 只要按照对应的格式将相应的分区添加到list中&#xff0c;那…

王凯1987计算机系,计算机科学与技术系王凯:付出总有回报

首先&#xff0c;我真的很高兴能拿到这个奖&#xff0c;毕竟&#xff0c;这是对我自己大一这半年付出的一种肯定&#xff0c;也是对今后学习的很大鼓励。我想&#xff0c;学校之所以设立奖学金&#xff0c;真正的目的也正如此吧。听到这个信息时&#xff0c;我感到惊喜又意外&a…

【ECharts系列|01入门】 从入门到天黑【入门级教程实战】

ECharts 是一个使用 JavaScript 实现的开源可视化库&#xff0c;涵盖各行业图表&#xff0c;满足各种需求。 ECharts 遵循 Apache-2.0 开源协议&#xff0c;免费商用。 ECharts 兼容当前绝大部分浏览器&#xff08;IE8/9/10/11&#xff0c;Chrome&#xff0c;Firefox&#xff0…

【LeetCode】1. Two Sum

题目&#xff1a; Given an array of integers, return indices of the two numbers such that they add up to a specific target. You may assume that each input would have exactly one solution. 从给定的一个整数数组中找出两个数&#xff0c;使得它们的和为target&…