多线程并发如何高效实现生产者/消费者?

【导读】无需引入第三方消息队列组件,我们如何利用内置C#语法高效实现生产者/消费者对数据进行处理呢?

在.NET Core共享框架(Share Framework)引入了通道(Channel),也就是说无需额外通过NuGet包安装,若为.NET Framework则需通过NuGet安装,前提是版本必须是4.6+(包含4.6),查询网上资料少的可怜,估计也有部分童鞋都没听说这玩意,所以接下来将通过几篇文章详细介绍其使用和底层具体实现原理

生产者/消费者概念

生产者/消费者这一概念,相信我们大家都不陌生,在日常生活无处不在、随处可见,其本质可用一句话概括:具有多个连续步骤的工作流程。比如美团外卖、再比如工厂里面的流水作业线、又比如线下实体快餐店等等

整个过程如同一条链,在这个链中每个步骤必须被完全隔离执行,生产者产生“东西”,然后对其交由下一步骤进行处理,最终到达消费者。

上述叙述为一切抽象,我们回到软件领域,在软件中每一块都在对应的线程中执行,以确保数据能得到正确处理,当然,这也就包括跨线程共享数据可能引起的并发问题。

此前我们利用内置BlockingCollection实现生产者/消费者机制(详见:.NET/.NET Core实现简单消息队列),但依然无法解决我们所面临的两个问题:其一:阻塞问题,其二:无任何基于Task的异步APi执行异步操作

通过引入System.Threading.Channel库则可以完美解决生产者/消费者问题,毫无疑问,线程安全是前提,性能测试有保证,异步提高吞吐量,配置选项够灵活。目前来看,利用通道可能将是实现生产者/消费者的最终手段

通道(Channel)概念

名为通道还是比较形象,如同管道一样,说到底就是线程安全的队列,既然是队列,那么势必涉及边界问题,通道类型分为有界通道和无界通道

有界通道(Bounded Channel):对传入数据具有指定容量,这也就意味着,若生产者产生的数据一旦达到容量空间,将不得不等待消费者执行完为生产者推送数据腾出额外可用空间

无界通道:(Unbounded Channel):对传入数据无上限,这也就意味着生产者可以持续不断发布数据,以此希望消费者能跟上生产者的节奏

到这里我们完全可得出一结论:因通道提供有界和无界选项,所以内置不可能利用并发队列来实现,一定是通过链表数据结构实现队列机制。

那么问题来了,全部指定为无界通道岂不万事大吉,这个问题想想就有问题,虽说无界通道为毫无上限,但计算机的系统内存不是,无论是有界通道抑或是无界通道都会通过缓存区来存储数据。所以选择正确的通道类型,取决于业务上下文。

那么问题又来了,若创建有界通道,一旦达到容量限制,通道应该如何处理呢?别担心,这个事情则交由我们根据实际业务情况来处理,边界通道容量满模式(BoundedChannelFullMode)枚举

???? Wait: 等待可用空间以完成写操作

???? DropNewest: 直接删除并忽略通道中的最新数据,以便为待写入数据腾出空间

???? DropOldest: 直接删除并忽略通道中的最旧数据,以便为待写入数据腾出空间

???? DropWrite: 直接删除要写入的数据

我们通过如下简单3个步骤实现生产者/消费者

创建通道类型

//创建通道类型
public static class Channel
{//有界通道(指定容量)public static Channel<T> CreateBounded<T>(int capacity);//有界通道(指定容量、配置通道满模式选项、配置读(是否单个读取)、写(是否单个写入)、是否允许延续同步操作)public static Channel<T> CreateBounded<T>(BoundedChannelOptions options);//无界通道public static Channel<T> CreateUnbounded<T>();//无界通道(配置读(是否单个读取)、写(是否单个写入)、是否允许延续同步操作)public static Channel<T> CreateUnbounded<T>(UnboundedChannelOptions options);
}

创建生产者


//向通道写入数据(生产者)
public abstract class ChannelWriter<T>
{  protected ChannelWriter();  //标识写入通道完成,不再有数据写入public void Complete(Exception error = null);  //尝试向通道写入数据,若被写入则返回true,否则为falsepublic abstract bool TryWrite(T item);//异步返回通道是否有可写入空间public abstract ValueTask<bool> WaitToWriteAsync(CancellationToken cancellationToken = default);//异步写入数据到通道public virtual ValueTask WriteAsync(T item, CancellationToken cancellationToken = default);
}

创建消费者


