Corda服务的异步流调用

如何使流程更快? 如果您已经与Corda合作了一段时间,那么您很有可能已经考虑过这一点。 您可以通过以下几方面进行合理的调整来提高性能:事务大小,优化查询并减少整个Flow执行过程中所需的网络跃点数。 在某种程度上,还有另一种可能也让您无所适从。 多线程。

更具体地说,从已经执行的流程异步启动流程/子流程。 这样做有可能极大地改善您的CorDapps性能。

如果您尝试此操作,则可能会遇到与我得到的类似的例外。 此外,到目前为止,Corda还不支持子流的线程化。 但是,仍然可以做到。 我们只需要对此保持聪明。 那就是Corda Services中多线程进入的地方。它们可以在Flow中调用,但不会妨碍Flow对其施加的严格规则,因为正在执行的Flow不会在服务中挂起或检查点。

在本文中,我将重点介绍从服务内部以多线程方式启动流程。 在Corda中还可以使用其他线程,但这是我想更深入研究的有趣领域。 另一方面,从服务启动流程也充满了一些陷阱。 这些需要考虑并遍历。 否则,您将有一天醒来,想知道为什么一切都没有明显的原因停止了。

幸运的是,我在这里为您提供帮助。 对我来说,我必须直面这个问题。

对我来说幸运的是,R3能够提供帮助。

作为参考,我将在本文中使用Corda Enterprise 3.1 。 要真正从本文的内容中受益,您将需要使用Enterprise。 这是由于Enterprise支持多个异步执行的流。 开源目前不允许这样做。

我还建议您查看我以前的文章Corda Services 101,因为我们将在此基础上建立基础。

情境

让我们从概述本帖子将要使用的场景开始。

  • 随着时间的推移,甲方向甲方发送一些消息。 每个消息来自一个流。
  • 甲方回应所有发送给他们的消息。 每个消息都来自单个Flow,但是它们希望在单个位置执行该过程。

可以快速组合一系列流程来满足此要求。 按顺序进行此操作应该证明绝对是零问题(在解决了我们所有犯下的愚蠢错误之后)。

尽管这种情况对于需要性能的情况很不利,但是它很容易理解,因此我们可以专注于异步运行。

慢速同步解决方案

在研究异步解决方案之前,快速浏览一下将要使用的代码将是有益的。 下面是ReplyToMessagesFlow的代码。 我不想遍历所有底层代码,而只想专注于与此帖子相关的代码:

@InitiatingFlow
@StartableByRPC
class ReplyToMessagesFlow : FlowLogic<List>() {@Suspendableoverride fun call(): List {return messages().map { reply(it) }}private fun messages() =repository().findAll(PageSpecification(1, 100)).states.filter { it.state.data.recipient == ourIdentity }private fun repository() = serviceHub.cordaService(MessageRepository::class.java)@Suspendableprivate fun reply(message: StateAndRef) = subFlow(SendMessageFlow(response(message), message))private fun response(message: StateAndRef): MessageState {val state = message.state.datareturn state.copy(contents = "Thanks for your message: ${state.contents}",recipient = state.sender,sender = state.recipient)}
}

如果您确实阅读过Corda Services 101,那么您可能已经认识到此类。 正如我之前提到的,为提出的问题组合解决方案非常容易。 从Vault检索MessageState ,然后启动子subFlowsubFlow进行回复。

这段代码将愉快地逐个传递消息。

那么,我们可以采用此代码并使其更快吗?

异步尝试失败

让我们尝试通过引入线程来使当前代码更快! 我们将使用CompletableFutures来做到这一点:

@InitiatingFlow
@StartableByRPC
class ReplyToMessagesBrokenAsyncFlow : FlowLogic<List>() {@Suspendableoverride fun call(): List {return messages().map { CompletableFuture.supplyAsync { reply(it) }.join() }}// everything else is the same as before
}

大多数代码与以前相同,因此已从示例中排除。

对代码的唯一更改是添加了CompletableFuture及其supplyAsync方法(来自Java)。 它尝试在单独的线程上开始为每个消息执行reply功能。

那么为什么将本节命名为“一次失败的尝试”? 我指的是执行以上代码时获得的堆栈跟踪:

