C#中通道(Channels)的应用之(生产者-消费者模式)

一.生产者-消费者模式概述

生产者-消费者模式是一种经典的设计模式,它将数据的生成(生产者)和处理(消费者)分离到不同的模块或线程中。这种模式的核心在于一个共享的缓冲区,生产者将数据放入缓冲区,而消费者从缓冲区中取出数据进行处理。这种模式有助于提高系统的响应性和吞吐量,因为它允许生产者和消费者并行工作,互不干扰。

二.Channels 概念

Channels提供了一种通信机制,允许生产者和消费者之间安全、可靠地交换信息,即使它们在不同的执行线程上运行。自.NET Core 3.0引入以来,System.Threading.Channels命名空间为我们处理生产者-消费者模式等复杂场景提供了强大的支持。Channels已经完全集成到.NET的异步模型中,支持async/await关键字,提供了一种异步的消息传递机制。通道本质上是一个线程安全的队列,支持在生产者和消费者之间安全、可靠地传递数据。通道有两种类型:有限容量的bound Channel无限容量的unbound Channel。有限容量的通道在达到容量上限时会根据指定的策略处理新消息,而无限容量的通道则没有容量限制。

三.Channels 生产者-消费者模式实现

创建通道来作为生产者和消费者之间的共享缓冲区
  1. 无界通道
  • 无界容量的通道(即没有明确限制可以存储的项目数量的通道),使用 Channel.CreateUnbounded<T>() 方法,如:
// 创建一个无界通道
var unboundedChannel = Channel.CreateUnbounded<string>();
  1. 有界通道
  • 创建有界通道则需要指定通道的容量上限,对于有限容量的通道,当通道满时,生产者可能需要等待或丢弃新数据。同样,当通道空时,消费者可能需要等待新数据的到来。通道提供了多种策略BoundedChannelFullMode 枚举处理方式:Wait:当通道已满时,写操作会等待直到队列中有空间来写入新的数据。这种情况下如果 TryWrite 操作会返回 false。DropOldest:如果通道已满,会删除最旧的数据(也就是最早进入通道但还未被读取的数据),以便给新的数据腾出空间。DropNewest:与 DropOldest 相反,会删除最新写入但还未被读取的数据来让新数据容纳进来。DropWrite:直接删除当前正在尝试写入的数据。
    使用 Channel.CreateBounded<T>(int capacity) 方法。例如:
// 创建一个 有界通道
var boundedChannel = Channel.CreateBounded<string>(100);
实现生产者
  • 生产者负责生成数据并将其写入通道。通常使用循环,在该循环中,生产者生成数据并使用WriteAsync方法将其写入通道。
