1. 引入jar包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.配置yml
2.1 配置生产者yml
spring:rabbitmq:host: localhostport: 5672virtual-host: / # 虚拟主机username: guestpassword: guestpublisher-returns: true #开启发送失败退回# simple:同步等待confirm结果,直到超时#correlated:异步回调,次你故意ConfirmCallback,MQ返回结果时会回调这个ComfirmCallbackpublisher-confirm-type: correlated
2.2 配置消费者yml
spring:rabbitmq:host: localhostport: 5672virtual-host: / # 虚拟主机username: guestpassword: guestcloud:stream:bindings:delayed-topic-input:destination: delayed-topic-demo #将消费者队列绑定到指定交换机group: group-1 #消费默认分组,消息到达时同一个分组下多个实例情况,只会有一个实例消费这条消息consumer:delayed-exchange: true #开启延时,生产者和消费者端都需要开启这个配置
3.生产者生产消息
3.1 direct 直连
把消息路由到那些 Bindingkey 与 RoutingKey 完全匹配的 Queue 中
3.1.1 直连队列消息发送
/***直接交换机 **/public static final String directExchange = "directExchangeOne";public static final String routingKey1 = "directKey1";public static final String routingKey2 = "directKey2";public static final String directQueue1 = "directQueueOne";public static final String directQueue2 = "directQueueTwo";/*** 直接交换机 一个交换机可以绑定一个队列一个消费者,也可以绑定多个队列多个消费者* 通过指定路由键directRouting发送给交换机directExchange* 交互机directExchange通过指定的路由键把消息msg投递到对应的队列上面去* @param map*/public void directToQueue(Map<String, String> map) {map.put("direct-路由key:",RabbitConstants.routingKey1);rabbitTemplate.convertAndSend(RabbitConstants.directExchange, RabbitConstants.routingKey1, map);map.put("direct-路由key:",RabbitConstants.routingKey2);rabbitTemplate.convertAndSend(RabbitConstants.directExchange, RabbitConstants.routingKey2, map);}
3.1.2 直连队列消息绑定
package rabbit.config;import config.RabbitConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 配置类 : 创建我们的直接交换机和队列,以及直接交换机跟队列的绑定关系* direct 类型的行为是”先匹配, 再投送”. 即在绑定时设定一个 routing_key, 消息的routing_key 匹配时, 才会被交换器投送到绑定的队列中去** */
@Configuration
public class DirectConfig {/*** Direct Exchange 是 RabbitMQ 默认的交换机模式,也是最简单的模式,根据key全文匹配去寻找队列* @return*/@Beanpublic DirectExchange directExchangeOne(){return new DirectExchange(RabbitConstants.directExchange);}@Beanpublic Queue directQueueOne(){return new Queue(RabbitConstants.directQueue1);}@Beanpublic Queue directQueueTwo(){return new Queue(RabbitConstants.directQueue2);}/*** 交换机:Exchange 用于转发消息,但是它不会做存储 ,如果没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息* 路由键 。消息到交换机的时候,交互机会转发到对应的队列中,那么究竟转发到哪个队列,就要根据该路由键。* direct 类型的行为是”先匹配, 再投送”. 即在绑定时设定一个 routing_key, 消息的routing_key 匹配时, 才会被交换器投送到绑定的队列中去* @param directQueueOne* @param directExchangeOne* @return*/@Beanpublic Binding directBindingOne(Queue directQueueOne, DirectExchange directExchangeOne){return BindingBuilder.bind(directQueueOne).to(directExchangeOne).with(RabbitConstants.routingKey1);}/*** 交换机:Exchange 用于转发消息,但是它不会做存储 ,如果没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息* 路由键 。消息到交换机的时候,交互机会转发到对应的队列中,那么究竟转发到哪个队列,就要根据该路由键。* direct 类型的行为是”先匹配, 再投送”. 即在绑定时设定一个 routing_key, 消息的routing_key 匹配时, 才会被交换器投送到绑定的队列中去* @param directQueueTwo* @param directExchangeOne* @return*/@Beanpublic Binding directBindingTwo(Queue directQueueTwo, DirectExchange directExchangeOne) {return BindingBuilder.bind(directQueueTwo).to(directExchangeOne).with(RabbitConstants.routingKey2);}}
3.1.3 直连队列消息接收
@RabbitListener(queues = RabbitConstants.directQueue1)@RabbitHandler // 指定对消息的处理public void directClientOne(HashMap<String,String> mes){System.out.println("直连队列消息1:" + mes);}/** @RabbitListener(queues = {"directQueue1","directQueue2"}):这样就可以一次消费两条消息 **/@RabbitListener(queues = RabbitConstants.directQueue2)@RabbitHandlerpublic void directClientTwo(HashMap<String,String> mes){System.out.println("直连队列消息2: " + mes);}
3.1.4 结果:
3.2 fanout 扇形
把消息发送到所有与它绑定的Queue中,没有路由概念
3.2.1 扇形消息发送
@Autowiredpublic RabbitMqProducer(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;rabbitTemplate.setConfirmCallback(this);}/**** 扇形交换机* 这个交换机没有路由键概念,就算你绑了路由键也是无视的* 消息会发送到所有绑定的队列上。* @param fanoutMap1*/public void fanoutToQueue(Map<String, String> fanoutMap1) {fanoutMap1.put("fanout-交换机:",RabbitConstants.fanoutExchange1);rabbitTemplate.convertAndSend(RabbitConstants.fanoutExchange1,null,fanoutMap1);}
3.2.2 扇形消息绑定
/*** 扇形交换机* Fanout:转发消息到所有绑定队列,没有路由key* */
@Configuration
public class FanoutConfig {/*** 不管路由键或者是路由模式,会把消息发给绑定给它的全部队列,如果配置了 routing_key 会被忽略。* @return*/@Beanpublic FanoutExchange fanoutExchange1(){return new FanoutExchange(RabbitConstants.fanoutExchange1);}@Beanpublic Queue fanoutQueue1(){return new Queue(RabbitConstants.fanoutQueue1);}@Beanpublic Queue fanoutQueue2(){return new Queue(RabbitConstants.fanoutQueue2);}/** 扇形交换机没有路由key */@Beanpublic Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange1){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange1);}/** 扇形交换机没有路由key */@Beanpublic Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange1) {return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange1);}}
3.2.3 扇形消息接收
/** 扇形交换机 */public static final String fanoutExchange1 = "fanout_exchange1";public static final String fanoutQueue1 = "fanout_queue1";public static final String fanoutQueue2 = "fanout_queue2";@RabbitListener(queues = RabbitConstants.fanoutQueue1)@RabbitHandlerpublic void fanoutQueue1(HashMap<String,String> fanoutMes){System.out.println("扇形队列消息1: " + fanoutMes);}@RabbitListener(queues = RabbitConstants.fanoutQueue2)@RabbitHandlerpublic void fanoutQueue2(HashMap<String,String> fanoutMes){System.out.println("扇形队列消息2: " + fanoutMes);}
3.2.4 扇形--结果
3.3 topic 主题
将消息路由到 BindingKey 和 RoutingKey 相匹配的队列中--多了匹配的概念
3.3.1 主题队列消息发送
@Autowiredpublic RabbitMqProducer(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;rabbitTemplate.setConfirmCallback(this);}/***主题交换机:模糊匹配队列* *:星号表示任意一个字符* #:表示任意一个或者多个字符*/// topic 的 routingKeypublic static final String topicA = "helloTopic.world";public static final String topicB = "helloTopic.#";public static final String topicAll = "#";public static final String topicExchange = "topic_exchange";/** 绑定 topicA = "helloTopic.world"*/public static final String topicQueue1 = "topic_queue1";/** 绑定 topicB="helloTopic.#"*/public static final String topicQueue2 = "topic_queue2";/** 绑定 #,匹配所有 */public static final String topicQueue3 = "topic_queue3";/*** 主题交换机:模糊匹配队列* topic.# 可匹配topic topic.add topic.add.add* topic.* 可匹配topic.add topic.delete* @param map*/public void topicToQueue(Map<String, String> map) {// 第一个参数表示交换机,第二个参数表示 routing key,第三个参数即消息map.put("Topic-路由key:",RabbitConstants.topicA);rabbitTemplate.convertAndSend(RabbitConstants.topicExchange, RabbitConstants.topicA, map);map.put("Topic-路由key:",RabbitConstants.topicB);rabbitTemplate.convertAndSend(RabbitConstants.topicExchange, RabbitConstants.topicB, map);map.put("Topic-路由key:",RabbitConstants.topicAll);rabbitTemplate.convertAndSend(RabbitConstants.topicExchange, RabbitConstants.topicAll, map);}
3.3.2 主题队列消息绑定
/**** 按规则转发消息*/
@Configuration
public class TopicConfig {/*** Topic Exchange 转发消息主要是根据通配符* 用来接收生产者发送的消息并将这些消息路由给服务器中的队列中* @return*/@Beanpublic TopicExchange topicExchange1(){return new TopicExchange(RabbitConstants.topicExchange);}@Beanpublic Queue topicQueue1(){return new Queue(RabbitConstants.topicQueue1);}@Beanpublic Queue topicQueue2(){return new Queue(RabbitConstants.topicQueue2);}@Beanpublic Queue topicQueue3(){return new Queue(RabbitConstants.topicQueue3);}/*** 消息并不是直接被投递到 Queue(消息队列) 中的,中间还必须经过 Exchange(交换器) 这一层,* Exchange(交换器) 会把我们的消息分配到对应的 Queue(消息队列) 中* @param topicQueue1* @param topicExchange1* @return*/@Beanpublic Binding topicBinding1(Queue topicQueue1, TopicExchange topicExchange1){return BindingBuilder.bind(topicQueue1).to(topicExchange1).with(RabbitConstants.topicA);}@Beanpublic Binding topicBinding2(Queue topicQueue2, TopicExchange topicExchange1){return BindingBuilder.bind(topicQueue2).to(topicExchange1).with(RabbitConstants.topicB);}@Beanpublic Binding topicBinding3(Queue topicQueue3, TopicExchange topicExchange1){return BindingBuilder.bind(topicQueue3).to(topicExchange1).with(RabbitConstants.topicAll);}}
3.3.3 主题队列消息接收
@RabbitListener(queues = RabbitConstants.topicQueue1)@RabbitHandlerpublic void topicQueue1(HashMap<String,String> topicMes){System.out.println("主题消息队列1: " + topicMes);}@RabbitListener(queues = RabbitConstants.topicQueue2)@RabbitHandlerpublic void topicQueue2(HashMap<String,String> topicMes){System.out.println("主题消息队列2: " + topicMes);}@RabbitListener(queues = RabbitConstants.topicQueue3)@RabbitHandlerpublic void topicQueue3(HashMap<String,String> topicMes){System.out.println("主题消息队列匹配所有: " + topicMes);}
3.3.4 主题--结果
3.4 Delayed 延时(需要延时插件,参考我另一篇插件安装)
3.4.1 延时队列消息发送
/** 延迟队列 */public static final String DELAYED_EXCHANGE_NAME = "myDelayedExchange";public static final String DELAYED_QUEUE_NAME = "delayed.queue";public static final String DELAYED_ROUTING_KEY = "delayed.routing.key";/*** 死信延迟队列* @param message*/public void sendDelayedMessage(String message) {System.out.println("Send time 开始: " + LocalDateTime.now());rabbitTemplate.convertAndSend(RabbitConstants.DELAYED_EXCHANGE_NAME,RabbitConstants.DELAYED_ROUTING_KEY,message,messagePostProcessor -> {messagePostProcessor.getMessageProperties().setDelay(10000); // 设置消息的延长时间延,单位毫秒return messagePostProcessor;});System.out.println("Send time 结束: " + LocalDateTime.now() );}
3.4.2 延时队列消息绑定
public class DelayedConfig {/** 定义一个延迟交换机 **/@Beanpublic CustomExchange delayedExchange() {/*Map<String, Object> args = new HashMap<String, Object>();args.put("x-delayed-type", "direct");*/return new CustomExchange(RabbitConstants.DELAYED_EXCHANGE_NAME,"x-delayed-message", // 消息类型 x-delayed-messagetrue, // 是否持久化false); // 是否自动删除}/** 延时队列 **/@Beanpublic Queue delayedQueue() {return QueueBuilder.durable(RabbitConstants.DELAYED_QUEUE_NAME).withArgument("x-delayed-type", "direct").build();}/** 绑定队列到这个延迟交换机 */@Beanpublic Binding delayedBinding(Queue delayedQueue, CustomExchange delayedExchange) {return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(RabbitConstants.DELAYED_ROUTING_KEY).noargs();}
}
3.4.3 延时队列消息接收
@RabbitListener(queues = RabbitConstants.DELAYED_QUEUE_NAME)public void receiveDelayedMessage(String message, Channel channel) {System.out.println("Received delayed message: " + message);log.info("当前时间:{},接收时长信息给延迟队列:{}", LocalTime.now(),message);System.out.println("Received time: " + LocalDateTime.now() + " Received: " + message);// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}