java.util.concurrent.CompletionException: java.lang.IllegalArgumentException: Required value was null.at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) ~[?:1.8.0_172]at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) ~[?:1.8.0_172]at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592) ~[?:1.8.0_172]at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582) ~[?:1.8.0_172]at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) ~[?:1.8.0_172]at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) ~[?:1.8.0_172]at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) ~[?:1.8.0_172]at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) ~[?:1.8.0_172]
Caused by: java.lang.IllegalArgumentException: Required value was null.at net.corda.node.services.statemachine.FlowStateMachineImpl.checkDbTransaction(FlowStateMachineImpl.kt:201) ~[corda-node-3.1.jar:?]at net.corda.node.services.statemachine.FlowStateMachineImpl.processEventImmediately(FlowStateMachineImpl.kt:192) ~[corda-node-3.1.jar:?]at net.corda.node.services.statemachine.FlowStateMachineImpl.subFlow(FlowStateMachineImpl.kt:271) ~[corda-node-3.1.jar:?]at net.corda.core.flows.FlowLogic.subFlow(FlowLogic.kt:312) ~[corda-core-3.1.jar:?]at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow.reply(ReplyToMessagesBrokenAsyncFlow.kt:57) ~[classes/:?]at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow.access$reply(ReplyToMessagesBrokenAsyncFlow.kt:19) ~[classes/:?]at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow$poop$$inlined$map$lambda$1.get(ReplyToMessagesBrokenAsyncFlow.kt:46) ~[classes/:?]at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow$poop$$inlined$map$lambda$1.get(ReplyToMessagesBrokenAsyncFlow.kt:19) ~[classes/:?]

您将获得它,以及Corda正在打印的一长串检查点日志行。 此外,只是为了掩盖我的屁股并向您证明这不是由于CompletableFuture的问题引起的,这是使用Executor线程池时出现的另一个错误:

Exception in thread "pool-29-thread-1" Exception in thread "pool-29-thread-2" java.lang.IllegalArgumentException: Required value was null.at net.corda.node.services.statemachine.FlowStateMachineImpl.checkDbTransaction(FlowStateMachineImpl.kt:201)at net.corda.node.services.statemachine.FlowStateMachineImpl.processEventImmediately(FlowStateMachineImpl.kt:192)at net.corda.node.services.statemachine.FlowStateMachineImpl.subFlow(FlowStateMachineImpl.kt:271)at net.corda.core.flows.FlowLogic.subFlow(FlowLogic.kt:312)at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow.reply(ReplyToMessagesBrokenAsyncFlow.kt:48)at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow.access$reply(ReplyToMessagesBrokenAsyncFlow.kt:19)at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow$call$$inlined$map$lambda$1.run(ReplyToMessagesBrokenAsyncFlow.kt:29)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)
java.lang.IllegalArgumentException: Required value was null.at net.corda.node.services.statemachine.FlowStateMachineImpl.checkDbTransaction(FlowStateMachineImpl.kt:201)at net.corda.node.services.statemachine.FlowStateMachineImpl.processEventImmediately(FlowStateMachineImpl.kt:192)at net.corda.node.services.statemachine.FlowStateMachineImpl.subFlow(FlowStateMachineImpl.kt:271)at net.corda.core.flows.FlowLogic.subFlow(FlowLogic.kt:312)at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow.reply(ReplyToMessagesBrokenAsyncFlow.kt:48)at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow.access$reply(ReplyToMessagesBrokenAsyncFlow.kt:19)at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow$call$$inlined$map$lambda$1.run(ReplyToMessagesBrokenAsyncFlow.kt:29)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)

希望您在这一点上相信我。 如果不是,请参考我一开始所说的内容。 Corda当前不支持从正在执行的流程异步启动新流程。 我相信他们正在努力。 但是,截至目前。 不要使用此解决方案。

可行的异步解决方案

我们已经看到,在Flow内部执行线程是行不通的。 为了继续追求性能,我们现在来看一下Corda服务中的线程。 这并不奇怪,因为标题和开头的段落已经讨论了这一点……

抛开讽刺的评论。 委派服务将需要对原始解决方案进行一些重做,但是大部分代码将保持不变。 大部分内容将被复制并粘贴到另一个类中。 从流中获取代码并将其放入服务中。

以下是新的MessageService ,其中包含原始ReplyToMessagesFlow的代码,但进行了一些更改和添加了线程代码:

@CordaService
class MessageService(private val serviceHub: AppServiceHub) : SingletonSerializeAsToken() {private companion object {val executor: Executor = Executors.newFixedThreadPool(8)!!}fun replyAll() {messages().map {executor.execute {reply(it)}}}private fun messages() =repository().findAll(PageSpecification(1, 100)).states.filter { it.state.data.recipient == serviceHub.myInfo.legalIdentities.first() }private fun repository() = serviceHub.cordaService(MessageRepository::class.java)private fun reply(message: StateAndRef) =serviceHub.startFlow(SendMessageFlow(response(message), message))private fun response(message: StateAndRef): MessageState {val state = message.state.datareturn state.copy(contents = "Thanks for your message: ${state.contents}",recipient = state.sender,sender = state.recipient)}
}

如您所见,大多数代码与ReplyToMessagesFlow中的代码相同。

我要强调的第一点是使用Executor线程池。 我这里没有使用CompletableFutures ,原因是我们稍后将要讨论的原因。

那么,这一切如何运作? replyAll函数在新的系统线程上对从Vault检索到的每条消息执行reply 。 该新线程又调用startFlow 。 触发将新的流程放入“流程工作器”队列中。 这是所有乐趣发生的地方,一切开始变得混乱。

Flow Worker队列负责执行Flow执行的顺序,并将在Flow添加和完成时填充并为空。 该队列对于协调节点内流的执行至关重要。 当涉及到多线程Flows本身时,它也是痛苦的根源。

下图显示了队列的简化视​​图:

流进入队列并在处理后离开

我为什么要谈论这个队列? 好吧,我们需要格外小心,不要将无法完成的流程填满队列。

怎么会这样 通过在正在执行的流程中启动流程,然后流程等待其完成。 直到队列的线程池中的所有线程都遇到这种情况,这才不会引起问题。 一旦发生,它将使队列陷入僵局。 没有流程可以完成,因为它们都依赖于许多排队的流程来完成。

流留在队列中,等待它们调用的流完成

这很可能在多次触发相同流量的高吞吐量系统上发生。 现在,队列中充满了等待其他流完成的流的机会增加了。

这不是很好,使事情变得有些困难。 但是,只要我们意识到这一点,我们就可以适应它。

这也是Executor线程池而不是CompletableFuture的原因。 通过启动新流程而不等待其完成,可以避免死锁。 这也是该解决方案的缺点。 没有新Flow的结果,其功能将极为有限。

话虽如此,如果您的用例适合上面显示的结构,那么我绝对建议您使用此解决方案。

在下一节中,我将讨论如何使用CompletableFuture

CompletableFutures的危险解决方案

这很危险的原因很简单。 僵局。 我建议不要使用此解决方案。 除非您的节点有权访问足够的线程,否则要减少用无法完成的线程填充队列的机会。 另一方面,这是一个更为理想的解决方案,因为您可以等待启动的流程的结果并对其进行处理。 这使解决方案更加有用。

以下是带有CompletableFuturesMessageService外观:

@CordaService
class MessageService(private val serviceHub: AppServiceHub) : SingletonSerializeAsToken() {fun replyAll(): List =messages().map { reply(it).returnValue.toCompletableFuture().join() }// everything else is the same as before
}

除了replyAll函数外,代码replyAll 。 返回的CordaFuture提供的toCompletableFuture函数,调用join以等待所有期货的结果并返回总体结果。

如前所述,此解决方案可能导致死锁。 但是,对于您的情况,也许并非如此。 由您决定发生这种情况的可能性。 如果不利于您,最好走开。 选择坚持使用类似于上一节中详述的同步或异步解决方案。

我真的需要这样做吗?

现在,是的,我相信你会的。

展望未来,我怀疑您是否需要依靠我在本文中提出的解决方案。

我相信Corda正在努力消除从Flow内部启动Flow时甚至不必考虑线程的需求。 取而代之的是,您可以简单地调用带有选项的subFlow来异步运行它。 这本可以使我们保留原始的同步解决方案,但可以选择使每个subFlow在单独的线程上运行。

将各部分结合在一起

总之,在Corda Enterprise 3中,可以在正在执行的流程中异步启动新流程。 根据您的用例,这可以提供良好的性能优势。 有缺点。 您不能等待异步流的结果,而不会用死锁的威胁来威胁您的节点。 节点的基础队列无法处理它所处的情况。因此,在将线程引入到Flow调用中时,需要格外小心。 值得庆幸的是,随着Corda的发展,您甚至不必担心自己这样做。 它甚至可能像添加布尔函数参数一样简单。 那是梦想!

