Corda服务的异步流调用

如何使流程更快? 如果您已经与Corda合作了一段时间,那么您很有可能已经考虑过这一点。 您可以通过以下几方面进行合理的调整来提高性能:事务大小,优化查询并减少整个Flow执行过程中所需的网络跃点数。 在某种程度上,还有另一种可能也让您无所适从。 多线程。

更具体地说,从已经执行的流程异步启动流程/子流程。 这样做有可能极大地改善您的CorDapps性能。

如果您尝试此操作,则可能会遇到与我得到的类似的例外。 此外,到目前为止,Corda还不支持子流的线程化。 但是,仍然可以做到。 我们只需要对此保持聪明。 那就是Corda Services中多线程进入的地方。它们可以在Flow中调用,但不会妨碍Flow对其施加的严格规则,因为正在执行的Flow不会在服务中挂起或检查点。

在本文中,我将重点介绍从服务内部以多线程方式启动流程。 在Corda中还可以使用其他线程,但这是我想更深入研究的有趣领域。 另一方面,从服务启动流程也充满了一些陷阱。 这些需要考虑并遍历。 否则,您将有一天醒来,想知道为什么一切都没有明显的原因停止了。

幸运的是,我在这里为您提供帮助。 对我来说,我必须直面这个问题。

对我来说幸运的是,R3能够提供帮助。

作为参考,我将在本文中使用Corda Enterprise 3.1 。 要真正从本文的内容中受益,您将需要使用Enterprise。 这是由于Enterprise支持多个异步执行的流。 开源目前不允许这样做。

我还建议您查看我以前的文章Corda Services 101,因为我们将在此基础上建立基础。

情境

让我们从概述本帖子将要使用的场景开始。

  • 随着时间的推移,甲方向甲方发送一些消息。 每个消息来自一个流。
  • 甲方回应所有发送给他们的消息。 每个消息都来自单个Flow,但是它们希望在单个位置执行该过程。

可以快速组合一系列流程来满足此要求。 按顺序进行此操作应该证明绝对是零问题(在解决了我们所有犯下的愚蠢错误之后)。

尽管这种情况对于需要性能的情况很不利,但是它很容易理解,因此我们可以专注于异步运行。

慢速同步解决方案

在研究异步解决方案之前,快速浏览一下将要使用的代码将是有益的。 下面是ReplyToMessagesFlow的代码。 我不想遍历所有底层代码,而只想专注于与此帖子相关的代码:

@InitiatingFlow
@StartableByRPC
class ReplyToMessagesFlow : FlowLogic<List>() {@Suspendableoverride fun call(): List {return messages().map { reply(it) }}private fun messages() =repository().findAll(PageSpecification(1, 100)).states.filter { it.state.data.recipient == ourIdentity }private fun repository() = serviceHub.cordaService(MessageRepository::class.java)@Suspendableprivate fun reply(message: StateAndRef) = subFlow(SendMessageFlow(response(message), message))private fun response(message: StateAndRef): MessageState {val state = message.state.datareturn state.copy(contents = "Thanks for your message: ${state.contents}",recipient = state.sender,sender = state.recipient)}
}

如果您确实阅读过Corda Services 101,那么您可能已经认识到此类。 正如我之前提到的,为提出的问题组合解决方案非常容易。 从Vault检索MessageState ,然后启动子subFlowsubFlow进行回复。

这段代码将愉快地逐个传递消息。

那么,我们可以采用此代码并使其更快吗?

异步尝试失败

让我们尝试通过引入线程来使当前代码更快! 我们将使用CompletableFutures来做到这一点:

@InitiatingFlow
@StartableByRPC
class ReplyToMessagesBrokenAsyncFlow : FlowLogic<List>() {@Suspendableoverride fun call(): List {return messages().map { CompletableFuture.supplyAsync { reply(it) }.join() }}// everything else is the same as before
}

大多数代码与以前相同,因此已从示例中排除。

对代码的唯一更改是添加了CompletableFuture及其supplyAsync方法(来自Java)。 它尝试在单独的线程上开始为每个消息执行reply功能。

那么为什么将本节命名为“一次失败的尝试”? 我指的是执行以上代码时获得的堆栈跟踪:

