【C#】并行编程实战:使用并发集合

        在上一章的并行编程实现里,为了保护资源,我们对共享资源加锁(各种同步原语)来进行保护,避免多线程同时访问(主要是写入)。但一般来说,共享资源是一个可以由多个线程读写的集合,即便多线程也应该能够同时写入。因此,使用同步原语对于这种数据集合来说,就不是很合适。

        本章将学习线程安全(Thread-Safe)的集合。本章的内容还是比较简单,主要是代码示例较多。

        本教程对应学习工程:魔术师Dix / HandsOnParallelProgramming · GitCode


1、并发集合详解

        从 .NET Framework 4 开始,许多线程安全集合被添加到 .NET 库中,例如:

  • IProducerConsumerCollection<T>

  • BlockingCollection<T>

  • ConcurrentDictionary<TKey, TValue>

        这些都包含在了命名空间 System.Collections.Concurrent 之中。

System.Collections.Concurrent 命名空间 | Microsoft Learn提供多个线程安全集合类,只要多个线程同时访问集合,就应使用这些类来代替 System.Collections 和 System.Collections.Generic 命名空间中的相应类型。 但是,不保证通过扩展方法或通过显式接口实现访问集合对象是线程安全的,可能需要由调用方进行同步。 icon-default.png?t=N6B9https://learn.microsoft.com/zh-cn/dotnet/api/system.collections.concurrent?view=netstandard-2.1

        当使用这些结构时,不需要任何同步,并且读取和更新都将以原子方式完成。并发集合(Concurrent Collection)包装轻量级的 Slim 同步原语,在内核上的负担更轻。

1.1、IProducerConsumerCollection<T>

        顾名思义 IProducerConsumerCollection 就是生产者和消费者的集合,为通用同类对提供有效的无锁替代集合。通过继承接口以实现功能:

IProducerConsumerCollection定义供制造者/使用者用来操作线程安全集合的方法。 此接口提供一个统一的表示(为生产者/消费者集合),从而更高级别抽象如 BlockingCollection<T> 可以使用集合作为基础的存储机制。 icon-default.png?t=N6B9https://learn.microsoft.com/zh-cn/dotnet/api/system.collections.concurrent.iproducerconsumercollection-1?view=netstandard-2.1        首先,我这里先写一个测试用例:

        public static void AddRangeItem(this IProducerConsumerCollection<CaseItem> cases){for (int i = 0; i < 10; i++){var item = new CaseItem();item.Index = i;item.Value = $"{cases.GetType().Name}_{i}";cases.TryAdd(item);}}public static void ParallelDebugCases(this IProducerConsumerCollection<CaseItem> cases){int length = cases.Count;Parallel.For(0, length, x =>{CaseItem item;if (cases.TryTake(out item))Debug.Log($"[{item.Index}] : {item.Value}");});Debug.Log("执行打印完成");}

        这里逻辑就很简单,依次添加10个CaseItem,然后并行取出打印出来。我这里直接写一个类,以不做任何并行处理的方式实现,然后直接运行查看结果:

         可以看到,打印的结果就比较诡异了,没有打印出 1 和 7 。也就是有 2 个 CaseItem 在并行过程中丢失了。如果要想在多线程中正常使用,显然是要我们自己提供线程同步的方案。

        如果我们使用传统的队列(Queue),那么就需要像这样写类似的代码:

        public bool TryTake(out CaseItem item){lock (m_LockObject){if (m_Queue.Count > 0){item = m_Queue.Dequeue();return true;}item = null;return false;}}

        我这里是直接加锁了。当然也可以使用 Monitor 的写法,或者其他同步原语。但是我们发现这样确实不方便,毕竟重复的代码很多。

        那么下面作者介绍了几个内置的类可供使用:

1.2、并发队列 ConcurrentQueue<T>

        并发队列 (ConcurrentQueue<T>) 是队列(Queue<T>)的线程安全版本,而不必自己写同步原语。

ConcurrentQueue表示线程安全的先进先出 (FIFO) 集合。 icon-default.png?t=N6B9https://learn.microsoft.com/zh-cn/dotnet/api/system.collections.concurrent.concurrentqueue-1?view=netstandard-2.1        而且,ConcurrentQueue<T> 是已经继承了 IProducerConsumerCollection<T>:

using System.Collections.Generic;namespace System.Collections.Concurrent
{public class ConcurrentQueue<T> : IProducerConsumerCollection<T>, IEnumerable<T>, IEnumerable, ICollection, IReadOnlyCollection<T>{public ConcurrentQueue();public ConcurrentQueue(IEnumerable<T> collection);public int Count { get; }public bool IsEmpty { get; }public void Clear();public void CopyTo(T[] array, int index);public void Enqueue(T item);public IEnumerator<T> GetEnumerator();public T[] ToArray();public bool TryDequeue(out T result);public bool TryPeek(out T result);}
}

        在以下应用场景中,考虑使用并发队列:

  • 每个项目的处理时间都很短。

  • 只有一个专用生产者线程或只有一个专用消费者线程。

  • 处理速度在 500 FLOPS 或以上。

        上面是书上的说法,确实比较晦涩。这里提供一个参考文章:

C#线程 ConcurrentQueue安全队列介绍_Paddy Pan的博客-CSDN博客c#安全线程队列ConcurrentQueus介绍及方法实现原理和使用 _concurrentqueuehttps://blog.csdn.net/qq_41230604/article/details/126305068

        这个就讲得比较清楚了。例如在添加项目进队列时,不是直接Lock而是使用了一次自旋锁。总的来说,只要是高并发需求,使用 ConcurrentQueue<T> 是更优的选择。但是如果没有并发,ConcurrentQueue<T> 的性能应该是不如 Queue<T> 的。

        之前做过一个性能测试,可以看下具体差异:

【学习积累】Queue 与 ConcurrentQueue性能测试_concurrentqueue和queue的区别_魔术师Dix的博客-CSDN博客在 C# 中,关于队列(Queue)有两种,一种就是我们普通使用的队列,另一种是线程安全的队列ConcurrentQueue 。本文将对这两个队列进行一个简单的性能测试,同时讨论一种特殊情况:一个线程入队,一个线程出队时使用 Queue 的情况。_concurrentqueue和queue的区别https://blog.csdn.net/cyf649669121/article/details/130162599

        这里写一个测试用例:

        private void RunWithConcurrentQueue(){ConcurrentQueue<CaseItem> caseItems = new ConcurrentQueue<CaseItem>();caseItems.AddRangeItem();caseItems.ParallelDebugCases();}      

        执行结果如下:

         可见10次打印刚好就是 0~9 ,没有任何重复项。

1.3、并发堆栈 ConcurrentStack<T>

        ConcurrentStack<T> 是 Stack<T> 的并发版本,同样也继承了 IProducerConsumerCollection 接口。当然,与队列的区别,就是堆栈是 LIFO(后进先出),而队列是FIFO(先进先出)。同样的,并发堆栈不涉及内存锁定,靠自旋(Spinning)和比较交换(Compare-And_Swap,CAS)无锁算法来消除竞争。

ConcurrentStack表示线程安全的后进先出 (LIFO) 集合。 icon-default.png?t=N6B9https://learn.microsoft.com/zh-cn/dotnet/api/system.collections.concurrent.concurrentstack-1?view=netstandard-2.1        这里我们用 1.2 的代码改造一下进行示例:

     private void RunWithConcurrentStack(){ConcurrentStack<CaseItem> caseItems = new ConcurrentStack<CaseItem>();caseItems.AddRangeItem();caseItems.ParallelDebugCases();}

        运行结果也是相同的:

1.4、ConcurrentBag<T>

        这个和前两个类似,但是是无序集合。

ConcurrentBag表示对象的线程安全的无序集合。 icon-default.png?t=N6B9https://learn.microsoft.com/zh-cn/dotnet/api/system.collections.concurrent.concurrentbag-1?view=netstandard-2.1

        ConcurrentBag<T> 已针对同一线程既充当生产者又充当消费者的情况进行了优化。而且还支持工作窃取(详见 任务并行性:11、工作窃取队列)算法,为每一个线程维护一个本地队列。

        似乎比较抽象,这里写一段代码测试一下:

        private void RunWtihConcurrentBag(){ConcurrentBag<int> caseItems = new ConcurrentBag<int>();ManualResetEventSlim resetEventSlim = new ManualResetEventSlim(false);Task t1 = Task.Run(() =>{for (int i = 0; i < 5; i++){caseItems.Add(i);}resetEventSlim.Wait();int length = caseItems.Count;for (int i = 0; i < length; i++){int val;caseItems.TryTake(out val);Debug.Log($"case item : {val}");}});Task t2 = Task.Run(() =>{for (int i = 5; i < 10; i++){caseItems.Add(i);}resetEventSlim.Set();});}

        运行结果如下所示:

         可见在 Bag 内部明显分了 2 个部分,一个部分是 0~4,一部分是 5~9,也就是分别由 t1 和 t2 添加的事项。0~4 的事项总是排列在一起,但相互之间是无序的;5~9 的事项也是如此。按照书上说法,ConcurrentBag<T>为每一个线程维护了一个本地队列,都是优先完成自己线程的队列,再处理其他线程的,所以打印结果才会如上图所示。

