初探System.Threading.Channels

640?wx_fmt=jpeg



System.Threading.Channels是.Net Core基础类库中实现的一个多线程相关的库,专门处理数据流相关的操作,用来在生产者和订阅者之间传递数据(不知道可不可以理解为线程间传递数据,我把它类比成了Go语言中的Channel),使用时需要通过NuGet安装。

这个库的前身是System.Threading.Tasks.Channels,来自实验性质的核心类库项目https://github.com/dotnet/corefxlab,但是在2017年9月就不再更新了,目前使用的话需要用到最新的System.Threading.Channels库,如果你也是第一次接触的话,就直接上手研究System.Threading.Channels就可以了。

Channel API操作基于Channel对象,其操作主要由ChannelReaderChannelWriter两部分组成,由Channelt提供的工厂方法创建一个有容量限制(或者无限制、最大容量限制)的channel。这点类似于Go语言中的chan的容量,二者在这里有很多的类似的地方,也有不同的地方。

1.1. 和Go语言channel的一些比较

Go语言中的channel默认是没有容量的,在使用这个没有容量的channel时,生产者和消费者必须“流动”起来,否则将会阻塞,也就是当生产者写入channel一个数据时,必须同时有一个接收者接收,否则写入操作会停止,等待有一个消费者取走channel中的数据,写入操作才会继续。

System.Threading.Channels库中,没有类似Go语言的默认容量的机制,需要按需调用不同的Channel对象:

  • public static Channel<T> CreateBounded<T>(int capacity); :可以创建一个带有容量限制的Channel实例对象。

  • public static Channel<T> CreateBounded<T>(BoundedChannelOptions options) :创建一个自定义配置的Channel实例对象,可配置容量、以及在接收到新数据时的操作模式等等:

    • BoundedChannelFullMode.Wait:等待当前写入完成

    • BoundedChannelFullMode.DropNewest:删除并忽略管道中写入的最新的数据

    • BoundedChannelFullMode.DropOldest:删除并忽略管道中最旧的数据

    • BoundedChannelFullMode.DropWrite:删除当前正在写的数据,以写入管道中的新数据

  • public static Channel<T> CreateUnbounded<T>(); :创建一个没有容量限制的Channel实例对象,在实际使用时应当谨慎使用该创建方式,因为可能会发生OutOfMemoryException

  • public static Channel<T> CreateUnbounded<T>(UnboundedChannelOptions options):创建一个自定义配置的没有容量限制的Channel实例对象。该配置选项因为没有容量限制所以不会有写入等待操作模式,只有默认的一些配置:

    • public bool SingleWriter { get; set; }:是否需要一个一个读

    • public bool SingleReader { get; set; }:是否需要一个一个写

    • public bool AllowSynchronousContinuations { get; set; }:是否需要异步连续操作(我个人理解为异步操作时同时进行读写)

Go语言的channel机制和System.Threading.Channels的不同之处有两个:

  1. Go语言没有无限容量的channel,而且就我个人的想法而言,无限容量并不“无限”,因为内存是有限的。

  2. System.Threading.Channels没有单向的channel类型。在Go中可以创建“只读”或者“只写”的channel,但是System.Threading.Channels中没有提供这种操作。

1.2. 生产者、消费者需要的方法

生产者需要使用的一些方法:TryWrite/WriteAsync/WaitToWriteAsync/Complete 消费者需要使用的一些方法:TryRead/ReadAsync/WaitToReadAsync/Completion

方法介绍:

  • TryRead/TryWrite:尝试使用同步方式读取或写入一项数据,返回读取或者写入是否成功。TryRead同时会以out的形式返回读取到的数据。

  • ReadAsync/WriteAsync:使用异步方式写入或者读取一项数据。

  • TryComplete/Completion:可以将channel标记为完成状态,这样就不会写入多余的错误数据,如果从已完成状态的channel中ReadAsync时会抛出异常,所以在不需要异步读取时建议经常使用TryRead

  • WaitToReadAsync/WaitToWriteAsync:在尝试读取或者写入数据之前,调用该方法可获得一个Task<bool>表示读取或者写入操作能否进行。

