Fanout消息模型
* 广播模型:* 一个交换机绑定多个队列* 每个队列都有一个消费者* 每个消费者消费自己队列中的消息,每个队列的信息是一样的
生产者
package com.example.demo02.mq.fanout;import com.example.demo02.mq.util.ConnectionUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;import java.io.IOException;
public class FanoutSender {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtils.getConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("fanout.exchange", BuiltinExchangeType.FANOUT,false);String msg = "fanout message";channel.basicPublish("fanout.exchange", "", null, msg.getBytes());channel.close();connection.close();}
}
消费者1
package com.example.demo02.mq.fanout;import com.example.demo02.mq.util.ConnectionUtils;
import com.rabbitmq.client.*;import java.io.IOException;
public class FanoutReceiver1 {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtils.getConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("fanout.exchange", BuiltinExchangeType.FANOUT,false);channel.queueDeclare("fanout.queue1", false, false, false, null);channel.queueBind("fanout.queue1", "fanout.exchange", "");Consumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("Fanout1接收到的消息是:" + new String(body));channel.basicAck(envelope.getDeliveryTag(),false);}};channel.basicConsume("fanout.queue1",false,consumer);}
}
消费者2
package com.example.demo02.mq.fanout;import com.example.demo02.mq.util.ConnectionUtils;
import com.rabbitmq.client.*;import java.io.IOException;
public class FanoutReceiver2 {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtils.getConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("fanout.exchange", BuiltinExchangeType.FANOUT,false);channel.queueDeclare("fanout.queue2", false, false, false, null);channel.queueBind("fanout.queue2", "fanout.exchange", "");Consumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("Fanout2接收到的消息是:" + new String(body));channel.basicAck(envelope.getDeliveryTag(),false);}};channel.basicConsume("fanout.queue2",false,consumer);}
}
结果