开发进阶:Dotnet Core多路径异步终止

今天用一个简单例子说说异步的多路径终止。我尽可能写得容易理解吧,但今天的内容需要有一定的编程能力。

今天这个话题,来自于最近对gRPC的一些技术研究。

话题本身跟gRPC没有太大关系。应用中,我用到了全双工数据管道这样一个相对复杂的概念。

我们知道,全双工连接是两个节点之间的连接,但不是简单的“请求-响应”连接。任何一个节点都可以在任何时间发送消息。概念上,还是有客户端和服务端的区分,但这仅仅是概念上,只是为了区分谁在监听连接尝试,谁在建立连接。实际上,做一个双工的API比做一个“请求-响应”式的API要复杂得多。

由此,延伸出了另一个想法:做个类库,在库内部构建双工管道,供给消费者时,只暴露简单的内容和熟悉的方式。


一、开始

假设我们有这样一个API:

  • 客户端建立连接

  • 有一个SendAsync消息从客户端发送到服务器

  • 有一个TryReceiveAsync消息,试图等待来自服务器的消息(服务器有消息发送为True,返之为False)

  • 服务器控制数据流终止,如果服务器发送完最后一条消息,则客户端不再发送任何消息。

接口代码可以写成这样:

interface ITransport<TRequest, TResponse> : IAsyncDisposable
{ValueTask SendAsync(TRequest request, CancellationToken cancellationToken);ValueTask<(bool Success, TResponse Message)> TryReceiveAsync(CancellationToken cancellationToken);
}

忽略连接的部分,代码看起来并不复杂。

下面,我们创建两个循环,并通过枚举器公开数据:

ITransport<TRequest, TResponse> transport;
public async IAsyncEnumerable<TResponse> ReceiveAsync([EnumeratorCancellation] CancellationToken cancellationToken)
{while (true){var (success, message) =await transport.TryReceiveAsync(cancellationToken);if (!success) break;yield return message;}
}public async ValueTask SendAsync(IAsyncEnumerable<TRequest> data, CancellationToken cancellationToken)
{await foreach (var message in data.WithCancellation(cancellationToken)){await transport.SendAsync(message, cancellationToken);}
}

这里面用到了异步迭代器相关的概念。如果不明白,可以去看我的另一篇专门讨论异步迭代器的文章,【传送门】。

二、解决终止标志

好像做好了,我们用循环接收和发送,并传递了外部的终止标志给这两个方法。

真的做好了吗?

还没有。问题出在终止标志上。我们没有考虑到这两个流是相互依赖的,特别是,我们不希望生产者(使用SendAsync的代码)在任何连接失败的场景中仍然运行。

实际上,会有比我们想像中更多的终止路径:

  • 我们可能已经为这两个方法提供了一个外部的终止令牌,并且这个令牌可能已经被触发

  • ReceiveAsync的消费者可能已经通过WithCancellation提供了一个终止令牌给GetAsyncEnumerator,并且这个令牌可能已经被触发

  • 我们的发送/接收代码可能出错了

  • ReceiveAsync的消费者在数据获取到中途,要终止获取了 - 一个简单的原因是处理收到的数据时出错了

  • SendAsync中的生产者可能发生了错误

这只是一些可能的例子,但实际的可能会更多。

本质上,这些都表示连接终止,因此我们需要以某种方式包含所有这些场景,进而允许发送和接收路径之间传达问题。换句话说,我们需要自己的CancellationTokenSource

显然,这种需求,用库来解决是比较完美的。我们可以把这些复杂的内容放在一个消费者可以访问的单一API中:

public IAsyncEnumerable<TResponse> Duplex(IAsyncEnumerable<TRequest> request, CancellationToken cancellationToken = default);

这个方法:

  • 允许它传入一个生产者

  • 通话它传入一个外部的终止令牌

  • 有一个异步的响应返回

使用时,我们可以这样做:

await foreach (MyResponse item in client.Duplex(ProducerAsync()))
{// ... todo
}
async IAsyncEnumerable<MyRequest> ProducerAsync([EnumeratorCancellation] CancellationToken cancellationToken = default)
{for (int i = 0; i < 100; i++){yield return new MyRequest(i);await Task.Delay(100, cancellationToken);}
}

上面这段代码中,我们ProducerAsync还没有实现太多内容,目前只是传递了一个占位符。稍后我们可以枚举它,而枚举行为实际上调用了代码。