1.5、BlockingCollection<T>

        BlockingCollection<T> 可以限制生产者线程生成的最大数目,然后生产者线程将睡眠,并进入阻塞。当消费者线程使用了项目时,生产者线程将接触阻塞;当集合被耗尽时,消费者线程将被阻塞。

BlockingCollection为实现 IProducerConsumerCollection<T> 的线程安全集合提供阻塞和限制功能。 icon-default.png?t=N6B9https://learn.microsoft.com/zh-cn/dotnet/api/system.collections.concurrent.blockingcollection-1?view=netstandard-2.1

        综上 BlockingCollection<T> 有两个概念:

  • 限制:集合达到最大值时,不能添加任何新对象,生产者线程进入睡眠模式。

  • 阻塞:当集合为空时,可以阻塞消费者线程。

    这里写个代码来进行示例:
        //创建  BlockingCollection 并设定容量为 5,填0代表无上限private BlockingCollection<CaseItem> m_BlockingCaseItems = new BlockingCollection<CaseItem>(5);// 生产者线程private void RunWithBlockingCollectionProduce(){Task.Run(() =>{for (int i = 0; i < 10; i++){CaseItem item = new CaseItem();item.Index = i;item.Value = $"[{i}]_BlockCollection";m_BlockingCaseItems.Add(item);//如果当前容量 大于5 ,则会在这里阻塞;Debug.Log($"添加一个事项:{item.Value} / {m_BlockingCaseItems.Count}");}Debug.Log("全部添加完成 ! ");});}//消费者线程private void RunWtithBlockingCollectionConsumer(){Task.Run(async () =>{for (int i = 0; i < 5; i++){CaseItem item = m_BlockingCaseItems.Take();//如果当前容量为0,则会在在这里阻塞Debug.LogWarning($"消费一个事项:{item.Value}");await Task.Delay(1000);}Debug.LogWarning("全部消费完成 ! ");});}

        这两个方法我们有两个顺序调用,我们先调用生产者线程,再调用消费者线程,效果如下:

         可以看到,在添加了5个事项之后,达到容量上限之后不再继续添加,线程直接阻塞。当消费者线程每取出一个事项,生产者线程则解除一次阻塞并继续执行。

        现在我们反过来,先调用消费者线程,再调用生产者线程,结果如下:

         当开始调用消费者线程时,由于容量为空,所以直接阻塞了消费者线程。开始生产者线程之后,便开始唤醒并进行消费,从上图的 Log 中可以清楚地看到两个线程的相互唤醒作用。

 

2、多生产者-消费者应用场景

        这一节其实说的就是 BlockingCollection<T> 的两个语法糖:

namespace System.Collections.Concurrent
{public class BlockingCollection<T>:IEnumerable<T>, IEnumerable, IReadOnlyCollection<T>, ICollection, IDisposable{public static int AddToAny(BlockingCollection<T>[] collections, T item);public static int TakeFromAny(BlockingCollection<T>[] collections, out T item);}
}

也就是可以从一个集合中获取、添加事项。

3、ConcurrentDictionary<TKey,TValue>

        很显然了,ConcurrentDictionary<TKey,TValue> 就是用于多线程并行的字典,可以消除键的重复问题。这个字典读是无锁的,而写入是上锁的。

ConcurrentDictionary表示可由多个线程同时访问的键/值对的线程安全集合。 icon-default.png?t=N6B9https://learn.microsoft.com/zh-cn/dotnet/api/system.collections.concurrent.concurrentdictionary-2?view=netstandard-2.1        这里写个代码示例:

        private void RunWithConcurrentDictionary(){ConcurrentDictionary<int, CaseItem> dictCaseItems = new ConcurrentDictionary<int, CaseItem>();var t1 = Task.Run(() =>{Parallel.For(0, 5, i =>{CaseItem item = new CaseItem();item.Index = i;item.Value = $"T1 : {i}";dictCaseItems.AddOrUpdate(i, item, (k, v) =>{Debug.Log($"T1 冲突: {v.Value} |");return v;});});});var t2 = Task.Run(() =>{Parallel.For(4, 8, i =>{CaseItem item = new CaseItem();item.Index = i;item.Value = $"T2 : {i}";dictCaseItems.AddOrUpdate(i, item, (k, v) =>{Debug.Log($"T2 冲突: {v.Value} |");return v;});});});var t3 = Task.Run(() =>{Parallel.For(6, 10, i =>{CaseItem item = new CaseItem();item.Index = i;item.Value = $"T3 : {i}";dictCaseItems.AddOrUpdate(i, item, (k, v) =>{Debug.Log($"T3 冲突: {v.Value} |");return v;});});});Task.WaitAll(t1, t2, t3);Debug.Log("最后结果");foreach (var item in dictCaseItems.Values){Debug.Log(item.Value);}}

        执行打印结果如下:

         可以看到,我们遇到了3次写出冲突,会要求我们给一个方法进行冲突处理。可以选择保留,或者更新,或者合并,这个就看我们自己传入的方法如何处理。


