大约4年前,我第一次在Matthew Podwysocki的博客上遇到了Reactive Extensions ,但是直到我几周前看到Matthew在Code Mesh上发表演讲之后,我才对它有所了解。
它似乎最近变得越来越流行,我注意到,现在有一个由Netflix编写的Java版本RxJava 。
我以为可以尝试通过更改在探索cypher的MERGE函数时暴露的Observable而不是Future的代码来尝试一下。
回顾一下,我们有50个线程,我们进行了100次迭代,在这些迭代中我们创建了随机(用户,事件)对。 我们最多创建10个用户和50个事件,并且目标是同时发送相同对的请求。
在另一篇文章的示例中,我丢弃了每个查询的结果,而在这里我返回了结果,因此我有一些要订阅的内容。
代码的轮廓如下所示:
public class MergeTimeRx
{public static void main( final String[] args ) throws InterruptedException, IOException{String pathToDb = "/tmp/foo";FileUtils.deleteRecursively( new File( pathToDb ) );GraphDatabaseService db = new GraphDatabaseFactory().newEmbeddedDatabase( pathToDb );final ExecutionEngine engine = new ExecutionEngine( db );int numberOfThreads = 50;int numberOfUsers = 10;int numberOfEvents = 50;int iterations = 100;Observable<ExecutionResult> events = processEvents( engine, numberOfUsers, numberOfEvents, numberOfThreads, iterations );events.subscribe( new Action1<ExecutionResult>(){@Overridepublic void call( ExecutionResult result ){for ( Map<String, Object> row : result ){}}} );....}}
使用RxJava的好处是,没有提到我们如何获取ExecutionResult的集合,这并不重要。 我们只有它们的流,并且通过在Observable上调用订阅函数,只要有另一个函数可用,我们就会得到通知。
我发现的大多数示例都显示了如何从单个线程生成事件,但是我想使用线程池,以便可以同时触发许多请求。 processEvents方法最终看起来像这样:
private static Observable<ExecutionResult> processEvents( final ExecutionEngine engine, final int numberOfUsers, final int numberOfEvents, final int numberOfThreads, final int iterations ){final Random random = new Random();final List<Integer> userIds = generateIds( numberOfUsers );final List<Integer> eventIds = generateIds( numberOfEvents );return Observable.create( new Observable.OnSubscribeFunc<ExecutionResult>(){@Overridepublic Subscription onSubscribe( final Observer<? super ExecutionResult> observer ){final ExecutorService executor = Executors.newFixedThreadPool( numberOfThreads );List<Future<ExecutionResult>> jobs = new ArrayList<>();for ( int i = 0; i < iterations; i++ ){Future<ExecutionResult> job = executor.submit( new Callable<ExecutionResult>(){@Overridepublic ExecutionResult call(){Integer userId = userIds.get( random.nextInt( numberOfUsers ) );Integer eventId = eventIds.get( random.nextInt( numberOfEvents ) );return engine.execute("MERGE (u:User {id: {userId}})\n" +"MERGE (e:Event {id: {eventId}})\n" +"MERGE (u)-[:HAS_EVENT]->(e)\n" +"RETURN u, e",MapUtil.map( "userId", userId, "eventId", eventId ) );}} );jobs.add( job );}for ( Future<ExecutionResult> future : jobs ){try{observer.onNext( future.get() );}catch ( InterruptedException | ExecutionException ignored ){}}observer.onCompleted();executor.shutdown();return Subscriptions.empty();}} );}
我不确定这是否是使用Observable的正确方法,因此如果我记错了,请在评论中让我知道。
我不确定处理错误的正确方法是什么。 我最初在catch块中调用了observer#onError ,但这意味着不会再产生不是我想要的事件。
如果您想使用它,该代码可以作为要点 。 我添加了以下依赖关系以获得RxJava库:
<dependency><groupId>com.netflix.rxjava</groupId><artifactId>rxjava-core</artifactId><version>0.15.1</version></dependency>
翻译自: https://www.javacodegeeks.com/2014/01/rxjava-from-future-to-observable.html