java.util.concurrent.CompletionException: java.lang.IllegalArgumentException: Required value was null.at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) ~[?:1.8.0_172]at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) ~[?:1.8.0_172]at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592) ~[?:1.8.0_172]at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582) ~[?:1.8.0_172]at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) ~[?:1.8.0_172]at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) ~[?:1.8.0_172]at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) ~[?:1.8.0_172]at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) ~[?:1.8.0_172]
Caused by: java.lang.IllegalArgumentException: Required value was null.at net.corda.node.services.statemachine.FlowStateMachineImpl.checkDbTransaction(FlowStateMachineImpl.kt:201) ~[corda-node-3.1.jar:?]at net.corda.node.services.statemachine.FlowStateMachineImpl.processEventImmediately(FlowStateMachineImpl.kt:192) ~[corda-node-3.1.jar:?]at net.corda.node.services.statemachine.FlowStateMachineImpl.subFlow(FlowStateMachineImpl.kt:271) ~[corda-node-3.1.jar:?]at net.corda.core.flows.FlowLogic.subFlow(FlowLogic.kt:312) ~[corda-core-3.1.jar:?]at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow.reply(ReplyToMessagesBrokenAsyncFlow.kt:57) ~[classes/:?]at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow.access$reply(ReplyToMessagesBrokenAsyncFlow.kt:19) ~[classes/:?]at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow$poop$$inlined$map$lambda$1.get(ReplyToMessagesBrokenAsyncFlow.kt:46) ~[classes/:?]at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow$poop$$inlined$map$lambda$1.get(ReplyToMessagesBrokenAsyncFlow.kt:19) ~[classes/:?]

您将获得它,以及Corda正在打印的一长串检查点日志行。 此外,只是为了掩盖我的屁股并向您证明这不是由于CompletableFuture的问题引起的,这是使用Executor线程池时出现的另一个错误:

Exception in thread "pool-29-thread-1" Exception in thread "pool-29-thread-2" java.lang.IllegalArgumentException: Required value was null.at net.corda.node.services.statemachine.FlowStateMachineImpl.checkDbTransaction(FlowStateMachineImpl.kt:201)at net.corda.node.services.statemachine.FlowStateMachineImpl.processEventImmediately(FlowStateMachineImpl.kt:192)at net.corda.node.services.statemachine.FlowStateMachineImpl.subFlow(FlowStateMachineImpl.kt:271)at net.corda.core.flows.FlowLogic.subFlow(FlowLogic.kt:312)at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow.reply(ReplyToMessagesBrokenAsyncFlow.kt:48)at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow.access$reply(ReplyToMessagesBrokenAsyncFlow.kt:19)at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow$call$$inlined$map$lambda$1.run(ReplyToMessagesBrokenAsyncFlow.kt:29)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)
java.lang.IllegalArgumentException: Required value was null.at net.corda.node.services.statemachine.FlowStateMachineImpl.checkDbTransaction(FlowStateMachineImpl.kt:201)at net.corda.node.services.statemachine.FlowStateMachineImpl.processEventImmediately(FlowStateMachineImpl.kt:192)at net.corda.node.services.statemachine.FlowStateMachineImpl.subFlow(FlowStateMachineImpl.kt:271)at net.corda.core.flows.FlowLogic.subFlow(FlowLogic.kt:312)at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow.reply(ReplyToMessagesBrokenAsyncFlow.kt:48)at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow.access$reply(ReplyToMessagesBrokenAsyncFlow.kt:19)at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow$call$$inlined$map$lambda$1.run(ReplyToMessagesBrokenAsyncFlow.kt:29)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)

希望您在这一点上相信我。 如果不是,请参考我一开始所说的内容。 Corda当前不支持从正在执行的流程异步启动新流程。 我相信他们正在努力。 但是,截至目前。 不要使用此解决方案。

可行的异步解决方案

我们已经看到,在Flow内部执行线程是行不通的。 为了继续追求性能,我们现在来看一下Corda服务中的线程。 这并不奇怪,因为标题和开头的段落已经讨论了这一点……

抛开讽刺的评论。 委派服务将需要对原始解决方案进行一些重做,但是大部分代码将保持不变。 大部分内容将被复制并粘贴到另一个类中。 从流中获取代码并将其放入服务中。

以下是新的MessageService ,其中包含原始ReplyToMessagesFlow的代码,但进行了一些更改和添加了线程代码:

@CordaService
class MessageService(private val serviceHub: AppServiceHub) : SingletonSerializeAsToken() {private companion object {val executor: Executor = Executors.newFixedThreadPool(8)!!}fun replyAll() {messages().map {executor.execute {reply(it)}}}private fun messages() =repository().findAll(PageSpecification(1, 100)).states.filter { it.state.data.recipient == serviceHub.myInfo.legalIdentities.first() }private fun repository() = serviceHub.cordaService(MessageRepository::class.java)private fun reply(message: StateAndRef) =serviceHub.startFlow(SendMessageFlow(response(message), message))private fun response(message: StateAndRef): MessageState {val state = message.state.datareturn state.copy(contents = "Thanks for your message: ${state.contents}",recipient = state.sender,sender = state.recipient)}
}

如您所见,大多数代码与ReplyToMessagesFlow中的代码相同。

我要强调的第一点是使用Executor线程池。 我这里没有使用CompletableFutures ,原因是我们稍后将要讨论的原因。

那么,这一切如何运作? replyAll函数在新的系统线程上对从Vault检索到的每条消息执行reply 。 该新线程又调用startFlow 。 触发将新的流程放入“流程工作器”队列中。 这是所有乐趣发生的地方,一切开始变得混乱。

Flow Worker队列负责执行Flow执行的顺序,并将在Flow添加和完成时填充并为空。 该队列对于协调节点内流的执行至关重要。 当涉及到多线程Flows本身时,它也是痛苦的根源。

下图显示了队列的简化视​​图:

流进入队列并在处理后离开

我为什么要谈论这个队列? 好吧,我们需要格外小心,不要将无法完成的流程填满队列。

怎么会这样 通过在正在执行的流程中启动流程,然后流程等待其完成。 直到队列的线程池中的所有线程都遇到这种情况,这才不会引起问题。 一旦发生,它将使队列陷入僵局。 没有流程可以完成,因为它们都依赖于许多排队的流程来完成。

流留在队列中,等待它们调用的流完成

这很可能在多次触发相同流量的高吞吐量系统上发生。 现在,队列中充满了等待其他流完成的流的机会增加了。

这不是很好,使事情变得有些困难。 但是,只要我们意识到这一点,我们就可以适应它。

这也是Executor线程池而不是CompletableFuture的原因。 通过启动新流程而不等待其完成,可以避免死锁。 这也是该解决方案的缺点。 没有新Flow的结果,其功能将极为有限。

话虽如此,如果您的用例适合上面显示的结构,那么我绝对建议您使用此解决方案。

在下一节中,我将讨论如何使用CompletableFuture

CompletableFutures的危险解决方案

这很危险的原因很简单。 僵局。 我建议不要使用此解决方案。 除非您的节点有权访问足够的线程,否则要减少用无法完成的线程填充队列的机会。 另一方面,这是一个更为理想的解决方案,因为您可以等待启动的流程的结果并对其进行处理。 这使解决方案更加有用。

以下是带有CompletableFuturesMessageService外观:

@CordaService
class MessageService(private val serviceHub: AppServiceHub) : SingletonSerializeAsToken() {fun replyAll(): List =messages().map { reply(it).returnValue.toCompletableFuture().join() }// everything else is the same as before
}

除了replyAll函数外,代码replyAll 。 返回的CordaFuture提供的toCompletableFuture函数,调用join以等待所有期货的结果并返回总体结果。

如前所述,此解决方案可能导致死锁。 但是,对于您的情况,也许并非如此。 由您决定发生这种情况的可能性。 如果不利于您,最好走开。 选择坚持使用类似于上一节中详述的同步或异步解决方案。

我真的需要这样做吗?

现在,是的,我相信你会的。

展望未来,我怀疑您是否需要依靠我在本文中提出的解决方案。

我相信Corda正在努力消除从Flow内部启动Flow时甚至不必考虑线程的需求。 取而代之的是,您可以简单地调用带有选项的subFlow来异步运行它。 这本可以使我们保留原始的同步解决方案,但可以选择使每个subFlow在单独的线程上运行。

将各部分结合在一起

总之,在Corda Enterprise 3中,可以在正在执行的流程中异步启动新流程。 根据您的用例,这可以提供良好的性能优势。 有缺点。 您不能等待异步流的结果,而不会用死锁的威胁来威胁您的节点。 节点的基础队列无法处理它所处的情况。因此,在将线程引入到Flow调用中时,需要格外小心。 值得庆幸的是,随着Corda的发展,您甚至不必担心自己这样做。 它甚至可能像添加布尔函数参数一样简单。 那是梦想!