4、本章小结

        本章的内容还是比较简单的,主要就是介绍了几个并发集合的用法,以及常见的注意事项。其实我发现其实 .NET 提供的集合都还是很好使用的,性能和效果上都没有啥问题,大家可以根据自己的需求,在不同使用场景中使用合适的并发集合。

本教程对应学习工程:魔术师Dix / HandsOnParallelProgramming · GitCode

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/1973.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JVM系列(8)——对象的内存布局

1、对象的创建过程 加载-验证-准备-解析-初始化-申请内存-成员变量赋初始值-加载构造方法。 前半段是JVM系列&#xff08;5&#xff09;——类加载过程&#xff0c;申请内存可参考&#xff1a;JVM系列&#xff08;3&#xff09;——内存分配与回收策略。 2、对象在内存中的存…

华为申请注册盘古大模型商标;京东推出言犀大模型,率先布局产业应用

7月14日科技新闻早知道&#xff0c;一分钟速览。 1.华为申请注册盘古大模型商标&#xff1a; 据天眼查 App 显示&#xff0c;7 月 7 日&#xff0c;华为技术有限公司申请注册“华为云盘古”、“Huawei Cloud Pangu Models”文字及图形商标&#xff0c;国际分类为网站服务、社…

常用API学习06(Java)

Biglnteger public BigInteger(int num, Random rnd) 获取随机大整数&#xff0c;范围&#xff1a;[0~2的num次方-1] public BigInteger(String val) 获取指定的大整数 public BigInteger(String val, int radix) 获取指定进制的大整数 public static BigInteg…

「深度学习之优化算法」(十四)麻雀搜索算法

1. 麻雀搜索算法简介 (以下描述,均不是学术用语,仅供大家快乐的阅读)   麻雀搜索算法(sparrow search algorithm)是根据麻雀觅食并逃避捕食者的行为而提出的群智能优化算法。提出时间是2020年,相关的论文和研究还比较少,有可能还有一些正在发表中,受疫情影响需要论…

C++:const修饰指针

const修饰符常常需要在c中使用到&#xff0c;需要注意到他对于指针修饰的时候的不同区别。 #include<iostream> using namespace std; int main() {//1.const修饰指针int a 10;int b 10;const int* p &a;//指针指向的值不可以改&#xff0c;指针的指向可以改// …

2023最新ChatGPT商业运营网站源码+支持ChatGPT4.0+新增GPT联网功能+支持ai绘画+实时语音识别输入+用户会员套餐+免费更新版本

2023最新ChatGPT商业运营网站源码支持ChatGPT4.0新增GPT联网功能支持ai绘画实时语音识别输入用户会员套餐免费更新版本 一、AI创作系统二、系统程序下载三、系统介绍四、安装教程五、主要功能展示六、更新日志 一、AI创作系统 提问&#xff1a;程序已经支持GPT3.5、GPT4.0接口…

vue2watch监听遇到的问题

1 vue 父组件里引入子组件 显示与隐藏是v-if控制时 父传入子的参数通过watch 监听请求接口时 watch 时而监听不到 请求接口的参数就不对 如图 父组件这么引入子组件v-show 和v-if 是有区别的 2 子组件通过watch 监听后 清空页面要展示的列表数据 重新从第一页加载数据&#x…

【雕爷学编程】Arduino动手做(164)---Futaba S3003舵机模块3

37款传感器与模块的提法&#xff0c;在网络上广泛流传&#xff0c;其实Arduino能够兼容的传感器模块肯定是不止37种的。鉴于本人手头积累了一些传感器和执行器模块&#xff0c;依照实践出真知&#xff08;一定要动手做&#xff09;的理念&#xff0c;以学习和交流为目的&#x…

长沙打造“全球研发中心城市”,智能网联产业如何交卷?

作者 | 魏启扬 来源 | 洞见新研社 知乎上有一个浏览超百万的热门问题——“大家怎么看待长沙这个城市&#xff1f;” 答主“星球研究所”的回答获得了高赞&#xff0c;“这是一个天性如火的城市”。 网红城市的外衣下&#xff0c;从湖南卫视的综艺节目&#xff0c;到网红美…

