根据amqp协议、rabbitmq入门、springboot集成rabbitmq 可知,rabbitmq的广播模式关键是使用fanout类型的exchange,fanout exchange会忽略message中的routing-key、queue中的binding-key,发给绑定exchange的全部queue。
创建fanout类型的exchange
import org.springframework.amqp.core.*;
@Configuration
public class MqConfig {/*** 定义广播交换机* @return*/@Beanpublic FanoutExchange fanoutExchange() {final FanoutExchange fanoutExchange = new FanoutExchange("自定义广播类型的交换机名称");return fanoutExchange;}
}
发送
@Autowiredprivate AmqpTemplate amqpTemplate;//发送到订阅数据的exchange中amqpTemplate.convertAndSend("自定义广播类型的交换机名称",//fanout类型的exchange会忽略routing-key,所以这里的binding key传空字符串"",message);
消费
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
/*** 将数据发送给队列1* @param message*/@RabbitListener(bindings = @QueueBinding(value = @Queue(“自定义队列1”),exchange = @Exchange(value = "自定义广播类型的交换机名称",type = ExchangeTypes.FANOUT),//fanout类型exchange会忽略binding-keykey = ""))public void doSynAddDataToJD(String message) {log.info("广播模式,同步数据给订阅方");}/*** 将数据发送给队列2* @param message*/@RabbitListener(bindings = @QueueBinding(value = @Queue(“自定义队列2”),exchange = @Exchange(value = "自定义广播类型的交换机名称",type = ExchangeTypes.FANOUT),key = ""))public void doSynAddDataToJD(String message) {log.info("广播模式,同步数据给订阅方");}
总结
实现发布订阅(广播模式)的关键在于对exchange类型的理解,可参考amqp协议、rabbitmq入门、springboot集成rabbitmq 、AMQP 0-9-1 Model Explained,源码中的类型有如下几种
package org.springframework.amqp.core;/*** Constants for the standard Exchange type names.** @author Mark Fisher* @author Gary Russell*/
public abstract class ExchangeTypes {/*** Direct exchange.* routing key和binding key完全匹配*/public static final String DIRECT = "direct";/*** Topic exchange.* binding key可使用通配符来匹配routing key*/public static final String TOPIC = "topic";/*** Fanout exchange.* 会忽略routing key、binding key,消息发送到绑定exchange的全部queue*/public static final String FANOUT = "fanout";/*** Headers exchange.* 使用headers中的属性来匹配,有只匹配一项或者全部匹配可选*/public static final String HEADERS = "headers";/*** System exchange.* 这个类型,暂时缺乏相关资料。*/public static final String SYSTEM = "system";}