微信公众号:趣编程ACE
关注可了解更多的.NET日常实战开发技巧,如需源码 请公众号后台留言 源码;
[如果觉得本公众号对您有帮助,欢迎关注]
.Net中RabbitMQ中交换机模式的使用
前文回顾
【微服务专题之】.Net6下集成消息队列上-RabbitMQ【微服务专题之】.Net6下集成消息队列2-RabbitMQ
【微服务专题之】.Net6中集成消息队列-RabbitMQ中直接路由模式
TopicExchange 交换机模式
如果我们需要将一条信息发送到多个队列上,假若利用直连模式,那么就会有很多的路由,而TopicExchange只需要配置定义好的路由规则,即可省略多余的路由指定。
PS:路由规则有一定的约束,比如需要采取* .#. * 的格式,用.号 分隔
其中
1.*表示一个单词
2. #表示任意数量(零个或多个)单词。
消费者程序演示
public static class TopicExchangeReceive{public static void Receive(IModel channel){channel.ExchangeDeclare("hello-topic-exchange", ExchangeType.Topic);channel.QueueDeclare(queue: "hello-topic-queue",durable: true,exclusive: false,autoDelete: false,arguments: null);channel.QueueBind("hello-topic-queue", "hello-topic-exchange", "route.*");//channel.QueueBind("hello", "hello-direct-exchange", "route2");// 创建一个消费者基本事件var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);Console.WriteLine(" [x] Received {0}", message);};channel.BasicConsume(queue: "hello-topic-queue",// 自动确认autoAck: true,consumer: consumer);//channel.BasicConsume(queue: "hello",// // 自动确认// autoAck: true,// consumer: consumer);Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();}}
生产者程序演示
public static class TopicExchangeSend{public static void Send(IModel channel){channel.ExchangeDeclare("hello-topic-exchange", ExchangeType.Topic);var count = 0;while (true){Thread.Sleep(1000);// 发送的消息string message = $"Hello World {count}";var body = Encoding.UTF8.GetBytes(message);//var body2 = Encoding.UTF8.GetBytes(message + "body2");// 基本发布 不指定交换channel.BasicPublish(exchange: "hello-topic-exchange",// 路由键 就是队列名称routingKey: "route.2",// 基础属性basicProperties: null,// 传递的消息体body: body);//channel.BasicPublish(exchange: "hello-direct-exchange",// // 路由键 就是队列名称// routingKey: "route2",// // 基础属性// basicProperties: null,// // 传递的消息体// body: body2);count++;Console.WriteLine(" [x] sent {0}", message);}}}
效果展示
Headers Exchange 模式
其实这种模式类似与Http请求里面的headers,我们定义一个字典然后通过交换机携带,作为交换机与路由器交换媒介,相比于Topic Exchange交换模式,header定义的格式类型更加丰富。
消费者程序演示
public static class HeadersExchangeReceive{public static void Receive(IModel channel){channel.ExchangeDeclare("hello-headers-exchange", ExchangeType.Headers);channel.QueueDeclare(queue: "hello-headers-queue",durable: true,exclusive: false,autoDelete: false,arguments: null);var headers = new Dictionary<string, object>(){{"test","A" }};channel.QueueBind("hello-headers-queue", "hello-headers-exchange", String.Empty, headers);//channel.QueueBind("hello", "hello-direct-exchange", "route2");// 创建一个消费者基本事件var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);Console.WriteLine(" [x] Received {0}", message);};channel.BasicConsume(queue: "hello-headers-queue",// 自动确认autoAck: true,consumer: consumer);//channel.BasicConsume(queue: "hello",// // 自动确认// autoAck: true,// consumer: consumer);Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();}}
生产者程序演示
public static void Send(IModel channel){channel.ExchangeDeclare("hello-headers-exchange", ExchangeType.Headers);var count = 0;while (true){Thread.Sleep(1000);// 发送的消息string message = $"Hello World {count}";var body = Encoding.UTF8.GetBytes(message);//var body2 = Encoding.UTF8.GetBytes(message + "body2");var basicProperties = channel.CreateBasicProperties();basicProperties.Headers = new Dictionary<string, object>(){{"test" ,"A"}};// 基本发布 不指定交换channel.BasicPublish(exchange: "hello-headers-exchange",// 路由键 就是队列名称routingKey: String.Empty,// 基础属性basicProperties,// 传递的消息体body: body);//channel.BasicPublish(exchange: "hello-direct-exchange",// // 路由键 就是队列名称// routingKey: "route2",// // 基础属性// basicProperties: null,// // 传递的消息体// body: body2);count++;Console.WriteLine(" [x] sent {0}", message);}}
效果演示
FanoutExchange 模式
扇形交换机模式是最傻瓜的一种交换模式,总的而言只要将队列绑定到交换机上,就能做到消息互通了。当然了这种机制处理事件的速度也是所有交换机类型里面最快的。
消费者程序演示
public static class FanoutExchangeReceive{public static void Receive(IModel channel){channel.ExchangeDeclare("hello-fanout-exchange", ExchangeType.Fanout);channel.QueueDeclare(queue: "hello-fanout-queue",durable: true,exclusive: false,autoDelete: false,arguments: null);//var headers = new Dictionary<string, object>()//{// {"test","A" }//};channel.QueueBind("hello-fanout-queue", "hello-fanout-exchange", String.Empty);//channel.QueueBind("hello", "hello-direct-exchange", "route2");// 创建一个消费者基本事件var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);Console.WriteLine(" [x] Received {0}", message);};channel.BasicConsume(queue: "hello-fanout-queue",// 自动确认autoAck: true,consumer: consumer);//channel.BasicConsume(queue: "hello",// // 自动确认// autoAck: true,// consumer: consumer);Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();}
生产者程序演示
public static class FanoutExchangeSend{public static void Send(IModel channel){channel.ExchangeDeclare("hello-fanout-exchange", ExchangeType.Fanout);var count = 0;while (true){Thread.Sleep(1000);// 发送的消息string message = $"Hello World {count}";var body = Encoding.UTF8.GetBytes(message);//var body2 = Encoding.UTF8.GetBytes(message + "body2");//var basicProperties = channel.CreateBasicProperties();//basicProperties.Headers = new Dictionary<string, object>()//{// {"test" ,"A"}//};// 基本发布 不指定交换channel.BasicPublish(exchange: "hello-fanout-exchange",// 路由键 就是队列名称routingKey: String.Empty,// 基础属性null,// 传递的消息体body: body);//channel.BasicPublish(exchange: "hello-direct-exchange",// // 路由键 就是队列名称// routingKey: "route2",// // 基础属性// basicProperties: null,// // 传递的消息体// body: body2);count++;Console.WriteLine(" [x] sent {0}", message);}}}
效果演示
PS:具体的代码效果演示看视频哦~