【SpringBoot整合系列】SpringBoot整合RabbitMQ-基本使用

目录

  • SpringtBoot整合RabbitMQ
    • 1.依赖
    • 2.配置
    • RabbitMQ的7种模式
      • 1.简单模式(Hello World)
        • 应用场景
        • 代码示例
      • 2.工作队列模式(Work queues)
        • 应用场景
        • 代码示例
        • 手动 ack
          • 代码示例
      • 3.订阅模式(Publish/Subscribe)
        • 应用场景
        • 代码示例
      • 4.路由模式(Routing)
        • 应用场景
        • 代码示例
      • 5.主题模式(Topics)
        • 应用场景
        • 代码示例
      • 6.远程过程调用(RPC)
        • 应用场景
        • 代码示例
          • 消息生产者开发
          • 消息发送者开发
      • 7.发布者确认(Publisher Confirms)
        • 应用场景
    • RabbitMQ的四种交换机
      • 1.直连交换机(Direct exchange)
        • 代码示例
      • 2.扇形交换机(Fanout exchange)
        • 代码示例
      • 3.主题交换机(Topic exchange)
        • 代码示例
      • 4.首部交换机(Headers exchange)
        • 代码示例

SpringtBoot整合RabbitMQ

1.依赖

        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency>

2.配置

server:port: 9090
spring:rabbitmq:host: 192.168.29.200port: 5672username: adminpassword: adminvirtual-host: /

RabbitMQ的7种模式

1.简单模式(Hello World)

在这里插入图片描述

  • 做最简单的事情,一个生产者对应一个消费者,RabbitMQ相当于一个消息代理,负责将A的消息转发给B
应用场景
  • 将发送的电子邮件放到消息队列,然后邮件服务在队列中获取邮件并发送给收件人
代码示例
  1. 配置类
    @Configuration
    public class HelloWorldConfig {public static final String HELLO_WORLD_QUEUE_NAME = "hello_world_queue";@Beanpublic Queue queue1() {return new Queue(HELLO_WORLD_QUEUE_NAME);}
    }
    
  2. 监听并消费消息
    @Component
    public class HelloWorldConsumer {@RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME)public void receive(String msg) {System.out.println("msg = " + msg);}
    }
    
  3. 生产消息并发送
    @SpringBootTest
    class MyMqBootApplicationTests {@ResourceRabbitTemplate rabbitTemplate;@Testvoid helloworld() {rabbitTemplate.convertAndSend(HelloWorldConfig.HELLO_WORLD_QUEUE_NAME, "hello world!!!");}
    }
    

2.工作队列模式(Work queues)

在这里插入图片描述

  • 在多个消费者之间分配任务(竞争的消费者模式),一个生产者对应多个消费者,一般适用于执行资源密集型任务,单个消费者处理不过来,需要多个消费者进行处理
  • 一个队列对应了多个消费者,默认情况下,由队列对消息进行平均分配,消息会被分到不同的消费者手中。消费者可以配置各自的并发能力,进而提高消息的消费能力,也可以配置手动 ack,来决定是否要消费某一条消息。
应用场景
  • 一个订单的处理需要10s,有多个订单可以同时放到消息队列,然后让多个消费者同时处理,这样就是并行了,而不是单个消费者的串行情况
代码示例
  1. 监听并消费消息

    @Component
    public class HelloWorldConsumer {@RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME)public void receive(String msg) {System.out.println("receive = " + msg);}// concurrency 为 10,此时,receive2将会同时存在 10 个子线程去消费消息@RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME,concurrency = "10")public void receive2(String msg) {System.out.println("receive2 = " + msg+"------->"+Thread.currentThread().getName());}
    }
    
  2. 生产消息并发送

        @Testvoid work() {for (int i = 0; i < 10; i++) {rabbitTemplate.convertAndSend(HelloWorldConfig.HELLO_WORLD_QUEUE_NAME, "hello");}}
    
  3. 运行结果:运行结果每次不一定一样

    receive2 = hello------->org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1-9
    receive2 = hello------->org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1-10
    receive2 = hello------->org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1-1
    receive2 = hello------->org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1-3
    receive2 = hello------->org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1-7
    receive2 = hello------->org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1-5
    receive = hello
    receive2 = hello------->org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1-4
    receive2 = hello------->org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1-6
    receive2 = hello------->org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1-8
    