这篇文章中使用的代码可以在我的GitHub上找到 。

如果您认为这篇文章有帮助,可以在Twitter上@LankyDanDev关注我,以跟上我的新文章。

翻译自: https://www.javacodegeeks.com/2018/09/asynchronous-flow-invocations-corda-services.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/345199.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【渝粤题库】陕西师范大学164117 企业组网技术 作业 (高起专)

《企业组网技术》作业 一、单选题&#xff08;每题4分&#xff0c;共60分&#xff09; 1. 下列关于网络操作系统叙述错误的是&#xff08; &#xff09;。 A. 是计算机和用户之间的接口 B. 只能在相同的系统间进行连接和操作 C. 具有网络通信和网络服务的功能 D. 管理计算机的硬…

【渝粤题库】陕西师范大学165209 组织职业生涯管理 作业(专升本)

《组织职业生涯管理》作业答案 一、单选题 1、人力资源供过于求的表现不包括_________ A、职业岗位缺乏 B、社会就业不足 C、政府促进就业政策 D、职业选择余地较大 2、职业的个人功能不包括___________ A、社会稳定的安全阀 B、获取利益的手段 C、个人发挥才能的手段 D、社会贡…

【渝粤题库】陕西师范大学191203 法理学导论 作业

《法理学导论》作业 一、单项选择题 1、下列关于法理学属性的说法&#xff0c;正确的是&#xff1a;法理学属于&#xff08; &#xff09; A理论法学 B法律史学 C国内应用法学。 D比较法学。 2、下列说法不正确的是&#xff08; &#xff09; A先有法、法律&#xff0c;后有法学…

jsf 后台参数到页面_JSF:直接从页面将参数传递给JSF操作方法,这是JavaEE 6+的一个不错的功能...

jsf 后台参数到页面其中一个JSF 2不错的特点出现在Java企业版的JavaEE 6 &#xff0c;是你可以传递参数给喜欢的commandButton和commandLink组件的任何动作元件操作方法。 基于此&#xff0c;您可以最小化托管bean中的方法数量。 另外&#xff0c;为了最小化在bean内部设置的…

mysql odbc ado性能差异_ODBC、OLEDB和ADO之间的关系 ,以及性能比较

学习了.net视频之后&#xff0c;对里面涉及到的数据库连接部分中的一些概念表示很无语。网上很多相关资料&#xff0c;但除了网站不一样外&#xff0c;基本上内容都神一样的一致。现在&#xff0c;我就通过结合看到的一些资料再加上自己的理解试图去解释一下&#xff0c;有不对…

【渝粤题库】陕西师范大学200731 计算机组成原理

《计算机组成原理》作业 一、填空 1&#xff0e;电子数字计算机从1946年诞生至今&#xff0c;按其工艺和器件特点&#xff0c;大致经历了四代变化。第一代从  年开始&#xff1b;第二代从  年开始&#xff1b;第三代从   年开始&#xff0c;采用  &#xff1b;第四代从…

轻松与外来客户进行REST通信

在这个例子中&#xff0c;我们将向您展示如何使用Feign客户端开发一个简单的Spring Boot Application&#xff0c;以使用Weather REST服务。 Spring Boot是基于Java的框架&#xff0c;可简化Web和企业应用程序的构建。 Spring Boot具有嵌入式Tomcat&#xff0c;提供“启动器”…

【渝粤题库】陕西师范大学201701 高等数学(二)作业 (高起本、专升本)

《高等数学(二)》作业 一、填空题 1&#xff0e;点A&#xff08;2&#xff0c;3&#xff0c;-4&#xff09;在第 卦限。 2&#xff0e;设 . 3&#xff0e;函数 。 4&#xff0e;设 。 5&#xff0e;设共域D由直线所围成&#xff0c;则将二重积分化为累次积分得 。 6&#xff0e…

【渝粤题库】陕西师范大学202101 公共政策学

《公共政策学》作业 一、单项选择题 1、“公共政策是对全社会的价值做有权威的分配”&#xff0c;提出这一命题的学者是&#xff08;  &#xff09; A拉斯韦尔    B伊斯顿       C戴伊       D安德森 2、美国学者林德布洛姆提出的公共政策模型是&#xff08;  …

unzip不能解压mysql的zip_Linux中zip压缩和unzip解压缩命令详解