//从通道读取数据(消费者)
public abstract class ChannelReader<T>
{protected ChannelReader();public virtual Task Completion { get; }//异步读取通道所有数据public virtual IAsyncEnumerable<T> ReadAllAsync([EnumeratorCancellation] CancellationToken cancellationToken = default);//异步读取通道每一项数据public virtual ValueTask<T> ReadAsync(CancellationToken cancellationToken = default);//尝试向通道读取数据public abstract bool TryRead(out T item);//异步返回通道是否有可读取数据public abstract ValueTask<bool> WaitToReadAsync(CancellationToken cancellationToken = default);
}

有界通道(Channel)示例

一切已就绪,接下来我们通过示例重点演示有界通道,然后无界通道只不过是通道类型不同,额外增加选项配置而已

首先我们创建消息数据类

public class Message
{public Message(string data){Data = data;}public string Data { get; }
}

然后为方便观察生产者和消费者数据打印情况,在控制台中通过不同字体颜色来进行区分,简单来个日志类

public static class Logger
{private static readonly object obj = new object();public static void Log(string text, ConsoleColor color = ConsoleColor.White){lock (obj){Console.ForegroundColor = color;Console.WriteLine($"[{DateTime.Now:yyyy-MM-dd hh:mm:ss.ff}] - {text}");}}
}

接下来定义生产者发布数据

public class Producer
{private readonly ChannelWriter<Message> _writer;private readonly int _msgId;public Producer(ChannelWriter<Message> writer, int msgId){_writer = writer;_msgId = msgId;}public async Task PublishAsync(Message message, CancellationToken cancellationToken = default){await _writer.WriteAsync(message, cancellationToken);Logger.Log($"生产者 {_msgId} > 发布消息 【{message.Data}】", ConsoleColor.Yellow);}
}

消费者接收数据,为模拟演示,延迟50毫秒作为消息处理时间

public class Consumer
{private readonly ChannelReader<Message> _reader;private readonly int _msgId;public Consumer(ChannelReader<Message> reader, int msgId){_reader = reader;_msgId = msgId;}public async Task BeginConsumeAsync(CancellationToken cancellationToken = default){Logger.Log($"消费者 {_msgId} > 等待处理消息", ConsoleColor.Green);try{await foreach (var message in _reader.ReadAllAsync(cancellationToken)){Logger.Log($"消费者 ({_msgId})> 接收消息: 【{message.Data}】", ConsoleColor.Green);await Task.Delay(50, cancellationToken);}}catch (Exception ex){Logger.Log($"消费者 {_msgId} > 被强迫停止:{ex}", ConsoleColor.Green);}Logger.Log($"消费者 {_msgId} > 完成处理消息", ConsoleColor.Green);}
}

然后定义启动初始化生产者和消费者任务数量

//启动指定数量的消费者
private static Task[] StartConsumers(Channel<Message> channel, int consumersCount, CancellationToken cancellationToken)
{var consumerTasks = Enumerable.Range(1, consumersCount).Select(i => new Consumer(channel.Reader, i).BeginConsumeAsync(cancellationToken)).ToArray();return consumerTasks;
}//启动指定数量的生产者
private static async Task ProduceAsync(Channel<Message> channel,int messagesCount,int producersCount,CancellationTokenSource tokenSource)
{var producers = Enumerable.Range(1, producersCount).Select(i => new Producer(channel.Writer, i)).ToArray();int index = 0;var tasks = Enumerable.Range(1, messagesCount).Select(i =>{index = ++index % producersCount;var producer = producers[index];var msg = new Message($"{i}");return producer.PublishAsync(msg, tokenSource.Token);}).ToArray();await Task.WhenAll(tasks);Logger.Log("生产者发布消息完成,结束写入");channel.Writer.Complete();Logger.Log("等待消费者处理");await channel.Reader.Completion;Logger.Log("消费者正在处理");
}

最后一步则是创建通道类型(有界通道),启动生产者和消费者线程任务并运行

private static async Task Run(int maxMessagesToBuffer, int messagesToSend, int producersCount, int consumersCount)
{Logger.Log("*** 开始执行 ***");Logger.Log($"生产者数量 #: {producersCount}, 容量大小: {maxMessagesToBuffer}, 消息数量: {messagesToSend}, 消费者数量 #: {consumersCount}");var channel = Channel.CreateBounded<Message>(maxMessagesToBuffer);var tokenSource = new CancellationTokenSource();var cancellationToken = tokenSource.Token;var tasks = new List<Task>(StartConsumers(channel, consumersCount, cancellationToken)){ProduceAsync(channel, messagesToSend, producersCount, tokenSource)};await Task.WhenAll(tasks);Logger.Log("*** 执行完成 ***");
}

接下来我们在主方法中调用上述Run方法,指定有界通道容量为100,消费数量为10,生产者和消费者数量各为1,如下:

static async Task Main(string[] args)
{await Run(100, 10, 1, 1);Console.ReadLine();
}

根据业务上下文我们可指定有界通道满模式以及其他对应参数

var channel = Channel.CreateBounded<Message>(new BoundedChannelOptions(maxMessagesToBuffer)
{FullMode = BoundedChannelFullMode.Wait,SingleReader = true,SingleWriter = true,AllowSynchronousContinuations = false
});

关于无界通道没啥太多要讲解的地方,配置选项如下:

var channel = Channel.CreateUnbounded<Message>(new UnboundedChannelOptions()
{SingleReader = true,SingleWriter = true,AllowSynchronousContinuations = false
});

相比阻塞模型,通道提供异步支持以及灵活配置,更适合在实际业务场景中使用

关于通道大概就讲解这么多,后续我们将分析通道实现原理,更详细介绍请参看如下外链

An Introduction to System.Threading.Channels

https://devblogs.microsoft.com/dotnet/an-introduction-to-system-threading-channels/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/306676.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

js-cookie 无法设置cookie_php操作 cookie

1&#xff0c;设置cookie<?phpsetcookie(key);setcookie(key1,value1);setcookie(key2,value2,time()1*24*60*60);setcookie(key4, value4, time() 1 * 24 * 60 * 60, , , false, true); //一旦cookie的httponly为真&#xff0c;那么只能在服务端获取&#xff0c;js无法操…

.Net orm 开源项目 FreeSql 2.0.0

写在开头2018年11月头脑发热到今天&#xff0c;一晃已经两年&#xff0c;当初从舒服区走向一个巨大的坑&#xff0c;回头一看后背一凉。两年时间从无到有&#xff0c;经历数不清的日夜奋斗&#xff08;有人问花了多长时间投入&#xff0c;答案&#xff1a;全职x2 两年无休息&a…

c语言函数库——ispunct函数 判断字符是否为标点符号或特殊字符

c语言函数库——ispunct函数 判断字符是否为标点符号或特殊字符 头文件&#xff1a;#inlude <ctype.h> spunct() 函数用来检测一个字符是否为标点符号或特殊字符&#xff0c;其原型为&#xff1a; int ispunct(int c); 【参数】c 为需要检测的字符。 【返回值】若 c 为标…

js重新渲染div_前端工程师必备:从浏览器的渲染到性能优化

文章来自&#xff1a;华为云开发者社区摘要&#xff1a;本文主要讲谈及浏览器的渲染原理、流程以及相关的性能问题。问题前瞻1. 为什么css需要放在头部&#xff1f;2. js为什么要放在body后面&#xff1f;3. 图片的加载和渲染会阻塞页面DOM构建吗&#xff1f;4. dom解析完才出现…

做架构也得讲武德

这里是Z哥的个人公众号每周五11&#xff1a;45 按时送达当然了&#xff0c;也会时不时加个餐&#xff5e;我的第「169」篇原创敬上大家好&#xff0c;我是Z哥。今天分享一篇对「架构」这件事的随想。我想&#xff0c;做「架构」是每个热爱技术的技术人在不断追求想进入的领域。…

c++随机数函数rand()

c 语言rand()生成随机数 c语言中rand()函数生成随机数的用法&#xff1a; 详细介绍&#xff1a; (1)使用该函数首先应在开头包含头文件stdlib.h #include<stdlib.h>(C建议使用#include&#xff0c;下同) (2)在标准的C库中函数rand()可以生成0~RAND_MAX之间的一个随机数…

杂牌手柄模拟xboxone手柄_手机就能玩Switch游戏,蛋蛋模拟器+盖世小鸡X2手柄体验...

最近收到一个很爆炸的消息&#xff0c;国外大神开发出了EGG模拟器(蛋蛋模拟器)&#xff0c;让手机也能玩Switch游戏&#xff0c;一直垂涎Switch游戏体验的我怎么能错过呢&#xff0c;必须一探究竟。据悉&#xff0c;EGG模拟器支持100多款Switch游戏&#xff0c;而且游戏还在持续…

api-hook,更轻量的接口测试工具

前言在网站的开发过程中&#xff0c;接口联调和测试是至关重要的一环&#xff0c;其直接影响产品的核心价值&#xff0c;而目前也有许多技术方案和工具加持&#xff0c;让我们的开发测试工作更加便捷。接口作为数据传输的重要载体&#xff0c;数据格式和内容具有多样性&#xf…

C++11的for循环使用auto的新用法

C11的for循环使用auto的新用法 for(auto a:vec) { cout<<a<<" "; } #include<bits/stdc.h> using namespace std; int main() {vector<int> vec;for(int i0; i<10; i){vec.push_back(i);}for(auto a:vec){cout<<a<<" …