手动 ack

手动 ack可以自行决定是否消费 RabbitMQ 发来的消息

代码示例
  1. 配置文件:配置手动ack
server:port: 9090
spring:rabbitmq:host: 192.168.29.200port: 5672username: adminpassword: adminvirtual-host: /listener:simple:acknowledge-mode: manual # 配置手动ack
  1. 消费代码:receive2 拒绝了所有消息,第一个消费者消费了所有消息
    @Component
    public class HelloWorldConsumer {@RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME)public void receive(Message message, Channel channel) throws IOException {System.out.println("receive="+message.getPayload());//手动确认channel.basicAck(((Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG)),true);}@RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME, concurrency = "10")public void receive2(Message message, Channel channel) throws IOException {System.out.println("receive2 = " + message.getPayload() + "------->" + Thread.currentThread().getName());//手动拒绝channel.basicReject(((Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG)), true);}
    }
    
  2. 测试结果(生产代码不变)
  • 此时receive2 拒绝了所有消息,receive消费了所有消息(如果receive2没有拒绝,receive断然不会消费10次)

3.订阅模式(Publish/Subscribe)

在这里插入图片描述

  • 一次向许多消费者发送消息,一个生产者发送的消息会被多个消费者获取,也就是将消息将广播到所有的消费者中。
  • 一个生产者,多个消费者,每一个消费者都有自己的一个队列,生产者没有将消息直接发送到队列,而是发送到了交换机,每个队列绑定交换机,生产者发送的消息经过交换机,到达队列,实现一个消息被多个消费者获取的目的。
  • 需要注意的是,如果将消息发送到一个没有队列绑定的 Exchange上面,那么该消息将会丢失,这是因为在 RabbitMQ 中 Exchange 不具备存储消息的能力,只有队列具备存储消息的能力
应用场景
  • 更新商品库存后需要通知多个缓存和多个数据库,这里的结构应该是:
    • 一个fanout类型交换机扇出两个个消息队列,分别为缓存消息队列、数据库消息队列
    • 一个缓存消息队列对应着多个缓存消费者
    • 一个数据库消息队列对应着多个数据库消费者
代码示例

具体内容看本文单独的目录 RabbitMQ的四种交换机 , 我这里单独拿出来解释了

4.路由模式(Routing)

在这里插入图片描述

  • 有选择地(Routing key)接收消息,发送消息到交换机并且要指定路由key ,消费者将队列绑定到交换机时需要指定路由key,仅消费指定路由key的消息
  • 一个生产者,一个交换机,两个队列,两个消费者,生产者在创建 Exchange 后,根据 RoutingKey 去绑定相应的队列,并且在发送消息时,指定消息的具体 RoutingKey 即可
应用场景
  • 如在商品库存中增加了1台iphone12,iphone12促销活动消费者指定routing key为iphone12,只有此促销活动会接收到消息,其它促销活动不关心也不会消费此routing key的消息
代码示例

参考本文单独的目录 RabbitMQ的四种交换机-1

5.主题模式(Topics)

在这里插入图片描述

  • 根据主题(Topics)来接收消息,将路由key和某模式进行匹配,此时队列需要绑定在一个模式上,#匹配一个词或多个词,*只匹配一个词。
  • 一个生产者,一个交换机,两个队列,两个消费者,生产者创建 Topic 的 Exchange 并且绑定到队列中,这次绑定可以通过 * 和 # 关键字,对指定 RoutingKey 内容,编写时注意格式 xxx.xxx.xxx 去编写
应用场景

同上,iphone促销活动可以接收主题为iphone的消息,如iphone12、iphone13,iphone…等

代码示例

参考本文单独的目录 RabbitMQ的四种交换机-3

6.远程过程调用(RPC)

在这里插入图片描述
如果我们需要在远程计算机上运行功能并等待结果就可以使用RPC,具体流程可以看图。

  • 首先 Client 发送一条消息,和普通的消息相比,这条消息多了两个关键内容:一个是 correlation_id,这个表示这条消息的唯一 id,还有一个内容是 reply_to,这个表示消息回复队列的名字。
  • Server 从消息发送队列获取消息并处理相应的业务逻辑,处理完成后,将处理结果发送到 reply_to 指定的回调队列中。
  • Client 从回调队列中读取消息,就可以知道消息的执行情况是什么样子了。