async Task ProducerAsync(ChannelWriter<string> writer)
{for (int i = 0; i < 100; i++){await writer.WriteAsync(i.ToString());await Task.Delay(100); // 模拟数据生成的时间间隔}writer.Complete(); // 标记通道为完成写入,不再接受新数据
}
实现消费者
  • 消费者负责从通道中读取数据并进行处理。通常使用循环,在该循环中,消费者使用ReadAsync或ReadAllAsync方法从通道中读取数据,并对其进行处理。

async Task ConsumerAsync(ChannelReader<string> reader)
{while (await reader.WaitToReadAsync()){if (reader.TryRead(out var msgstring)){Console.WriteLine($"Consumed: {msgstring}");// 在这里处理数据}}
}

下面展示一个完整的生产者和消费者示例

  1. 启动 Program
// See https://aka.ms/new-console-template for more informationusing System.Threading.Channels;
using System.Threading.Tasks;
using TestChannels;Console.WriteLine("选择运行的模式?例如:1");
Console.WriteLine("1. 单生产单消费");
Console.WriteLine("2. 多生产单消费");
Console.WriteLine("3. 单生产多消费");
Console.WriteLine("4. 多生产多消费");
Console.WriteLine("请输入编号:");
var key = Console.ReadKey();switch (key.KeyChar)
{case '1':await SingleProducerSingleConsumer();break;case '2':await MultiProducerSingleConsumer();break;case '3':await SingleProduceMultipleConsumers();break;case '4':await MultiProducerMultipleConsumers();break;default:Console.WriteLine("请先选择运行模式!");break;
}// 单生产单消费
static async Task SingleProducerSingleConsumer()
{var channel = Channel.CreateUnbounded<string>();var producer1 = new Producer(channel.Writer, 1, 2000);var consumer1 = new Consumer(channel.Reader, 1, 1500);Task consumerTask1 = consumer1.ConsumerAsync(); // 开始消费Task producerTask1 = producer1.ProducerAsync(); // 开始生产await producerTask1.ContinueWith(_ => channel.Writer.Complete());await consumerTask1;
}// 多生产单消费
static async Task MultiProducerSingleConsumer()
{var channel = Channel.CreateUnbounded<string>();List<Task> producerTasks = new List<Task>();for (int i = 1; i <= 3; i++){producerTasks.Add(Task.Run(async () => {var producer = new Producer(channel.Writer, i, 2000);await producer.ProducerAsync();}));await Task.Delay(500); // 暂停500毫秒,启动另外一个生产}var consumer1 = new Consumer(channel.Reader, 1, 250);Task consumerTask1 = consumer1.ConsumerAsync(); // 开始消费await Task.WhenAll(producerTasks.ToArray()).ContinueWith(_ => channel.Writer.Complete());await consumerTask1;
}// 单生产多消费
static async Task SingleProduceMultipleConsumers()
{var channel = Channel.CreateUnbounded<string>();var producer1 = new Producer(channel.Writer, 1, 100);List<Task> consumerTasks = new List<Task>();for (int i = 1; i <= 3; i++){consumerTasks.Add(Task.Run(async () => {var consumer = new Consumer(channel.Reader, 1, 1500);await consumer.ConsumerAsync();}));}Task producerTask1 = producer1.ProducerAsync();await producerTask1.ContinueWith(_ => channel.Writer.Complete());await Task.WhenAll(consumerTasks.ToArray());
}// 多生产多消费
static async Task MultiProducerMultipleConsumers()
{var channel = Channel.CreateUnbounded<string>();List<Task> producerTasks = new List<Task>();for (int i = 1; i <=3; i++){Console.WriteLine("线程"+i.ToString());producerTasks.Add(Task.Run(async () => {var producer = new Producer(channel.Writer, i, 100);await producer.ProducerAsync();}));await Task.Delay(500); // 暂停500毫秒,启动另外一个生产}List<Task> consumerTasks = new List<Task>();for (int i = 1; i < 3; i++){consumerTasks.Add(Task.Run(async () => {var consumer = new Consumer(channel.Reader, 1, 1500);await consumer.ConsumerAsync();}));}await Task.WhenAll(producerTasks.ToArray()).ContinueWith(_ => channel.Writer.Complete());await Task.WhenAll(consumerTasks.ToArray());
}
  1. 生产者Producer
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Channels;
using System.Threading.Tasks;namespace TestChannels
{internal class Producer{private readonly ChannelWriter<string> _writer;private readonly int _identifier;private readonly int _delay;public Producer(ChannelWriter<string> writer, int identifier, int delay){_writer = writer;_identifier = identifier;_delay = delay;}public async Task ProducerAsync(){Console.WriteLine($"开始 ({_identifier}): 发布消息");for (var i = 0; i < 10; i++){await Task.Delay(_delay); // 停顿一下,方便观察数据var msg = $"P{_identifier} - {DateTime.Now:G}-{i}";Console.WriteLine($"发布 ({_identifier}): 消息成功 {msg}");await _writer.WriteAsync(msg);}Console.WriteLine($"发布 ({_identifier}): 完成");}}
}
  1. 消费者Consumer
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Channels;
using System.Threading.Tasks;namespace TestChannels
{/// <summary>/// 消费/// </summary>internal class Consumer{private readonly ChannelReader<string> _reader;private readonly int _identifier;private readonly int _delay;public Consumer(ChannelReader<string> reader, int identifier, int delay){_reader = reader;_identifier = identifier;_delay = delay;}public async Task ConsumerAsync(){Console.WriteLine($" 开始({_identifier}):消费 ");while (await _reader.WaitToReadAsync()){if (_reader.TryRead(out var timeString)){await Task.Delay(_delay); // 停顿一下,方便观察数据Console.WriteLine($"消费 ({_identifier}): 成功 {timeString}");}}Console.WriteLine($"消费 ({_identifier}): 完成");}}
}

运行

  • [ 参考] : https://learn.microsoft.com/en-us/dotnet/api/system.threading.channels?view=netcore-3.0

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/66868.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【excel】VBA简介(Visual Basic for Applications)

文章目录 一、基本概念二、语法2.1 数据类型2.11 基本数据类型2.12 常量2.13 数组 2.2 控制语句2.21 条件语句2.22 循环语句2.23 错误处理&#xff1a;On Error2.24 逻辑运算 2.3 其它语句2.31 注释2.32 with语句 2.4 表达式2.41 常见表达式类型2.42 表达式的优先级 2.5 VBA 的…

Git:Cherry-Pick 的使用场景及使用流程

前面我们说了 Git合并、解决冲突、强行回退等解决方案 >> 点击查看 这里再说一下 Cherry-Pick功能&#xff0c;Cherry-Pick不是merge&#xff0c;只是把部分功能代码Cherry-Pick到远程的目标分支 git cherry-pick功能简介&#xff1a; git cherry-pick 是用来从一个分…

mysql本地安装和pycharm链接数据库操作

MySQL本地安装和相关操作 Python相关&#xff1a;基础、函数、数据类型、面向、模块。 前端开发&#xff1a;HTML、CSS、JavaScript、jQuery。【静态页面】 Java前端&#xff1b; Python前端&#xff1b; Go前端 -> 【动态页面】直观&#xff1a; 静态&#xff0c;写死了…

数据结构与算法之二叉树: LeetCode 654. 最大二叉树 (Ts版)

最大二叉树 https://leetcode.cn/problems/maximum-binary-tree/ 描述 给定一个不重复的整数数组 nums 。 最大二叉树 可以用下面的算法从 nums 递归地构建: 创建一个根节点&#xff0c;其值为 nums 中的最大值递归地在最大值 左边 的 子数组前缀上 构建左子树递归地在最大值…

【Unity高级】一文了解Unity 中的条件编译(附所有指令)

一、Unity中的条件编译 Unity 对 C# 语言的支持包括使用指令&#xff0c;这些指令允许您根据是否定义了某些脚本符号&#xff0c;选择性地包含或排除代码的编译。有关这些指令在 C# 中如何工作的更多信息&#xff0c;请参阅微软关于 C# 预处理器指令 的文档。 &#xff08;一…

苍穹外卖08——(涉及接收日期格式数据、ApachePOI导出报表、sql获取top10菜品数据)

营业额统计 service层 在需要处理空值、与数据库交互或使用集合时&#xff0c;Integer 、Double是更好的选择。 // 导入string工具类 import org.apache.commons.lang.StringUtils; Service // 标记该类为Spring的服务组件 Slf4j // 引入日志功能 public class Repor…

微信小程序订阅消息提醒-云函数

微信小程序消息订阅分2种&#xff1a; 1.一次性订阅&#xff1a;用户订阅一次就可以推送一次&#xff0c;如果需要多次提醒需要多次订阅。 2.长期订阅&#xff1a;只有公共服务领域&#xff0c;如政务、医疗、交通、金融和教育等。‌在用户订阅后&#xff0c;在很长一段时间内…

代码随想录算法训练营第 4 天(链表 2)| 24. 两两交换链表中的节点19.删除链表的倒数第N个节点 -

一、24. 两两交换链表中的节点 题目&#xff1a;24. 两两交换链表中的节点 - 力扣&#xff08;LeetCode&#xff09; 视频&#xff1a;帮你把链表细节学清楚&#xff01; | LeetCode&#xff1a;24. 两两交换链表中的节点_哔哩哔哩_bilibili 讲解&#xff1a;代码随想录 dummy-…

pycharm-pyspark 环境安装

1、环境准备&#xff1a;java、scala、pyspark、python-anaconda、pycharm vi ~/.bash_profile export SCALA_HOME/Users/xunyongsun/Documents/scala-2.13.0 export PATH P A T H : PATH: PATH:SCALA_HOME/bin export SPARK_HOME/Users/xunyongsun/Documents/spark-3.5.4-bin…

【大模型入门指南 07】量化技术浅析

【大模型入门指南】系列文章&#xff1a; 【大模型入门指南 01】深度学习入门【大模型入门指南 02】LLM大模型基础知识【大模型入门指南 03】提示词工程【大模型入门指南 04】Transformer结构【大模型入门指南 05】LLM技术选型【大模型入门指南 06】LLM数据预处理【大模型入门…

3DGabor滤波器实现人脸特征提取

import cv2 import numpy as np# 定义 Gabor 滤波器的参数 kSize 31 # 滤波器核的大小 g_sigma 3.0 # 高斯包络的标准差 g_theta np.pi / 4 # Gabor 函数的方向 g_lambda 10.0 # 正弦波的波长 g_gamma 0.5 # 空间纵横比 g_psi np.pi / 2 # 相位偏移# 生成 Gabor 滤…

【Linux】4.Linux常见指令以及权限理解(2)

文章目录 3. Linux指令3.1 ls指令和rm指令补充3.2 man指令&#xff08;重要&#xff09;3.3cp指令&#xff08;重要&#xff09;输出重定向3.3.1ubuntu20.04如何安装tree 3.4 mv指令&#xff08;重要&#xff09;mv指令更改文件名mv指令更改目录名 如何看待指令指令的重命名3.5…

Vue3初学之Element-plus

用于快速的上手开发&#xff0c;以做项目为导向&#xff0c;所以借用element-plus插件 发现淘宝的镜像有时候也是很慢的&#xff0c;还可以换个 npm config set registry https://registry.npmmirror.com 安装element-plus npm install element-plus --save 查看安装是否成…

实用操作系统学习笔记

第1章 操作系统概述 操作系统基本概念 【基础知识】 操作系统&#xff1a;控制和管理整个计算机系统的硬件和软件资源&#xff0c;合理地组织、调度计算机的工作与资源的分配&#xff0c;进而为用户和其他软件提供方便接口与环境的程序集合。操作系统是计算机系统中最基本的…

k8s部署rocketmq踩坑笔记

给团队部署一个rocketmq4.8.0. k8s上部署的broker&#xff0c;注册到nameserver上是自己的pod ip&#xff0c;导致本机连接到的broker的pod ip&#xff0c;这个ip k8s集群外的机器是无法联通的。 nameserver上注册的是这个pod ipv4 尝试将broker的配置brokerIP1修改为注册到na…

UI自动化测试保姆级教程①

欢迎来到阿妮莫的学习小屋慢也好&#xff0c;步子小也好&#xff0c;在往前走就好 目录 自动化测试 简介 作用 分类 优缺点 优点 缺点(误区) UI自动化测试 自动化测试使用场景 自动化测试实现时间 Selenium框架 特点 Web自动化测试环境部署 Selenium包安装 浏览…

【2024年华为OD机试】 (A卷,100分)- 总最快检测效率(Java JS PythonC/C++)

一、问题描述 题目描述 在系统、网络均正常的情况下组织核酸采样员和志愿者对人群进行核酸检测筛查。 每名采样员的效率不同&#xff0c;采样效率为 N 人/小时。由于外界变化&#xff0c;采样员的效率会以 M 人/小时为粒度发生变化&#xff0c;M 为采样效率浮动粒度&#xf…

离线录制激光雷达数据进行建图

目前有一个2D激光雷达&#xff0c;自己控制小车运行一段时间&#xff0c;离线获取到激光雷达数据后运行如下代码进行离线建图。 roslaunch cartographer_ros demo_revo_lds.launch bag_filename:/home/firefly/AutoCar/data/rplidar_s2/2025-01-08-02-08-33.bag实际效果如下 d…

蓝桥杯嵌入式速通(1)

1.工程准备 创建一文件夹存放自己的代码&#xff0c;并在mdk中include上文件夹地址 把所有自身代码的头文件都放在headfile头文件中&#xff0c;之后只需要在新的文件中引用headfile即可 headfile中先提前可加入 #include "stdio.h" #include "string.h"…

QT跨平台应用程序开发框架(1)—— 环境搭建

目录 一&#xff0c;关于QT 二&#xff0c;关于应用程序框架 三&#xff0c;环境搭建 3.1 预备 3.2 下载Qt SDK 3.3 安装Qt SDK 3.4 配置环境变量 3.5 认识一些重要工具 四&#xff0c;Qt Creator 的基本使用 4.1 创建项目 4.2 代码解释 一&#xff0c;关于QT 互联网…