qiankun:react18主应用 + 微应用 react18 + vue3

一&#xff1a;主应用 搭建react项目 npx create-react-app react-qiankun-main安装Antd npm install antd –save在 index.js中引入 import { ConfigProvider } from "antd"; import zhCN from "antd/locale/zh_CN"; import "antd/dist/reset.css…

DDOS百科:什么是 DDoS 攻击及如何防护DDOS攻击

一、什么是 DDoS 攻击&#xff1f; 当多台机器一起攻击一个目标&#xff0c;通过大量互联网流量淹没目标或其周围基础设施&#xff0c;从而破坏目标服务器、服务或网络的正常流量时&#xff0c;就会发生分布式拒绝服务(DDoS)攻击。 DDoS允许向目标发送指数级更多的请求&#…

【大数据之Hadoop】三十七、Hadoop HA高可用

1、HA概述 实现高可用最关键的策略是消除单点故障。HA分成各个组件的HA机制&#xff1a;HDFS的HA和YARN的HA。   Hadoop2.0之前&#xff0c;在HDFS集群中NameNode存在单点故障&#xff08;SPOF&#xff09;。 NameNode主要在以下两个方面影响HDFS集群&#xff1a; &#xff…

AI时代图像安全“黑科技”如何助力人工智能与科技发展?

〇、前言 7月7日下午&#xff0c;2023世界人工智能大会&#xff08;WAIC&#xff09;“聚焦大模型时代AIGC新浪潮—可信AI”论坛在上海世博中心红厅举行。人工智能等技术前沿领域的著名专家与学者、投资人和领军创业者汇聚一堂&#xff0c;共同探索中国科技创新的驱动力量。 在…

4. 设计(黑盒)测试用例 (一) 等价类 边界值 判定表

本篇文章我们将详细介绍如何来测试用例。 1. 设计测试用例的基本要素 1.1 测试用例概念 测试用例&#xff08;Test Case&#xff09;是为了实施测试而向被测试的系统提供的一组集合。 1.2 测试用例要素 测试环境、测试步骤、测试数据、预期结果。 1.3 测试用例的重要性 提…

【美团面试】软件测试面试题

一、设计登录界面测试用例 功能测试(Function test) 0. 什么都不输入&#xff0c;点击提交按钮&#xff0c;看提示信息。&#xff08;非空检查&#xff09; 1.输入正确的用户名和密码&#xff0c;点击提交按钮&#xff0c;验证是否能正确登录。&#xff08;正常输入&#xff0…

【启发式算法】灰狼优化算法【附python实现代码】

写在前面&#xff1a; 首先感谢兄弟们的订阅&#xff0c;让我有创作的动力&#xff0c;在创作过程我会尽最大能力&#xff0c;保证作品的质量&#xff0c;如果有问题&#xff0c;可以私信我&#xff0c;让我们携手共进&#xff0c;共创辉煌。 路虽远&#xff0c;行则将至&#…

江南大学轴承数据故障诊断(利用连续小波变换转换为二维图像,再利用CNN进行故障诊断)

1.江南大学轴承数据集介绍 采样频率&#xff1a;50khz&#xff0c;采样时间&#xff1a;10s 转速&#xff1a;600 800 1000/rpm 内圈&#xff1a;ib 外圈&#xff1a;ob 滚动体&#xff1a;tb 正常&#xff1a;N 以600转速下的内圈故障数据为例展示&#xff1a; 开始数据…

第46节:cesium 水面效果(含源码+视频)

结果示例: 完整源码: <template><div class="viewer"><vc-viewer @ready="ready" :logo="false"><!

C 知识积累 替换gets函数 Linux C 语法分析 switch和if else的比较

目录 替换gets函数gets()用处gets()的危险之处gets()的几种替代方法一、用%c循环输入直到遇到换行结束二、用getchar()循环输入直到遇到换行结束三、scanf的另一种用法四、c中的getline()方法五、解决方案使用fgets代替 回车与换行一.知其然二.知其所以然 关键字&#xff0c;操…

怎样优雅地增删查改(五):按组织架构查询

文章目录 原理实现应用测试 之前我们实现了Employee&#xff0c;Alarm管理模块以及通用查询应用层。 Employee的集合查询业务&#xff0c;是通过重写CreateFilteredQueryAsync方法&#xff0c;来实现按组织架构查询的过滤条件。 我们将这段逻辑代码提取到通用查询应用层中&…