Topic消费模型
* 通配符模型
* 生产者必须指定完整且准确的路由key
* 消费者可以使用通配符
* * :可以替代一级的任意字符 add. * == > add. user add. goods
* #:可以替代多级的任意字符 add. # == > add. user. name add. user. name. firstName
生产者
package com. example. demo02. mq. topic ; import com. example. demo02. mq. util. ConnectionUtils ;
import com. rabbitmq. client. BuiltinExchangeType ;
import com. rabbitmq. client. Channel ;
import com. rabbitmq. client. Connection ; import java. io. IOException ;
public class TopicSender { public static void main ( String [ ] args) throws Exception { Connection connection = ConnectionUtils . getConnection ( ) ; Channel channel = connection. createChannel ( ) ; channel. exchangeDeclare ( "topic.exchange" , BuiltinExchangeType . TOPIC , false ) ; String msg1 = "商品新增了,Topic模型,routing key 为 goods.add" ; String msg2 = "商品修改了,Topic模型,routing key 为 goods.update" ; String msg3 = "商品删除了,Topic模型,routing key 为 goods.delete" ; String msg4 = "用户新增了,Topic模型,routing key 为 user.add" ; String msg5 = "用户修改了,Topic模型,routing key 为 user.update" ; String msg6 = "用户删除了,Topic模型,routing key 为 user.delete" ; String msg7 = "添加了用户名字,Topic模型,routing key 为 user.add.name" ; String msg8 = "添加了用户年龄,Topic模型,routing key 为 user.add.age" ; String msg9 = "修改了用户名字,Topic模型,routing key 为 user.update.name" ; String msg10 = "修改了用户年龄,Topic模型,routing key 为 user.update.age" ; channel. basicPublish ( "topic.exchange" , "goods.add" , null , msg1. getBytes ( ) ) ; channel. basicPublish ( "topic.exchange" , "goods.update" , null , msg2. getBytes ( ) ) ; channel. basicPublish ( "topic.exchange" , "goods.delete" , null , msg3. getBytes ( ) ) ; channel. basicPublish ( "topic.exchange" , "user.add" , null , msg4. getBytes ( ) ) ; channel. basicPublish ( "topic.exchange" , "user.update" , null , msg5. getBytes ( ) ) ; channel. basicPublish ( "topic.exchange" , "user.delete" , null , msg6. getBytes ( ) ) ; channel. basicPublish ( "topic.exchange" , "user.add.name" , null , msg7. getBytes ( ) ) ; channel. basicPublish ( "topic.exchange" , "user.add.age" , null , msg8. getBytes ( ) ) ; channel. basicPublish ( "topic.exchange" , "user.update.name" , null , msg9. getBytes ( ) ) ; channel. basicPublish ( "topic.exchange" , "user.update.age" , null , msg10. getBytes ( ) ) ; channel. close ( ) ; connection. close ( ) ; }
}
消费者1
package com. example. demo02. mq. topic ; import com. example. demo02. mq. util. ConnectionUtils ;
import com. rabbitmq. client. * ; import java. io. IOException ;
public class TopicReceiver1 { public static void main ( String [ ] args) throws Exception { Connection connection = ConnectionUtils . getConnection ( ) ; Channel channel = connection. createChannel ( ) ; channel. exchangeDeclare ( "topic.exchange" , BuiltinExchangeType . TOPIC , false ) ; channel. queueDeclare ( "topic.queue1" , false , false , false , null ) ; channel. queueBind ( "topic.queue1" , "topic.exchange" , "goods.*" ) ; Consumer consumer = new DefaultConsumer ( channel) { @Override public void handleDelivery ( String consumerTag, Envelope envelope, AMQP. BasicProperties properties, byte [ ] body) throws IOException { System . out. println ( "商品模块接收到的消息是:" + new String ( body) ) ; channel. basicAck ( envelope. getDeliveryTag ( ) , false ) ; } } ; channel. basicConsume ( "topic.queue1" , false , consumer) ; }
}
消费者2
package com. example. demo02. mq. topic ; import com. example. demo02. mq. util. ConnectionUtils ;
import com. rabbitmq. client. * ; import java. io. IOException ;
public class TopicReceiver2 { public static void main ( String [ ] args) throws Exception { Connection connection = ConnectionUtils . getConnection ( ) ; Channel channel = connection. createChannel ( ) ; channel. exchangeDeclare ( "topic.exchange" , BuiltinExchangeType . TOPIC , false ) ; channel. queueDeclare ( "topic.queue2" , false , false , false , null ) ; channel. queueBind ( "topic.queue2" , "topic.exchange" , "user.#" ) ; Consumer consumer = new DefaultConsumer ( channel) { @Override public void handleDelivery ( String consumerTag, Envelope envelope, AMQP. BasicProperties properties, byte [ ] body) throws IOException { System . out. println ( "用户模块接收到的消息是:" + new String ( body) ) ; channel. basicAck ( envelope. getDeliveryTag ( ) , false ) ; } } ; channel. basicConsume ( "topic.queue2" , false , consumer) ; }
}
结果