我最近一直在熟悉.net Core中引入的新Channel<T>类型。我想在它第一次发布的时候我了解过它,但是有关文章非常非常少,我不能理解它们与其他队列有什么不同。
在使用了一段时间后,我终于看到了它们的吸引力和真正的力量。最值得注意的是大型异步后台操作,这些操作几乎需要双向通信来同步它们正在做的事情。这句话有点拗口,但希望在本系列文章结束时,你会清楚什么时候应该使用Channel<T>,什么时候应该使用一些更基本的东西,比如Queue<T>。
Channel是什么?
从核心来说,Channel本质上是.net中的一种新的集合类型,它与现有的Queue<T>类型非常相似,但有额外的好处。在真正尝试研究这个主题时,我发现的问题是,许多现有的外部队列技术(IBM MQ、Rabbit MQ等)都有“channel”的概念,它们的范围从完全抽象的思维过程,到系统中实际的物理类型。
现在也许我大错特错,但如果你认为.net中的Channel就好比是允许等待新消息的一个队列,并告诉生产者要保持队列越来越大,消费者无法跟上,我认为这很难出错。
这里我提到了一个关键词,生产者/消费者。你可能还听说过Pub/Sub。但它们是不可互换的。
Pub/Sub描述的是某人发布信息,一个或多个“订阅者”监听该信息并对其采取一定的响应行为。这里不存在负载平衡,因为当添加订阅服务器时,它们本质上与其他所有人获得相同消息的副本。
在图表形式中,Pub/Sub看起来有点像这样:
生产者/消费者描述生产者发布消息的行为,并且有一个或多个消费者可以对该消息进行操作,但是每个消息只读取一次。它不会分发到每个订阅者。
当然,用图表的形式:
另一种思考生产者/消费者的方式是想象你去超市结账。当顾客想结帐时,排队的队伍变长了,你可以简单地打开更多的收银台来处理这些顾客。这个小小的思考过程实际上是很重要的,因为如果你不能打开更多的收银台怎么办?排队的队伍应该越来越长吗?如果收银台操作员坐在那里,但没有顾客怎么办?他们是应该当天就打包回家呢,还是应该被告知坐着等客人来了再说。
这通常被称为生产者-消费者问题,这是Channel要解决的问题。
基础Channel示例
与Channel有关的所有东西都在System.Threading.Channels中。在以后的版本中,这似乎是与标准的.net Core项目捆绑在一起的,但如果不是,这里有一个nuget包:https://www.nuget.org/packages/System.Threading.Channels。
一个极其简单的Channel示例是这样的:
static async Task Main(string[] args)
{var myChannel = Channel.CreateUnbounded();for (int i = 0; i < 10; i++){await myChannel.Writer.WriteAsync(i);}while (true){var item = await myChannel.Reader.ReadAsync();Console.WriteLine(item);}
}
这里没有太多可谈的。我们创建了一个“无限的”通道(这意味着它可以容纳无限项,但在本系列的后续内容中会有更多内容)。我们写10项,读10项,在这一点上,它与我们在.net中见过的任何其他队列没有太大区别。
Channel是线程安全的
没错,通道是线程安全的。这意味着多个线程可以读写同一个通道而不会出现问题。如果我们看一下这里的Channel源代码,我们可以看到它是线程安全的,因为它使用锁和内部“队列”的组合来同步读/写器,一个接一个地读/写。
实际上,Channel的预期用例是多线程场景。例如,如果我们使用上面的基本代码,当我们实际上不需要线程安全性时,维护线程安全性实际上会有一些开销。所以在那个例子中,我们可能只使用Queue<T>更好。但是这段代码呢?
static async Task Main(string[] args)
{var myChannel = Channel.CreateUnbounded();_ = Task.Factory.StartNew(async () =>{for (int i = 0; i < 10; i++){await myChannel.Writer.WriteAsync(i);await Task.Delay(1000);}});while (true){var item = await myChannel.Reader.ReadAsync();Console.WriteLine(item);}
}
在这里,我们有一个单独的线程写入消息,而我们的主线程读取消息。你会注意到有趣的事情是,我们添加了消息之间的延迟。怎么能调用ReadAsync()?没有TryDequeue或Dequeue,如果队列中没有消息,它就运行null,对吗?
答案是Channel Reader的“ReadAsync()”方法实际上会“等待”一个消息。因此,不需要在等待消息时执行一些荒谬的循环,也不需要在等待时完全阻塞线程。我们将在以后的文章中进一步讨论这个问题,但是你要知道你可以使用ReadAsync来等待新的消息,而不是编写一些自定义的代码来做同样的事情。
接下来是什么?
现在你已经掌握了基础知识,下一篇让我们看看使用Channel一些更高级的场景。
原文链接:https://dotnetcoretutorials.com/2020/11/24/using-channels-in-net-core-part-1-getting-started/