创建一个控制台程序演示channel的用法:

 1 2 3 4 5 6 7 8 9101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
using System;using System.Collections.Generic;using System.Threading.Channels;using System.Threading.Tasks;namespace ConsoleApp1{    class Program    {        static void Main(string[] args)        {            Task.Run(async () =>            {                await ChannelRun(0,0, 1, 50, 5);            });            Console.WriteLine("运行开始...");            Console.ReadLine();        }        /// <summary>        /// channel运行        /// </summary>        /// <param name="readDelayMs">读取器每次读取完等待时间</param>        /// <param name="writeDelayMs">写入器每次写入完等待时间</param>        /// <param name="finalNumberOfReaders">几个读取器同时读取</param>        /// <param name="howManyMessages">写入器总共写入多少消息</param>        /// <param name="maxCapacity">channel最大容量</param>        /// <returns></returns>        public static async Task ChannelRun(int readDelayMs, int writeDelayMs, int finalNumberOfReaders,int howManyMessages, int maxCapacity )        {            // 创建channel            var channel = Channel.CreateBounded<string>(maxCapacity);            var reader = channel.Reader;            var writer = channel.Writer;            var tasks = new List<Task>();            // 读取器执行读取任务,可以设置多个读取器同时读取            for (var i = 0; i < finalNumberOfReaders; i++)            {                var idx = i;                tasks.Add(Task.Run(() => Read(reader, idx + 1,readDelayMs)));            }            // 写入器执行写入操作            for (var i = 0; i < howManyMessages; i++)            {                Console.WriteLine($"写入器在{DateTime.Now.ToLongTimeString()}写入:{i}");                await writer.WriteAsync($"发布消息:'{i}");                // 写入完等待片刻                await Task.Delay(writeDelayMs);            }            // 写入器标记完成状态            writer.Complete();            // 等待读取器读取完成            await reader.Completion;            // 等待读取器所有的Task完成            await Task.WhenAll(tasks);        }        /// <summary>        /// 读取数据任务        /// </summary>        /// <param name="theReader">读取器</param>        /// <param name="readerNumber">读取器编号</param>        /// <param name="delayMs">读取完等待时间</param>        /// <returns>任务</returns>        public static async Task Read(ChannelReader<string> theReader, int readerNumber, int delayMs)        {            // 循环判断读取器是否完成状态            while (await theReader.WaitToReadAsync())            {                // 尝试读取数据                while (theReader.TryRead(out var theMessage))                {                    Console.WriteLine($"线程{readerNumber}号读取器在{DateTime.Now.ToLongTimeString()}读取到了消息:'{theMessage}'");                    // 读取完等待片刻                    await Task.Delay(delayMs);                }            }        }    }}

借助代码中的注释应当可以理解示例代码的作用,对其中的关键点做个说明:

  • 写入器只有一个,写入的容量由channel的容量控制。

  • 读取器可以设置多个,由Task调度同时读取。

2.1. 写入器、读取器无等待

写入器和读取器不等待,不停的读写数据,有一个读取器,总共写入50个数据,channel的容量为5,调用传参如下:

1234
Task.Run(async () =>{    await ChannelRun(0,0, 1, 50, 5);});

结果: 640?wx_fmt=gif

写入读取操作在一秒内完成了,观察输出可以发现,写入和读取交替进行,写入的数据会立刻被读取器读取出来打印在终端内。

2.2. 读取器阻塞(等待)

将读取器的等待时间设置长一些,观察一下写入器是否会被阻塞,调用传参如下:

1234
Task.Run(async () =>{    await ChannelRun(10000,0, 1, 50, 5);});

结果: 640?wx_fmt=gif

从输出的结果可以看见,在程序开始时写入器写入了6个数据(但是调试的时候capacity的值时5,这里的机制有待考证),然后每过10秒读取器读取一个数据后,写入器才能写入一个数据,由于读取器的速度限制相当于将写入器也进行了阻塞。

2.3. 多个读取器同时读取

读取器还是每读取一次暂停10秒,但是有5个Task同时读取,调用传参如下:

1234
Task.Run(async () =>{    await ChannelRun(10000,0, 1, 50, 5);});

结果: 640?wx_fmt=gif

从输出可以看出来,5个读取器Task可以每10秒钟同时读取5个数据,而写入器也同样的几乎是每次写入5个数据。

System.Threading.Channels作为一个线程间通信的库,用来当作发布者/订阅者组件使用非常方便。但是比起Go语言中的channel还是有些区别的,因为c#的Async/Await从某中意义上讲,并不是真正的多线程。

 1 2 3 4 5 6 7 8 91011121314151617181920212223242526272829303132333435363738394041424344454647
package mainimport (	"fmt"	"time")func main() {	fmt.Println("运行开始...")	channelRun(2000, 0, 5, 50, 10)}// channel运行实例// readDelayMs, writeDelayMs分别是读取器需要暂停的时间和写入器需要暂停的时间// finalNumberOfReaders是读取器个个数// howManyMessages是消息的总数// maxCapacity是channel的容量func channelRun(readDelayMs, writeDelayMs, finalNumberOfReaders, howManyMessages, maxCapacity int) {	// 创建channel	channel := make(chan string, maxCapacity)	for i := 0; i < finalNumberOfReaders; i++ {		go func(i int) {			read(channel, i, readDelayMs)		}(i)	}	for i := 0; i < howManyMessages; i++ {		fmt.Printf("写入器在%v写入:%v\n", time.Now().Format("2006-01-02 15:04:05.0000"), i)		channel <- fmt.Sprintf("发布消息:%v", i)		time.Sleep(time.Duration(writeDelayMs) * time.Millisecond)	}}// 读取器从channel中读取数据// channel时chan的实例// readerNumner代表了当前Goroutine的编号// delayMs表示当前的Goroutine读取完需要等待的时间func read(channel chan string, readerNumber, delayMs int) {	// 不断循环读取	for {		select {		case msg := <-channel:			fmt.Printf("%v号Gouroutine 读取器在%v读取到了消息:'%s'\n", readerNumber, time.Now().Format("2006-01-02 15:04:05.0000"), msg)			time.Sleep(time.Duration(delayMs) * time.Millisecond)		}	}}

结果: 640?wx_fmt=png

https://medium.com/@alexyakunin/go-vs-c-part-1-goroutines-vs-async-await-ac909c651c11

https://github.com/dotnet/corefx/blob/master/src/System.Threading.Channels/tests/ChannelTests.cs

https://sachabarbs.wordpress.com/2018/11/28/system-threading-channels/

640?wx_fmt=png


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/315120.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Codeforces Round #715 (Div. 1) B. Almost Sorted 找规律

传送门 文章目录题意&#xff1a;思路&#xff1a;题意&#xff1a; 思路&#xff1a; 找规律yydsyydsyyds。 一看没什么想法&#xff0c;所以打了个表&#xff0c;好家伙&#xff0c;不打不知道&#xff0c;一打不得了&#xff0c;下面是n6n6n6的符合要求的情况&#xff1a; …

SQL Server之索引解析(二)

、堆表堆表通过IAM连接一起&#xff0c;查询时全表扫描。1、1 非聚集索引结构叶子节点数据结构&#xff1a;行数据结构Rid&#xff08;8字节&#xff09;中间节点数据结构&#xff1a; &#xff08;非聚集非唯一索引&#xff09;行数据结构Page&#xff08;4&#xff09;2 Rid&…

纠正一个错误,分布式系统关注点第17篇

这里是Z哥的个人公众号每周五早8点 按时送达当然了&#xff0c;也会时不时加个餐&#xff5e;我的第「78」篇原创敬上今天来加个餐&#xff0c;紧急纠正一个错误。先和大家说一声抱歉&#xff1a;D昨晚睡觉前&#xff0c;惯例打开「订阅号助手」回复一些留言。有一位小伙伴提了…

【NOI2016】国王饮水记【贪心】【斜率优化】【决策单调性】

传送门 首先比h1h_1h1​小的肯定没用&#xff0c;直接无视 然后考虑合并的顺序 ①在无限制的情况下&#xff0c;合并多个不如一个一个合并 a<b<ca<b<ca<b<c时&#xff0c;ab2c2>abc3{{ab \over 2}c\over 2}>{{abc}\over 3}22ab​c​>3abc​ ②先…

CF946D Timetable 背包dp + 思维转换

传送门 文章目录题意&#xff1a;思路&#xff1a;题意&#xff1a; n,m,k≤500n,m,k\le500n,m,k≤500 思路&#xff1a; 将其转换成背包的模型&#xff0c;就可以想出来一个很明显的dpdpdp状态&#xff1a;f[i][j]f[i][j]f[i][j]表示前iii行花费了jjj的最小代价&#xff0c;…

.NET开发框架(三)-高可用服务器端设计

我们对框架功能作了简述&#xff0c;演示视频请点击 这里查看 &#xff0c;本章节&#xff0c;我们专门讲解一下&#xff0c;如何在Window服务器下&#xff0c;设计高可用的框架。我们的框架设计采用的是Window版本的服务端设计&#xff1a;整体框架图如下&#xff0c;为什么我…

P1537 弹珠 背包可行性dp

传送门 文章目录题意&#xff1a;思路&#xff1a;题意&#xff1a; 思路&#xff1a; 疯狂水文章。 这个很明显是个背包&#xff0c;我们开一个布尔数组&#xff0c;之后枚举每组的个数&#xff0c;让后枚举1−61-61−6&#xff0c;再枚举容量kkk&#xff0c;注意顺序不能错了…

.NET Core 3.0中的WinForms创建集中式拉取请求中心

Windows 窗体&#xff08;或简称 WinForms&#xff09;&#xff0c;多年来被用于开发具有丰富和交互式界面的基于 Windows 的强大应用程序。各类企业对这些桌面应用程序的投入量非常巨大&#xff0c;每月有大约 240 万开发人员使用 Visual Studio 创建桌面式应用。利用和扩展现…

CF296B dp\容斥

传送门 文章目录题意&#xff1a;思路&#xff1a;题意&#xff1a; n≤1e5n\le1e5n≤1e5 思路&#xff1a; 求方案数基本就是考虑dpdpdp了&#xff0c;看到nnn这么大可以考虑一下分情况讨论的dpdpdp状态。 设f[i][j]f[i][j]f[i][j]表示到了第iii个&#xff0c;状态为jjj的方…

asp.net core 自定义异常处理中间件

Intro在 asp.net core 中全局异常处理&#xff0c;有时候可能不能满足我们的需要&#xff0c;可能就需要自己自定义一个中间件处理了&#xff0c;最近遇到一个问题&#xff0c;有一些异常&#xff0c;不希望记录错误日志&#xff0c;目前主要是用户请求取消导致的 TaskCanceled…

CF786B Legacy 线段树优化建图

传送门 文章目录题意&#xff1a;思路&#xff1a;题意&#xff1a; 实现如下连边后跑最短路。 思路&#xff1a; 优化建图板子题&#xff0c;优化思路就是将区间分割成若干个线段树上的线段&#xff0c;与线段树分治有点类似&#xff0c;由于有点向区间也有区间向点的边&a…

【ZJOI2015】幻想乡 Wi-Fi 搭建计划【几何】【贪心】【dp】

传送门 题意&#xff1a;一个x∈(−∞,∞),y∈[0,R]x\in(-\infin,\infin),y\in[0,R]x∈(−∞,∞),y∈[0,R]的矩形中有nnn个点&#xff0c;矩形外有mmm个半径均为RRR的圆&#xff0c;有独立的代价cic_ici​。求覆盖最多的点所需的最小代价。 n,m≤100n,m\leq100n,m≤100 显然先…

.NET架构开发应知应会

.NET程序是基于.NET framework、.NET Core、Mono、UWP【.NET实现】开发和运行的 &#xff0c;定义以上【.NET实现】的标准规范称为.NET StandardL1&#xff1a;.NET Standard.NET标准是一组API集合&#xff0c;由上层三种【.NET实现】的Basic Class Library实现&#xff0c;更正…

几个冷门字符串算法的学习笔记(最小表示法,exKMP,Lyndon Word)

所有下标均从1开始 最小表示法 给定一个串&#xff0c;求字典序最小的循环同构。 我们把串复制一遍接在后面&#xff0c;然后求出[1,N][1,N][1,N]开始的长为NNN的子串中最小的 先设i1,j2i1,j2i1,j2 然后暴力找出iii和jjj往后匹配的第一个不同的位置&#xff0c;记为ikikik…

.NET Core IdentityServer4实战 第Ⅴ章-单点登录

OiDc可以说是OAuth的改造版&#xff0c;在最初的OAuth中&#xff0c;我们需要先请求一下认证服务器获取下Access_token&#xff0c;然后根据Access_token去Get资源服务器, 况且OAuth1 和 2 完全不兼容&#xff0c;易用性差&#xff0c;而OIDC可以在登陆的时候就把信息返回给你&…

【CF594E】Cutting the Line 【贪心】【Lyndon Word】【扩展kmp】

传送门 题意&#xff1a;给一个字符串SSS和正整数kkk&#xff0c;将SSS分成最多kkk段&#xff0c;每段不变或翻转&#xff0c;使得最后的字典序最小。 ∣S∣≤5106|S|\leq5\times10^6∣S∣≤5106 发现不翻转可以看成拆成若干单字符分别翻转&#xff0c;所以先分析一下必须翻转…

一份好的工作总结才能帮你升职加薪

这里是Z哥的个人公众号每周五早8点 按时送达当然了&#xff0c;也会时不时加个餐&#xff5e;我的第「79」篇原创敬上最近有点忙&#xff0c;搬出之前攒的一篇文章来应急一下。一篇能助你挣更多钱的文章。好了&#xff0c;下面开始。我的读者们大部分是互联网行业的&#xff0c…

腾讯开源软件镜像站上线

腾讯开源软件镜像站(Tencent Open Source Mirror Site)已于近日上线&#xff0c;其官方名称为「腾讯云软件源」&#xff0c;由腾讯云提供支持。官方表示搭建此开源镜像站的目的在于宣传自由软件的价值&#xff0c;提高自由软件社区文化氛围&#xff0c;推广自由软件在国内的应用…

ASP.NET Core on K8S学习初探(2)

“ [LOG] ASP.NET Core on K8S Starting...”在上一篇《单节点环境搭建》中&#xff0c;通过Docker for Windows在Windows开发机中搭建了一个单节点的K8S环境&#xff0c;接下来就是动人心弦的部署ASP.NET Core API到K8S了。但是&#xff0c;在部署之前&#xff0c;我还是把基本…

Educational Codeforces Round 96 E. String Reversa 线段树模拟序列交换

传送门 文章目录题意&#xff1a;思路&#xff1a;题意&#xff1a; 思路&#xff1a; 与上一篇题解大同小异&#xff0c;无非就是不需要枚举排列了。 // Problem: E. String Reversal // Contest: Codeforces - Educational Codeforces Round 96 (Rated for Div. 2) // URL:…