回到Duplex。这个方法,至少需要考虑两种不同的终止方式:

  • 通过cancellationToken传入的外部令牌

  • 使用过程中可能传递给GetAsyncEnumerator()的潜在的令牌

这儿,为什么不是之前列出的更多种终止方式呢?这儿要考虑到编译器的组合方式。我们需要的不是一个CancellationToken,而是一个CancellationTokenSource

public IAsyncEnumerable<TResponse> Duplex(IAsyncEnumerable<TRequest> request, CancellationToken cancellationToken = default) => DuplexImpl(transport, request, cancellationToken);private async static IAsyncEnumerable<TResponse> DuplexImpl(ITransport<TRequest, TResponse> transport, IAsyncEnumerable<TRequest> request, CancellationToken externalToken, [EnumeratorCancellation] CancellationToken enumeratorToken = default)
{using var allDone = CancellationTokenSource.CreateLinkedTokenSource(externalToken, enumeratorToken);// ... todo
}

这里,DuplexImpl方法允许枚举终止,但又与外部终止标记保持分离。这样,在编译器层面不会被合并。在里面,CreateLinkedTokenSource反倒像编译器的处理。

现在,我们有一个CancellationTokenSource,需要时,我们可能通过它来终止循环的运行。

using var allDone = CancellationTokenSource.CreateLinkedTokenSource(externalToken, enumeratorToken);
try
{// ... todo
}
finally
{allDone.Cancel();
}

通过这种方式,我们可以处理这样的场景:消费者没有获取所有数据,而我们想要触发allDone,但是我们退出了DuplexImpl。这时候,迭代器的作用就很大了,它让程序变得更简单,因为用了using,最终里面的任何内容都会定位到Dispose/DisposeAsync

下一个是生产者,也就是SendAsync。它也是双工的,对传入的消息没有影响,所以可以用Task.Run作为一个独立的代码路径开始运行,而如果生产者出现错误,则终止发送。上边的todo部分,可以加入:

var send = Task.Run(async () =>
{try{await foreach (var message in request.WithCancellation(allDone.Token)){await transport.SendAsync(message, allDone.Token);}}catch{allDone.Cancel();throw;}
}, allDone.Token);// ... todo: receiveawait send;

这里启动了一个生产者的并行操作SendAsync。注意,这里我们用标记allDone.Token把组合的终止标记传递给生产者。延迟await是为了允许ProducerAsync方法里可以使用终止令牌,以满足复合双工操作的生命周期要求。

这样,接收代码就变成了:

while (true)
{var (success, message) = await transport.TryReceiveAsync(allDone.Token);if (!success) break;yield return message;
}allDone.Cancel();

最后,把这部分代码合在一起看看:

private async static IAsyncEnumerable<TResponse> DuplexImpl(ITransport<TRequest, TResponse> transport, IAsyncEnumerable<TRequest> request, CancellationToken externalToken, [EnumeratorCancellation] CancellationToken enumeratorToken = default)
{using var allDone = CancellationTokenSource.CreateLinkedTokenSource(externalToken, enumeratorToken);try{var send = Task.Run(async () =>{try{await foreach (var message in request.WithCancellation(allDone.Token)){await transport.SendAsync(message, allDone.Token);}}catch{allDone.Cancel();throw;}}, allDone.Token);while (true){var (success, message) = await transport.TryReceiveAsync(allDone.Token);if (!success) break;yield return message;}allDone.Cancel();await send;}finally{allDone.Cancel();}
}

三、总结

相关的处理就这么多。这里实现的关键点是:

  • 外部令牌和枚举器令牌都对allDone有贡献

  • 传输中发送和接收代码使用allDone.Token

  • 生产者枚举使用allDone.Token

  • 任何情况下退出枚举器,allDone都会被终止

  • 如果传输接收错误,则allDone被终止

  • 如果消费者提前终止,则allDone被终止

  • 当我们收到来自服务器的最后一条消息后,allDone被终止

  • 如果生产者或传输发送错误,allDone被终止

最后多说一点,关于ConfigureAwait(false)

