RxJava:从未来到可观察

大约4年前,我第一次在Matthew Podwysocki的博客上遇到了Reactive Extensions ,但是直到我几周前看到Matthew在Code Mesh上发表演讲之后,我才对它有所了解。

它似乎最近变得越来越流行,我注意到,现在有一个由Netflix编写的Java版本RxJava 。

我以为可以尝试通过更改在探索cypher的MERGE函数时暴露的Observable而不是Future的代码来尝试一下。

回顾一下,我们有50个线程,我们进行了100次迭代,在这些迭代中我们创建了随机(用户,事件)对。 我们最多创建10个用户和50个事件,并且目标是同时发送相同对的请求。

在另一篇文章的示例中,我丢弃了每个查询的结果,而在这里我返回了结果,因此我有一些要订阅的内容。

代码的轮廓如下所示:

public class MergeTimeRx
{public static void main( final String[] args ) throws InterruptedException, IOException{String pathToDb = "/tmp/foo";FileUtils.deleteRecursively( new File( pathToDb ) );GraphDatabaseService db = new GraphDatabaseFactory().newEmbeddedDatabase( pathToDb );final ExecutionEngine engine = new ExecutionEngine( db );int numberOfThreads = 50;int numberOfUsers = 10;int numberOfEvents = 50;int iterations = 100;Observable<ExecutionResult> events = processEvents( engine, numberOfUsers, numberOfEvents, numberOfThreads, iterations );events.subscribe( new Action1<ExecutionResult>(){@Overridepublic void call( ExecutionResult result ){for ( Map<String, Object> row : result ){}}} );....}}

使用RxJava的好处是,没有提到我们如何获取ExecutionResult的集合,这并不重要。 我们只有它们的流,并且通过在Observable上调用订阅函数,只要有另一个函数可用,我们就会得到通知。

我发现的大多数示例都显示了如何从单个线程生成事件,但是我想使用线程池,以便可以同时触发许多请求。 processEvents方法最终看起来像这样:

private static Observable<ExecutionResult> processEvents( final ExecutionEngine engine, final int numberOfUsers, final int numberOfEvents, final int numberOfThreads, final int iterations ){final Random random = new Random();final List<Integer> userIds = generateIds( numberOfUsers );final List<Integer> eventIds = generateIds( numberOfEvents );return Observable.create( new Observable.OnSubscribeFunc<ExecutionResult>(){@Overridepublic Subscription onSubscribe( final Observer<? super ExecutionResult> observer ){final ExecutorService executor = Executors.newFixedThreadPool( numberOfThreads );List<Future<ExecutionResult>> jobs = new ArrayList<>();for ( int i = 0; i < iterations; i++ ){Future<ExecutionResult> job = executor.submit( new Callable<ExecutionResult>(){@Overridepublic ExecutionResult call(){Integer userId = userIds.get( random.nextInt( numberOfUsers ) );Integer eventId = eventIds.get( random.nextInt( numberOfEvents ) );return engine.execute("MERGE (u:User {id: {userId}})\n" +"MERGE (e:Event {id: {eventId}})\n" +"MERGE (u)-[:HAS_EVENT]->(e)\n" +"RETURN u, e",MapUtil.map( "userId", userId, "eventId", eventId ) );}} );jobs.add( job );}for ( Future<ExecutionResult> future : jobs ){try{observer.onNext( future.get() );}catch ( InterruptedException | ExecutionException ignored ){}}observer.onCompleted();executor.shutdown();return Subscriptions.empty();}} );}

我不确定这是否是使用Observable的正确方法,因此如果我记错了,请在评论中让我知道。

我不确定处理错误的正确方法是什么。 我最初在catch块中调用了observer#onError ,但这意味着不会再产生不是我想要的事件。

如果您想使用它,该代码可以作为要点 。 我添加了以下依赖关系以获得RxJava库:

<dependency><groupId>com.netflix.rxjava</groupId><artifactId>rxjava-core</artifactId><version>0.15.1</version></dependency>

参考: RxJava 从未来到我们的JCG合作伙伴 Mark Needham在Mark Needham Blog博客上均可观察到。

