RxJava + Java8 + Java EE 7 + Arquillian =幸福

消防反应管理_DigitalStorm_226x150

服务是一种体系结构样式,其中每个服务都实现为一个独立的系统。 他们可以使用自己的持久性系统(尽管不是强制性的),部署,语言等。

由于系统由一个以上的服务组成,因此每个服务将与其他服务通信,通常使用轻量级协议(如HTTP)并遵循Restful Web方法。 您可以在这里阅读有关微服务的更多信息: http : //martinfowler.com/articles/microservices.html

让我们看一个非常简单的例子。 假设我们有一家预订商店,用户可以在其中导航目录,当他们找到想要查看更多信息的书时,他们单击isbn,然后打开一个新屏幕,其中包含该书的详细信息和有关该书的评论由读者撰写。

该系统可能由两个服务组成:

  • 一种获取书籍详细信息的服务。 可以从任何传统系统(如RDBMS)中检索它们。
  • 一种将所有评论写在书中的服务,在这种情况下,信息可以存储在文档库中。

这里的问题是,对于用户的每个请求,我们需要打开两个连接,每个服务一个。 当然,我们需要一种并行执行这些工作的方法来提高性能。 这是一个问题,我们如何处理异步请求? 第一个想法是使用Future类。 对于两个服务可能很好,但是如果您需要四个或五个服务,则代码将变得越来越复杂,例如,您可能需要从一个服务获取数据并将其用于另一服务或将一个服务的结果改编为输入另一个。 因此,存在线程和同步管理的成本。

有一种干净整洁的方式来解决这个问题的方法真是太棒了。 这正是RxJava所做的。 RxJava是Reactive Extensions的Java VM实现:该库用于通过使用可观察的序列来组成异步和基于事件的程序。

使用RxJava而不是从结构中提取数据,而是将数据推送到它,该数据与订阅者侦听的事件做出反应并一致地起作用。 您可以在https://github.com/Netflix/RxJava中找到更多信息。

因此,在这种情况下,我们将实现的示例是此处描述的示例,该示例使用RxJavaJava EE 7Java 8Arquillian进行测试。

这篇文章假定您知道如何使用Java EE规范编写Rest服务。

因此,让我们从两个服务开始:

@Singleton
@Path("bookinfo")
public class BookInfoService {@GET@Path("{isbn}")@Produces(MediaType.APPLICATION_JSON)@Consumes(MediaType.APPLICATION_JSON)public JsonObject findBookByISBN(@PathParam("isbn") String isbn) {return Json.createObjectBuilder().add("author", "George R.R. Martin").add("isbn", "1111").add("title", "A Game Of Thrones").build();}}
@Singleton
@Path("comments")
public class CommentsService {@GET@Path("{isbn}")@Produces(MediaType.APPLICATION_JSON)public JsonArray bookComments(@PathParam("isbn") String isbn) {return Json.createArrayBuilder().add("Good Book").add("Awesome").build();}}
@ApplicationPath("rest")
public class ApplicationResource extends Application {
}

最后是时候创建第三个外观服务,该服务从客户端接收通信,并行向两个服务发送请求,最后压缩两个响应。 zip是将通过指定函数发出的一组项目组合在一起并将其发送回客户端的过程(不要与压缩混淆!)。

