C# 8中的Async Streams

关键要点

  • 异步编程技术提供了一种提高程序响应能力的方法。

  • Async/Await模式在C# 5中首次亮相,但只能返回单个标量值。

  • C# 8添加了异步流(Async Streams),允许异步方法返回多个值,从而扩展了其可用性。

  • 异步流提供了一种用于表示异步数据源的绝佳方法。

  • 异步流是Java和JavaScript中使用的反应式编程模型的替代方案。

C# 5引入了Async/Await,用以提高用户界面响应能力和对Web资源的访问能力。换句话说,异步方法用于执行不阻塞线程并返回一个标量结果的异步操作。

微软多次尝试简化异步操作,因为Async/Await模式易于理解,所以在开发人员当中获得了良好的认可。

现有异步方法的一个重要不足是它必须提供一个标量返回结果(一个值)。比如这个方法async Task<int> DoAnythingAsync(),DoAnythingAsync的结果是一个整数(一个值)。

由于存在这个限制,你不能将这个功能与yield关键字一起使用,并且也不能将其与async IEnumerable<int>(返回异步枚举)一起使用。

如果可以将Async/Await特性与yield操作符一起使用,我们就可以使用非常强大的编程模型(如异步数据拉取或基于拉取的枚举,在F#中被称为异步序列)。

C# 8中新提出的Async Streams去掉了标量结果的限制,并允许异步方法返回多个结果。

这个变更将使异步模式变得更加灵活,这样就可以按照延迟异步序列的方式从数据库中获取数据,或者按照异步序列的方式下载数据(这些数据在可用时以块的形式返回)。

例如:

foreach await (var streamChunck in asyncStreams){Console.WriteLine($“Received data count = {streamChunck.Count});} 

Reactive Extensions(Rx)是解决异步编程问题的另一种方法。Rx越来越受到开发人员的欢迎。很多其他编程语言(如Java和JavaScript)已经实现了这种技术(RxJava、RxJS)。Rx基于推送式编程模型(Push Programming Model),也称为反应式编程。反应式编程是事件驱动编程的一种类型,它处理的是数据而不是通知。

通常,在推送式编程模型中,你不需要控制Publisher。数据被异步推送到队列中,消费者在数据到达时消费数据。与Rx不同,Async Streams可以按需被调用,并生成多个值,直到达到枚举的末尾。

在本文中,我将对拉取模型和推送模型进行比较,并演示每一种技术各自的适用场景。我将使用很多代码示例向你展示整个概念和它们的优点,最后,我将讨论Async Streams功能,并向你展示示例代码。

拉取式编程模型与推送式编程模型

640?wx_fmt=jpeg

图-1-拉取式编程模型与推送式编程模型

我使用的例子是著名的生产者和消费者问题,但在我们的场景中,生产者不是生成食物,而是生成数据,消费者消费的是生成的数据,如图-1所示。拉取模型很容易理解。消费者询问并拉取生产者的数据。另一种方法是使用推送模型。生产者将数据发布到队列中,消费者通过订阅队列来接收所需的数据。

拉取模型更合适“快生产者和慢消费者”的场景,因为消费者可以从生产者那里拉取其所需的数据,避免消费者出现溢出。推送模型更适合“慢生产者和快消费者”的场景,因为生产者可以将数据推送给消费者,避免消费者不必要的等待时间。

Rx和Akka Streams(流式编程模型)使用了回压技术(一种流量控制机制)。它使用拉取模型或推送模型来解决上面提到的生产者和消费者问题。

在下面的示例中,我使用了一个慢消费者从快生产者那里异步拉取数据序列。消费者在处理完一个元素后,会向生产者请求下一个元素,依此类推,直到到达序列的末尾。

动机和背景

要了解我们为什么需要Async Streams,让我们来看下面的代码。

// 对参数(count)进行循环相加操作 static int SumFromOneToCount(int count){ConsoleExt.WriteLine("SumFromOneToCount called!");var sum = 0;for (var i = 0; i <= count; i++){sum = sum + i;}return sum;}

方法调用:

const int count = 5;ConsoleExt.WriteLine($"Starting the application with count: {count}!");ConsoleExt.WriteLine("Classic sum starting.");ConsoleExt.WriteLine($"Classic sum result: {SumFromOneToCount(count)}");ConsoleExt.WriteLine("Classic sum completed.");ConsoleExt.WriteLine("################################################");ConsoleExt.WriteLine(Environment.NewLine);

输出:

640?wx_fmt=jpeg

我们可以通过使用yield运算符让这个方法变成惰性的,如下所示。

static IEnumerable<int> SumFromOneToCountYield(int count){ConsoleExt.WriteLine("SumFromOneToCountYield called!");var sum = 0;for (var i = 0; i <= count; i++){sum = sum + i;yield return sum;}}

调用方法:

const int count = 5;ConsoleExt.WriteLine("Sum with yield starting.");foreach (var i in SumFromOneToCountYield(count)){ConsoleExt.WriteLine($"Yield sum: {i}");}ConsoleExt.WriteLine("Sum with yield completed.");ConsoleExt.WriteLine("################################################");ConsoleExt.WriteLine(Environment.NewLine);

输出:

640?wx_fmt=jpeg

正如你在输出窗口中看到的那样,结果被分成几个部分返回,而不是作为一个值返回。以上显示的累积结果被称为惰性枚举。但是,仍然存在一个问题,即sum方法阻塞了代码的执行。如果你查看线程,可以看到所有东西都在主线程中运行。

现在,让我们将async应用于第一个方法SumFromOneToCount上(没有yield关键字)。

static async Task<int> SumFromOneToCountAsync(int count){ConsoleExt.WriteLine("SumFromOneToCountAsync called!");var result = await Task.Run(() =>{var sum = 0;for (var i = 0; i <= count; i++){sum = sum + i;}return sum;});return result;}

调用方法:

const int count = 5;ConsoleExt.WriteLine("async example starting.");// 相加操作是异步进行得!这样还不够,我们要求不仅是异步的,还必须是惰性的。var result = await SumFromOneToCountAsync(count);ConsoleExt.WriteLine("async Result: " + result);ConsoleExt.WriteLine("async completed.");ConsoleExt.WriteLine("################################################");ConsoleExt.WriteLine(Environment.NewLine);

输出:

640?wx_fmt=jpeg

我们可以看到计算过程是在另一个线程中运行,但结果仍然是作为一个值返回!

想象一下,我们可以按照命令式风格将惰性枚举(yield return)与异步方法结合起来。这种组合称为Async Streams。这是C# 8中新提出的功能。这个新功能为我们提供了一种很好的技术来解决拉取式编程模型问题,例如从网站下载数据或从文件或数据库中读取记录。

让我们尝试使用当前的C# 版本。我将async关键字添加到SumFromOneToCountYield方法中,如下所示。

640?wx_fmt=jpeg

图-2 组合使用async关键字和yield发生错误

我们试着将async添加到SumFromOneToCountYield,但直接出现错误,如上所示!

让我们试试别的吧。我们可以将IEnumerable放入任务中并删除yield关键字,如下所示:

static async Task<IEnumerable<int>> SumFromOneToCountTaskIEnumerable(int count){ConsoleExt.WriteLine("SumFromOneToCountAsyncIEnumerable called!");var collection = new Collection<int>();var result = await Task.Run(() =>{var sum = 0;for (var i = 0; i <= count; i++){sum = sum + i;collection.Add(sum);}return collection;});return result;}

调用方法:

const int count = 5;ConsoleExt.WriteLine("SumFromOneToCountAsyncIEnumerable started!");var scs = await SumFromOneToCountTaskIEnumerable(count);ConsoleExt.WriteLine("SumFromOneToCountAsyncIEnumerable done!");foreach (var sc in scs){// 这不是我们想要的,结果将作为块返回!!!!ConsoleExt.WriteLine($"AsyncIEnumerable Result: {sc}");}ConsoleExt.WriteLine("################################################");ConsoleExt.WriteLine(Environment.NewLine);

输出:

640?wx_fmt=jpeg

可以看到,我们异步计算所有的内容,但仍然存在一个问题。结果(所有结果都在集合中累积)作为一个块返回,但这不是我们想要的惰性行为,我们的目标是将惰性行为与异步计算风格相结合。

为了实现所需的行为,你需要使用外部库,如Ix(Rx的一部分),或者你必须使用新提出的C#特性Async Streams。

回到我们的代码示例。我使用了一个外部库来显示异步行为。

static async Task ConsumeAsyncSumSeqeunc(IAsyncEnumerable<int> sequence){ConsoleExt.WriteLineAsync("ConsumeAsyncSumSeqeunc Called");await sequence.ForEachAsync(value =>{ConsoleExt.WriteLineAsync($"Consuming the value: {value}");// 模拟延迟!Task.Delay(TimeSpan.FromSeconds(1)).Wait();});}static IEnumerable<int> ProduceAsyncSumSeqeunc(int count){ConsoleExt.WriteLineAsync("ProduceAsyncSumSeqeunc Called");var sum = 0;for (var i = 0; i <= count; i++){sum = sum + i;// 模拟延迟!Task.Delay(TimeSpan.FromSeconds(0.5)).Wait();yield return sum;}}

调用方法:

const int count = 5;ConsoleExt.WriteLine("Starting Async Streams Demo!");// 启动一个新任务,用于生成异步数据序列!IAsyncEnumerable<int> pullBasedAsyncSequence = ProduceAsyncSumSeqeunc(count).ToAsyncEnumerable();ConsoleExt.WriteLineAsync("X#X#X#X#X#X#X#X#X#X# Doing some other work X#X#X#X#X#X#X#X#X#X#");// 启动另一个新任务,用于消费异步数据序列!var consumingTask = Task.Run(() => ConsumeAsyncSumSeqeunc(pullBasedAsyncSequence));// 出于演示目的,等待任务完成!consumingTask.Wait();ConsoleExt.WriteLineAsync("Async Streams Demo Done!");

输出:

640?wx_fmt=jpeg

最后,我们实现了我们想要的行为!我们可以在枚举上进行异步迭代。

源代码在这里。

客户端/服务器端的异步拉取

我将使用一个更现实的例子来解释这个概念。客户端/服务器端架构是演示这一功能优势的绝佳方法。

客户端/服务器端同步调用

客户端向服务器端发送请求,客户端必须等待(客户端被阻塞),直到服务器端做出响应,如图-3所示。

640?wx_fmt=jpeg

图-3 同步数据拉取,客户端等待请求完成

异步数据拉取

客户端发出数据请求然后继续执行其他操作。一旦有数据到达,客户端就继续处理达到的数据。

640?wx_fmt=jpeg

图-4 异步数据拉取,客户端可以在请求数据时执行其他操作

异步序列数据拉取

客户端发出数据块请求,然后继续执行其他操作。一旦数据块到达,客户端就处理接收到的数据块并询问下一个数据块,依此类推,直到达到最后一个数据块为止。这正是Async Streams想法的来源。图-5显示了客户端可以在收到任何数据时执行其他操作或处理数据块。

640?wx_fmt=jpeg

图-5 异步序列数据拉取(Async Streams),客户端未被阻塞!

Async Streams

与IEnumerable<T>和IEnumerator<T>类似,Async Streams提供了两个新接口IAsyncEnumerable<T>和IAsyncEnumerator<T>,定义如下:

public interface IAsyncEnumerable<out T>{IAsyncEnumerator<T> GetAsyncEnumerator();}public interface IAsyncEnumerator<out T> : IAsyncDisposable    {Task<bool> MoveNextAsync();T Current { get; }}// Async Streams Feature可以被异步销毁public interface IAsyncDisposable{Task DiskposeAsync();}

Jonathan Allen已经在InfoQ网站上介绍过这个主题,我不想在这里再重复一遍,所以我建议你也阅读一下他的文章。

关键在于Task<bool> MoveNextAsync()的返回值(从bool改为Task<bool>,bool IEnumerator.MoveNext())。这样可以让整个计算和迭代都保持异步。大多数情况下,这仍然是拉取模型,即使它是异步的。IAsyncDisposable接口可用于进行异步清理。有关异步的更多信息,请点击此处。

语法

最终语法应如下所示:

foreach await (var dataChunk in asyncStreams){// 处理数据块或做一些其他的事情!}

如上所示,我们现在可以按顺序计算多个值,而不只是计算单个值,同时还能够等待其他异步操作结束。

重写微软的示例

我重写了微软的演示代码,你可以从我的GitHub下载相关代码。

这个例子背后的想法是创建一个大的MemoryStream(20000字节的数组),并按顺序异步迭代集合中的元素或MemoryStream。每次迭代从数组中拉取8K字节。

640?wx_fmt=jpeg

640?wx_fmt=jpeg

在(1)处,我们创建了一个大字节数组并填充了一些虚拟值。在(2)处,我们定义了一个叫作checksum的变量。我们将使用checksum来确保计算的总和是正确的。数组和checksum位于内存中,并通过一个元组返回,如(3)所示。

在(4)处,AsEnumarble(或者叫AsAsyncEnumarble)是一种扩展方法,用于模拟由8KB块组成的异步流( (6)处所示的BufferSize = 8000)。

通常,你不必继承IAsyncEnumerable,但在上面的示例中,微软这样做是为了简化演示,如(5)处所示。

(7)处是“foreach”,它从异步内存流中拉取8KB的块数据。当消费者(foreach代码块)准备好接收更多数据时,拉取过程是顺序进行的,然后它从生产者(内存流数组)中拉取更多的数据。最后,当迭代完成后,应用程序将’c’的校验和与checksum进行比较,如果它们匹配,就打印出“Checksums match!”,如(8)所示!

微软演示的输出窗口:

640?wx_fmt=jpeg

概要

我们已经讨论过Async Streams,它是一种出色的异步拉取技术,可用于进行生成多个值的异步计算。

Async Streams背后的编程概念是异步拉取模型。我们请求获取序列的下一个元素,并最终得到答复。这与IObservable<T>的推送模型不同,后者生成与消费者状态无关的值。Async Streams提供了一种表示异步数据源的绝佳方法,例如,当消费者尚未准备好处理更多数据时。示例包含了Web应用程序或从数据库中读取记录。

我已经演示了如何生成异步枚举数据,并使用外部异步序列库来消费枚举数据。我也演示了如何将这个功能用于从Web站点下载内容。最后,我们看到了新的Async Streams语法和一个完整的示例,该示例是基于微软的Build Demo Code(2018年5月7日至9日,西雅图,华盛顿州)。

关于作者

640?wx_fmt=jpegBassam Alugili 是STRATEC AG的高级软件专家和数据库专家。STRATEC是全球领先的全自动分析仪系统、实验室数据管理软件和智能耗材的合作伙伴。

原文地址:http://www.infoq.com/cn/articles/Async-Streams


.NET社区新闻,深度好文,欢迎访问公众号文章汇总 http://www.csharpkit.com

640?wx_fmt=jpeg

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/319828.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

线段树回顾

这个博主的线段树我觉得讲的很细了 文章目录建树区间查询&#xff0c;单点修改区间修改&#xff0c;单点查询区间修改&#xff0c;区间查询&#xff08;带pushdown&#xff09;乘法线段树根号线段树建树 struct node{ll l,r;ll sum,mlz,plz; }tree[4*maxn]; inline void bulid…

P3175-[HAOI2015]按位或【min-max容斥,FWT】

正题 题目链接:https://www.luogu.com.cn/problem/P3175 题目大意 开始有一个nnn位二进制数s0s0s0&#xff0c;每次有pip_ipi​概率选取数字iii让sss或上这个数字iii&#xff0c;求期望多少次能够让sss的nnn个位都变为111。 解题思路 因为是或所以我们只关心最后一个选中的数…

【DP】建学校问题(luogu 2803/2018 特长生 T3)

正题 luogu 2803 题目大意 给出n个点&#xff0c;和相邻的点的距离&#xff0c;每个点有一个权值&#xff0c;现在让你建k个特殊点&#xff0c;使所有点到其中一个特殊点的代价之和最小 解题思路 先预处理处一个区间到同一个特殊点的最小代价 然后DP即可 – 代码 #inclu…

[XSY3112] 接水果(树上包含路径,整体二分,扫描线)

传送门 给出一棵nnn个点的树。接下来给出PPP条树上路径ai→bia_i\to b_iai​→bi​&#xff0c;及其权值cic_ici​。最后有QQQ个询问&#xff0c;每个询问给出一条树上路径ui→viu_i\to v_iui​→vi​&#xff0c;问在包含ui→viu_i\to v_iui​→vi​的所有树上路径中&#xf…

Tr A HDU1575

文章目录题目&#xff1a;题解&#xff1a;代码&#xff1a;Tr A HDU1575题目&#xff1a; A为一个方阵&#xff0c;则Tr A表示A的迹&#xff08;就是主对角线上各项的和&#xff09;&#xff0c;现要求Tr(A^k)%9973。 Input 数据的第一行是一个T&#xff0c;表示有T组数据。 每…

asp.net core webapi项目配置全局路由

一、前言在开发项目的过程中&#xff0c;我新创建了一个controller&#xff0c;发现vs会给我们直接在controller头添加前缀&#xff0c;比如[Route("api/[controller]")],即在访问接口的时候会变成http://localhost:8000/api/values&#xff0c;但是如果控制器有很多…

CF622F-The Sum of the k-th Powers【拉格朗日插值】

正题 题目链接:https://www.luogu.com.cn/problem/CF622F 题目大意 给出n,kn,kn,k&#xff0c;求 ∑i1nik\sum_{i1}^ni^ki1∑n​ik 解题思路 很经典的拉格朗日差值问题 这个东西显然是可以化成一个k1k1k1次的多项式的&#xff0c;所以我可以直接代k2k2k2个点插出值来。看到顺…

平板游戏问题(luogu 2003/2018 特长生 T4)

正题 luogu 2003 题目大意 在平面上有若干块板子&#xff0c;每块板子的左右端分别向下连一条柱子&#xff0c;连到第一块板子&#xff0c;问你共要多少长度的柱子 解题思路 枚举一个板子中间的柱子&#xff08;即对这些柱子可能有贡献&#xff09; 然后取一个最高的立即可…

位运算及其性质

定义 运算名符号效果按位与&如果两个相应的二进制位都为1&#xff0c;则该位的结果值为1&#xff0c;否则为0按位或l两个相应的二进制位中只要有一个为1&#xff0c;该位的结果值为1按位异或^若参加运算的两个二进制位值相同则为0&#xff0c;否则为1取反~对一个二进制数按…

C Looooops POJ - 2115

C Looooops POJ - 2115 题目&#xff1a; A Compiler Mystery: We are given a C-language style for loop of type statement; I.e., a loop which starts by setting variable to value A and while variable is not equal to B, repeats statement followed by increasing …

IdentityServer4实战 - 谈谈 JWT Token 的安全策略

一.前言众所周知&#xff0c;IdentityServer4 默认支持两种类型的 Token&#xff0c;一种是 Reference Token&#xff0c;一种是 JWT Token 。前者的特点是 Token 的有效与否是由 Token 颁发服务集中化控制的&#xff0c;颁发的时候会持久化 Token&#xff0c;然后每次验证都需…

P4320-道路相遇,P5058-[ZJOI2004]嗅探器【圆方树,LCA】

两题差不多就一起写了 P4320-道路相遇 题目链接:https://www.luogu.com.cn/problem/P4320 题目大意 nnn个点mmm条边的一张图&#xff0c;qqq次询问两个点之间路径的必经点数量。 解题思路 建出圆方树然后问题就变为询问两个点之间路径的圆点数量&#xff0c;可以直接倍增L…

【Manacher】绿绿和串串(luogu 5446)

正题 luogu 5446 题目大意 定义对折为&#xff1a;以最后一个字符为对称轴翻转 问你一个字符串可能是由那些前缀对折若干次而来的 解题思路 先用Manacher求出回文长度 那么满足以下条件之一的前缀就是可行方案 最后一个字符的回文长度加上它的位置为n&#xff08;后面若干…

使用.Net Core实现FNV分布式hash一致性算法

说到FNV哈希算法不得不提Memcached&#xff0c;我们先简单介绍一下Memcached。MemcachedMemcached分为客户端与服务端&#xff0c;Memcached是服务端&#xff0c;服务端本身不提供分布式实现&#xff0c;只是一个单独的k-v缓存&#xff1b;Memcached的分布式是在客户端类库中实…

一起开心暑假集训第一周限时训练 2020/7/5

文章目录A - Goldbachs Conjecture POJ - 2262B - 同余方程 计蒜客 - T2010C - Tr A HDU - 1575D - C Looooops POJ - 2115vjudge试题集链接 A - Goldbach’s Conjecture POJ - 2262 试题链接&#xff1a; 线性筛先预处理&#xff0c;然后判断就行 #include<iostream>…

P5631-最小mex生成树【线段树,并查集】

正题 题目链接:https://www.luogu.com.cn/problem/P5631 题目大意 nnn个点mmm条边的一张图&#xff0c;求mexmexmex值最小的一棵生成树。 解题思路 考虑比较暴力的做法&#xff0c;枚举答案&#xff0c;然后判断其他边能否构成一棵生成树。 发现一条边会被重复加入多次&…

【Manacher】【贪心】字符串连接(金牌导航 Manacher-4)

正题 金牌导航 Manacher-4 题目大意 给出一个字符串&#xff0c;让你用最少的回文串连接得到该串&#xff08;这里连接是可以有重合的&#xff09; 解题思路 先用Manacher求出以x为左端点的回文串右端点最大的位置 然后在当前回文串中贪心求下一回文串的右端点 代码 #incl…

[XSY4197] Snow(树形DP)

我们在树上的每个点iii上放aia_iai​个小点&#xff0c;初始时先让每个点单独减&#xff0c;这样要花费aia_iai​之和的次数。 然后尝试把某些减合并。一个点上面的小点至多可以向两个相邻的小点连边&#xff08;这两个小点不能在同一个点上&#xff09;。每连一条边&#xff…

Followme Devops实践之路

引言天下武功,唯快不破想要提高开发团队效率&#xff0c;势必要有一套完整而成熟的开发流程方案&#xff0c;除了sprint迭代开发模式之外,还有近几年流行的devops流程,都是可以大幅度提高开发效率的工具. 我们团队也不断探索、实践&#xff0c;最终形成了现有的一套体系&#x…

【每日一题】6月30日 Growth

来源&#xff1a; 时间限制&#xff1a;C/C 1秒&#xff0c;其他语言2秒 空间限制&#xff1a;C/C 1048576K&#xff0c;其他语言2097152K 64bit IO Format: %lld文章目录题目描述题解&#xff1a;代码&#xff1a;题目描述 弱弱有两个属性a和b&#xff0c;这两个属性初始的时…