使用RxNetty访问Meetup的流API

本文将涉及多个主题:响应式编程,HTTP,解析JSON以及与社交API集成。 完全在一个用例中:我们将通过非夸张的RxNetty库实时加载和处理新的metup.com事件,结合Netty框架的强大功能和RxJava库的灵活性。 Meetup提供了公开可用的流API ,可实时推送世界各地注册的每一个Meetup。 只需浏览至stream.meetup.com/2/open_events并观察JSON块如何缓慢地出现在屏幕上。 每当有人创建新事件时,自包含的JSON就会从服务器推送到您的浏览器。 这意味着这样的请求永无止境,相反,只要需要,我们就会不断接收部分数据。 我们已经在将Twitter4J变成RxJava的Observable中研究了类似的情况。 每个新的Meetup事件都会发布一个独立的JSON文档,与此类似(省略很多细节):

{ "id" : "219088449","name" : "Silver Wings Brunch","time" : 1421609400000,"mtime" : 1417814004321,"duration" : 900000,"rsvp_limit" : 0,"status" : "upcoming","event_url" : "http://www.meetup.com/Laguna-Niguel-Social-Networking-Meetup/events/219088449/","group" : { "name" : "Former Flight Attendants South Orange and North San Diego Co","state" : "CA"...},"venue" : { "address_1" : "26860 Ortega Highway","city" : "San Juan Capistrano","country" : "US"...},"venue_visibility" : "public","visibility" : "public","yes_rsvp_count" : 1...
}

每当我们长时间轮询的HTTP连接(带有Transfer-Encoding: chunked响应标头)推送此类JSON时,我们都希望对其进行解析并以某种方式进一步传递。 我们讨厌回调,因此RxJava似乎是一个合理的选择(认为: Observable<Event> )。

步骤1:使用RxNetty接收原始数据

我们不能使用普通的HTTP客户端,因为它们专注于请求-响应语义。 这里没有任何响应,我们只是永远保持打开的连接,并在数据到达时使用它们。 RxJava具有开箱即用的RxApacheHttp库,但它假定为text/event-stream内容类型 。 相反,我们将使用底层的通用RxNetty库。 它是Netty(duh!)的包装,并且能够实现任意的 TCP / IP(包括HTTP)以及UDP客户端和服务器。 如果您不了解Netty,则它是基于数据包的,而不是面向流的,因此我们可以预期每次Meetup推送都会有一个Netty事件。 该API当然不是简单明了的,但是一旦您使用它,它就会变得有意义:

HttpClient<ByteBuf, ByteBuf> httpClient = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder("stream.meetup.com", 443).pipelineConfigurator(new HttpClientPipelineConfigurator<>()).withSslEngineFactory(DefaultFactories.trustAll()).build();final Observable<HttpClientResponse> responses = httpClient.submit(HttpClientRequest.createGet("/2/open_events"));
final Observable byteBufs = responses.flatMap(AbstractHttpContentHolder::getContent);
final Observable chunks = byteBufs.map(content -> content.toString(StandardCharsets.UTF_8));

首先,我们创建HttpClient并设置SSL(请注意,关于服务器证书的trustAll()可能不是最佳的生产设置)。 稍后,我们submit() GET请求并接收Observable<HttpClientResponse<ByteBuf>>作为回报。 ByteBuf是Netty对通过网络发送或接收的一堆字节的抽象。 该观察结果将立即告诉我们从Meetup收到的每条数据。 从响应中提取ByteBuf ,我们将其转换为包含上述JSON的String 。 到目前为止,一切正常。

步骤2:将数据包与JSON文档对齐

