自上次发布以来已经有一段时间了,但我终于回来了! 由于我仍在我的项目中,因此我将再次撰写有关使用Corda的文章。 这次,我们将不再关注Corda,而是将Spring与Corda结合使用。 更具体地说,Spring WebFlux。 为什么这样 第一,因为我们可以。 第二,因为它允许我们流式传输来自Corda节点的事件。 这使我们能够跟踪流的进度或检索对Vault的更新,并将其发送给注册到相关端点的任何客户端。 将WebFlux与Corda结合使用确实会带来一些问题。 有些来自Corda,有些来自Spring。 虽然,Spring问题与我有关,但我期望Spring Boot + WebFlux组合默认对我有更多作用。
在本文中,我假设您对Corda有一定的经验,但是如果您确实需要有关此主题的更多信息,我建议您阅读我以前的文章: 什么是Corda和Corda 开发 。 此外,我还建议您看一下使用Spring WebFlux做事作为WebFlux的介绍。
本教程的内容将使用Corda的3.2
开源版本。 我实际上是根据3.1
开始撰写这篇文章的,但是在此期间发布了较新的版本。 因此,有一些基于在这些版本之间移动的注释。
我们还将在Kotlin中实现所有内容,但本文的内容也可以在Java中实现。
示例应用程序简介
我们将为一个非常简单的应用程序建模,该应用程序不会提供太多使用,因此,出于这篇博文的目的,我将其合并在一起。 该应用程序将由一方向另一方发送消息(由MessageState
表示)组成。 为此, SendMessageFlow
将运行,一旦运行,双方将拥有消息的副本,仅此而已。 简短而简单,但应向我们提供足够的知识来证明WebFlux如何与Corda配合使用。
结构体
通常,我从查看依赖关系开始。 尽管由于将代码分成了单独的模块,所以最好先查看小示例应用程序的结构。
+-- app
| +-- {spring code}
| +-- build.gradle
+-- cordapp
| +-- {flow code}
| +-- build.gradle
+-- contracts-and-states
| +-- {contracts and states code}
| +-- build.gradle
+-- build.gradle
这是应用程序结构的快速视图。 app
将包含所有Spring代码,并将通过RPC委托给Corda节点。 cordapp
模块包含流逻辑, contracts-and-states
按照名称的建议进行操作,并包含契约和状态代码。 cordapp
模块和contracts-and-states
模块都打包到Cordapp Jars中,并转储到Corda节点中。
这些模块中的每个模块都包含一个build.gradle
文件,该文件包含其相关的构建信息和相关性。 由于本文不是直接着眼于编写Corda代码,因此我们将不继续详细研究每个模块及其构建文件。 取而代之的是,我们仅在帖子末尾重新整理流代码,以便我们专注于Spring实现。
Spring模块的依赖
以下是app
模块的build.gradle
文件(包含Spring代码):
我不是Gradle的专家,因此该代码段中可能有一些可以做得更好的事情,但是它确实可以满足需要。
因此,我想强调一些事情。 使用Spring Boot 2.0.3.RELEASE
,使用kotlin-spring
插件向所有标有某些Spring注释的Kotlin类添加open
。 在很多情况下都需要这样做,因为Spring要求某些类是非最终的。 这在Java中不是问题,但对于Kotlin来说是有问题的,因为默认情况下所有类都是final。 有关该插件的更多信息,请访问kotlinlang.org 。
spring-boot-starter-webflux
了WebFlux依赖关系以及常规的Spring Web服务器代码,以使一切正常运行。
rxjava-reactive-streams
,这是一个有趣的内容,我们将在以后看到它。 由于Corda使用RxJava 1.xx
而不是较新的RxJava2,因此其Observable
不能实现Spring WebFlux用于返回反应流的Java 8 Publisher
接口。 这种依赖性将这些较旧的Observable
转换为Publisher
因此它们与WebFlux兼容。 稍后,当我们查看执行转换的代码时,将再次涉及到这一点。
最后, netty-all
版本被强制为4.1.25.Final
以解决依赖关系问题。
路由功能
WebFlux引入了一种功能性方法,用于将请求路由到处理请求的功能。 有关更多信息,请参见使用Spring WebFlux进行操作 。 我不想深入探讨WebFlux的工作方式,但我们将快速定义路由功能。 主要原因是由于使用Kotlin而不是Java。 Kotlin提供了一种使用DSL定义功能的不同方法。
下面是定义本教程路由的代码:
routes
bean接收MessageHandler
bean(我们将在后面进行介绍),并将两个URI映射到该MessageHandler
找到的函数。 与Java实现相比,DSL允许的版本略短。 此片段中有几个部分需要重点关注。
("/messages")
定义两个路由功能的基本请求路径。 DSL允许功能从此基本路径嵌套自己,并帮助传达路由的结构。
一个函数在发送请求后返回的响应中接受TEXT_EVENT_STREAM
( text/event-stream
),同时还将APPLICATION_JSON
( application/stream+json
)指定为正文的内容。 由于我们已经定义了Content-Type
,因此在大多数情况下,我们可以假设我们将发送一个POST
请求(就是这样)。 POST
从先前的配置中进一步嵌套,并添加了另一个MessageHandler
函数来接受请求。
第二个功能从Corda节点接收更新。 为此,它返回APPLICATION_STREAM_JSON
并期望将GET
请求发送到/messages/updates
。
处理函数
在本节中,我们将看一下上一节中几次提到的MessageHandler
。 此类包含执行实际业务逻辑的所有功能。 路由只是达到这一点的一种方法。
我以前的文章“用Spring WebFlux做事”将比我在本文中更深入地解释这些示例中更多WebFlux的特定部分。
下面是处理程序代码:
首先,我们应突出显示NodeRPCConnection
类及其类型为CordaRPCOps
的属性proxy
。 我从示例Corda和Spring应用程序 (由R3员工编写)中窃取了NodeRPCConnection
。 简而言之, NodeRPCConnection
创建到Corda节点的RPC连接,并且proxy
返回CordaRPCOps
。 CordaRPCOps
包含所有可用的RPC操作。 这就是Spring与Corda节点交互的方式。
让我们仔细看看updates
功能:
此功能返回新消息,将其保存到Vault中。 如果您有一个监视来自Corda节点的更新的应用程序,则这种端点会很好。
此代码段中与Corda相关的代码全部包含在trackNewMessages
函数中。 它使用CordaRPCOps
的vaultTrackBy
访问保管库服务,并开始跟踪对任何MessageState
的更新。 由于我们尚未将任何参数传递给该函数,因此它将仅跟踪UNCONSUMED
状态。 vaultTrackBy
返回一个DataFeed
对象, DataFeed
对象可以用于通过snapshot
属性检索保管库的snapshot
也可以通过访问updates
属性来返回一个Observable
以允许其订阅更新事件。 我们将使用此RxJava Observable
将数据流式传输回调用方。
这是我们需要使用我前面提到的rxjava-reactive-streams
的第一个实例。 toPublisher
方法接受一个Observable
并将其转换为Publisher
。 请记住,WebFlux需要兼容Java 8的反应性流库,这些库必须实现Publisher
。 例如,Spring倾向于使用提供Mono
和Flux
类的Reactor 。
创建Publisher
,需要将其馈送到ServerResponse
。 由于目前一切顺利,我们将通过ok
方法返回200
响应。 然后将Content-Type
设置为APPLICATION_STREAM_JSON
因为它包含流数据。 最后,响应的主体从trackNewMessages
中获取Publisher
trackNewMessages
。 现在,端点已准备好由发出请求的客户端进行订阅。
现在,完成了从节点到客户端的流更新功能。 实际保存新消息怎么办? 此外,是否有任何信息可以传递给发送者有关执行流程的信息? 因此,让我们回答这两个问题。 是的,我们可以使用WebFlux保存新消息。 是的,流程可以返回其当前进度。
下面是post
函数的代码,该函数在流的流进度时将新消息保存到发件人和收件人的节点上:
proxy.startTrackedFlow
启动一个流程,该流程的进度可以由添加到该流程的任何ProgressTracker
跟踪。 此类中定义的startTrackedFlow
委托给上述函数,并返回其progress
属性; 一个Observable<String>
其事件由ProgressTracker
的进度组成。
传递到流中的MessageState
是从请求传递的Message
对象创建的。 这是因为它包含的信息比MessageState
本身少,因此可以更轻松地将消息数据输入到端点。 parse
将Message
传递的字符串X500名称转换为CordaX500Name
,然后假定存在网络中,转换为网络中的Party
。
然后通过created
方法将其打包到响应中。 指定Content-Type
来告诉客户端它包含text/event-stream
。 消息的路径使用在执行流之前创建的UUID
。 例如,这可以用于检索特定的消息,但是您需要自己实现该消息,因为我太懒了,因此本文不做。
创建一个客户
现在已经设置了端点,我们应该创建一个可以发送请求并使用发送回它的流的客户端。 稍后,我们将简要地看一下流代码,以更全面地了解正在发生的事情。
为了将请求发送到响应式后端,Spring WebFlux提供了WebClient
类。 发送请求后, WebClient
可以对响应中发送的每个事件做出反应。 下面的MessageClient
就是这样做的:
MessageClient
包装并使用WebClient
将请求发送到WebClient
的构建器中指定的地址。 在该课程中,关于反序列化还有一些额外的配置,但是我想暂时重新介绍一下,因为还有一部分内容涉及该主题。
和以前一样, 使用Spring WebFlux做事提供了WebFlux特定方法的深入解释。
因此,让我们单独查看每个请求,首先将POST
请求发送到/messages
端点:
post
方法创建一个用于指定请求内容的构建器。 这应该与我们之前定义的端点匹配。 建立请求后,请调用exchange
方法将其发送到服务器。 然后,将响应的主体映射到Flux<String>
,以使其可以订阅。 那就是使用反应流的本质。 订阅响应后,客户端将决定对每个事件执行他们希望执行的任何处理。 在这种情况下,它只是打印出ProgressTracker
的当前步骤。
如果我们通过这段代码发送请求,我们将收到以下信息:
STEP: Verifying
STEP: Signing
STEP: Sending to Counterparty
STEP: Collecting signatures from counterparties.
STEP: Verifying collected signatures.
STEP: Done
STEP: Finalising
STEP: Requesting signature by notary service
STEP: Broadcasting transaction to participants
STEP: Done
STEP: Done
这些是SendMessageFlow
的ProgressTracker
定义的步骤。 是的,我知道我还没有向您显示该代码,但是请相信我。 真的没有太多其他。 如您所见,流返回的每个字符串值都将“ STEP”附加到自身
现在进入到/messages/update
端点的GET
请求:
同样,在这一点上没有什么可显示的。 但是,在幕后,实际上需要大量工作才能使它工作。 我需要打通电话才能解决的所有问题都围绕着序列化和反序列化。 我们将在下一部分中进行介绍。
对此请求的响应如下:
UPDATE: 0 consumed, 1 producedConsumed:Produced:
56781DF3CEBF2CDAFACE1C5BF04D4962B5483FBCD2C2E428352AD82BC951C686(0)
: TransactionState(data=MessageState(sender=O=PartyA, L=London, C=GB,
recipient=O=PartyB, L=London, C=GB, contents=hello there,
linearId=1afc6144-32b1-4265-a06e-73b6bb81aef3_b0fa8491-c9b9-418c-ba6e-8b7840faaf30,
participants=[O=PartyA, L=London, C=GB, O=PartyB, L=London, C=GB]),
contract=com.lankydanblog.tutorial.contracts.MessageContract,
notary=O=Notary, L=London, C=GB, encumbrance=null,
constraint=net.corda.core.contracts.WhitelistedByZoneAttachmentConstraint@4a1febb5)
关于此端点的好处是,它现在维持与该节点的连接,该节点将继续将所有相关更新发送回此客户端。 上面的请求是原始POST
消息的更新。 客户端收到的任何新事件都会在客户端上输出更新。 这就是使此类端点非常适合触发进程或仅在与Corda节点本身分开的前端上显示最新数据的理想之选。
序列化和反序列化
在本节中,我想集中精力正确设置序列化和反序列化。 从/messages/updates
端点检索的数据需要正确地序列化其数据,以传递给客户端,客户端也需要能够反序列化响应数据。
通常,Spring会为您执行很多操作,而且仍然可以执行,但是似乎WebFlux仍需要一些额外的步骤来正确设置它。 免责声明,这是根据我的经验,如果您知道执行此操作的更好方法,那么我很想听听您的意见。
Corda Jackson支持
Spring倾向于默认使用杰克逊,并且很方便地,Corda本身提供了很多杰克逊设置。 JacksonSupport.cordaModule
为诸如Party
和CordaX500Name
类的类提供了一些序列化和反序列化。 如果在某些基本情况下需要对Corda类进行序列化或反序列化,则这可能会满足您的需求。 在Spring中,您可以创建一个默认的ObjectMapper
将检索并添加到其自身的bean。
但是,此路线有一些警告。 由于模块依赖于ObjectMapper
可以访问节点信息,例如通过RPC客户端CordaRPCOps
访问节点信息,因此无法反序列化某些类。 否则,反序列化Party
, AbstractParty
或AnonymousParty
将会失败。 不仅如此,由于不安全,此功能现已从Corda 3.2
弃用。 JacksonSupport.cordaModule
也已移入其自己的类( CordaModule
)。
我下面提供的解决方案也是Corda从现在开始建议采用的解决方案。
以下是MessageClient
从/messages/updates
端点检索更新时引发的异常(对于本节的其余部分,将使用相同的端点):
com.fasterxml.jackson.databind.ObjectMapper cannot be cast to net.corda.client.jackson.JacksonSupport$PartyObjectMapper
由此,我们可以确定我们的ObjectMapper
类型错误,并且实际上需要是PartyObjectMapper
的子类型。 进一步介绍一下,我们可以看到在JacksonSupport
类中也找到了该映射器。 现在,剩下要做的就是创建这个映射器,并使用它而不是默认的ObjectMapper
。
因此,让我们看看如何做到这一点:
这将创建一个RpcObjectMapper
,它实现PartyObjectMapper
并利用RPC检索节点信息,从而可以反序列化各种参与方类。 在createDefaultMapper,
CordaModule
了之前的CordaModule
,感谢Spring,对于大多数需要序列化或反序列化的实例(以后要CordaModule
注意),它现在是默认的对象映射器。
一些更多的序列化和反序列化配置
现在……我实际上处于一个很奇怪的位置。 我想通过所有其他步骤来使端点正常工作。 但是,无论我做什么,我似乎都无法重新创建曾经遇到的所有错误,然后才能使其正常工作。 我不知道该说些什么……我的异常被吞没在某处,阻止我看到正在发生的事情。 无论如何,我们必须继续。 值得庆幸的是,我知道为什么我添加了其余的代码,但是我无法再为您提供每个更改都已修复的例外……
Soooo,让我们看一下我们早先开始研究的rpcObjectMapper
的最终产品:
这里有一些补充。 JsonComponentModule
作为bean添加,以便它拾取定义的@JsonSerializer
和@JsonDeserializer
自定义组件(在其他类中)。 似乎即使将它作为模块添加到映射器,如果要查找和注册自定义JSON组件,它仍然需要创建bean本身。
接下来是MixinModule
。 此类解决了在反序列化Vault.Update
和SecureHash
时出现的问题。 让我们仔细看看。
Mixin允许我们将Jackson注释添加到类上,而实际上没有访问类本身的权限,而这显然是我们无法控制的,因为这是Corda代码库中的一个对象。 另一个选择是将其添加到我们之前讨论的CordaModule
,但这是CordaModule
。
Vault.Update
需要这种方法,是因为它拥有一个名为isEmpty
的方法,该方法不能很好地与Jackson配合使用,后者感到困惑,并认为isEmpty
与一个名为empty
的布尔字段匹配。 因此,当将JSON反序列化回对象时,它将尝试为该字段传递一个值。
MixinModule
本身只是一个类,其构造函数将VaultUpdateMixin
和SecureHashMixin
添加到其自身中。 然后,映射器会像添加其他模块一样添加该模块。 任务完成。
添加到VaultUpdateMixin
的Jackson批注是@JsonIgnore
,这可以说明@JsonIgnore
。 序列化或反序列化时, isEmpty
函数将被忽略。
接下来是SecureHashMixin
:
从3.1
移到3.2
后,我已经添加了这个。 对我来说,似乎忘记了为SecureHash
添加Mixin。 CordaModule
包括用于SecureHash.SHA256
序列化和反序列化,但不包括SecureHash
。 上面的代码是从CordaModule
复制和粘贴的, CordaModule
一个类与Mixin绑定在一起。
包含此内容后,将解决3.1
和3.2
之间的差异。
我想我会为此提出一个问题!
定制序列化器和反序列器
要序列化Vault.Update
仅AttachmentConstraint
接口需要它自己的自定义序列化程序:
HashAttachmentConstraint
因为只有HashAttachmentConstraint
实际上有任何字段。 这与稍后反序列化器匹配,在反序列化器上读取type
JSON字段以确定创建哪个对象。
需要自定义反序列器的最后两个类是ContractState
和AttachmentContract
(与之前的序列化程序匹配):
ContractStateDeserialiser
是一个非常懒惰的实现,因为在本教程中仅使用一种状态。 AttachmentConstraintDeserialiser
使用序列化程序中定义的type
字段来确定应将其转换为AttachmentConstraint
哪种实现。
WebFlux特定的配置
由于使用了WebFlux,本小节将介绍额外的必需配置。 您已经在MessageClient
看到了一些配置,但是还需要做一些额外的工作:
客户端需要这个bean能够反序列化application/stream+json
以及响应中返回的对象。
要使用配置中定义的Jackson2JsonDecoder
,必须指定WebClient
的ExchangeStrategies
。 不幸的是,没有编写ExchangeStrategies
类来拾取我们已经创建的Jackson2JsonDecoder
。 我希望这种配置在默认情况下可以工作,但是,哦,很好。 要添加ExchangeStrategies
,必须使用WebClient
构建器。 一旦完成,我们终于到了。 完成打包响应的所有序列化以及从客户端使用响应序列的反序列化已完成。
总结了我希望在本文中讨论的所有与Spring相关的代码。
快速了解Flow代码
在结束之前,我将简要展示为完成本教程的目的而编写的流程:
这是一个非常简单的流程,增加了一个ProgressTracker
, /messages
请求用于跟踪流程的当前状态。 简而言之,该流程将MessageState
传递给它,并将其发送给交易对手。 在流程中移动时, ProgressTracker
将更新为相关步骤。 可以在Corda文档中找到有关使用ProgressTracker
更多文档。
关闭时间
老实说,这比我想象的要长得多,而且花了我的时间比我希望的要长得多。
总之,Spring WebFlux提供了使用响应流在响应事件到达时进行处理的功能。 当与Corda一起使用时,可以跟踪流程的进度,并可以保持持久的库更新流,随时准备在它们到达时采取行动。 为了充分利用带有Corda的WebFlux,我们还必须研究确保对象由服务器正确序列化,然后由客户端反序列化以便可以使用它们。 Lucky Corda确实提供了其中一些功能,但是缺少一两个类或功能,我们需要确保使用提供的对象映射器使用它们。 不幸的是,使用Spring模块时,WebFlux需要比我通常习惯的更多的配置,但是没有什么不能解决的。
这篇文章的其余代码可以在我的GitHub上找到
如果您喜欢这篇文章,可以在@LankyDanDev上的Twitter上关注我,我在其中发布新帖子的更新(尽管最近它们的速度有所放缓)。
翻译自: https://www.javacodegeeks.com/2018/07/streaming-data-corda-node-spring-webflux.html