翻译自: https://www.javacodegeeks.com/2014/01/rxjava-from-future-to-observable.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/365997.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

手机站CSS

手机web——自适应网页设计&#xff08;html/css控制&#xff09;内核&#xff1a;-ms- /* IE 9 */-moz- /* Firefox */-webkit- /* Safari and Chrome */-o- /* Opera */一. 网页宽度自动适应手机屏幕的宽度&#xff0c;在head标签内加上以下内容&#xff1a;<meta na…

linux的kerne启动过程,linux

333.6Khttp://www.vuse.vanderbilt.edu/~srs/linux-papers-talks/iee-2002.pdfvuse.vanderbilt.edu全网免费399.3Khttp://www.stillhq.com/pdfdb/000478/data.pdfstillhq.com全网免费333.6Khttp://www.researchgate.net/profile/Gillian_Heller/publication/220386856_Maintain…

设计模式(五)--工厂模式汇总

LZ想把简单工厂模式、工厂方法模式和抽象工厂模式整理到一篇博文当中&#xff0c;由浅入深&#xff0c;应该能方便理解和记忆&#xff0c;话不多说&#xff0c;进入正题。 一、简单工厂模式 定义&#xff1a;从设计模式的类型上来说&#xff0c;简单工厂模式是属于创建型模式&a…

如何估算内存消耗?

这个故事可以追溯到至少十年之前&#xff0c;当时我第一次接触PHB时遇到一个问题&#xff1a;“在生产部署中&#xff0c;我们需要购买多大服务器”。 我们正在构建的新的&#xff0c;闪亮的系统距离生产开始还有9个月的时间&#xff0c;显然该公司已承诺提供包括硬件在内的整个…

python爬取b站403_Python如何爬取b站热门视频并导入Excel

代码如下 #encoding:utf-8 import requests from lxml import etree import xlwt import os # 爬取b站热门视频信息 def spider(): video_list [] url "https://www.bilibili.com/ranking?spm_id_from333.851.b_7072696d61727950616765546162.3" html requests.g…

url文件的格式

[DEFAULT]BASEURL[InternetShortcut]URLWorkingDirectoryShowCommandIconIndexIconFileModifiedHotKey  其中BASEURL、URL和WorkingDirectory这3项的含义是不言而明的。ShowCommand规定Internet Explorer启动后窗口的初始状态&#xff1a;7表示最小化&#xff0c;3表示最大化…

vscode 编辑器常用快捷键

最近&#xff0c;打算换个编辑器&#xff0c;而 vscode 是一个不错的选择。大部分快捷键和 sublime 还是很像的&#xff0c;但有些也不一样。特此整理一份小笔记。 参考&#xff1a; vscode: Visual Studio Code 常用快捷键非常全的VsCode快捷键常用快捷键 主命令&#xff1a;c…

linux sudo永久免密码,linux 免密码 使用sudo 直接使用root权限执行命令

1.切换到root用户下,怎么切换就不用说了吧,不会的本身百度去.百度2.添加sudo文件的写权限,命令是:权限chmod uw /etc/sudoers密码3.编辑sudoers文件文件vi /etc/sudoersvi找到这行 root ALL(ALL) ALL,在他下面添加xxx ALL(ALL) ALL (这里的xxx是你的用户名)sudops:这里说下你能…

Derek解读Bytom源码-创世区块

作者&#xff1a;Derek 简介 Github地址&#xff1a;https://github.com/Bytom/bytom Gitee地址&#xff1a;https://gitee.com/BytomBlockchain/bytom 本章介绍Derek解读-Bytom源码分析-创世区块 作者使用MacOS操作系统&#xff0c;其他平台也大同小异 Golang Version: 1.8 创…

使用调试器进行事后跟踪

我最近一直在使用的大多数调试器的好功能是能够在断点上记录信息。 这对理解代码而无需修改是非常有用的&#xff0c;它涉及字节码修改。 让我们考虑一下这种非常琐碎且效率低下的函数实现&#xff0c;以返回斐波那契数列中的第n个数字。 public class Fib {public long fib(…

链表排序c++代码_[链表面试算法](一) 链表的删除-相关题型总结(6题)

