RabbitMQ从入门到精通之安装、通讯方式详解

文章目录

  • RabbitMQ
    • 一、RabbitMQ介绍
      • 1.1 现存问题
    • 一、RabbitMQ介绍
    • 二、RabbitMQ安装
    • 三、RabbitMQ架构
    • 四、RabbitMQ通信方式
      • 4.1 RabbitMQ提供的通讯方式
      • 4.2 Helloworld 方式
      • 4.2Work queues
      • 4.3 Publish/Subscribe
      • 4.4 Routing
      • 4.5 Topics
      • 4.6 RPC (了解)
    • 五、Springboot 操作RabbitMQ
    • 六、RabbitMQ保证消息可靠性
      • 6.1、 保证消息一定送达到Exchange
      • 6.2、保证消息可以路由到Queue中
      • 6.3、保证队列持久化消息
      • 6.4、保证消息者可以正常消费消息
      • 6.5 SpringBoot实现上述操作
        • 6.5.1 Confirm
      • 6.5.2 Return
      • 6.5.3 消息持久化
    • 七、RabbitMQ死信队列 & 延迟交换机
      • 7.4、准备Exchange&Queue
      • 7.5、实现效果
    • 八、RabbitMQ的集群


RabbitMQ

一、RabbitMQ介绍

1.1 现存问题

    • 服务异步调用: 服务A如何保证异步请求一定能被服务B接收到并处理
      在这里插入图片描述
    • 削峰: 海量请求,如何实现削峰的效果,将请求全部放到一个队列中,慢慢的消费,这个队列怎么实现?

在这里插入图片描述

    • 服务解耦: 如何尽量的降低服务之间的耦合问题,如果在订单与积分和商家服务解构,需要一个队列,而这个队列依然需要实现上述两个情况功能。

在这里插入图片描述

一、RabbitMQ介绍

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。

AMQP协议:
在这里插入图片描述

Erlang:

Erlang在1991年由爱立信公司向用户推出了第一个版本,经过不断的改进完善和发展,在1996年爱立信又为所有的Erlang用户提供了一个非常实用且稳定的OTP软件库并在1998年发布了第一个开源版本。Erlang同时支持的操作系统有linux,windows,unix等,可以说适用于主流的操作系统上,尤其是它支持多核的特性非常适合多核CPU,而分布式特性也可以很好融合各种分布式集群。

二、RabbitMQ安装

docker-compose.yml

version: “3.1”
services:
rabbitmq:
image: daocloud.io/library/rabbitmq:3.8.5
container_name: rabbitmq
restart: always
volumes:
- ./data/:/var/lib/rabbitmq/
ports:
- 5672:5672
- 15672:15672

在这里插入图片描述
在这里插入图片描述

docker-compose.yml文件内容:
在这里插入图片描述

镜像拉取完成后,直接在linux 内部执行: curl localhost:5672

在这里插入图片描述
执行后能够显示AMQP 字样的内容就说明执行成功了

执行 docker exec -it rabbitmq bash 命令 进入容器内部
cd opt/rabbitmq/ 目录下
在这里插入图片描述
执行cd plugins/sbin 命令进入目录下, 执行 ./rabbitmq-plugins enable rabbitmq_managent命令 启动rabbitmq 图形化界面
在这里插入图片描述
访问15672 端口,默认的账号密码是guest/guest
在这里插入图片描述

三、RabbitMQ架构

在这里插入图片描述

四、RabbitMQ通信方式

4.1 RabbitMQ提供的通讯方式

  • Hello World :为了入门操作
  • Work queues : 一个队列被多个消费者消费
  • Publish/Subscribe:手动创建Exchange(FANOUT)
  • Routing: 手动创建Exchange(DIRECT)
  • Topics : 手动创建Exchange(TOPIC)
  • RPC: RPC方式
  • Publisher Confirms

4.2 Helloworld 方式

在这里插入图片描述