@Singleton
@Path("book")
public class BookService {private static final String BOOKSERVICE = "http://localhost:8080/bookservice";private static final String COMMENTSERVICE = "http://localhost:8080/bookcomments";@Resource(name = "DefaultManagedExecutorService")ManagedExecutorService executor;Client bookServiceClient;WebTarget bookServiceTarget;Client commentServiceClient;WebTarget commentServiceTarget;@PostConstructvoid initializeRestClients() {bookServiceClient = ClientBuilder.newClient();bookServiceTarget = bookServiceClient.target(BOOKSERVICE + "/rest/bookinfo");commentServiceClient = ClientBuilder.newClient();commentServiceTarget = commentServiceClient.target(COMMENTSERVICE + "/rest/comments");}@GET@Path("{isbn}")@Produces(MediaType.APPLICATION_JSON)public void bookAndComment(@Suspended final AsyncResponse asyncResponse, @PathParam("isbn") String isbn) {//RxJava code shown below}
}

基本上,我们创建一个新服务。 在这种情况下,我们将要连接的两个服务的URL都是硬编码的。 这样做是出于学术目的,但是您将在类似于生产的代码中从生产者类或属性文件或用于此目的的任何系统中注入它。 然后,我们创建javax.ws.rs.client.WebTarget来使用Restful Web Service

之后,我们需要使用RxJava API实现bookAndComment方法。

RxJava中使用的主要类是rx.Observabl e。 正如他的名字所暗示的那样,该类是可观察的,它是引发事件以推动对象的原因。 默认情况下,事件是同步的,开发人员有责任使它们异步。

因此,我们需要为每个服务提供一个异步可观察实例:

public Observable<JsonObject> getBookInfo(final String isbn) {return Observable.create((Observable.OnSubscribe<JsonObject>) subscriber -> {Runnable r = () -> {subscriber.onNext(bookServiceTarget.path(isbn).request().get(JsonObject.class));subscriber.onCompleted();};executor.execute(r);});
}

基本上,我们创建一个Observable ,当订户订阅它时将执行指定的功能。 该函数是使用lambda表达式创建的,以避免创建嵌套的内部类。 在这种情况下,由于调用bookinfo服务,我们将返回JsonObject 。 结果将传递到onNext方法,以便订阅者可以接收结果。 因为我们要异步执行此逻辑,所以代码被包装在Runnable块中。

完成所有逻辑后,还需要调用onCompleted方法。

注意,因为除了创建Runnable之外 ,我们还希望使可观察的异步发生,所以我们使用Executor在单独的线程中运行逻辑。 Java EE 7中的一项重大新增功能是在容器内创建线程的一种托管方式。 在这种情况下,我们使用容器提供的ManagedExecutorService在当前任务的不同线程中异步跨越任务。

public Observable<JsonArray> getComments(final String isbn) {return Observable.create((Observable.OnSubscribe<JsonArray>) subscriber -> {Runnable r = () -> {subscriber.onNext(commentServiceTarget.path(isbn).request().get(JsonArray.class));subscriber.onCompleted();};executor.execute(r);});
}

与之前的内容类似,但没有获取书籍信息,而是获得了一系列评论。

然后,当两个响应均可用时,我们需要创建一个负责将两个响应压缩的可观察对象。 这是通过在Observable类上使用zip方法完成的,该方法接收两个Observable并应用一个函数来合并两个结果。 在这种情况下,一个lambda表达式将创建一个新的json对象,附加两个响应。

@GET
@Path("{isbn}")
@Produces(MediaType.APPLICATION_JSON)
public void bookAndComment(@Suspended final AsyncResponse asyncResponse, @PathParam("isbn") String isbn) {//Calling previous defined functionsObservable<JsonObject> bookInfo = getBookInfo(isbn);Observable<JsonArray> comments = getComments(isbn);Observable.zip(bookInfo, comments, (JsonObject book, JsonArray bookcomments) ->Json.createObjectBuilder().add("book", book).add("comments", bookcomments).build()).subscribe(new Subscriber<JsonObject>() {@Overridepublic void onCompleted() {}@Overridepublic void onError(Throwable e) {asyncResponse.resume(e);}@Overridepublic void onNext(JsonObject jsonObject) {asyncResponse.resume(jsonObject);}});
}

让我们看一下以前的服务。 我们正在使用@Suspended批注来使用Java EE中的新增功能之一,即Jax-Rs 2.0异步REST端点。 基本上,我们正在做的是释放服务器资源,并使用resume方法在响应可用时生成响应。

最后是测试。 我们将Wildfly 8.1用作Java EE 7服务器和Arquillian 。 因为每个服务可以在不同的服务器上进行部署,我们将在不同的战争 ,而是同一个服务器内部署各项服务。

因此,在这种情况下,我们将部署三个战争文件,这在Arquillian中非常容易做到。

@RunWith(Arquillian.class)
public class BookTest {@Deployment(testable = false, name = "bookservice")public static WebArchive createDeploymentBookInfoService() {return ShrinkWrap.create(WebArchive.class, "bookservice.war").addClasses(BookInfoService.class, ApplicationResource.class);}@Deployment(testable = false, name = "bookcomments")public static WebArchive createDeploymentCommentsService() {return ShrinkWrap.create(WebArchive.class, "bookcomments.war").addClasses(CommentsService.class, ApplicationResource.class);}@Deployment(testable = false, name = "book")public static WebArchive createDeploymentBookService() {WebArchive webArchive = ShrinkWrap.create(WebArchive.class, "book.war").addClasses(BookService.class, ApplicationResource.class).addAsLibraries(Maven.resolver().loadPomFromFile("pom.xml").resolve("com.netflix.rxjava:rxjava-core").withTransitivity().as(JavaArchive.class));return webArchive;}@ArquillianResourceURL base;@Test@OperateOnDeployment("book")public void should_return_book() throws MalformedURLException {Client client = ClientBuilder.newClient();JsonObject book = client.target(URI.create(new URL(base, "rest/").toExternalForm())).path("book/1111").request().get(JsonObject.class);//assertions}
}

在这种情况下,客户将要求一本书提供所有信息。 在服务器部分中, zip方法将等待直到并行检索书和注释,然后将两个响应组合到一个对象中并发送回客户端。

这是RxJava的非常简单的示例。 实际上,在这种情况下,我们只看到了如何使用zip方法,但是RxJava提供了许多其他有用的方法,例如take()map()merge() ,…( https:// github .com / Netflix / RxJava / wiki / Alphabetical-Observable-Operators列表 )

此外,在此示例中,我们仅看到了连接两个服务并并行检索信息的示例,您可能想知道为什么不使用Future类。 在此示例中使用FutureCallbacks完全可以,但是在现实生活中,您的逻辑将不像压缩两个服务那样容易。 也许您将拥有更多服务,也许您需要从一项服务中获取信息,然后针对每个结果打开一个新的连接。 如您所见,您可能从两个Future实例开始,但以一堆Future.get()方法,超时等结束,因此,在这些情况下, RxJava确实简化了应用程序的开发。

此外,我们还看到了如何使用Java EE 7的一些新增功能,例如如何使用Jax-Rs开发异步Restful服务。

在这篇文章中,我们学习了如何处理服务之间的互连以及如何使它们可伸缩并减少资源消耗。 但是,我们没有谈论这些服务之一发生故障时发生的情况。 来电者怎么了? 我们有办法进行管理吗? 当其中一项服务不可用时,是否有一种方法可以不浪费资源? 我们将在下一篇关于容错的文章中对此进行介绍。

我们不断学习,

亚历克斯


邦迪亚,邦迪亚! Bon dia aldematí! Fem for a la mandra I saltem corrents del llit。 (Bon Dia!–DàmarisGelabert)

翻译自: https://www.javacodegeeks.com/2014/07/rxjava-java8-java-ee-7-arquillian-bliss.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/363086.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C# -- RSA加密与解密

1. RSA加密与解密 -- 使用公钥加密、私钥解密 public class RSATool{public string Encrypt(string strText, string strPublicKey){RSACryptoServiceProvider rsa new RSACryptoServiceProvider();rsa.FromXmlString(strPublicKey);byte[] byteText Encoding.UTF8.GetByt…

VMware Station NAT上网模式配置

转载于:https://www.cnblogs.com/MimiSnowing/p/10718235.html

JavaFX技巧10:自定义复合控件

用JavaFX编写自定义控件是一个简单直接的过程。 需要一个控件类来控制控件的状态&#xff08;因此命名&#xff09;。 外观需要控件的外观。 而且通常不是用于自定义外观CSS文件。 控件的常用方法是将其使用的节点隐藏在其外观类中。 例如&#xff0c; TextField控件使用javaf…

React后台管理系统-首页Home组件

1.Home组件要显示用户总数、商品总数和订单总数&#xff0c;数据请求后端的 /manage/statistic/base_count.do接口&#xff0c;返回的是 this.state { userCount : -, productCount : -, orderCount : - } //页面挂载之后请求数据componentDidMount(){ this.loadCount(); } lo…

JAX-RS 2.0的新功能– @BeanParam批注

至少可以说JAX-RS很棒&#xff0c;也是我的最爱之一&#xff01; 为什么&#xff1f; 功能丰富 直观&#xff08;因此学习曲线不那么陡峭&#xff09; 易于使用和开发 具有出色的RI – Jersey &#xff0c; RestEasy等 有足够的JAX-RS粉丝可以添加此内容&#xff01; JAX…

带有自定义模块的JBoss EAP上的骆驼

Apache Camel —最好的开源集成库 Apache Camel是一个很棒的开放源代码集成库&#xff0c;可以用作ESB的主干或在独立的应用程序中进行系统的路由&#xff0c;转换或中介&#xff08;请参阅&#xff1a;集成多个系统&#xff09;。 Camel非常通用&#xff0c;不会迫使用户部署到…

Java中的读写锁

一、读写锁 1、初识读写锁 a&#xff09;Java中的锁——Lock和synchronized中介绍的ReentrantLock和synchronized基本上都是排它锁&#xff0c;意味着这些锁在同一时刻只允许一个线程进行访问&#xff0c;而读写锁在同一时刻可以允许多个读线程访问&#xff0c;在写线程访问的时…

Flex4中的皮肤(2): Skin State

在上一篇 中&#xff0c;定义了一个最简单的SkinnableComponent并为其定义了两个Skin。 对于TransitionSkin&#xff0c;需要在enable时有不同的展现方式&#xff0c;这可以通过Skin State实现。 对自定义的SkinnableComponent的修改 首先在组件中定义isEnabled属性&#xff1a…

休眠自动冲洗的黑暗面

介绍 既然我已经描述了JPA和Hibernate刷新策略的基础知识 &#xff0c;我就可以继续阐明Hibernate的AUTO刷新模式的令人惊讶的行为。 并非所有查询都会触发会话刷新 许多人会认为Hibernate 总是在执行任何查询之前先刷新Session。 虽然这可能是一种更直观的方法&#xff0c;并…

vue项目中z-index不起作用(将vue实例挂在到window上面)

问题描述&#xff1a;由于原有项目&#xff08;传统项目&#xff09;中嵌入新的vue组件&#xff0c;dialog弹出框的z-index&#xff1a;999999&#xff1b;任然不起作用&#xff1b; 解决办法&#xff1a;将vue实例挂载到window 解决代码如下&#xff1a; 入口文件index.js中 i…

IDE:5个最喜欢的NetBeans功能

愉快的发展……。 NetBeans具有许多有趣的功能 &#xff0c;这些功能使开发非常容易&#xff0c;只需很少的步骤&#xff0c;并且可以在非常快速地将产品推向市场的情况下提供高产的环境 。 将我的谈话仅限于五个功能非常困难&#xff0c;而此IDE具有大量有趣的功能。 但是在…

flask总结之session,websocket,上下文管理

1.关于session flask是带有session的&#xff0c;它加密后存储在用户浏览器的cookie中&#xff0c;可以通过app.seesion_interface源码查看 from flask import Flask,sessionapp Flask(__name__)app.secret_key aptx4869 # 必须要指定这个参数app.route(/login)def login():…

深入了解Oracle IDM审核

在处理敏感信息的任何产品中&#xff0c; 报告都是至关重要的功能。 同样适用于身份和访问管理工具。 Oracle IDM的审核模块是其OOTB报告功能的基础。 让我们快速看一下审核引擎以及它如何促进OIM中的报告功能。 这里展示的用例很简单– 在OIM中更改为用户记录。 从审核的角度…

django批量form表单处理

1.应用说明 一般在表单信息录入中&#xff0c;如果存在许多重复提交的信息&#xff0c;我们就需要进行批量处理&#xff0c;比如学生信息的批量录入。 这里一种方式就是使用xlrd模块处理&#xff0c;把学生信息录入到系统内 另外一种方式就是采用我们from组件中提供的formset来…

ADF:弹出窗口,对话框和输入组件

在本文中&#xff0c;当我们有一个af&#xff1a;popup包含af&#xff1a;dialog并在其中包含输入组件时&#xff0c;我想着重介绍一个非常常见的用例。 在实现此用例时&#xff0c;需要注意一些陷阱。 让我们考虑一个简单的示例&#xff1a; <af:popup id"p1" …

ORM 开发环境之利器:MVC 中间件 FreeSql.AdminLTE

前言 这是一篇纯技术干货的分享文章&#xff0c;FreeSql 已经基本完成 .NETCore 最方便的 ORM 使命&#xff0c;我们正在筹备生态的建立&#xff0c;比如 ABP 中如何使用 FreeSql 的实现&#xff0c;需要各种各样的扩展包&#xff0c;好多好多工作量。有没有大神愿意无偿参与做…

django中间件及中间件实现的登录验证

1.定义 一个用来处理Django的请求和响应的框架级别的钩子&#xff08;函数&#xff09;&#xff0c;相对比较轻量级&#xff0c;并且在全局上改变django的输入与输出&#xff08;使用需谨慎&#xff0c;否则影响性能&#xff09; 直白的说中间件就是帮助我们在视图函数执行之前…

二进制和十进制的相互转换

十进制转二进制&#xff1a; 方法一&#xff1a;y…… 25 * x 24 * x 23 * x 22 * x 21 * x 20 * x&#xff0c;其中y是十进制数字&#xff0c;x是0或1。 方法二&#xff1a; 二进制转十进制&#xff1a; 10100125 * 1 24 * 0 23 * 1 22 * 0 21 * 0 20 * 141 更多专业前端知…

Hadoop开发工具简介

几天前&#xff0c; Apache Hadoop开发工具 &#xff08;又名HDT &#xff09;发布了。 这些项目旨在将插件引入eclipse中&#xff0c;以简化Hadoop平台上的开发。 该博客旨在概述HDT的一些重要功能。 单端点 该项目可以充当HDFS&#xff0c;Zookeeper和MR群集的单个端点。 您…

使用Spring MVC时的常见错误

当我大约10年前开始我的职业生涯时&#xff0c;Struts MVC就是市场上的常态。 但是&#xff0c;多年来&#xff0c;我观察到Spring MVC逐渐流行起来。 鉴于Spring MVC与Spring容器的无缝集成以及它提供的灵活性和可扩展性&#xff0c;这对我来说并不奇怪。 从到目前为止的Spri…