这种情况其实非常适合处理异步调用。

应用场景
  • 需要等待接口返回数据,如订单支付
代码示例
消息生产者开发
  1. 依赖

            <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency>
    
  2. 配置

    server:port: 7777
    spring:rabbitmq:host: 192.168.29.200port: 5672username: adminpassword: adminvirtual-host: /publisher-confirm-type: correlated # 配置消息确认方式,我们通过 correlated 来确认,只有开启了这个配置,将来的消息中才会带 correlation_id,只有通过 correlation_id 我们才能将发送的消息和返回值之间关联起来。publisher-returns: true #开启发送失败退回。
    
  3. 配置类

  4. /*** @author: zjl* @datetime: 2024/5/9* @desc: * 这个配置类中我们分别配置了消息发送队列 msgQueue 和消息返回队列 replyQueue,* 然后将这两个队列和消息交换机进行绑定。常规操作。* 在 Spring Boot 中我们负责消息发送的工具是 RabbitTemplate,* 默认情况下,系统自动提供了该工具,但是这里我们需要对该工具重新进行定制,* 主要是添加消息发送的返回队列,最后我们还需要给返回队列设置一个监听器*/
    @Configuration
    public class RabbitConfig {public static final String RPC_QUEUE1 = "queue_1";public static final String RPC_QUEUE2 = "queue_2";public static final String RPC_EXCHANGE = "rpc_exchange";/*** 设置消息发送RPC队列*/@Beanpublic Queue msgQueue() {return new Queue(RPC_QUEUE1);}/*** 设置返回队列*/@Beanpublic Queue replyQueue() {return new Queue(RPC_QUEUE2);}/*** 设置交换机*/@Beanpublic TopicExchange exchange() {return new TopicExchange(RPC_EXCHANGE);}/*** 请求队列和交换器绑定*/@Beanpublic Binding msgBinding() {return BindingBuilder.bind(msgQueue()).to(exchange()).with(RPC_QUEUE1);}/*** 返回队列和交换器绑定*/@Beanpublic Binding replyBinding() {return BindingBuilder.bind(replyQueue()).to(exchange()).with(RPC_QUEUE2);}/*** 使用 RabbitTemplate发送和接收消息* 并设置回调队列地址*/@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate template = new RabbitTemplate(connectionFactory);template.setReplyAddress(RPC_QUEUE2);template.setReplyTimeout(6000);return template;}/*** 给返回队列设置监听器*/@Beanpublic SimpleMessageListenerContainer replyContainer(ConnectionFactory connectionFactory) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.setQueueNames(RPC_QUEUE2);container.setMessageListener(rabbitTemplate(connectionFactory));return container;}
    }
    
  5. 消息发送

    /*** @author: zjl* @datetime: 2024/5/9* @desc:*     消息发送调用 sendAndReceive 方法,该方法自带返回值,返回值就是服务端返回的消息。*     服务端返回的消息中,头信息中包含了 spring_returned_message_correlation 字段,*     这个就是消息发送时候的 correlation_id,通过消息发送时候的 correlation_id*     以及返回消息头中的 spring_returned_message_correlation 字段值,*     我们就可以将返回的消息内容和发送的消息绑定到一起,*     确认出这个返回的内容就是针对这个发送的消息的。*/
    @RestController
    @Slf4j
    public class RpcClientController {@Resourceprivate RabbitTemplate rabbitTemplate;@GetMapping("/send")public String send(String message) {// 创建消息对象Message newMessage = MessageBuilder.withBody(message.getBytes()).build();log.info("client send:{}", newMessage);//客户端发送消息Message result = rabbitTemplate.sendAndReceive(RabbitConfig.RPC_EXCHANGE, RabbitConfig.RPC_QUEUE1, newMessage);String response = "";if (result != null) {// 获取已发送的消息的 correlationIdString correlationId = newMessage.getMessageProperties().getCorrelationId();log.info("correlationId:{}", correlationId);// 获取响应头信息HashMap<String, Object> headers = (HashMap<String, Object>) result.getMessageProperties().getHeaders();// 获取 server 返回的消息 idString msgId = (String) headers.get("spring_returned_message_correlation");if (msgId.equals(correlationId)) {response = new String(result.getBody());log.info("client receive:{}", response);}}return response;}
    }
    

这就是整个消息生产者的开发,其实最最核心的就是 sendAndReceive 方法的调用。调用虽然简单,但是准备工作还是要做足够。例如如果我们没有在 application.properties 中配置 correlated,发送的消息中就没有 correlation_id,这样就无法将返回的消息内容和发送的消息内容关联起来

消息发送者开发
  1. 依赖

            <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency>
    
  2. 配置

    server:port: 8888
    spring:rabbitmq:host: 192.168.29.200port: 5672username: adminpassword: adminvirtual-host: /publisher-confirm-type: correlated # 配置消息确认方式,我们通过 correlated 来确认,只有开启了这个配置,将来的消息中才会带 correlation_id,只有通过 correlation_id 我们才能将发送的消息和返回值之间关联起来。publisher-returns: true #开启发送失败退回。
    
  3. 配置类

    @Configuration
    public class RabbitConfig {public static final String RPC_QUEUE1 = "queue_1";public static final String RPC_QUEUE2 = "queue_2";public static final String RPC_EXCHANGE = "rpc_exchange";/*** 配置消息发送队列*/@BeanQueue msgQueue() {return new Queue(RPC_QUEUE1);}/*** 设置返回队列*/@BeanQueue replyQueue() {return new Queue(RPC_QUEUE2);}/*** 设置交换机*/@BeanTopicExchange exchange() {return new TopicExchange(RPC_EXCHANGE);}/*** 请求队列和交换器绑定*/@BeanBinding msgBinding() {return BindingBuilder.bind(msgQueue()).to(exchange()).with(RPC_QUEUE1);}/*** 返回队列和交换器绑定*/@BeanBinding replyBinding() {return BindingBuilder.bind(replyQueue()).to(exchange()).with(RPC_QUEUE2);}
    }
    
  4. 消息消费

    @RestController
    @Slf4j
    public class RpcConsumerReceiver {/** 服务端首先收到消息并打印出来。* 服务端提取出原消息中的 correlation_id。* 服务端调用 sendAndReceive 方法,将消息发送给 RPC_QUEUE2 队列,同时带上 correlation_id 参数。*/@Resourceprivate RabbitTemplate rabbitTemplate;@RabbitListener(queues = RabbitConfig.RPC_QUEUE1)public void process(Message msg) {log.info("server receive : {}",msg.toString());Message response = MessageBuilder.withBody(("i'm receive:"+new String(msg.getBody())).getBytes()).build();CorrelationData correlationData = new CorrelationData(msg.getMessageProperties().getCorrelationId());rabbitTemplate.sendAndReceive(RabbitConfig.RPC_EXCHANGE, RabbitConfig.RPC_QUEUE2, response, correlationData);}
    }
    

7.发布者确认(Publisher Confirms)

  • 与发布者进行可靠的发布确认,发布者确认是RabbitMQ扩展,可以实现可靠的发布。
  • 在通道上启用发布者确认后,RabbitMQ将异步确认发送者发布的消息,这意味着它们已在服务器端处理
应用场景
  • 对于消息可靠性要求较高,比如钱包扣款

RabbitMQ的四种交换机

1.直连交换机(Direct exchange)

  • 具有路由功能的交换机,绑定到此交换机的时候需要指定一个routing_key,交换机发送消息的时候需要routing_key,会将消息发送道对应的队列
  • DirectExchange 的路由策略是将消息队列绑定到一个 DirectExchange 上,当一条消息到达 DirectExchange 时会被转发到与该条消息 routing key 相同的 Queue 上
  • 例如消息队列名为 “hello-queue”,则 routingkey 为 “hello-queue” 的消息会被该消息队列接收。
代码示例
  1. 配置类
    @Configuration
    public class RabbitDirectConfig {//首先提供一个消息队列Queue,然后创建一个DirectExchange对象,三个参数分别是名字,重启后是否依然有效以及长期未用时是否删除。//创建一个Binding对象将Exchange和Queue绑定在一起。//DirectExchange和Binding两个Bean的配置可以省略掉,即如果使用DirectExchange,可以只配置一个Queue的实例即可。public final static String DIRECTNAME = "mq-direct";@Beanpublic Queue queue() {return new Queue("hello-queue");}@Beanpublic DirectExchange directExchange() {return new DirectExchange(DIRECTNAME, true, false);}@Beanpublic Binding binding() {return BindingBuilder.bind(queue()).to(directExchange()).with("direct");}
    }
    
  2. 消费者
    通过 @RabbitListener 注解指定一个方法是一个消息消费方法,方法参数就是所接收到的消息
    @Component
    public class DirectReceiver {@RabbitListener(queues = "hello-queue")public void handler1(String msg) {System.out.println("DirectReceiver:" + msg);}
    }
    
  3. 生产者发送消息
    @RestController
    public class SendController {@Resourceprivate RabbitTemplate rabbitTemplate;@RequestMapping("/send")public String send(){rabbitTemplate.convertAndSend("hello-queue", "hello direct!");return "success";}
    }
    

2.扇形交换机(Fanout exchange)

  • 广播消息到所有队列,没有任何处理,速度最快
  • FanoutExchange 的数据交换策略是把所有到达 FanoutExchange 的消息转发给所有与它绑定的 Queue 上,在这种策略中,routingkey 将不起任何作用
代码示例
  1. 配置类

    @Configuration
    public class RabbitFanoutConfig {//在这里首先创建 FanoutExchange,参数含义与创建 DirectExchange 参数含义一致,// 然后创建两个 Queue,再将这两个 Queue 都绑定到 FanoutExchange 上public final static String FANOUTNAME = "mq-fanout";@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange(FANOUTNAME, true, false);}@Beanpublic Queue queueOne() {return new Queue("queue-one");}@Beanpublic Queue queueTwo() {return new Queue("queue-two");}@Beanpublic Binding bindingOne() {return BindingBuilder.bind(queueOne()).to(fanoutExchange());}@Beanpublic Binding bindingTwo() {return BindingBuilder.bind(queueTwo()).to(fanoutExchange());}
    }
    
  2. 消费者

    @Component
    public class FanoutReceiver {@RabbitListener(queues = "queue-one")public void receiver1(String message) {System.out.println("FanoutReceiver:receiver1:" + message);}@RabbitListener(queues = "queue-two")public void receiver2(String message) {System.out.println("FanoutReceiver:receiver2:" + message);}
    }
    
  3. 生产者发送消息

    @RestController
    public class SendController {@Resourceprivate RabbitTemplate rabbitTemplate;@RequestMapping("/send")public String send(){//注意这里发送消息时不需要 routingkey,指定 exchange 即可,routingkey 可以直接传一个 nullrabbitTemplate.convertAndSend(RabbitFanoutConfig.FANOUTNAME,null, "hello fanout!");return "success";}
    }
    

3.主题交换机(Topic exchange)

  • 在直连交换机基础上增加模式匹配,也就是对routing_key进行模式匹配,*代表一个单词,#代表多个单词
  • TopicExchange 是比较复杂但是也比较灵活的一种路由策略,在 TopicExchange 中,Queue 通过 routingkey 绑定到 TopicExchange 上,
  • 当消息到达 TopicExchange 后,TopicExchange 根据消息的 routingkey 将消息路由到一个或者多个 Queue 上
代码示例
  1. 配置类

    @Configuration
    public class RabbitTopicConfig {/*** 首先创建 TopicExchange,参数和前面的一致。* 然后创建三个 Queue,第一个 Queue 用来存储和 “xiaomi” 有关的消息,* 第二个 Queue 用来存储和 “huawei” 有关的消息,* 第三个 Queue 用来存储和 “phone” 有关的消息。* * 将三个 Queue 分别绑定到 TopicExchange 上,* 第一个 Binding 中的 “xiaomi.#” 表示消息的 routingkey 凡是以 “xiaomi” 开头的,都将被路由到名称为 “xiaomi” 的 Queue 上,* 第二个 Binding 中的 “huawei.#” 表示消息的 routingkey 凡是以 “huawei” 开头的,都将被路由到名称为 “huawei” 的 Queue 上,* 第三个 Binding 中的 “#.phone.#” 则表示消息的 routingkey 中凡是包含 “phone” 的,都将被路由到名称为 “phone” 的 Queue 上。*/public final static String TOPICNAME = "mq-topic";@Beanpublic TopicExchange topicExchange() {return new TopicExchange(TOPICNAME, true, false);}@Beanpublic Queue xiaomi() {return new Queue("xiaomi");}@Beanpublic Queue huawei() {return new Queue("huawei");}@Beanpublic Queue phone() {return new Queue("phone");}@Beanpublic Binding xiaomiBinding() {return BindingBuilder.bind(xiaomi()).to(topicExchange()).with("xiaomi.#");}@Beanpublic Binding huaweiBinding() {return BindingBuilder.bind(huawei()).to(topicExchange()).with("huawei.#");}@Beanpublic Binding phoneBinding() {return BindingBuilder.bind(phone()).to(topicExchange()).with("#.phone.#");}
    }
    
  2. 消费者

    @Component
    public class TopicReceiver {@RabbitListener(queues = "phone")public void receiver1(String message) {System.out.println("PhoneReceiver:" + message);}@RabbitListener(queues = "xiaomi")public void receiver2(String message) {System.out.println("XiaoMiReceiver:"+message);}@RabbitListener(queues = "huawei")public void receiver3(String message) {System.out.println("HuaWeiReceiver:"+message);}
    }
    
  3. 生产者发送消息

@RestController
public class SendController {@Resourceprivate RabbitTemplate rabbitTemplate;@RequestMapping("/send")public String send(){//根据 RabbitTopicConfig 中的配置,//第一条消息将被路由到名称为 “xiaomi” 的 Queue 上,//第二条消息将被路由到名为 “huawei” 的 Queue 上,//第三条消息将被路由到名为 “xiaomi” 以及名为 “phone” 的 Queue 上,//第四条消息将被路由到名为 “huawei” 以及名为 “phone” 的 Queue 上,//最后一条消息则将被路由到名为 “phone” 的 Queue 上rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"xiaomi.news","小米新闻..");rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"huawei.news","华为新闻..");rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"xiaomi.phone","小米手机..");rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"huawei.phone","华为手机..");rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"phone.news","手机新闻..");return "success";}
}

4.首部交换机(Headers exchange)

  • 忽略routing_key,使用Headers信息(一个Hash的数据结构)进行匹配,优势在于可以有更多更灵活的匹配规则
  • HeadersExchange 是一种使用较少的路由策略,HeadersExchange 会根据消息的 Header 将消息路由到不同的 Queue 上,这种策略也和 routingkey无关
代码示例
  1. 配置类

    @Configuration
    public class RabbitHeaderConfig {/*** 这里的配置大部分和前面介绍的一样,差别主要体现的 Binding 的配置上,* 第一个 bindingName 方法中,* whereAny 表示消息的 Header 中只要有一个 Header 匹配上 map 中的 key/value,* 就把该消息路由到名为 “name-queue” 的 Queue 上,* 这里也可以使用 whereAll 方法,* 表示消息的所有 Header 都要匹配。* whereAny 和 whereAll 实际上对应了一个名为 x-match 的属性。* bindingAge 中的配置则表示只要消息的 Header 中包含 age,不管 age 的值是多少,* 都将消息路由到名为 “age-queue” 的 Queue 上*/public final static String HEADERNAME = "mq-header";@Beanpublic HeadersExchange headersExchange() {return new HeadersExchange(HEADERNAME, true, false);}@Beanpublic Queue queueName() {return new Queue("name-queue");}@Beanpublic Queue queueAge() {return new Queue("age-queue");}@Beanpublic Binding bindingName() {Map<String, Object> map = new HashMap<>();map.put("name", "mq");return BindingBuilder.bind(queueName()).to(headersExchange()).whereAny(map).match();}@Beanpublic Binding bindingAge() {return BindingBuilder.bind(queueAge()).to(headersExchange()).where("age").exists();}
    }
    
  2. 消费者

    @Component
    public class HeaderReceiver {//注意这里的参数用 byte 数组接收@RabbitListener(queues = "name-queue")public void receiver1(byte[] msg) {System.out.println("HeaderReceiver:name:" + new String(msg, 0, msg.length));}@RabbitListener(queues = "age-queue")public void receiver2(byte[] msg) {System.out.println("HeaderReceiver:age:" + new String(msg, 0, msg.length));}
    }
    
  3. 生产者发送消息

package cn.smbms.controller;import cn.smbms.config.RabbitFanoutConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;/*** @author: zjl* @datetime: 2024/5/9* @desc: */
@RestController
public class SendController {@Resourceprivate RabbitTemplate rabbitTemplate;@RequestMapping("/send")public String send(){//这里创建两条消息,两条消息具有不同的 header,不同 header 的消息将被发到不同的 Queue 中去Message nameMsg = MessageBuilder.withBody("hello header! name-queue".getBytes()).setHeader("name", "sang").build();Message ageMsg = MessageBuilder.withBody("hello header! age-queue".getBytes()).setHeader("age", "99").build();rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME, null, ageMsg);rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME, null, nameMsg);return "success";}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/9346.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

远程开机与远程唤醒BIOS设置

远程开机与远程唤醒BIOS设置 在现代计算机应用中&#xff0c;远程管理和控制已成为许多企业和个人的基本需求。其中&#xff0c;远程开机和远程唤醒是两项非常实用的功能。要实现这些功能&#xff0c;通常需要在计算机的BIOS中进行一些特定的设置。以下是对远程开机和远程唤醒…

VS2019下使用MFC完成科技项目管理系统

背景&#xff1a; &#xff08;一&#xff09;实验目的 通过该实验&#xff0c;使学生掌握windows程序设计的基本方法。了解科技项目组织管理的主要内容和管理方面的基本常识&#xff0c;熟练应用数据库知识&#xff0c;通过处理过程对计算机软件系统工作原理的进一步理解&…

Python批量备份华为设备配置到FTP服务器

Excel表格存放交换机信息&#xff1a; 备份文件夹效果图&#xff1a; Windows系统配置计划任务定时执行python脚本&#xff1a; Program/script&#xff1a;C:\Python\python.exe Add arguments (optional)&#xff1a; D:\Python_PycharmProjects\JunLan_pythonProje…

verilog中输入序列不连续的序列检测

编写一个序列检测模块&#xff0c;输入信号端口为data&#xff0c;表示数据有效的指示信号端口为data_valid。当data_valid信号为高时&#xff0c;表示此刻的输入信号data有效&#xff0c;参与序列检测&#xff1b;当data_valid为低时&#xff0c;data无效&#xff0c;抛弃该时…

如何通过wifi网络将串口数据发送到多个设备

摘要&#xff1a;当lora电台的速率无法满足高速传输时&#xff0c;可以考虑用“串口服务器”。本文介绍一下如何使用TP-LINK的TL-CPE300D实现一对多的数据发送。 当前也有使用lora电台的&#xff0c;但是lora电台支持的速率有限&#xff0c;可能最大支持到9600&#xff0c;甚至…

TC3xx MTU概述(1)

目录 1.MTU基本功能 2.MBIST 3.小结 1.MTU基本功能 在TC3xx中&#xff0c;MTU(Memory Unit Test)被用来管理控制芯片内部各种RAM的测试、初始化和数据完整性检查。 既然MTU主要是管理和控制&#xff0c;那干活的想必另有他人。所以在该平台中&#xff0c;我们可以看到SRAM…

Electron-Vue 脚手架避坑实录,兼容Win11,升级electron22,清理控制台错误

去年的还是有用的&#xff0c;大家继续看&#xff0c;今年再补充一些Electron-Vue 异常处理方案 M1 和 Window10_electron异常处理-CSDN博客 代码gitee.com地址 electron-demo: electron 22 初始代码开发和讲解 升级electron为22版本&#xff08;这个版本承上启下&#xff0c…

怎么用git在暂存区(stage)中移除不需要提交(commit)的文件?

2024年5月9日&#xff0c;周四上午 非常简单&#xff0c;用下面这条命令就可以了 git rm --cached <file>注&#xff1a;这条命令不会把文件从文件夹中删除&#xff0c;只会把文件从暂存区中移除出去 实战

《Python编程从入门到实践》day23

# 昨日知识点回顾 操控飞船移动发射子弹&#xff0c;删除屏幕之外的子弹 #今日知识点学习 第13章 外星人 13.1 项目回顾 项目添加新功能前审核既有代码&#xff0c;对混乱或低效的代码进行清理 13.2 创建第一个外星人 13.2.1 创建Alien类 # alien.py imp…

影响视频视觉质量的因素——各类视觉伪影

模糊效应&#xff08;Blurring Artifact&#xff09; 图像模糊&#xff08;blurring&#xff09;&#xff1a;平滑图像的细节和边缘产生的现象&#xff0c;模糊对于图像来说&#xff0c;是一个低通滤波器&#xff08;low-pass filter&#xff09;。一般而言&#xff0c;用户更…

商品上新业务状态机接入实践

一、商品上新业务介绍 商品上新即为在得物平台上架一个新的商品&#xff0c;一个完整的商品上新流程从各种不同的来源渠道提交新品申请开始&#xff0c;需要历经多轮不同角色的审核&#xff0c;主要包括&#xff1a; 选品审核&#xff1a;根据新品申请提交的资料信息判定是否符…

Docker 怎么将映射出的路径设置为非root用户权限

在Docker中&#xff0c;容器的根文件系统默认是由root用户拥有的。如果想要在映射到宿主机的路径时设置为非root用户权限&#xff0c;可以通过以下几种方式来实现&#xff1a; 1. 使用具有特定UID和GID的非root用户运行容器&#xff1a; 在运行容器时&#xff0c;你可以使用-u…

17 空闲空间管理

目录 假设 底层机制 分割与合并 追踪已分配空间的大小 嵌入空闲列表 让堆增长 基本策略 最优匹配 首次匹配 下次匹配 其他方式 分离空闲列表 伙伴系统 小结 分页是将内存成大小相等的内存块&#xff0c;这样的机制下面&#xff0c;很容易去管理这些内存&#xff0c…

Word表格标题间距大修改环绕为无仍无法解决

1.选中表格&#xff0c;右键选择【表格属性】 2.选择【环绕】&#xff0c;此时【定位】可以被启用&#xff08;如下&#xff09;&#xff0c;点击进入窗口 3.修改参数和下面一模一样 注意&#xff1a;【垂直】那里的修改方式是先选段落&#xff0c;后在位置输入0

python:鸭子类型使用场景

python&#xff1a;鸭子类型使用场景 1 前言 “一只鸟走起来像鸭子、游泳起来像鸭子、叫起来也像鸭子&#xff0c;那么这只鸟可以被称为鸭子。“----鸭子模型 鸭子模型是Python中的一种编程哲学&#xff0c;也被称为“鸭子类型”。它来源于一句话&#xff1a;“如果它走起路…

qt 5.15.x 安装android过程记录

1.经过好几天的qt for android 安装&#xff0c;发现存在很多坑 参考其他文章可以编译出APK文件。但是我发现(我的机器上)无法调试apk程序&#xff0c;不能调试那怎么行呢&#xff0c;看了很多文章都是运行出结果了就结束了。没有展示怎么调试程序。 很多文章都是建议安装JDK8…

CTF数据安全大赛crypto题目解题过程

CTF-Crypto加密题目内容 下面是一个Base64加密的密文 bXNobnszODdoajM3MzM1NzExMzQxMmo4MGg0bDVoMDYzNDQzNH0原文链接&#xff1a; 数据安全大赛CTF-Crypto题目 - 红客网-网络安全与渗透技术 我们用Python写一个解密脚本&#xff1a; import base64 import time #base64加密…

韩顺平0基础学Java——第7天

p110-p154 控制结构&#xff08;第四章&#xff09; 多分支 if-elseif-else import java.util.Scanner; public class day7{public static void main(String[] args) {Scanner myscanner new Scanner(System.in);System.out.println("input your score?");int s…

什么是Jetpack

Jetpack Jetpack 是一套组件库、工具&#xff0c;可帮助开发人员遵循最佳做法&#xff0c;减少样板代码并编写可在 Android 版本和设备上一致工作的代码&#xff0c;以便开发人员可以专注于他们关心的代码 组成 主要包含四部分&#xff1a;架构&#xff08;Architecture&…

Linux:进程通信(三)信号的捕捉

目录 一、信号捕捉函数 1、signal函数 2、sigaction函数 二、用户态与内核态 1、用户态 2、内核态 用户态与内核态转换 三、volatile关键字 四、SIGCHLD信号 一、信号捕捉函数 1、signal函数 signal函数是C语言标准库中的一个函数&#xff0c;用于处理Unix/Linux系…