默认情况下,await包含一个对SynchronizationContext.Current的检查。除了表示额外的上下文切换之外,在UI应用程序的情况下,它也意味着在UI线程上运行不需要在UI线程上运行的代码。库代码通常不需要这样做。因此,在库代码中,通常应该在所有用到await的地方使用. configureawait (false)来绕过这个检查。而在一般应用程序的代码中,应该默认只使用await而不使用ConfigureAwait,除非你知道你在做什么。

喜欢就来个三连,让更多人因你而受益

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/305618.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

android 活动切换动画,android – 在使用ChangeImageTransform共享元素转换的两个活动之间动画化ImageView...

要在具有共享元素的两个活动之间进行屏幕转换动画&#xff0c;您可以阅读this article并按照上述步骤&#xff1a;Enable window content transitions in your theme.Specify a shared elements transition in your style.Define your transition as an XML resource.Assign a …

怎样开始学习ERP?

最近我们公司的一个事业部来了一个新员工&#xff0c;行政主管把它领到我的面前&#xff0c;说他只接触过一些ERP&#xff0c;以后工作的主要内容就是负责该事业部内部的ERP系统。我也没有多问&#xff0c;因为他至于怎么被招聘的&#xff0c;已经不是我的事情了。我简单地问了…

win10右键一直转圈_Win10电脑开机一直转圈无法进入系统的解决方法

相信很多用户在安装Win10系统之后&#xff0c;常常会出现一些问题&#xff0c;其中就有电脑开不了机一直在转圈的情况&#xff0c;那么遇到电脑开机一直在转圈怎么办呢&#xff1f;下面笔者就针对这一情况和大家介绍Win10系统电脑开不了机一直在转圈的解决方法。解决方法开机马…

跨平台导PDF,结合wkhtmltopdf很顺手

前言好东西要分享&#xff0c;之前一直在使用wkhtmltopdf进行pdf文件的生成&#xff0c;常用的方式就是先安装wkhtmltopdf&#xff0c;然后在程序中用命令的方式将对应的html生成pdf文件&#xff0c;简单而且方便&#xff1b;但重复的编码使得想在wkhtmltopdf基础上进行封装&am…

女人15个秘密武器疼爱男人(女人必看)

一、成为他的空气 男人有时是更犹豫的&#xff0c;更孩子气的&#xff0c;比女人更少有勇气去承担爱的。他就像是那条在水里游来游去的鱼&#xff0c;一直在高唱着“我要自由”的歌曲。所以你只有慢慢***他的生活里&#xff0c;令他身在其中&#xff0c;舒适而不自觉&#xff0…

ueditor如何设置上传图片的高度宽度_怎么设置天猫主图

天猫主图很多人都设置过&#xff0c;不过&#xff0c;新手第一长主图往往不是很在行&#xff0c;所以也不懂怎么设置&#xff1f;那今天&#xff0c;麦顶电商就给大家科普下关于天猫长主图的知识&#xff0c;想要学习设置长主图的店主&#xff0c;这期的内容也正好适合你们。用…

鸿蒙系统2020正式版,鸿蒙2.0来了!华为开发者大会HDC 2020宣布

原标题&#xff1a;鸿蒙2.0来了&#xff01;华为开发者大会HDC 2020宣布华为官方宣布&#xff0c;2020年度华为开发者大会“HDC Together”将于9月10日至9月12日在东莞松山湖举办&#xff0c;早鸟门票明天起开售。根据预告&#xff0c;本次大会将带来主题演讲、技术论坛、行业大…

洋哥!我要进大厂!

阅读本文大概需要4分钟。最近不少读者咨询洋哥如何进大厂&#xff0c;回答的多了就想着要不整理出一个系列文章出来。说干就干&#xff0c;第一期先写写应届毕业和刚工作不久的计算机专业的童鞋如何进大厂。先说说必须要做到的几件事&#xff1a;第一&#xff0c;学校内的课程&…

战神II导演 首席程序员访谈(转自www.npc6.com )

当乔治卢卡斯在星球大战系列中取得了那样巨大的成功之后&#xff0c;伊文克什纳接手帝国反击战导演的任务肯定不是个轻松的担子&#xff0c;不过结果大家还是普遍的认为这是系列里最好一部。类似的&#xff0c;战神的导演David Jaffe也是一个难以追随的榜样。当他宣布了把导演的…

调整png的不透明度_TGA与PNG的优劣对比

