Direct消息模型
* 路由模型:
* 一个交换机可以绑定多个队列
* 生产者给交换机发送消息时,需要指定消息的路由键
* 消费者绑定队列到交换机时,需要指定所需要消费的信息的路由键
* 交换机会根据消息的路由键将消息转发到对应的队列* 缺点:
* 当消息很多的时候,需要指定的路由键也会很多,究极复杂。
生产者
package com.example.demo02.mq.direct;import com.example.demo02.mq.util.ConnectionUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;import java.io.IOException;
public class DirectSender {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtils.getConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("direct.exchange", BuiltinExchangeType.DIRECT, false);String msg1 = "{To DirectReceiver1: orderId:1001}";String msg2 = "{To DirectReceiver2: orderId:1002}";channel.basicPublish("direct.exchange","order.save",null,msg1.getBytes());channel.basicPublish("direct.exchange","order.update",null,msg2.getBytes());channel.close();connection.close();}
}
消费者1
package com.example.demo02.mq.direct;import com.example.demo02.mq.util.ConnectionUtils;
import com.rabbitmq.client.*;import java.io.IOException;
public class DirectReceiver1 {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtils.getConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("direct.exchange", BuiltinExchangeType.DIRECT, false);channel.queueDeclare("direct.queue1", false, false, false, null);channel.queueBind("direct.queue1","direct.exchange","order.save");Consumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("DirectReceiver1接收到的新增订单消息是:" + new String(body));channel.basicAck(envelope.getDeliveryTag(),false);}};channel.basicConsume("direct.queue1",false,consumer);}
}
消费者2
package com.example.demo02.mq.direct;import com.example.demo02.mq.util.ConnectionUtils;
import com.rabbitmq.client.*;import java.io.IOException;
public class DirectReceiver2 {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtils.getConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("direct.exchange", BuiltinExchangeType.DIRECT, false);channel.queueDeclare("direct.queue2", false, false, false, null);channel.queueBind("direct.queue2","direct.exchange","order.update");Consumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("DirectReceiver2接收到的修改订单消息:" + new String(body));channel.basicAck(envelope.getDeliveryTag(),false);}};channel.basicConsume("direct.queue2",false,consumer);}
}
结果