在数据结构的最高层抽象里&#xff0c;只有两种结构&#xff0c;数组和链表。这两种结构&#xff0c;是所有其他数据结构实现的基础。队列和栈&#xff0c;可以用链表和数组来实现。图&#xff0c;可以用邻接表和邻接矩阵来实现&#xff0c;其中&#xff0c;邻接表就是链表&…

JS原生方法实现jQuery的ready()

浏览器加载页面的顺序&#xff1a; 1、 解析HTML结构 2、 加载外部脚本和样式表文件 3、 解析并执行脚本代码 4、 构造HTML DOM模型ready() 5、 加载图片等组件 6、 页面加载完毕onload() ready事件是在DOM模型构造完毕时触发 load事件是在页面加载完毕后触发 function…

程序员高效工具列表

FQ必备 *** 文件管理器 wox 或者 Listary , everything 截图软件 Snipaste 下载器 Fish 冰点 Markdown 工具 Typora 图床工具 PicGo 思维导图 Xmind 抓包工具 Wireshark 协议工具 Fiddler 接口测试工具 PostMan 剪切板工具 Ditto 害怕截图丢失&#xff1f; 博客园…

c语言如何空格键返回主菜单,C语言中scanf函数与空格回车的用法说明

众所周知&#xff0c;C语言中的scanf函数的作用是从标准输入设备(通常是键盘)读取输入值&#xff0c;并存储到参数列表中指针所指向的内存单元。下面从几个方面说一下一些稍微细节的东西。下面的实验都在vc6.0中通过。1、scanf的返回值scanf通常返回的是成功赋值(从标准输入设备…

Linear_algebra_03_矩阵

1. 矩阵的线性运算&#xff1a; 2.1 矩阵的乘法&#xff1a;Xik * Ykj Zij 2.2 矩阵乘法性质&#xff1a; 3.1 矩阵的幂次方运算 3.2 矩阵转置的运算律 3.3 方阵运算 4 分块矩阵的运算 5. 矩阵的初等变换 5.1 单位矩阵I经过一次初等变换所得到的矩阵称为初等矩阵. 5.2 初等矩…

在Activiti中执行自定义查询

&#xff08;这可能最终会出现在Activiti 5.15版本的用户指南中&#xff0c;但是我已经想共享它了&#xff09; Activiti API允许使用高级API与数据库进行交互。 例如&#xff0c;对于检索数据&#xff0c;查询API和本机查询API的用法很强大。 但是&#xff0c;对于某些用例&a…

jquery ready方法实现原理

先看这两句代码&#xff1a;window.addEventListener(load,loaded,false);document.addEventListener(DOMContentLoaded,loaded,false);总结&#xff1a;load事件是在页面所有元素都加载完后触发;DOMContentLoaded&#xff0c;它是指dom tree加载完就触发;防止了页面加载被堵塞…

js转json工具_菜鸟丨Egert3D微信小游戏发布与Unity工具使用

本次教程将会为大家介绍Egret3D工具导出Unity场景对象的使用&#xff0c;以及发布微信小游戏流程。让大家对Egret 3D有更加熟悉的了解。需求工具&#xff1a;1、Unity场景导出插件&#xff1b;2、微信开发者工具。导出插件的使用一、打开需要导出的Unity场景&#xff0c;并且把…

MySQL----示例知识点整理

示例语句&#xff1a; select count(0),hour(c.created_at) from behavior_client_view c join behavior_share son c.share_uuids.uuidwhere s.agent_uuid(select uuid from user where mobile12606666333 and deleted0)and DATE_FORMAT(c.created_at,%Y-%m) >DATE_FORMAT(…

c语言c1变成e并输出,【图片】(原创)用纯C变了个变色输出字符的程序。。。【c语言吧】_百度贴吧...

该楼层疑似违规已被系统折叠 隐藏此楼查看此楼#include#include#includemain(){char c;int i,j,k,l,m,n,o;int x,y;char c1;int a,b,d,e;x35;y8;textcolor(13);gotoxy(35,10),cprintf("photoshop2014");textcolor(11);gotoxy(20,11);cprintf(" My name is zhou …