一.生产者-消费者模式
概述
生产者-消费者模式
是一种经典的设计模式,它将数据的生成(生产者)和处理(消费者)分离到不同的模块或线程中。这种模式的核心在于一个共享的缓冲区,生产者将数据放入缓冲区,而消费者从缓冲区中取出数据进行处理。这种模式有助于提高系统的响应性和吞吐量,因为它允许生产者和消费者并行工作,互不干扰。
二.Channels
概念
Channels
提供了一种通信机制,允许生产者和消费者之间安全、可靠地交换信息,即使它们在不同的执行线程上运行。自.NET Core 3.0
引入以来,System.Threading.Channels
命名空间为我们处理生产者-消费者模式等复杂场景提供了强大的支持。Channels
已经完全集成到.NET的异步模型中,支持async/await
关键字,提供了一种异步的消息传递机制。通道本质上是一个线程安全的队列,支持在生产者和消费者之间安全、可靠地传递数据。通道
有两种类型:有限容量的bound Channel
和无限容量的unbound Channel
。有限容量的通道在达到容量上限时会根据指定的策略处理新消息,而无限容量的通道则没有容量限制。
三.Channels
生产者-消费者模式实现
创建通道
来作为生产者和消费者之间的共享缓冲区
- 无界通道
- 无界容量的通道(即没有明确限制可以存储的项目数量的通道),使用
Channel.CreateUnbounded<T>()
方法,如:
// 创建一个无界通道
var unboundedChannel = Channel.CreateUnbounded<string>();
- 有界通道
- 创建有界通道则需要指定通道的容量上限,对于有限容量的通道,当通道满时,生产者可能需要等待或丢弃新数据。同样,当通道空时,消费者可能需要等待新数据的到来。通道提供了多种策略
BoundedChannelFullMode
枚举处理方式:Wait
:当通道已满时,写操作会等待直到队列中有空间来写入新的数据。这种情况下如果 TryWrite 操作会返回 false。DropOldest
:如果通道已满,会删除最旧的数据(也就是最早进入通道但还未被读取的数据),以便给新的数据腾出空间。DropNewest
:与 DropOldest 相反,会删除最新写入但还未被读取的数据来让新数据容纳进来。DropWrite
:直接删除当前正在尝试写入的数据。
使用Channel.CreateBounded<T>(int capacity)
方法。例如:
// 创建一个 有界通道
var boundedChannel = Channel.CreateBounded<string>(100);
实现生产者
- 生产者负责生成数据并将其写入通道。通常使用循环,在该循环中,生产者生成数据并使用
WriteAsync
方法将其写入通道。
async Task ProducerAsync(ChannelWriter<string> writer)
{for (int i = 0; i < 100; i++){await writer.WriteAsync(i.ToString());await Task.Delay(100); // 模拟数据生成的时间间隔}writer.Complete(); // 标记通道为完成写入,不再接受新数据
}
实现消费者
- 消费者负责从通道中读取数据并进行处理。通常使用循环,在该循环中,消费者使用ReadAsync或ReadAllAsync方法从通道中读取数据,并对其进行处理。
async Task ConsumerAsync(ChannelReader<string> reader)
{while (await reader.WaitToReadAsync()){if (reader.TryRead(out var msgstring)){Console.WriteLine($"Consumed: {msgstring}");// 在这里处理数据}}
}
下面展示一个完整的生产者和消费者
示例
- 启动
Program
类
// See https://aka.ms/new-console-template for more informationusing System.Threading.Channels;
using System.Threading.Tasks;
using TestChannels;Console.WriteLine("选择运行的模式?例如:1");
Console.WriteLine("1. 单生产单消费");
Console.WriteLine("2. 多生产单消费");
Console.WriteLine("3. 单生产多消费");
Console.WriteLine("4. 多生产多消费");
Console.WriteLine("请输入编号:");
var key = Console.ReadKey();switch (key.KeyChar)
{case '1':await SingleProducerSingleConsumer();break;case '2':await MultiProducerSingleConsumer();break;case '3':await SingleProduceMultipleConsumers();break;case '4':await MultiProducerMultipleConsumers();break;default:Console.WriteLine("请先选择运行模式!");break;
}// 单生产单消费
static async Task SingleProducerSingleConsumer()
{var channel = Channel.CreateUnbounded<string>();var producer1 = new Producer(channel.Writer, 1, 2000);var consumer1 = new Consumer(channel.Reader, 1, 1500);Task consumerTask1 = consumer1.ConsumerAsync(); // 开始消费Task producerTask1 = producer1.ProducerAsync(); // 开始生产await producerTask1.ContinueWith(_ => channel.Writer.Complete());await consumerTask1;
}// 多生产单消费
static async Task MultiProducerSingleConsumer()
{var channel = Channel.CreateUnbounded<string>();List<Task> producerTasks = new List<Task>();for (int i = 1; i <= 3; i++){producerTasks.Add(Task.Run(async () => {var producer = new Producer(channel.Writer, i, 2000);await producer.ProducerAsync();}));await Task.Delay(500); // 暂停500毫秒,启动另外一个生产}var consumer1 = new Consumer(channel.Reader, 1, 250);Task consumerTask1 = consumer1.ConsumerAsync(); // 开始消费await Task.WhenAll(producerTasks.ToArray()).ContinueWith(_ => channel.Writer.Complete());await consumerTask1;
}// 单生产多消费
static async Task SingleProduceMultipleConsumers()
{var channel = Channel.CreateUnbounded<string>();var producer1 = new Producer(channel.Writer, 1, 100);List<Task> consumerTasks = new List<Task>();for (int i = 1; i <= 3; i++){consumerTasks.Add(Task.Run(async () => {var consumer = new Consumer(channel.Reader, 1, 1500);await consumer.ConsumerAsync();}));}Task producerTask1 = producer1.ProducerAsync();await producerTask1.ContinueWith(_ => channel.Writer.Complete());await Task.WhenAll(consumerTasks.ToArray());
}// 多生产多消费
static async Task MultiProducerMultipleConsumers()
{var channel = Channel.CreateUnbounded<string>();List<Task> producerTasks = new List<Task>();for (int i = 1; i <=3; i++){Console.WriteLine("线程"+i.ToString());producerTasks.Add(Task.Run(async () => {var producer = new Producer(channel.Writer, i, 100);await producer.ProducerAsync();}));await Task.Delay(500); // 暂停500毫秒,启动另外一个生产}List<Task> consumerTasks = new List<Task>();for (int i = 1; i < 3; i++){consumerTasks.Add(Task.Run(async () => {var consumer = new Consumer(channel.Reader, 1, 1500);await consumer.ConsumerAsync();}));}await Task.WhenAll(producerTasks.ToArray()).ContinueWith(_ => channel.Writer.Complete());await Task.WhenAll(consumerTasks.ToArray());
}
- 生产者
Producer
类
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Channels;
using System.Threading.Tasks;namespace TestChannels
{internal class Producer{private readonly ChannelWriter<string> _writer;private readonly int _identifier;private readonly int _delay;public Producer(ChannelWriter<string> writer, int identifier, int delay){_writer = writer;_identifier = identifier;_delay = delay;}public async Task ProducerAsync(){Console.WriteLine($"开始 ({_identifier}): 发布消息");for (var i = 0; i < 10; i++){await Task.Delay(_delay); // 停顿一下,方便观察数据var msg = $"P{_identifier} - {DateTime.Now:G}-{i}";Console.WriteLine($"发布 ({_identifier}): 消息成功 {msg}");await _writer.WriteAsync(msg);}Console.WriteLine($"发布 ({_identifier}): 完成");}}
}
- 消费者
Consumer
类
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Channels;
using System.Threading.Tasks;namespace TestChannels
{/// <summary>/// 消费/// </summary>internal class Consumer{private readonly ChannelReader<string> _reader;private readonly int _identifier;private readonly int _delay;public Consumer(ChannelReader<string> reader, int identifier, int delay){_reader = reader;_identifier = identifier;_delay = delay;}public async Task ConsumerAsync(){Console.WriteLine($" 开始({_identifier}):消费 ");while (await _reader.WaitToReadAsync()){if (_reader.TryRead(out var timeString)){await Task.Delay(_delay); // 停顿一下,方便观察数据Console.WriteLine($"消费 ({_identifier}): 成功 {timeString}");}}Console.WriteLine($"消费 ({_identifier}): 完成");}}
}
- [ 参考] :
https://learn.microsoft.com/en-us/dotnet/api/system.threading.channels?view=netcore-3.0