1、把/home目录下面的mydata目录压缩为mydata.zipzip -r mydata.zip mydata #压缩mydata目录2、把/home目录下面的mydata.zip解压到mydatabak目录里面unzip mydata.zip -d mydatabak3、把/home目录下面的abc文件夹和123.txt压缩成为abc123.zipzip -r abc123.zip abc 123.txt4、…

【渝粤题库】陕西师范大学202891 基于web的程序设计

填空题 1、执行完a"6"语句后&#xff0c;a是 类型。 2、程序段 <% a3 aa5 %> 执行完毕后a的值是 。 3、VBSCRIPT函数 可以将数值转换为字符串。 4、Mid&#xff08;“八千里路云和月”&#xff0c;3&#xff0c;2&#xff09;的返回值是什么&#xff1f; 5、Se…

通过Spring Boot了解H2 InMemory数据库

介绍 基本上&#xff0c;数据库设置涉及几个步骤&#xff0c;然后才能在应用程序中通过已配置的数据源使用它。 在实际项目实施中&#xff0c;这实际上是必需的。 但是&#xff0c;在某些情况下&#xff0c;我们只需要为某些事情完成POC&#xff0c;而整个数据库设置工作仍然是…

【渝粤题库】陕西师范大学210004幼儿园美术教育作业(高起专)

《幼儿园美术教育》作业 一、名词解释题 1、美术 2、曼陀罗 3、艺术 4、表现目标 5、意愿画 6、DBAE 7、夸张式表现 8、最近发展区 9、自由画 10、情节画 11、雕塑 12、艺术起源理论“巫术论” 13、儿童美术 14、物体画 15、工艺美术 16、过程目标 二、简答题 1、简述里德的艺术…

core identity mysql_Microsoft.AspNetCore.Identity 使用 mysql 报错处理

1.使用mysql 首先要确定mysql connector 支的版本&#xff0c;正面是链接https://dev.mysql.com/doc/connector-net/en/connector-net-entityframework-core.htmlTable 9.2 Supported versions of Entity Framework CoreConnector/NETEF Core 1.1EF Core 2.0EF Core 2.16.10.4.…

【渝粤题库】陕西师范大学291003综合英语(三)作业(高起专、高起本)

《综合英语三》作业 Matching. Read the following words and match them with the explanations in the right column. ( ) 1. inaudible A. to talk proudly ( ) 2. fragrance B. angry ( ) 3. boast C. the quality of being new ( ) 4. survive D. easily seen; standing o…

【渝粤题库】陕西师范大学300005 中国历史文选

《中国历史文选》作业 一、解释下列句子中黑体加线的字或词 1&#xff0e;三月丙午&#xff0c;入曹。数之&#xff0c;以其不用僖负羁而乘轩者三百人也&#xff0c;且曰&#xff1a;“献状。” 2&#xff0e;微楚之惠不及此&#xff0c;退三舍辟之&#xff0c;所以报也。 3&am…

mysql 数据路由_node-路由操作mysql数据库

node大部分方法都是异步的&#xff0c;在操作数据库方法后面紧接着输出结果&#xff0c;输出的结果只会为空值&#xff0c;使用promise及其方便的解决这个问题&#xff0c;接下来看看node如何使用路由来处理不同请求&#xff0c;进而操作mysql数据库一、引入相关依赖node中默认…

行为设计模式:中介者

以前我们看过迭代器模式。 中介者模式在实现目标上有很大的不同。 它是行为模式之一&#xff0c;其目的是改变对象之间的通信方式。 中介器将代替对象之间的直接通信&#xff0c;而不是直接相互通信。 例如&#xff0c;想象一下金融交易的场景。 您确实想交易和购买&#xff…

【渝粤题库】陕西师范大学500006 算法语言 作业

《算法语言》作业 一、填空题 1、13/2的运算结果为 &#xff0c;’A’2的运算结果是 。 2、C 语言源程序需经过 、 两个过程生成可执行文件。 3、如果表示16进制常量45&#xff0c;在C 中应写为 。 4、C中变量从其作用域上分为 、 5、表达式X12的值为 。 6、3(12>0)的值为 。…

mysql多数据源事务_多数据源一致性事务解决方案

spring 多数据源配置spring 多数据源配置一般有两种方案&#xff1a;1、在spring项目启动的时候直接配置两个不同的数据源&#xff0c;不同的sessionFactory。在dao 层根据不同业务自行选择使用哪个数据源的session来操作。2、配置多个不同的数据源&#xff0c;使用一个session…