//工具类
public class RabbitMQConnectionUtil {public static final String RABBITMQ_HOST ="172.16.177.133";public static final int RABBITMQ_POST =5672;public static final String RABBITMQ_USERNAME ="guest";public static final String RABBITMQ_PASSWORD ="guest";public static final String RABBITMQ_VIRTUAL_HOST ="/";/*** 构建RabbitMQ的连接对象* @return*/public static Connection getConnection() throws Exception{//1.创建connection    工厂对象ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(RABBITMQ_HOST);connectionFactory.setPort(RABBITMQ_POST);connectionFactory.setUsername(RABBITMQ_USERNAME);connectionFactory.setPassword(RABBITMQ_PASSWORD);connectionFactory.setVirtualHost(RABBITMQ_VIRTUAL_HOST);//2.设置RabbitMQ的连接信息Connection connection = connectionFactory.newConnection();//3. 返回连接对象return connection;}
//生产者@Testpublic void consume() throws Exception {//1.获取连接对象Connection connection = RabbitMQConnectionUtil.getConnection();//2.创建Channel 通道Channel channel = connection.createChannel();//3.构建队列channel.queueDeclare(Publisher.QUEUE_NAME,false,false,false,null);//4. 监听消息DefaultConsumer callback =new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消息者获取消息"+new String(body,"UTF-8"));}};channel.basicConsume(Publisher.QUEUE_NAME,true,callback);System.out.println("开始监听");System.in.read();}//消费者@Testpublic void publish() throws Exception {//1.获取连接对象Connection connection = RabbitMQConnectionUtil.getConnection();//2.创建Channel 通道Channel channel = connection.createChannel();//3.构建队列channel.queueDeclare(QUEUE_NAME,false,false,false,null);//4. 发布消息String message="hello word!";channel.basicPublish("",QUEUE_NAME,null,message.getBytes());System.out.println("消息发送成功");System.in.read();}

4.2Work queues

在这里插入图片描述
一个队列中的消息,只会被一个消费者成功的消费,默认情况下,RabbitMQ的队列会将消息以轮询的方式交给不同的消费者消费,消费者拿到消息后,需要给RabbitMQ一个ack,RabbitMQ认为消费者已经拿到消息了

//消费者@Testpublic void consume() throws Exception {//1.获取连接对象Connection connection = RabbitMQConnectionUtil.getConnection();//2.创建Channel 通道Channel channel = connection.createChannel();//3.构建队列channel.queueDeclare(Publisher.QUEUE_NAME,false,false,false,null);//3.1 设置消息的控制,一次拿几个消息
//#2        channel.basicQos(1);//4. 监听消息DefaultConsumer callback =new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {//模拟业务执行时间Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消息者获取消息"+new String(body,"UTF-8"));
//#1				channel.basicAck(envelope.getDeliveryTag(),false);}};channel.basicConsume(Publisher.QUEUE_NAME,true,callback);
//#1		channel.basicConsume(Publisher.QUEUE_NAME,false,callback);System.out.println("开始监听");System.in.read();//消费者2@Testpublic void consume2() throws Exception {//1.获取连接对象Connection connection = RabbitMQConnectionUtil.getConnection();//2.创建Channel 通道Channel channel = connection.createChannel();//3.构建队列channel.queueDeclare(Publisher.QUEUE_NAME,false,false,false,null);//3.1 设置消息的控制,一次拿几个消息
//#2        channel.basicQos(1);//4. 监听消息 DefaultConsumer callback =new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {//模拟业务执行时间Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消息者获取消息"+new String(body,"UTF-8"));//basicAck(标识,是否批量操作)
//#1				channel.basicAck(envelope.getDeliveryTag(),false);}};channel.basicConsume(Publisher.QUEUE_NAME,true,callback);
//#1		channel.basicConsume(Publisher.QUEUE_NAME,false,callback);System.out.println("开始监听");System.in.read();}
//生产者public static final String QUEUE_NAME="work";@Testpublic void publish() throws Exception {//1.获取连接对象Connection connection = RabbitMQConnectionUtil.getConnection();//2.创建Channel 通道Channel channel = connection.createChannel();//3.构建队列channel.queueDeclare(QUEUE_NAME,false,false,false,null);//4. 发布消息for (int i=0;i<10;i++){String message="hello word!"+i;channel.basicPublish("",QUEUE_NAME,null,message.getBytes());}System.out.println("消息发送成功");System.in.read();}

当两台消费者的消费能力不相同的时候,为了提高效率,就不能以轮询的方式进行分发,而是以消费者消费完成后手动传递ack 的方式进行下一个消息的分发,将== #1 #2 的代码 ==打开即可
操作步骤:

  • 操作#1 让消费者关闭自动ack,并且设置消息的流控,最终实现消费者可以尽可能去多消费消息
  • 操作#2 设置每次拿几个消息

4.3 Publish/Subscribe

自行创建路由器,并绑定队列
在这里插入图片描述
如何构建一个自定义交换机,并指定类型是FANOUT,让交换机和多个Queue绑定到一起

//生产者 
public static final String EXCHANGE_NAME="pubsub";public static final String QUEUE_NAME1="pubsub-one";public static final String QUEUE_NAME2="pubsub-two";@Testpublic void pulish() throws Exception {//1.获取连接对象Connection connection = RabbitMQConnectionUtil.getConnection();//2. 构建ChannalChannel channel = connection.createChannel();//3.构建虚拟机, exchange 交换机名称,type 交换机类型【枚举】channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);//4. 构建队列channel.queueDeclare(QUEUE_NAME1, false, false, false, null);channel.queueDeclare(QUEUE_NAME2, false, false, false, null);//5. 绑定 交换机 和 队列,使用的是FANOUT类型的交换机,绑定方式是直接绑定 ,routingkey 参数随便写什么都可以,channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"");channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"");//6.发消息到交换机channel.basicPublish(EXCHANGE_NAME,"",null,"publish/subscribe!".getBytes());System.out.println("消息成功发送");}

4.4 Routing

DIRECT 类型的交换机,在绑定Exchange和Queue时,需要指定好routingKey同时在发送消息的时候,也指定routingkey,只有在routingkey 一致时,才会把指定的消息路由到指定的队列
在这里插入图片描述

public static final String EXCHANGE_NAME="routing";public static final String QUEUE_NAME1="routing-one";public static final String QUEUE_NAME2="routing-two";@Testpublic void pulish() throws Exception {//1.获取连接对象Connection connection = RabbitMQConnectionUtil.getConnection();//2. 构建ChannalChannel channel = connection.createChannel();//3.构建虚拟机, exchange 交换机名称,type 交换机类型【枚举】 * 交换机类型 改成 BuiltinExchangeType.DIRECTchannel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//4. 构建队列channel.queueDeclare(QUEUE_NAME1, false, false, false, null);channel.queueDeclare(QUEUE_NAME2, false, false, false, null);//5. 绑定 交换机 和 队列,使用的是FANOUT类型的交换机,绑定方式是直接绑定 ,routingkey 参数随便写什么都可以,channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"ORANGE");channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"BLACK");channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"GREEN");//6.发消息到交换机channel.basicPublish(EXCHANGE_NAME,"ORANGE",null,"大橙子!".getBytes());channel.basicPublish(EXCHANGE_NAME,"BLACK",null,"黑布林!".getBytes());channel.basicPublish(EXCHANGE_NAME,"WHITE",null,"小白兔!".getBytes());System.out.println("消息成功发送");}

4.5 Topics

topics 模式支持模糊匹配RoutingKey,就像是sql中的 like子句模糊查询,而路由模式等同于sql中的where子句等值查询

在这里插入图片描述

通过模糊路由到队列。该方式的Routing key必须具有固定格式:以 . 间隔的一串单词,比如:quick.orange.rabbit,Routing key 最多不能超过255byte。

交换机和队列的Binding key用通配符来表示,有两种语法:

  • * 可以替代一个单词;
  • # 可以替代 0 或多个单词;

例如 #.com.#
#可以表示0级或多级。xx.com、com.xx、com、xx.com.xx.xx、xx.xx.com.xx都可以

例如 *.com. *
*表示一级,xx.com.xx 可以 ,com.xx 不可以,前面缺少一级,xx.com.xx.xx不可以,com后面只能有一级xx,最终格式必须是 xx.com.xx

 public static final String EXCHANGE_NAME="topics";public static final String QUEUE_NAME1="topics-one";public static final String QUEUE_NAME2="topics-two";@Testpublic void pulish() throws Exception {//1.获取连接对象Connection connection = RabbitMQConnectionUtil.getConnection();//2. 构建ChannalChannel channel = connection.createChannel();//3.构建虚拟机, exchange 交换机名称,type 交换机类型【枚举】 * 交换机类型 改成 BuiltinExchangeType.TOPICchannel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);//4. 构建队列channel.queueDeclare(QUEUE_NAME1, false, false, false, null);channel.queueDeclare(QUEUE_NAME2, false, false, false, null);//5. 绑定 交换机 和 队列,// Topic类型的交换机,在和队列绑定时,需要以aaa.bbb.ccc 方式编写routingKey// 其中有两个特殊字符: *(相当于占位符),# (相当通配符)channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"*.orange.*");channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"*.*.rabbit");channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"lazy.#");//6.发消息到交换机channel.basicPublish(EXCHANGE_NAME,"big.orange.rabbit",null,"大橙子兔子!".getBytes());//匹配1、2channel.basicPublish(EXCHANGE_NAME,"small.while.rabbit",null,"小兔子!".getBytes());//匹配1、2channel.basicPublish(EXCHANGE_NAME,"lazy.dog.dog.dog.dog",null,"懒狗狗狗狗!".getBytes());//匹配 3System.out.println("消息成功发送");}

4.6 RPC (了解)

因为两个服务在交互时,可以尽量做到Client和server的结偶,通过RabbitMQ进行结藕操作
需要让client 发送消息时,携带两个属性,

  • replyto告知server将相应信息放到哪个队列
  • correlationId告知server 发送相应消息时,需要携带位置标识来告知client响应的消息

在这里插入图片描述

public class Publisher {public static final String QUEUE_PUBLISHER = "rpc_publisher";public static final String QUEUE_CONSUMER = "rpc_consumer";@Testpublic void publish() throws Exception {//1.获取连接对象Connection connection = RabbitMQConnectionUtil.getConnection();//2.创建Channel 通道Channel channel = connection.createChannel();//3.构建队列channel.queueDeclare(QUEUE_PUBLISHER,false,false,false,null);channel.queueDeclare(QUEUE_CONSUMER,false,false,false,null);//4. 发布消息String message="hello rpc!";String uuid = UUID.randomUUID().toString();AMQP.BasicProperties prop =new AMQP.BasicProperties().builder().replyTo(QUEUE_CONSUMER).correlationId(uuid).build();channel.basicPublish("",QUEUE_PUBLISHER,prop,message.getBytes());channel.basicConsume(QUEUE_CONSUMER,false,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {String id = properties.getCorrelationId();if(id!=null && id.equals(uuid)){System.out.println("接收到服务端的响应:"+new String(body));}channel.basicAck(envelope.getDeliveryTag(),false);}});System.out.println("消息发送成功");System.in.read();}
}public class Consumer {public static final String QUEUE_PUBLISHER = "rpc_publisher";public static final String QUEUE_CONSUMER = "rpc_consumer";@Testpublic void consume() throws Exception {//1.获取连接对象Connection connection = RabbitMQConnectionUtil.getConnection();//2.创建Channel 通道Channel channel = connection.createChannel();//3.构建队列channel.queueDeclare(QUEUE_PUBLISHER,false,false,false,null);channel.queueDeclare(QUEUE_CONSUMER,false,false,false,null);//4. 监听消息DefaultConsumer callback =new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消息者获取消息"+new String(body,"UTF-8"));String resp = "获取到client发出的请求,这里是响应的信息";String respQueueName = properties.getReplyTo();String uuid = properties.getCorrelationId();AMQP.BasicProperties prop =new AMQP.BasicProperties().builder().correlationId(uuid).build();channel.basicPublish("",respQueueName,prop,resp.getBytes());channel.basicAck(envelope.getDeliveryTag(),false);}};channel.basicConsume(QUEUE_PUBLISHER,false,callback);System.out.println("开始监听");System.in.read();}
}

五、Springboot 操作RabbitMQ


  • 创建项目
  • 导入依赖
	<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
  • 配置rabbitmq信息
spring:rabbitmq:host: 172.16.177.133password: guestusername: guestport: 5672virtual-host: /

配置类声明队列

@Configuration
public class RabbitMQConfig {public static final String EXCHANGE="boot-exchange";public static final String QUEUE="boot-queue";public static final String ROUTING_KEY="*.black.*";@Beanpublic Exchange bootExchange(){return ExchangeBuilder.topicExchange(EXCHANGE).build();}@Beanpublic Queue bootQueue(){return QueueBuilder.durable(QUEUE).build();}@Beanpublic Binding bootBinding(Exchange bootExchange,Queue bootQueue){return BindingBuilder.bind(bootQueue).to(bootExchange).with(ROUTING_KEY).noargs();}
}

生产者配置

@SpringBootTest
class SpringbootRabbitmqApplicationTests {@AutowiredRabbitTemplate rabbitTemplate;@Testvoid contextLoads() {}@Testpublic void publisher(){rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE,"big.black.dog","message");System.out.println("消息发送成功");}@Testpublic void publiWithProps(){rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE, "big.black.dog", "message", new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setCorrelationId("123");return message;}});System.out.println("消息发送成功2");}
}

消费者配置

@Component
public class ConsumeListener {/**** @param msg* @param channel 前提是配置好spring.rabbitmq.listener.simple.acknowledge-mode: manual #开启手动ack* @param message* @throws IOException*/@RabbitListener(queues = RabbitMQConfig.QUEUE)public void consume(String msg, Channel channel, Message message) throws IOException {System.out.println("队列消息为:"+msg);String correlationId = message.getMessageProperties().getCorrelationId();System.out.println("标识为:"+correlationId);channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}
}
  • 声明交换机&队列

六、RabbitMQ保证消息可靠性

confirm机制
可以通过confirm效果保证消息一定送达到Exchange,官方提供了三种,选择了对于效率影响最低的异步回调的效果

6.1、 保证消息一定送达到Exchange

使用confirm机制

 public static final String QUEUE_NAME="confirms ";@Testpublic void publish() throws Exception {//1.获取连接对象Connection connection = RabbitMQConnectionUtil.getConnection();//2.创建Channel 通道Channel channel = connection.createChannel();//3.构建队列channel.queueDeclare(QUEUE_NAME,false,false,false,null);//3.1 开启confirms 的异步回调channel.confirmSelect();String message="hello word!";//3.2 设置confirms的异步回调channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {System.out.println("消息成功发送到Exchange");}@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {System.out.println("消息没有发送到Exchange,尝试重试,或者保存到数据库做其他补偿操作");}});//3.3 设置return回调,确认消息是否路由到了队列channel.addReturnListener(new ReturnListener() {@Overridepublic void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消息没有到指定队列时,做其他的补偿措施!!");}});//3.4、设置消息持久化AMQP.BasicProperties prop =new AMQP.BasicProperties().builder().deliveryMode(2)//消息持久化.build();//4. 发布消息channel.basicPublish("",QUEUE_NAME,true,prop,message.getBytes());System.out.println("消息发送成功");System.in.read();}

6.2、保证消息可以路由到Queue中

使用return 机制
为了保证Exchange上的消息一定可以送达到Queue

//6.2设置return 回调,确认消息是否路由到了Queue
channel.addReturnListener(new ReturnListener() {@Overridepublic void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消息没有到指定队列时,做其他的补偿措施!!");}});
//7.在发送消息时,将basicPublish方法参数中的mandatory设置为true,即可开启Return机制,当消息没有路由到队列中时,就会执行return	 回调

6.3、保证队列持久化消息

DeliveryMode设置消息持久化

//6.3、设置消息持久化AMQP.BasicProperties prop =new AMQP.BasicProperties().builder().deliveryMode(2)//消息持久化.build();//4. 发布消息channel.basicPublish("",QUEUE_NAME,true,prop,message.getBytes());

6.4、保证消息者可以正常消费消息

详情看WorkQueue模式

6.5 SpringBoot实现上述操作

6.5.1 Confirm

  • 编写配置文件开启Confirm机制

spring:
rabbitmq:
publisher-confirm-type: correlated # 新版本 开启confirm机制
publisher-confirms: true # 老版本

  • 在发送消息时,配置RabbitTemplate
@Testpublic void publishWithConfirms() throws IOException {rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {if(b){System.out.println("消息已经送达到交换机");}else{System.out.println("消息没有送到到Exchange,需要做一些补偿操作!");}}});rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE,"big.black.dog","message");System.out.println("消息发送成功");System.in.read();}

6.5.2 Return

  • 编写配置文件开启Return机制

spring:
rabbitmq:
publisher-returns: true # 开启return机制

  • 在发送消息时,配置RabbitTemplate
@Testpublic void publishWithReturn() throws IOException {rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {@Overridepublic void returnedMessage(Message message, int i, String s, String s1, String s2) {String msg = new String(message.getBody());System.out.println("消息失败:"+msg+"路由队列失败!!做补救操作");}});rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE,"big.black.dog","message");System.out.println("消息发送成功");System.in.read();}

6.5.3 消息持久化

//3.4、设置消息持久化
AMQP.BasicProperties prop =new AMQP.BasicProperties().builder().deliveryMode(2)//消息持久化 #1.build();//4. 发布消息,  将参数mandatory设置为true,即可开启Return机制,当消息没有路由到队列中时,就会执行return回调channel.basicPublish("",QUEUE_NAME,true,prop,message.getBytes());

七、RabbitMQ死信队列 & 延迟交换机

###7.1、 消息被消费者拒绝,requeue设置为false
###7.2.1、发送消息时设置消息的生存时间,如果生存时间到了,还没有被消费。
###7.2.2、 也可以指定某个队列中所有消息的生存时间,如果生存时间到了,还没有被消费
###7.3、队列已经达到消息的最长长度后,再路由过来的消息直接变成死信
在这里插入图片描述

7.4、准备Exchange&Queue

@Configuration
public class DeadLetterConfig {public static final String NORMAL_EXCHANGE="normal-exchange";public static final String NORMAL_QUEUE="normal-queue";public static final String NORMAL_ROUTING_KEY="normal.#";public static final String DEAD_EXCHANGE="dead-exchange";public static final String DEAD_QUEUE="dead-queue";public static final String DEAD_ROUTING_KEY="dead.#";/**普通交换机*/@Beanpublic Exchange normalExchange(){return ExchangeBuilder.topicExchange(NORMAL_EXCHANGE).build();}/**普通队列*/@Beanpublic Queue normalQueue(){return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DEAD_EXCHANGE)//绑定死信队列.build();}/**普通队列绑定路由*/@Beanpublic Binding normalBingding(Queue normalQueue,Exchange normalExchange){return BindingBuilder.bind(normalQueue).to(normalExchange).with(DEAD_ROUTING_KEY).noargs();}/**死信交换机*/@Beanpublic Exchange deadExchange(){return ExchangeBuilder.topicExchange(DEAD_EXCHANGE).build();}/**死信队列*/@Beanpublic Queue deadQueue(){return QueueBuilder.durable(DEAD_QUEUE ).build();}/**绑定死信队列和交换机*/@Beanpublic Binding deadBinding(Queue deadQueue,Exchange deadExchange){return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs();}
}

7.5、实现效果

