RabbitMQ的基本使用
引入程序集:RabbitMQ.Client
生产者
/// <summary>
/// ProducerWrites 写入消息 ConsumerConsumption 消费消息
/// </summary>
public class ProducerWrites
{public static void Send(){string path = AppDomain.CurrentDomain.BaseDirectory;string tag = path.Split('/', '\\').Last(s => !string.IsNullOrEmpty(s));Console.WriteLine($"这里是 {tag} 启动了。。");//创建链接:通过一个connection工厂来创建链接ConnectionFactory factory = new ConnectionFactory();factory.HostName = "localhost";//RabbitMQ服务在本地运行factory.UserName = "guest";//用户名factory.Password = "guest";//密码 //factory.VirtualHost = "/Richard";//创建链接using (IConnection connection = factory.CreateConnection()){//创建一个信道;using (IModel channel = connection.CreateModel()){ //删除队列channel.QueueDelete("ProducerWrites"); //删除交换机channel.ExchangeDelete("ProducerWritesExChange"); //创建队列channel.QueueDeclare(queue: "ProducerWrites", durable: true, exclusive: false, autoDelete: false, arguments: null);//创建交换机channel.ExchangeDeclare(exchange: "ProducerWritesExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);//交换机和队列绑定channel.QueueBind(queue: "ProducerWrites", exchange: "ProducerWritesExChange", routingKey: "advanced", arguments: null);Console.ForegroundColor = ConsoleColor.Red;Console.WriteLine($"生产者{tag}已准备就绪~~~");{for (int i = 0; i < 20; i++){IBasicProperties basicProperties = channel.CreateBasicProperties();basicProperties.Persistent = true;//basicProperties.DeliveryMode = 2;string message = $"{tag}:大家伙欢迎大家来到.NET高级班的VIP课程_{i+1}";byte[] body = Encoding.UTF8.GetBytes(message);channel.BasicPublish(exchange: "ProducerWritesExChange",routingKey: "advanced",basicProperties: basicProperties,body: body);Console.WriteLine($"{message} 已发送~");Thread.Sleep(500);}while (true){IBasicProperties basicProperties = channel.CreateBasicProperties();basicProperties.Persistent = true;//basicProperties.DeliveryMode = 2;Console.WriteLine("请输入消息内容:");string message =Console.ReadLine();byte[] body = Encoding.UTF8.GetBytes(message);channel.BasicPublish(exchange: "ProducerWritesExChange",routingKey: "advanced",basicProperties: basicProperties,body: body);Console.WriteLine($"{message} 已发送~"); Thread.Sleep(500);}}}}}
}
消费者
public class ConsumerConsumption
{/// <summary>/// ProducerWrites 写入消息 ConsumerConsumption 消费消息/// </summary>public static void Consumption(){string path = AppDomain.CurrentDomain.BaseDirectory;string tag = path.Split('/', '\\').Last(s => !string.IsNullOrEmpty(s));Console.WriteLine($"这里是 {tag} 启动了。。");var factory = new ConnectionFactory();factory.HostName = "localhost";//RabbitMQ服务在本地运行factory.UserName = "guest";//用户名factory.Password = "guest";//密码 using (var connection = factory.CreateConnection()){using (var channel = connection.CreateModel()){Console.ForegroundColor = ConsoleColor.Green;try{channel.QueueDeclare(queue: "ProducerWrites", durable: true, exclusive: false, autoDelete: false, arguments: null);channel.ExchangeDeclare(exchange: "ProducerWritesExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);channel.QueueBind(queue: "ProducerWrites", exchange: "ProducerWritesExChange", routingKey: "advanced", arguments: null);//rabbitMq消费消息是通过事件驱动的:var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) => //如果有消息进入到Rabbitmq,就会触发这个事件来完成消息的消费;{var body = ea.Body;var message = Encoding.UTF8.GetString(body.ToArray());Console.WriteLine($"{tag}接受消息: {message}");};channel.BasicConsume(queue: "ProducerWrites",autoAck: true,consumer: consumer);Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();}catch (Exception ex){Console.WriteLine(ex.Message);}}}}
}