Netty非常强大,因为它不会掩盖泄漏抽象所固有的复杂性。 每次通过TCP / IP线路接收到某些内容时,都会通知我们。 您可能会相信,当服务器发送100字节时,客户端的Netty会通知我们有关这100字节的信息。 但是,TCP / IP堆栈可以自由地拆分和合并您通过有线发送的数据,特别是因为它假定是流,因此如何将其拆分为数据包无关紧要。 Netty的文档中对此警告做了很大的解释。 对我们意味着什么? 当Meetup发送单个事件时,我们可能仅收到一个可观察到的chunks String 。 但是同样可以将其划分为任意数量的数据包,因此chunks将发出多个String 。 更糟糕的是,如果Meetup接连发送两个事件,则它们可能适合一个数据包。 在这种情况下, chunks将发出一个带有两个独立JSON文档的String 。 事实上,我们不能假设JSON字符串和收到的网络数据包之间有任何对齐。 我们所知道的是,代表事件的各个JSON文档由换行符分隔。 令人惊讶的是, RxJavaString官方附加组件RxJavaString提供了一种精确的方法:

Observable jsonChunks = StringObservable.split(chunks, "\n");

实际上,甚至还有更简单的StringObservable.byLine(chunks) ,但它使用的是平台相关的行尾。 最好在官方文档中解释split()作用:

圣分裂

现在我们可以安全地解析jsonChunks发出的每个String了:

步骤3:解析JSON

有趣的是,这一步骤并不是那么简单。 我承认,我排序的享受WSDL时间,因为我很容易,可预见生成如下web服务的合同Java模型。 JSON,特别是在JSON模式的边缘市场渗透方面,基本上是集成的“狂野西部”。 通常,您会得到非正式的文档或请求和响应的样本。 没有类型信息或格式,无论字段是否为必填项,等等。此外,由于我不情愿使用地图映射 (在那里,Clojure程序员),为了使用基于JSON的REST服务,我必须自己编写映射POJO。 好吧,有解决方法。 首先,我举了一个由Meetup流API生成的JSON的代表性示例,并将其放在src/main/json/meetup/event.json 。 然后,我使用jsonschema2pojo-maven-plugin ( 也存在Gradle和Ant版本)。 插件的名称令人困惑,它还可以与JSON示例(不仅是架构)一起使用以生成Java模型:

<plugin><groupId>org.jsonschema2pojo</groupId><artifactId>jsonschema2pojo-maven-plugin</artifactId><version>0.4.7</version><configuration><sourceDirectory>${basedir}/src/main/json/meetup</sourceDirectory><targetPackage>com.nurkiewicz.meetup.generated</targetPackage><includeHashcodeAndEquals>true</includeHashcodeAndEquals><includeToString>true</includeToString><initializeCollections>true</initializeCollections><sourceType>JSON</sourceType><useCommonsLang3>true</useCommonsLang3><useJodaDates>true</useJodaDates><useLongIntegers>true</useLongIntegers><outputDirectory>target/generated-sources</outputDirectory></configuration><executions><execution><id>generate-sources</id><phase>generate-sources</phase><goals><goal>generate</goal></goals></execution></executions>
</plugin>

此时,Maven将创建与Jackson兼容的Event.javaVenue.javaGroup.java等:

private Event parseEventJson(String jsonStr) {try {return objectMapper.readValue(jsonStr, Event.class);} catch (IOException e) {throw new UncheckedIOException(e);}
}

很好,它很好:

final Observableevents = jsonChunks.map(this::parseEventJson);

步骤5:获利!!!

有了Observable<Event>我们可以实现一些非常有趣的用例。 是否要查找刚刚创建的波兰所有聚会的名称? 当然!

events.filter(event -> event.getVenue() != null).filter(event -> event.getVenue().getCountry().equals("pl")).map(Event::getName).forEach(System.out::println);

寻找统计信息每分钟创建多少个事件? 没问题!

events.buffer(1, TimeUnit.MINUTES).map(List::size).forEach(count -> log.info("Count: {}", count));

或者,也许您想继续搜索将来最远的聚会,而不是寻找比已发现的聚会更近的聚会?

events.filter(event -> event.getTime() != null).scan(this::laterEventFrom).distinct().map(Event::getTime).map(Instant::ofEpochMilli).forEach(System.out::println);//...private Event laterEventFrom(Event first, Event second) {return first.getTime() > second.getTime() ?first :second;
}

此代码过滤掉未知时间的事件,发出当前事件或前一个事件( scan() ),具体取决于后面的事件,过滤出重复事件并显示时间。 这个运行了几分钟的小程序已经发现一个计划于2015年11月创建的聚会,而在撰写本文时,聚会是2014年12月。 可能性是无止境的。