  • 基于消费者进行reject 或者 nack 实现死信效果
@Component
public class DeadListener {/**** @param msg* @param channel 需要手动启动ACK 才能有效 spring.rabbitmq.listener.simple.acknowledge-mode: manual* @param message 需要手动启动ACK 才能有效 spring.rabbitmq.listener.simple.acknowledge-mode: manual*/@RabbitListener(queues = DeadLetterConfig.NORMAL_QUEUE)public void comsume(String msg, Channel channel, Message message) throws IOException {System.out.println("接收到normal队列的消息:"+msg);
//        channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);}
}
  • 消息的生存时间

八、RabbitMQ的集群

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/70332.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【MyBatisⅡ】动态 SQL

目录 &#x1f392;1 if 标签 &#x1fad6;2 trim 标签 &#x1f460;3 where 标签 &#x1f9ba;4 set 标签 &#x1f3a8;5 foreach 标签 动态 sql 是Mybatis的强⼤特性之⼀&#xff0c;能够完成不同条件下不同的 sql 拼接。 在 xml 里面写判断条件。 动态SQL 在数据库里…

LeetCode 202 快乐数

题目链接 力扣&#xff08;LeetCode&#xff09;官网 - 全球极客挚爱的技术成长平台 法一&#xff1a;哈希 使用哈希表循环判断每次经过平方和的数&#xff0c;如果为1则直接返回true&#xff0c;若之前存在过但不为1则直接返回false 代码 class Solution { public:// 计算…

pytorch中squeeze函数用法

squeeze的中文意思是“挤压”&#xff0c;顾名思义&#xff0c;该函数的作用是压缩维度 squeeze(input, dimNone) -> Tensor input一个高维张量&#xff0c;如果各个维度中存在大小为1的维度&#xff0c;squeeze才起作用&#xff0c;下面举例说明 x torch.arange(6).res…

Http Content-type 对照表

文件扩展名Content-Type(Mime-Type)文件扩展名Content-Type(Mime-Type).*&#xff08; 二进制流&#xff0c;不知道下载文件类型&#xff09;application/octet-stream.tifimage/tiff.001application/x-001.301application/x-301.323text/h323.906application/x-906.907drawing…

MySql 游标 触发器

游标 1.什么是游标 MySQL游标是一种数据库对象&#xff0c;它用于在数据库查询过程中迭代访问结果集中的每一行。游标可以被看作是一个指向查询结果集的指针&#xff0c;通过移动游标&#xff0c;可以按行读取和处理结果集的数据。在MySQL中&#xff0c;游标可以用于在存储过程…

【MySQL基础|第一篇】——谈谈SQL中的DDL语句

个人主页&#xff1a;兜里有颗棉花糖 欢迎 点赞&#x1f44d; 收藏✨ 留言✉ 加关注&#x1f493;本文由 兜里有颗棉花糖 原创 收录于专栏【MySQL学习专栏】&#x1f388; 本专栏旨在分享学习MySQL的一点学习心得&#xff0c;欢迎大家在评论区讨论&#x1f48c; 前言&#xff…

力扣(LeetCode)算法_C++——有效的数独

请你判断一个 9 x 9 的数独是否有效。只需要 根据以下规则 &#xff0c;验证已经填入的数字是否有效即可。 数字 1-9 在每一行只能出现一次。 数字 1-9 在每一列只能出现一次。 数字 1-9 在每一个以粗实线分隔的 3x3 宫内只能出现一次。&#xff08;请参考示例图&#xff09; …

ChatGPT AIGC 完成多维分析雷达图

我们先让ChatGPT来帮我们总结一下多维分析雷达图的功能与作用。 同样ChatGPT AIGC完成的动态雷达图效果如下; 这样的一个多维分析动态雷达图是用HTML,JS,Echarts 来完成的。 将完整代码复制如下: <!DOCTYPE html> <html style="height: 100%"><h…

等保测评 —— 安全控制点

等保测评 —— 安全控制点 技术 第1章 安全物理环境 物理位置的选择 物理访问控制 防盗窃和防破坏 防雷击 防火 防水和防潮 防静电 温湿度控制 电力供应 电磁防护 第2章 安全通信网络 网络架构 通信传输 可信验证 第3章 安全区域边界 访问控制 入侵防范 恶…

Golang综合项目实战(一)

Golang综合项目实战&#xff08;一&#xff09; 01-项目简介02-项目架构、术语、运行结果03-创建并初始化项目04-创建用户模型和错误处理05-创建密码加密工具类06-保存密码之前的hooks07-创建用户名密码验证工具类08-用户数据库操作逻辑09-操作用户service10-创建商品分类模型…

PHP8内置函数中的变量函数-PHP8知识详解

在php8中&#xff0c;与变量相关的内置函数比较多&#xff0c;本文说一些比较重要的、常见的内置函数。今日着重讲解了5个&#xff0c;分别是&#xff1a;检测变量是否为空的函数empty()、判断变量是否定义过的函数isset()、销毁指定的变量的函数unset()、获取变量的类型的函数…

通过Siri打造智能爬虫助手:捕获与解析结构化数据

在信息时代&#xff0c;我们经常需要从互联网上获取大量的结构化数据。然而&#xff0c;传统的网络爬虫往往需要编写复杂代码和规则来实现数据采集和解析。如今&#xff0c;在苹果公司提供的语音助手Siri中有一个强大功能可以帮助我们轻松完成这项任务——通过使用自定义指令、…

ChatGPT在机器人护理和老年人支持中的潜在角色如何?

机器人在护理和老年人支持领域有着巨大的潜力&#xff0c;可以提供多种服务和支持&#xff0c;改善老年人的生活质量&#xff0c;并减轻护理工作者和家庭成员的负担。在这篇文章中&#xff0c;我将探讨机器人在这一领域的潜在角色&#xff0c;包括其应用、优势和挑战。 ## 1. …

MySQL表的CURD

CRUD : Create(创建), Retrieve(读取)&#xff0c;Update(更新)&#xff0c;Delete&#xff08;删除&#xff09; Create 语法 INSERT [INTO] table_name [(column [, column] ...)] VALUES (value_list) [, (value_list)] ... value_list: value, [, value] ... 示例&#x…

鸿鹄工程项目管理系统 Spring Cloud+Spring Boot+前后端分离构建工程项目管理系统

工程项目管理软件&#xff08;工程项目管理系统&#xff09;对建设工程项目管理组织建设、项目策划决策、规划设计、施工建设到竣工交付、总结评估、运维运营&#xff0c;全过程、全方位的对项目进行综合管理 工程项目各模块及其功能点清单 一、系统管理 1、数据字典&am…

React18 新特性

React18 新特性 自动批量更新State 定义 import { useState } from reactconst [x, setX] useState(0)渲染赋值 setX(5)并发CM模式 同步不可中断更新机制 -> 异步可中断并行 状态更新 机制 React18 默认开启并发模式 详见代码 ReactDOM 的引入 import ReactDOM fr…

基于Hata模型的BPSK调制信号小区覆盖模拟matlab完整程序分享

基于Hata信道模型的BPSK调制信号小区覆盖模拟matlab仿真&#xff0c;对比VoIP, Live Video,FTP/Email 完整程序&#xff1a; clc; clear; close all; warning off; addpath(genpath(pwd)); % Random bits are generated here. bits randi([0, 1], [50,1]); M 2; t 1:1:50; …

Lesson5-2:OpenCV视频操作---视频追踪

学习目标 理解meanshift的原理知道camshift算法能够使用meanshift和Camshift进行目标追踪 1.meanshift 1.1原理 m e a n s h i f t meanshift meanshift算法的原理很简单。假设你有一堆点集&#xff0c;还有一个小的窗口&#xff0c;这个窗口可能是圆形的&#xff0c;现在你可…

【算法】堆排序 详解

堆排序 详解 堆排序代码实现 排序&#xff1a; 排序&#xff0c;就是使一串记录&#xff0c;按照其中的某个或某些关键字的大小&#xff0c;递增或递减的排列起来的操作。 稳定性&#xff1a; 假定在待排序的记录序列中&#xff0c;存在多个具有相同的关键字的记录&#xff0c…

leetcode刷题日常

最长递增子序列 最长递增子序列 class Solution {public int lengthOfLIS(int[] nums) {int nnums.length;if(n0) return 0;int[] dp new int[n]; // 记录上一个比当前是第几的元素。dp[0] 1;int ans 1;for(int i1;i<n;i){dp[i] 1;for(int j0;j<i;j){if (nums[j]<…