这篇文章中使用的代码可以在我的GitHub上找到 。

如果您认为这篇文章有帮助,可以在Twitter上@LankyDanDev关注我,以跟上我的新文章。

翻译自: https://www.javacodegeeks.com/2018/09/asynchronous-flow-invocations-corda-services.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/345199.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

jsf 后台参数到页面_JSF:直接从页面将参数传递给JSF操作方法,这是JavaEE 6+的一个不错的功能...

jsf 后台参数到页面其中一个JSF 2不错的特点出现在Java企业版的JavaEE 6 &#xff0c;是你可以传递参数给喜欢的commandButton和commandLink组件的任何动作元件操作方法。 基于此&#xff0c;您可以最小化托管bean中的方法数量。 另外&#xff0c;为了最小化在bean内部设置的…

mysql odbc ado性能差异_ODBC、OLEDB和ADO之间的关系 ,以及性能比较

学习了.net视频之后&#xff0c;对里面涉及到的数据库连接部分中的一些概念表示很无语。网上很多相关资料&#xff0c;但除了网站不一样外&#xff0c;基本上内容都神一样的一致。现在&#xff0c;我就通过结合看到的一些资料再加上自己的理解试图去解释一下&#xff0c;有不对…

轻松与外来客户进行REST通信

在这个例子中&#xff0c;我们将向您展示如何使用Feign客户端开发一个简单的Spring Boot Application&#xff0c;以使用Weather REST服务。 Spring Boot是基于Java的框架&#xff0c;可简化Web和企业应用程序的构建。 Spring Boot具有嵌入式Tomcat&#xff0c;提供“启动器”…

通过Spring Boot了解H2 InMemory数据库

介绍 基本上&#xff0c;数据库设置涉及几个步骤&#xff0c;然后才能在应用程序中通过已配置的数据源使用它。 在实际项目实施中&#xff0c;这实际上是必需的。 但是&#xff0c;在某些情况下&#xff0c;我们只需要为某些事情完成POC&#xff0c;而整个数据库设置工作仍然是…

mysql多数据源事务_多数据源一致性事务解决方案

spring 多数据源配置spring 多数据源配置一般有两种方案&#xff1a;1、在spring项目启动的时候直接配置两个不同的数据源&#xff0c;不同的sessionFactory。在dao 层根据不同业务自行选择使用哪个数据源的session来操作。2、配置多个不同的数据源&#xff0c;使用一个session…

www.how2j.com_HOW-TO:快速开始使用Spring 4.0,以构建简单的REST-Like API(演练)

www.how2j.comHOW-TO&#xff1a;快速开始使用Spring 4.0&#xff0c;以构建简单的REST-Like API&#xff08;演练&#xff09; 关于使用Spring MVC创建Web API的另一篇教程。 不太复杂。 只是一个演练。 生成的应用程序将提供简单的API&#xff0c;将Mongo作为其持久性&#x…

flutter 返回指定界面_Flutter页面路由导航及传参

转载请注明出处: https://learnandfish.com/概述 每个应用都有很多个页面&#xff0c;在flutter中同样也有很多页面&#xff0c;被称之为路由(Router)&#xff0c;页面之间的跳转通过导航器(Navigator)进行管理。其中 Navigator.push 和 Navigator.pop 是最简单的跳转到新页面和…

正确的工作流程:我应该使用哪个OAuth 2.0流程?

什么是OAuth 2.0 OAuth 2.0是一个已被广泛采用的委托授权框架&#xff0c;已经存在了很多年&#xff0c;并且似乎已经存在。 如果您不熟悉OAuth 2.0的基本概念&#xff0c;可以使用 川崎孝彦写的优秀文章 。 这只是OAuth 2.0各方的简要提醒&#xff1a; 资源所有者–受保护资…

mysql workbench入门_5分钟入门MySQL Workbench

接下来进入下一步&#xff0c;使用Workbench执行sql文件&#xff1a;1.打开Workbench&#xff0c;主页面上点击要connect的连接。2.注意系统偏好设置里&#xff0c;MySQL是running的状态&#xff0c;否则无法执行。创建数据库&#xff1a;点击创建数据库按钮&#xff0c;输入数…

使用Spring Boot和Vue进行有益的开发