一、TGA与PNG的特点&#xff08;一&#xff09;PNG 解码混乱 不稳定 有时候不能识别 放在AE里呈像杂质&#xff0c;斑点极多。PNG相当于视频格式中AVI。而TGA 更接近原始格式&#xff0c;更安全稳定&#xff0c;两者质量单独的最优无损情况下效果差不多。图1TGA格式二值化后放到…

这难道不是.NET5的bug? 在线求锤?

hello&#xff0c;最近在对一个使用.NET5项目的认证授权系统进行重构&#xff0c;对.NET 5的授权中间件的源码有些看法。也希望同学们能帮我理解。一个朴素的需求这是一个api项目&#xff0c;默认所有的api都需要授权&#xff0c; 少量散落在Controller各处的api不需要授权访问…

网管心得:优化网络性能给局域网提速[好文章]

网管心得:优化网络性能给局域网提速 目前&#xff0c;几乎任何稍微大一点的企业和学校都会建立一个局域网供使用&#xff0c;网络已经无处不在了。作为局域网络的网管人员&#xff0c;对于网络速度是非常在乎的&#xff0c;如何有效的利用带宽&#xff0c;避免不必要的速度损失…

win2008r2用户账户控制什么意思_养老保险统筹账户是什么意思?有什么用?

我国《社会保险法》规定&#xff1a;基本养老保险实行统筹账户和个人账户相结合的方式。对于在职职工来说&#xff0c;养老保险费分为单位缴费和个人缴费两部分&#xff1b;单位一般按照缴费基数的20%划入到统筹账户中&#xff0c;个人按照缴费基数的8%计入到个人账户里&#x…

android手机拍摄权限,react-native 手机拍照权限

第一步 &#xff1a;在 android/app/src/main/AndroidManifest.xml添加然后运行项目在手机应用权限哪里查看这是添加前的这是添加后的第二步调用import React, {Component} from react;import {View,Text,Image,StyleSheet,Button,Modal,TouchableHighlight,TouchableOpacity,D…

NHibernate for .NET 1.2

NHibernate for .NET 1.2 支持很多Hibernate 3.0的特性,性能方面据说有很大提高.下面这篇文章可以好好看看NHibernate Best Practices with ASP.NET, 1.2nd Ed.

Halcon和Opencv的区别?

1.MVTec HALCONMVTec HALCON 是世界上最全能的机器视觉软件.世界各地的用户从HALCON为快速开发图像分析和机器视觉程序的灵活架构获益匪浅.HALCON 提供了超过1100多种具备突出性能控制器的库,如模糊分析,形态,模式匹配,3D校正等.HALCON支持多个操作系统,编程语言和截获设备从而…

idea 一直在build_让web开发部署提速 8 倍的一款 IDEA 插件,你有在用?

原文:http://suo.im/4on4JE 来源于云栖社区>中间件小哥作为一个 Java 程序员&#xff0c;我们大多数会在 Intellij IDEA 中基于 SpringBoot 来开发 WEB 应用&#xff0c;所以本文中的测评将会基于以下几个架构来构建&#xff1a;开发环境&#xff1a;IDEA项目组织方式&…

Android 调用12306接口,GitHub - AndroidyxChen/loading-12306: 仿PC端12306的刷新loading的自定义view...

loading-12306仿PC端12306的刷新loading的自定义view效果图&#xff1a;核心代码及实现逻辑如下&#xff1a;mPaint.setColor(mColor);mPaint.setTextSize(50);//1、动画开启前&#xff0c;theCircle的初始值为-1&#xff0c;所以初始化时只走canvas.drawCircle()方法&#xff…

FreeBSD 下的 MySQL 备份方案

作/译者&#xff1a;叶金荣&#xff08;Email: &#xff09;&#xff0c;来源&#xff1a;http://imysql.cn&#xff0c;转载请注明作/译者和出处&#xff0c;并且不能用于商业用途&#xff0c;违者必究。核心提示&#xff1a;如何在 FreeBSD 下实现 MySQL 的全量及增量备份&am…

IdentityServer4(八)使用EntityFramework Core对数据进行持久化

上几篇&#xff0c;我们创建了客户端&#xff0c;scope&#xff0c;启动时&#xff0c;IdentityServer把这些配置数据加载至内存&#xff0c;但是&#xff0c;如果我们想要更改配置&#xff0c;就必须停掉IdentityServer&#xff0c;然后重新启动。且IdentityServe在r运行过程中…