希望我能对如何轻松地将各种技术融合在一起有一个很好的了解:反应式编程以编写超快速的网络代码,无需样板代码的类型安全JSON解析,以及RxJava来快速处理事件流。 请享用!

翻译自: https://www.javacodegeeks.com/2014/12/accessing-meetups-streaming-api-with-rxnetty.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/360951.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

js、react对象名和对象属性赋值

const resValue {}; resValue[standards${standardsNumber}] ""; Console.log(:test"&#xff0c;resValue )//

TIBCO产品的微服务和DevOps

如今&#xff0c;每个人都在谈论微服务。 您可以在数百篇文章和博客文章中读到很多有关微服务的信息。 马丁福勒 &#xff08; Martin Fowler &#xff09;的文章是一个很好的起点&#xff0c;该文章引发了有关这种新架构概念的大量讨论。 另一个很棒的资源是独立于供应商的分…

使用Degraph管理软件包依赖关系

软件开发领域的很大一部分是使系统的复杂性尽可能地低。 但是复杂性到底是什么&#xff1f; 虽然确切的语义有很大不同&#xff0c;但取决于您询问的人&#xff0c;大多数人可能都认为这与系统中部件的数量及其交互有很大关系。 考虑太空中的大理石&#xff0c;即行星&#xf…

[转载] 应急管理体系及其业务流程研究

转载于:https://www.cnblogs.com/6DAN_HUST/archive/2013/03/04/2942337.html

WP8手机上的图标

一直不清楚WP8手机上两个圆的标志是什么意思&#xff0c;今天看到下面的链接&#xff0c;终于搞明白了&#xff0c;原来是打开了GPS就有。 http://www.windowsphone.com/en-us/how-to/wp8/basics/what-do-the-icons-on-my-phone-mean 转载于:https://www.cnblogs.com/wonderow/…

ASIHTTPRequest类库简介和使用说明

官方网站&#xff1a; http://allseeing-i.com/ASIHTTPRequest/ 。可以从上面下载到最新源码&#xff0c;以及获取到相关的资料。 使用iOS SDK中的HTTP网络请求API&#xff0c;相当的复杂&#xff0c;调用很繁琐&#xff0c;ASIHTTPRequest就是一个对CFNetwork API进行了封装&a…

UltraESB的首选IDE – IntelliJ IDEA

在AdroitLogic&#xff0c;我们长期以来一直在使用IntelliJ IDEA进行开发。 它是Java和相关语言/技术的最佳IDE&#xff08;它可能也是许多其他语言的选择&#xff0c;但我的经验主要是Java和相关技术&#xff09;。 Groovy和IDEA的Grails的集成很棒。 通过自动发现JDBC驱动程…

跟我一步一步开发自己的Openfire插件

这篇是简单插件开发&#xff0c;下篇聊天记录插件。 开发环境&#xff1a; System&#xff1a;Windows WebBrowser&#xff1a;IE6、Firefox3 JavaEE Server&#xff1a;tomcat5.0.2.8、tomcat6 IDE&#xff1a;eclipse、MyEclipse 8开发依赖库&#xff1a; Jdk1.6、jasper-com…

Apache FOP与Eclipse和OSGi的集成

Apache FOP是由XSL格式化对象&#xff08; XSL-FO &#xff09;驱动的开源打印处理器。 例如&#xff0c;将数据对象转换为PDF可能非常有用。 但是&#xff0c;将其集成到PDE中并最终以OSGi Service的形式运行并最终显得有些麻烦。 因此&#xff0c;我提供了一个P2存储库&…

不删除侦听器–使用ListenerHandles

听一个可观察的实例并对它的变化做出反应很有趣。 做一些必要的事情来打断或结束这种聆听会变得很有趣。 让我们看看问题的根源和解决方法。 总览 这篇文章将首先讨论这种情况&#xff0c;然后再讨论常见的方法和问题所在。 然后&#xff0c;它将提供解决大多数问题的简单抽象…

使用Google Guava Cache进行本地缓存

很多时候&#xff0c;我们将不得不从数据库或另一个Web服务获取数据或从文件系统加载数据。 在涉及网络呼叫的情况下&#xff0c;将存在固有的网络等待时间&#xff0c;网络带宽限制。 解决此问题的方法之一是在应用程序本地拥有一个缓存。 如果您的应用程序跨越多个节点&…

JAX-RS 2.0:服务器端处理管道

这篇文章的灵感来自JAX-RS 2.0规范文档 &#xff08;附录C&#xff09;中的Processing Pipeline部分。 我喜欢它是因为它提供了JAX-RS中所有模块的漂亮快照-以准备好吞咽的胶囊形式&#xff01; 礼貌– JAX-RS 2.0规范文档 因此&#xff0c;我想到了使用此图简要概述不同的JA…

基于TCP/IP的文件服务器编程一例

来源&#xff0c;华清远见嵌入式学院实验手册&#xff0c;代码来源&#xff1a;华清远见曾宏安 实现的功能&#xff1a; 编写TCP文件服务器和客户端。客户端可以上传和下载文件 客户端支持功能如下&#xff1a; 1.支持一下命令 help 显示客户端所有命令和说明 list 显示服务器…

【Linux系统基础】(2)在Linux上部署MySQL、RabbitMQ、ElasticSearch、Zookeeper、Kafka、NoSQL等各类软件

实战章节&#xff1a;在Linux上部署各类软件 前言 为什么学习各类软件在Linux上的部署 在前面&#xff0c;我们学习了许多的Linux命令和高级技巧&#xff0c;这些知识点比较零散&#xff0c;同学们跟随着课程的内容进行练习虽然可以基础掌握这些命令和技巧的使用&#xff0c;…

JDK 7和JDK 8中大行读取速度较慢的原因

我之前发布了博客文章“使用JDK 7和JDK 8读取慢速行”&#xff0c;并且在该问题上有一些有用的评论来描述该问题。 这篇文章提供了更多解释&#xff0c;说明为何该文章中演示的文件读取&#xff08;并由Ant的LineContainsRegExp使用 &#xff09;在Java 7和Java 8中比在Java 6中…

Spring Stateless State Security第3部分:JWT +社会认证

我的Stateless Spring Security系列文章的第三部分也是最后一部分是关于将基于JWT令牌的身份验证与spring-social-security混合在一起的。 这篇文章直接建立在此基础上&#xff0c;并且主要集中在已更改的部分上。 想法是使用基于OAuth 2的“使用Facebook登录”功能来替换基于用…

nyoj239 月老的难题 二分图 匈牙利算法

月老的难题 时间限制&#xff1a;1000 ms | 内存限制&#xff1a;65535 KB难度&#xff1a;4描述月老准备给n个女孩与n个男孩牵红线&#xff0c;成就一对对美好的姻缘。 现在&#xff0c;由于一些原因&#xff0c;部分男孩与女孩可能结成幸福的一家&#xff0c;部分可能不会结…

Web应用程序体系结构– Spring MVC – AngularJs堆栈

Spring MVC和AngularJs共同为构建表单密集型Web应用程序提供了一个真正高效且吸引人的前端开发堆栈。在这篇博客文章中&#xff0c;我们将看到如何使用这些技术构建表单密集型Web应用程序&#xff0c;并将这种方法与其他方法进行比较可用选项。 可以在此github 存储库中找到功能…

antd Datepicker组件报错 ——date.clone is not a function或者date1.isAfter is not a function

问题描述&#xff1a; antd Datepicker组件报错 ——date.clone is not a function或者date1.isAfter is not a function 原因分析&#xff1a; 在From中渲染默认值&#xff0c;一般数据请求拿到返回值存在异步&#xff0c;会晚于渲染&#xff0c;因此日期转换不能放在DatePi…

集成CDI和WebSockets

考虑尝试一个简单的Java EE 7原型应用程序&#xff0c;该应用程序涉及JAX-RS&#xff08;REST&#xff09;&#xff0c;WebSockets和CDI。 注意 &#xff1a;不想让它成为破坏者-但这篇文章主要讨论了我在尝试使用Web套接字和使用CDI作为“胶水”的REST&#xff08;在Java EE应…