简介
主题模式允许发送者根据主题发布消息,而订阅者可以订阅特定的主题。
在主题模式中,生产者发送的消息被发送到一个交换机(Exchange),该交换机根据消息的路由键(Routing Key)和绑定(Binding)规则将消息路由到一个或多个队列。消费者随后从队列中接收并消费这些消息。以下是主题模式的一些关键要点:
- 路由键的设计:路由键是由点(.)分隔的字符串,例如 "stock.usd.nyse"。这些字符串通常定义了消息的某些属性或分类。
- 通配符的使用:队列绑定时可以使用通配符 "*" 和 "#"。其中,星号可以替代一个单词,井号可以替代零个或多个单词。这增加了灵活性,允许使用模糊匹配来定义哪个队列应该接收具有特定路由键的消息。
- 消息的路由过程:当消息到达交换机时,交换机会查找所有绑定的队列,检查它们的绑定键,并确定哪些队列的绑定键与消息的路由键相匹配。匹配成功的队列会接收到消息。
- 灵活性和复杂性:与直接模式相比,主题模式提供了更大的灵活性,因为它允许基于多个标准进行路由。然而,这种灵活性也带来了额外的复杂性,因为需要正确设计路由键和绑定键以实现期望的路由行为。
- 消息的丢失风险:如果没有任何队列的绑定键与消息的路由键匹配,那么消息将会丢失。因此,正确配置交换机、队列的绑定以及路由键非常重要。
通配符
在主题模式中的通配符其实就像我们平时写的正则表达式,比如在消费者中使用 "stock.#" 作为绑定键,那么那么绑定键 "stock.#" 的消费者将匹配到所有以 "stock" 开头的路由键,不论后面跟随什么单词。如"stock.usd.nyse"、"stock.eur.frankfurt" 还是 "stock.jpy.tokyo",只要是以 "stock" 开头的消息都会被该队列接收。
- 星号(*):星号可以代替路由键中的一个单词。例如,如果有一个路由键为 "stock.usd.nyse" 的消息,那么绑定键 "stock.*.nyse" 或 "stock.usd.#" 都可以匹配到这个消息。星号可以匹配零个或多个单词,但不会跨越点(.)进行匹配。
- 井号(#):井号可以代替路由键中的零个或多个单词,且可以跨越点进行匹配。这意味着,如果有一个路由键为 "stock.usd.nyse" 的消息,那么绑定键 "stock.#" 将匹配到所有以 "stock" 开头的路由键,不论后面跟随什么单词。
生产者
在之前的模式中,我们使用的都是路由键RoutingKey,而主题模式中的是绑定键 BindingKey。主题模式可以看作是一种特殊的路由模式,它允许更复杂的路由策略,通过使用通配符 "*" 和 "#" 来实现模糊匹配。从而实现处理更加复杂的消息路由需求。
RoutingKey 主要用于生产者发布消息时定义消息的路由路径,而 BindingKey 用于定义交换机与队列之间的绑定关系。
class MyClass
{public static void Main(string[] args){var factory = new ConnectionFactory();factory.HostName = "localhost"; //RabbitMQ服务在本地运行factory.UserName = "guest"; //用户名factory.Password = "guest"; //密码//创建连接using (var connection = factory.CreateConnection()){//创建通道using (var channel = connection.CreateModel()){//声明了一个主题交换机(topic),命名为"hello"channel.ExchangeDeclare("hello", "topic");Console.WriteLine("生产者:请输入绑定key");var bindingKey = Console.ReadLine();string msg;Console.WriteLine("请输入要发送的消息内容:");while (!string.IsNullOrEmpty(msg = Console.ReadLine())){var body = Encoding.UTF8.GetBytes(msg);channel.BasicPublish("hello", bindingKey, null, body); //开始传递Console.WriteLine("已发送: {0}", msg);}}}}
}
消费者
class MyClass
{static void Main(string[] args){//创建连接工厂var factory = new ConnectionFactory();factory.HostName = "localhost";factory.UserName = "guest";factory.Password = "guest";//创建连接using (var connection = factory.CreateConnection()){//创建通道using (var channel = connection.CreateModel()){//声明了一个交换机channel.ExchangeDeclare("hello", "topic");//声明一个新的队列,并将这个队列的名称赋值给变量 queueNamevar queueName = channel.QueueDeclare().QueueName;//从控制台获取一个绑定键Console.WriteLine("消费者:请输入BindingKey");var bindingKey = Console.ReadLine();channel.QueueBind(queueName, "hello", bindingKey);//事件的基本消费者var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);Thread.Sleep(1000);Console.WriteLine("已接收: {0}", message);};channel.BasicConsume(queueName, true, consumer);Console.ReadKey();}}}
}
演示
还是老样子,我们将生产者和消费者都发布打包,分别运行三次,然后生产者分别发送一条消息,如下图。
#
用于匹配多个单词。因此com.#三条消息都收到了。
*
用于匹配一个单词,因为有两个生产者的第三个单词是2,所以*.*.2
的消费者收到了两条消息。
而只有一个生产者的第二个单词是B,所以*.B.*
的消费者只收到了一条消息