“我喜欢编写身份验证和授权代码。” 〜从来没有Java开发人员。 厌倦了一次又一次地建立相同的登录屏幕&#xff1f; 尝试使用Okta API进行托管身份验证&#xff0c;授权和多因素身份验证。 Vue是一个Web框架&#xff0c;由于它的精简和刻薄&#xff0c;最近引起了很多关注。 …

mysql mydumper_系统运维|Mydumper-MySQL数据库备份工具

Mydumper 是 MySQL 数据库服务器备份工具&#xff0c;它比 MySQL 自带的 mysqldump 快很多。它还有在转储的同时获取远程服务器二进制日志文件的能力。Mydumper 的优势并行能力 (因此有高速度) 和性能 (高效的代码避免了耗费 CPU 处理能力的字符集转换过程)更容易管理输出 (每个…

vue调用手机相机相册_详解Vue调用手机相机和相册以及上传

组件选中{{imgList.length}}张文件&#xff0c;共{{bytesToSize(this.size)}}javaScript代码export default {name: "cameras-and-albums",data(){return{imgList: [],datas: new FormData(),files:0,size:0}},methods:{//调用相册&相机fileClick() {$(#upload_f…

红旗linux mysql_恢复 - 红旗Linux案例精选:Amanda集中备份实例详细讲解_数据库技术_Linux公社-Linux系统门户网站...

五、恢复假定我们需要恢复cp3上一些丢失的数据&#xff0c;首先用amandabackup帐号登录cp2机器&#xff0c;创建/etc/amanda/amanda-client.conf文件&#xff0c;内容如下&#xff1a;## amanda.conf - sample Amanda client configuration file.## This file normally goes in…

linuxpip安装python包_Windows+Linux安装Python包管理工具pip

WindowsLinux安装Python包管理工具pipWindows安装Python包管理工具pippip是一个Python包管理工具&#xff0c;主要是用于安装PyPI上的软件包&#xff0c;可以替代easy_install工具。一、前期准备首先确认windows机器上面是否已经安装好了python。在cmd中输入python --version和…

mysql 深胡_Mysql胡说八道

mysql索引今天看了一些关于MySQL相关的东西&#xff0c;来做一些碎碎念&#xff0c;写这些可能只是觉得自己看东西老爱忘23333.先来看一组MySQL数据&#xff0c;如图我们要查看最后id11的数据&#xff0c;如果我们不加索引的话会怎样呢&#xff1f;他会一条一条的比对&#xff…

java public main_实例分析Java中public static void main(String args[])是什么意思

本文实例讲述了Java中public static void main(String args[])的来龙去脉。分享给大家供大家参考&#xff0c;具体如下&#xff1a;public static void main(String[] args)这绝对不是凭空想出来的&#xff0c;也不是没有道理的死规定&#xff0c;而是java程序执行的需要。jvm在…

java浏览文件夹_一个用java实现简单的文件浏览器

[java]代码库import java.awt.*;import java.awt.event.*;import java.net.URL;import javax.swing.*;//文件浏览器public class HTTPBrowserDemo extends JFrame {private static final long serialVersionUID -5794029080886644211L;JTextField jtfAddress; // 输入文件地址…

java奥运会安排赛程问题_记录奥运-当今五大Java记录框架之间的竞赛

java奥运会安排赛程问题开发人员&#xff1a;Takipi会告诉您何时新代码在生产中中断– Log4J vs SLF4J简单vs Logback vs Java Util日志记录vs LOG4J2 日志记录实际上是每个服务器端应用程序中古老且固有的部分。 这是应用程序以持久且可读的方式输出实时状态的主要方法。 某些…

为什么在子类中不重写超类的实例变量

当我们在父类和子类中创建一个具有相同名称的变量&#xff0c;并尝试使用持有子类对象的父类引用访问它时&#xff0c;我们会得到什么&#xff1f; 为了理解这一点&#xff0c;让我们考虑下面的示例&#xff0c;其中在Parent和Child类中声明一个具有相同名称的变量x 。 class…

cocos lua调用java_【Tech-Lua】Cocos-2dx-Lua调用java的小白教程(三)

上周五下班前&#xff0c;打包成功了。我很高兴&#xff0c;周六去踢场足球&#xff0c;周日去现场看了最后一分钟掉球的恒大&#xff0c;度过了一个愉快的周末。然后&#xff0c;噩梦的周一开始了。我再次打包&#xff0c;打算打包就安装&#xff0c;但结果是失败的。为何&…