如何使用 C# 中的 HashSet

译文链接&#xff1a;https://www.infoworld.com/article/3586972/how-to-use-hashset-in-csharp.htmlHashSet 是一个优化过的无序集合&#xff0c;提供对元素的高速查找和高性能的set集合操作&#xff0c;而且 HashSet 是在 .NET 3.5 中被引入的&#xff0c;在 System.Collect…

python装饰器源代码_13-Python-装饰器

1、装饰器的定义 装饰器的本质就是函数&#xff0c;用来装饰其它函数&#xff0c;就是为其它函数添加附加功能。 装饰器原则如下&#xff1a; 不能修改被装饰的函数的源代码 不能修改被装饰的函数的调用方式 2、实现装饰器知识储备 函数即变量 1 defbar():2 print("in the…

算法设计与分析——分治与递归策略——hanoi问题

**汉诺塔问题&#xff1a;**古代有一个梵塔&#xff0c;塔内有三个座A、B、C&#xff0c;A座上有64个盘子&#xff0c;盘子大小不等&#xff0c;大的在下&#xff0c;小的在上&#xff08;如图&#xff09;。有一个和尚想把这64个盘子从A座移到B座&#xff0c;但每次只能允许移…

post多个参数_关于HTTP GET和POST的区别

Photo by Luca Bravo on UnsplashGET还是POST&#xff1f; 考虑将浏览器作为客户端&#xff0c;可以缓存哪种方法&#xff1f; 哪个是"安全"方法&#xff1f; 哪一个不是幂等的&#xff1f; 如果我将端点URL复制并粘贴到浏览器的地址栏中&#xff0c;然后按Enter&…

小试YARP

.net core下&#xff0c;一个轻量组反向代理库&#xff0c;由微软发起。做了一个简单的带验证的反向代理&#xff0c;应用结构如上图&#xff0c;一个验证服务&#xff0c;两个业务服务和一个YARP服务。源码https://github.com/axzxs2001/Asp.NetCoreExperiment/tree/master/As…

Entity Framework Core 5中实现批量更新、删除

本文介绍了一个在EntityFramework Core 5中不需要预先加载数据而使用一句SQL语句批量更新、删除数据的开发包&#xff0c;并且分析了其实现原理&#xff0c;并且与其他实现方案做了比较。一、背景随着微软全面拥抱开源&#xff0c;.Net开源社区百花开放&#xff0c;涌现了非常多…

篮子里拿鸡蛋问题

一个一个拿&#xff0c;正好拿完。两个两个拿&#xff0c;还剩一个。三个三个拿&#xff0c;正好拿完。 四个四个拿&#xff0c;还剩一个。五个五个拿&#xff0c;还差一个。六个六个拿&#xff0c;还剩三个。 七个七个拿&#xff0c;正好拿完。八个八个拿&#xff0c;还剩一个…

一套标准的ASP.NET Core容器化应用日志收集分析方案

点击上方蓝字给一个关注吧讲故事关注我公众号的朋友&#xff0c;应该知道我写了一些云原生应用日志收集和分析相关的文章&#xff0c;其中内容大多聚焦某个具体的组件&#xff1a;超级有用的TraceId&#xff0c;快点用起来吧&#xff01;如何利用NLog输出结构化日志&#xff0c…

算法设计与分析——递归与分治策略——棋盘覆盖

问题描述 棋盘覆盖问题要求在2^k * 2^k 个方格组成的棋盘中&#xff0c;你给定任意一个特殊点&#xff0c;用一种方案实现对除该特殊点的棋盘实现全覆盖。 建立模型如图&#xff1a; 解决方案就是利用分治法&#xff0c;将方形棋盘分成4部分&#xff0c;如果该特殊点在某一部…

函数求值需要运行所有线程_JavaScript函数式编程(二)

纯函数就是&#xff0c;对于相同的输入&#xff0c;永远会得到相同的输出&#xff0c;而且没有任何可观察的副作用&#xff0c;也不依赖外部环境的状态但是实际的编程中&#xff0c;特别是前端的编程范畴里&#xff0c;“不依赖外部环境”这个条件是根本不可能的&#xff0c;我…

算法设计与分析——递归与分治——归并排序

归并排序采用的是一种分治的思想&#xff0c;如下图&#xff0c;先将要排序的元素分为两块&#xff0c;每个块又开始分裂&#xff0c;然后逐个按照特定顺序合并&#xff0c;合成最后我们需要的数组。 归并排序的复杂度&#xff1a; 时间复杂度&#xff1a;O(nlogn) 空间复杂度&…