RabbitMQ保姆级教程

文章目录

  • 前言
  • 一、MQ是什么?
    • 1.1 AMQP
  • 二、在Linux安装RabbitMQ
    • 2.1 安装
    • 2.2 RabbitMQ启动命令
    • 2.3 开启RabbitMQ 后台管理界面
      • 2.3.1 登录rabbitMQ UI界面
    • 2.3 Docker启动RabbitMQ
    • 2.4 常见消息模型
    • 2.5 生产者(Producer) / 消费者(Consumer)
    • 2.6 工作队列模式(Work Queues)
    • 2.7 参数细节
    • 2.8 实现能者多劳
      • 2.8.1 Ack手动应答防止数据丢失和消息拒收后重新发送
      • 2.8.2 预取值
    • 2.9 Publish/Subscribe 发布/订阅
    • 2.10 Routing(路由) - Direct
    • 2.11 Routing(路由)- Topic
  • 三、进阶篇 高级特性
    • 3.1 死信队列
      • 3.1.1 死信队列实战:消息TTL过期
      • 3.1.2 死信队列实战:队列达到最大长度 设置正常队列最大长度
      • 3.1.3 死信队列实战:消息被拒
    • 3.2 基于SpringBoot实现延迟队列
    • 3.3 发布确认 高级特性
      • 3.3.1 可靠性投递confirm模式
      • 3.3.2 可靠性投递return模式
    • 3.4 优先级队列
    • 3.5 消费端限流

前言

提示:RaabitMQ消息队列的学习。


一、MQ是什么?

  • MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统
    之间进行通信。
  • RabbitMQ 是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包
    裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑 RabbitMQ 是
    一个快递站,一个快递员帮你传递快件。RabbitMQ 与快递站的主要区别在于,它不处理快件而是接收,
    存储和转发消息数据。
    在这里插入图片描述
  • 工作原理
    在这里插入图片描述

1.1 AMQP

  • AMQP,即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用
    层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,遵
    循此协议,不收客户端和中间件产品和开发语言限制。2006年,AMQP 规范发布。类比HTTP。

二、在Linux安装RabbitMQ

2.1 安装

	1. 我们把erlang环境与rabbitMQ 安装包解压到Linux2. rpm -ivh erlang安装包3. yum install socat -y 安装依赖 / rpm -ivh socat依赖包 --force --nodeps4. rpm -ivh rabbitmq安装包

2.2 RabbitMQ启动命令

	1. 开启服务 /sbin/service rabbitmq-server start  / service rabbitmq-server start 2. 停止服务 service rabbitmq-server stop 3. 重启服务 service rabbitmq-server restart 

2.3 开启RabbitMQ 后台管理界面

	1.  rabbitmq-plugins enable rabbitmq_management
  • 添加一个新的用户
	1. 创建rabbitMQ账号rabbitmqctl add_user 用户名 密码2. 设置用户角色rabbitmqctl set_user_tags 用户名 administrator #设置用户名为超级管理员3. 设置用户权限rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"4. 查看rabbitmq的用户和角色rabbitmqctl list_users5. 登录rabbitMQ 界面: Linux虚拟机ip:15672 即可

2.3.1 登录rabbitMQ UI界面

记得开放15672端口访问 Linux虚拟机ip:15672 即可

输入账户密码后看到这个界面代表成功
在这里插入图片描述

2.3 Docker启动RabbitMQ

Docker安装

	1. docker pull rabbitmq:3-management2. 开启rabbitMQdocker run \-e RABBITMQ_DEFAULT_USER=root \-e RABBITMQ_DEFAULT_PASS=123456 \--name mq \--hostname mq1 \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq:3-management

2.4 常见消息模型

  • channel:操作MQ的工具
  • exchange:路由消息到队列中
  • queue:缓存消息
  • virtual host:虚拟主机,是对queue、exchange等资源的逻辑分组
    在这里插入图片描述

2.5 生产者(Producer) / 消费者(Consumer)

  • 所需依赖
	<dependencies><!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.7.3</version></dependency><!-- https://mvnrepository.com/artifact/commons-io/commons-io --><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.4</version></dependency></dependencies>
1234567891011121314

在这里插入图片描述

  • 生产者代码
	/*** 生产者:发消息*/
public class Producer {//队列名称public static final String QUEUE_NAME="hello";//发消息public static void main(String[] args) throws Exception{//创建一个连接工厂ConnectionFactory factory = new ConnectionFactory();//工厂IP连接rabbitMQ队列factory.setHost("ip地址");//设置用户名密码factory.setUsername("RabbitMQ登录用户名");factory.setPassword("RabbitMQ登录密码");//创建连接Connection connection = factory.newConnection();//通过连接来获取 信道来发消息Channel channel = connection.createChannel();/*** 生成一个队列* 1.队列名称* 2.队列里面的信息是否持久化 默认false 信息存储在内存中* 3.该列队是否只供一个消费者进行消费,是否进行消息共享*   true:可以多个消费者消费*   false:只能一个消费者消费* 4.是否自动删除,最后一个消费者断开连接后,该队列是否自动删除*   true:自动删除*   false:不自动删除* 5.其他参数*/channel.queueDeclare(QUEUE_NAME,false,false,false,null);//发消息String message="hello rabbitMQ";/*** 发送一个消息* 1.发送到哪个交换机* 2.路由的KEY值是哪个? 指的是本次队列的名称* 3.其他参数信息* 4.发送的消息体*/channel.basicPublish("",QUEUE_NAME,null,message.getBytes());System.out.println("消息发送完毕");channel.close();connection.close();}
}
  • 消费者
	/*** 消费者:接收消息*/
public class Consumer {//队列名称,接收此队列的消息public static final String QUEUE_NAME="hello";public static void main(String[] args) throws Exception{//创建连接工程ConnectionFactory factory = new ConnectionFactory();//工厂IP连接rabbitMQ队列factory.setHost("ip地址");//设置用户名密码factory.setUsername("RabbitMQ登录用户名");factory.setPassword("RabbitMQ登录密码");//创建连接Connection connection = factory.newConnection();//通过连接来获取 信道来收消息Channel channel = connection.createChannel();//声明 接收消息的回调DeliverCallback deliverCallback=(consumerTag, message)-> {//message:包含消息头和消息体,我们只想拿到消息体//若不进行转换,直接输出message我们拿到的则是地址String data = new String(message.getBody());System.out.println(new String(message.getBody()));};//声明 取消消费的回调CancelCallback cancelCallback=consumerTag->{System.out.println("消费消息被中断");};/*** 消费者消费消息* 1.消费哪个队列* 2.消费成功之后是否要自动答应*   true:代表自动应答*   false:手动应答* 3.消费成者成功消费的回调* 4.消费者取消消费的回调*/channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}
}

2.6 工作队列模式(Work Queues)

  • 模式说明
    在这里插入图片描述
  • Work Queues:与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消费,采用的是 轮询机制
  • 应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度
  • 工作模式:生产者
	public class ProducerWorkQueue {//队列名称public static final String QUEUE_NAME="hello";//发消息public static void main(String[] args) throws Exception{//创建一个连接工厂ConnectionFactory factory = new ConnectionFactory();//工厂IP连接rabbitMQ队列factory.setHost("ip地址");//设置用户名密码factory.setUsername("RabbitMQ登录用户名");factory.setPassword("RabbitMQ登录密码");//创建连接Connection connection = factory.newConnection();//通过连接来获取 信道来发消息Channel channel = connection.createChannel();/*** 生成一个队列* 1.队列名称* 2.队列里面的信息是否持久化 默认false 信息存储在内存中* 3.该列队是否只供一个消费者进行消费,是否进行消息共享*   true:可以多个消费者消费*   false:只能一个消费者消费* 4.是否自动删除,最后一个消费者断开连接后,该队列是否自动删除*   true:自动删除*   false:不自动删除* 5.其他参数*/channel.queueDeclare(QUEUE_NAME,false,false,false,null);for (int i = 1; i <= 10; i++) {//发消息String message=i+"hello rabbitMQ";/*** 发送一个消息* 1.发送到哪个交换机* 2.路由的KEY值是哪个? 指的是本次队列的名称* 3.其他参数信息* 4.发送的消息体*/channel.basicPublish("",QUEUE_NAME,null,message.getBytes());System.out.println("消息发送完毕");}channel.close();connection.close();}
}
  • 工作模式:两个消费者
	/*** 消费者:接收消息*/
public class ConsumerWorkQueues1 {//队列名称,接收此队列的消息public static final String QUEUE_NAME="hello";public static void main(String[] args) throws Exception{//创建连接工程ConnectionFactory factory = new ConnectionFactory();//工厂IP连接rabbitMQ队列factory.setHost("ip地址");//设置用户名密码factory.setUsername("RabbitMQ登录用户名");factory.setPassword("RabbitMQ登录密码");//创建连接Connection connection = factory.newConnection();//通过连接来获取 信道来收消息Channel channel = connection.createChannel();//声明 接收消息的回调DeliverCallback deliverCallback=(consumerTag, message)-> {//message:包含消息头和消息体,我们只想拿到消息体//若不进行转换,直接输出message我们拿到的则是地址String data = new String(message.getBody());System.out.println(new String(message.getBody()));};//声明 取消消费的回调CancelCallback cancelCallback=consumerTag->{System.out.println("消费消息被中断");};/*** 消费者消费消息* 1.消费哪个队列* 2.消费成功之后是否要自动答应*   true:代表自动应答*   false:手动应答* 3.消费成者成功消费的回调* 4.消费者取消消费的回调*/channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}
}
	/*** 消费者:接收消息*/
public class ConsumerWorkQueues2 {//队列名称,接收此队列的消息public static final String QUEUE_NAME="hello";public static void main(String[] args) throws Exception{//创建连接工程ConnectionFactory factory = new ConnectionFactory();//工厂IP连接rabbitMQ队列factory.setHost("ip地址");//设置用户名密码factory.setUsername("RabbitMQ登录用户名");factory.setPassword("RabbitMQ登录密码");//创建连接Connection connection = factory.newConnection();//通过连接来获取 信道来收消息Channel channel = connection.createChannel();//声明 接收消息的回调DeliverCallback deliverCallback=(consumerTag, message)-> {//message:包含消息头和消息体,我们只想拿到消息体//若不进行转换,直接输出message我们拿到的则是地址String data = new String(message.getBody());System.out.println(new String(message.getBody()));};//声明 取消消费的回调CancelCallback cancelCallback=consumerTag->{System.out.println("消费消息被中断");};/*** 消费者消费消息* 1.消费哪个队列* 2.消费成功之后是否要自动答应*   true:代表自动应答*   false:手动应答* 3.消费成者成功消费的回调* 4.消费者取消消费的回调*/channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}
}
  • 结果:各执行五次,也验证了 我们上面所说的 轮询机制
    在这里插入图片描述
    在这里插入图片描述
  • 小结:
    一个消息只能有一个接收者,但是可以有多个接收者

2.7 参数细节

  • durable:是否进行持久化,当前队列如果进行持久化,我们重启rabbitMQ后当前队列依旧存在
	//消费者生成的队列channel.queueDeclare(QUEUE_NAME,(durable)true/false,false,false,null);
  • props :队列中的信息是否持久化,若消息持久化,我们重启rabbitMQ后当前队列依旧存在
	//MessageProperties.PERSISTENT_TEXT_PLAIN:将消息进行持久化channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());  
  • autoDelete:是否自动删除,最后一个消费者断开连接后,该队列是否自动删除
	channel.queueDeclare(QUEUE_NAME,false,false,(autoDelete的参数位置)false,null);
  • autoAck:自动应答
	若开启了自动应答,rabbitMQ消息队列分配给消费者10个数据,只要消费者拿到消息队列的数据时,就会告诉消息队列,数据处理完毕。若当我们处理到第5个数据时,消费者出现了宕机,死掉了,则会出现数据丢失
	channel.basicConsume(QUEUE_NAME,(autoAck是否自动应答)false,deliverCallback,cancelCallback);

2.8 实现能者多劳

  • 业务场景:

    当我们的两个消费者执行业务时,a消费者执行速度快,b消费者执行速度慢,我们想让执行速度快的多执行,应当如何实现呢?

    1. 开启不公平分发,能者多劳 channel.basicQos(1); 0:轮询机制 1:能者多劳
    2. 开启手动确认
  • 消费者a

	/*** 消费者:接收消息*/
public class ConsumerWorkQueues1 {//队列名称,接收此队列的消息public static final String QUEUE_NAME="hello";public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登录用户名");factory.setPassword("RabbitMQ登录密码");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//开启不公平分发,能者多劳channel.basicQos(1);DeliverCallback deliverCallback=(consumerTag, message)-> {String data = new String(message.getBody());System.out.println(new String(message.getBody()));//参数1:确认队列中那个具体的消息:// 可以获取消息的id // 消息routingkey// 交换机 exchange// 消息和重传标志//参数2:是否开启多个消息同时确认channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback=consumerTag->{System.out.println("消费消息被中断");};/*** 消费者消费消息* 1.消费哪个队列* 2.消费成功之后是否要自动答应*   true:代表自动应答*   false:手动应答* 3.消费成者成功消费的回调* 4.消费者取消消费的回调*///关闭自动应答 falsechannel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);}
}
  • 消费者b:消费消息时然消费者b休眠100毫秒
public class ConsumerWorkQueues2 {//队列名称,接收此队列的消息public static final String QUEUE_NAME="hello";public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登录用户名");factory.setPassword("RabbitMQ登录密码");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//开启不公平分发,能者多劳channel.basicQos(1);//声明 接收消息的回调DeliverCallback deliverCallback=(consumerTag, message)-> {try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(new String(message.getBody()));//手动确认消息://参数1:确认队列中那个具体的消息 参数2:是否开启多个消息同时确认channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};//声明 取消消费的回调CancelCallback cancelCallback=consumerTag->{System.out.println("消费消息被中断");};/*** 消费者消费消息* 1.消费哪个队列* 2.消费成功之后是否要自动答应*   true:代表自动应答*   false:手动应答* 3.消费成者成功消费的回调* 4.消费者取消消费的回调*///关闭自动应答 falsechannel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);}
  • 执行结果:
    消费者a执行
    在这里插入图片描述
    消费者b执行
    在这里插入图片描述

2.8.1 Ack手动应答防止数据丢失和消息拒收后重新发送

  • 应用场景:两个消费者每次都从队列中来获取消息,若消费者a正常执行,消费者b在执行过程中出现了宕机,挂掉了那么我们未被消费的消息会被重新放回到队列中,防止消息丢失。
  1. 生产者
public class ProducerWorkQueue {//队列名称public static final String QUEUE_NAME="hello";//发消息public static void main(String[] args) throws Exception{//创建一个连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登录用户名");factory.setPassword("RabbitMQ登录密码");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME,false,false,false,null);Scanner scanner = new Scanner(System.in);while (true){String msg = scanner.nextLine();channel.basicPublish("",QUEUE_NAME, null,msg.getBytes());System.out.println("消息发送完毕");}}
}
  1. 消费者a
public class ConsumerWorkQueues1 {//队列名称,接收此队列的消息public static final String QUEUE_NAME="hello";public static void main(String[] args) throws Exception{//创建连接工程ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登录用户名");factory.setPassword("RabbitMQ登录密码");//创建连接Connection connection = factory.newConnection();//通过连接来获取 信道来收消息Channel channel = connection.createChannel();//声明 接收消息的回调DeliverCallback deliverCallback=(consumerTag, message)-> {//message:包含消息头和消息体,我们只想拿到消息体//若不进行转换,直接输出message我们拿到的则是地址String data = new String(message.getBody());System.out.println("消费者1===>"+new String(message.getBody()));try {int i=3/0;//模拟业务发生异常channel.basicAck(message.getEnvelope().getDeliveryTag(),false);}catch (Exception e){System.out.println("拒收消息发生了异常");//拒收消息//参数一:表示投递的消息标签//参数二:是否开启多个消息同时确认//参数三:是否重新给队列发送channel.basicNack(message.getEnvelope().getDeliveryTag(),false,true);}};//声明 取消消费的回调CancelCallback cancelCallback=consumerTag->{System.out.println("消费消息被中断");};/*** 消费者消费消息* 1.消费哪个队列* 2.消费成功之后是否要自动答应*   true:代表自动应答*   false:手动应答* 3.消费成者成功消费的回调* 4.消费者取消消费的回调*///关闭自动应答 falsechannel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);}
}
  1. 消费者b
public class ConsumerWorkQueues2 {//队列名称,接收此队列的消息public static final String QUEUE_NAME="hello";public static void main(String[] args) throws Exception{//创建连接工程ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登录用户名");factory.setPassword("RabbitMQ登录密码");//创建连接Connection connection = factory.newConnection();//通过连接来获取 信道来收消息Channel channel = connection.createChannel();//声明 接收消息的回调DeliverCallback deliverCallback=(consumerTag, message)-> {//message:包含消息头和消息体,我们只想拿到消息体//若不进行转换,直接输出message我们拿到的则是地址System.out.println("睡10秒");try {Thread.sleep(1000*10);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(new String(message.getBody()));//手动确认消息://参数1:确认队列中那个具体的消息 参数2:是否开启多个消息同时确认channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};//声明 取消消费的回调CancelCallback cancelCallback=consumerTag->{System.out.println("消费消息被中断");};/*** 消费者消费消息* 1.消费哪个队列* 2.消费成功之后是否要自动答应*   true:代表自动应答*   false:手动应答* 3.消费成者成功消费的回调* 4.消费者取消消费的回调*///关闭自动应答 falsechannel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);}
}
  • 当消费者b在消费消息时,我们让消费者b睡眠10秒模拟业务流程,在这10秒内我们手动关掉消费者b
  1. 发送 aa 消费者a接收
    在这里插入图片描述

在这里插入图片描述

  1. 发送bb消费者b接收,在消费者b睡眠过程中我们停止消费者b,来看看手动应答的结果
    在这里插入图片描述
    此时我们查看消费者a,出现了本应该是消费者b消费的消息bb
    在这里插入图片描述

2.8.2 预取值

	channel.basicQos(1);  0:轮询机制  1:能者多劳 若值>1代表当前队列的预取值,代表当前队列大概会拿到多少值

2.9 Publish/Subscribe 发布/订阅

在这里插入图片描述

  • 也可以叫 广播模式,当我们的P消费者发送了消息,交给了X(交换机),所有绑定了这个X(交换机)的队列都可以接收到P消费者发送的消息
  • 代码实现生产者
	public class Provider {public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登录用户名");factory.setPassword("RabbitMQ登录密码");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//将通道声明指定交换机,   参数一:交换机名称  参数二:交换机类型 fanout广播类型	//参数2:交换机类型也可使用  BuiltinExchangeType. 的方式来查看选择channel.exchangeDeclare("order", "fanout");channel.basicPublish("order","",null,"fanout type message".getBytes());channel.close();connection.close();}
}
  • 代码实现消费者
	public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登录用户名");factory.setPassword("RabbitMQ登录密码");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//通道绑定交换机channel.exchangeDeclare("order","fanout");//获取临时队列名称String queueName = channel.queueDeclare().getQueue();//绑定交换机和队列channel.queueBind(queueName,"order","");channel.basicConsume(queueName,true,(consumerTag,message)->{System.out.println("消费者1===>"+new String(message.getBody()));},consumerTag -> System.out.println("取消消费消息"));}
}

2.10 Routing(路由) - Direct

在这里插入图片描述

routing值订阅模型-Direct(直连)

  • 在上面广播模式中,一条消息,会被所有订阅的队列都消费。但是在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange

    在Direct模型下:

    1. 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
    2. 消息的发送方在Exchange发送消息时,也必须指定消息的RoutingKey
    3. Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的RoutingKey与消息的Routing Key完全一致,才会接受到消息
  • 生产者

public class Provider {public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登录用户名");factory.setPassword("RabbitMQ登录密码");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//通过信道声明交换机, 参数一:交换机名称  参数二:direct 路由模式channel.exchangeDeclare("logsExchange","direct");//发送消息 参数一:发送信息到的交换机名称//       参数二:绑定路由 发送给队列的那个路由key,//只有当队列的路由key与交换机的路由key相对应时,队列才会接受到消息channel.basicPublish("logsExchange","msgRouting",null,"routing logs direct info 发送了消息".getBytes());channel.close();connection.close();}
}
  • 消费者
public class Consumer1 {public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登录用户名");factory.setPassword("RabbitMQ登录密码");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("logs","direct");//获取临时队列名String queueName = channel.queueDeclare().getQueue();//绑定队列:参数一:临时队列名称 参数二:绑定的交换机名称 参数三:路由key,若消费者的路由key与生产者的路由key相同则可以收到消息channel.queueBind(queueName,"logsExchange","infoRouting");channel.queueBind(queueName,"logsExchange","msgRouting");channel.basicConsume(queueName,true,(consumerTag, message) -> System.out.println(new String(message.getBody())),consumerTag -> System.out.println(1));}
}
  • 消费者2
public class Consumer2 {public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登录用户名");factory.setPassword("RabbitMQ登录密码");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("logs","direct");//获取临时队列名String queueName = channel.queueDeclare().getQueue();channel.queueBind(queueName,"logs","error");channel.queueBind(queueName,"logs","msg");channel.basicConsume(queueName,true,(consumerTag, message) -> System.out.println(new String(message.getBody())),consumerTag -> System.out.println(1));}
}

2.11 Routing(路由)- Topic

在这里插入图片描述

  • Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。
  • 只不过Topic类型Exchange可以让队列在绑定RoutingKey的时候使用通配符!
	#通配符* (star) can substitute for exactly one word :匹配一个词# (hash) can substitute for zero or more words :匹配一个或多个词
  • 生产者
public class Provider {public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登录用户名");factory.setPassword("RabbitMQ登录密码");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//通过信道声明交换机, 参数一:交换机名称  参数二:topic 动态路由channel.exchangeDeclare("order","topic");String routingKey="user.order";//发送消息 参数一:发送信息到的交换机名称  参数二:绑定路由 发送给队列的那个路由keychannel.basicPublish("order",routingKey,null,("routing logs topic发送了消息"+routingKey).getBytes());channel.close();connection.close();}
}
  • 消费者1
public class Consumer1 {public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登录用户名");factory.setPassword("RabbitMQ登录密码");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("order","topic");//获取临时队列名String queueName = channel.queueDeclare().getQueue();//绑定队列:参数一:临时队列名称 参数二:绑定的交换机名称 参数三:动态通配符路由keychannel.queueBind(queueName,"order","user.*");channel.basicConsume(queueName,true,(consumerTag, message) -> System.out.println(new String(message.getBody())),consumerTag -> System.out.println(1));}
}
  • 消费者2
public class Consumer2 {public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登录用户名");factory.setPassword("RabbitMQ登录密码");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("order","topic");//获取临时队列名String queueName = channel.queueDeclare().getQueue();//绑定队列:参数一:临时队列名称 参数二:绑定的交换机名称 参数三:动态通配符路由keychannel.queueBind(queueName,"order","user.#");channel.basicConsume(queueName,true,(consumerTag, message) -> System.out.println(new String(message.getBody())),consumerTag -> System.out.println(1));}
}

三、进阶篇 高级特性

3.1 死信队列

	死信,顾名思义就是无法被消费的信息,字面意思可以这样理解,一般来说,producer将消息投递到queue里,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,自然就有了死信队列
  • 应用场景
	为了保证订单业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。比如说:用户在商城下单成功并点击去支付后在指定时间未支付时自动失效
  • 生产者:给正产的消息队列发送消息,并且设置消息过期时间为10S,超过10S消息未被消费,则消息进入死信队列
public class TTLProvider {//普通交换机名称public static final String NORMAL_EXCHANGE="normal_exchange";public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip");factory.setUsername("账户");factory.setPassword("密码");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//发送死信 设置TTL过期时间AMQP.BasicProperties properties=new AMQP.BasicProperties().builder().expiration("10000").build();for (int i = 1; i <= 10; i++) {String msg=""+i;channel.basicPublish(NORMAL_EXCHANGE,"normal",properties,msg.getBytes());}System.out.println("结束发送");}
}
  • 正常队列消费者
public class TTLConsumer1 {//普通交换机名称public static final String NORMAL_EXCHANGE="normal_exchange";//死信交换机名称public static final String DEAD_EXCHANGE="dead_exchange";//普通队列名称public static final String NORMAL_QUEUE="normal_queue";//死信队列名称public static final String DEAD_QUEUE="dead_queue";public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip");factory.setUsername("账户");factory.setPassword("密码");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//声明普通交换机和死信交换机channel.exchangeDeclare(NORMAL_EXCHANGE,"direct");channel.exchangeDeclare(DEAD_EXCHANGE,"direct");//声明普通队列HashMap<String, Object> map = new HashMap<>();//当消息被拒绝接受/未被消费 会将消息转发到死信队列//正常队列设置死信交换机map.put("x-dead-letter-exchange",DEAD_EXCHANGE);//设置死信队列的routingKeymap.put("x-dead-letter-routing-key","dead");channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);//声明死信队列channel.queueDeclare(DEAD_QUEUE,false,false,false,null);//绑定普通交换机与普通队列channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"normal");//绑定死信交换机与死信队列channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"dead");DeliverCallback deliverCallback=( consumerTag, message)->{System.out.println("Consumer1接收消息===>"+new String(message.getBody(),"UTF-8"));};CancelCallback cancelCallback=(consumerTag)-> System.out.println(consumerTag);channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,cancelCallback);}
}
  • 死信队列消费者
public class TTLConsumer2 {//死信队列名称public static final String DEAD_QUEUE="dead_queue";public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip");factory.setUsername("账户");factory.setPassword("密码");Connection connection = factory.newConnection();Channel channel = connection.createChannel();DeliverCallback deliverCallback=( consumerTag, message)->{System.out.println("Consumer1接收消息===>"+new String(message.getBody(),"UTF-8"));};CancelCallback cancelCallback=(consumerTag)-> System.out.println(consumerTag);channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);}
}
  • 结果:当设置了死信队列,和TTL过期时间,若超过了过期时间消息未被消费,则消息会转发到死信队列
    死信队列产生三大原因
  • 消息被拒接
  • 消息TTL过期
  • 队列达到最大长度

3.1.1 死信队列实战:消息TTL过期

在这里插入图片描述

  • 配置类
@Configuration
public class RabbitMQConfiguration {//普通交换机public static final String X_EXCHANGE="X";//死信交换机public static final String Y_DEAD_LETTER_EXCHANGE="Y";//普通队列public static final String QUEUE_A="QA";public static final String QUEUE_B="QB";//死信队列public static final String DEAD_QUEUE_D="QD";//声明普通x交换机@Beanpublic DirectExchange xExchange(){return new DirectExchange(X_EXCHANGE);}//声明死信交换机@Beanpublic DirectExchange yExchange(){return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);}//声明普通队列A TTL:10S@Beanpublic Queue queueA(){Map<String,Object> arg=new HashMap<>();//设置死信交换机arg.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//设置死信routingKeyarg.put("x-dead-letter-routing-key","YD");//设置TTL过期时间arg.put("x-message-ttl",10000);return QueueBuilder.durable(QUEUE_A).withArguments(arg).build();}//声明普通队列B TTL:40S@Beanpublic Queue queueB(){Map<String,Object> arg=new HashMap<>();//设置死信交换机arg.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//设置死信routingKeyarg.put("x-dead-letter-routing-key","YD");//设置TTL过期时间arg.put("x-message-ttl",40000);return QueueBuilder.durable(QUEUE_B).withArguments(arg).build();}//死信队列@Beanpublic Queue queueD(){return QueueBuilder.durable(DEAD_QUEUE_D).build();}@Beanpublic Binding queueABindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueA).to(xExchange).with("XA");}@Beanpublic Binding queueBBindingX(@Qualifier("queueB") Queue queueB,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueB).to(xExchange).with("XB");}@Beanpublic Binding queueDBindingY(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange") DirectExchange yExchange){return BindingBuilder.bind(queueD).to(yExchange).with("YD");}
}
  • TTL生产者
@RestController
@RequestMapping("/ttl")
@Slf4j
public class TTLProvider {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/{msg}")public void sendMsg(@PathVariable("msg") String msg){log.info("当前发送时间:{}发送了一条消息",new Date().toString());rabbitTemplate.convertAndSend("X","XA","TTL消息延迟为10S,消息为===>"+msg);rabbitTemplate.convertAndSend("X","XB","TTL消息延迟为40S,消息为===>"+msg);}
}
  • 死信队列消费者
@Component
@Slf4j
public class DeadLetterConsumer {@RabbitListener(queues = "QD")public void t1(Message message, Channel channel)throws Exception{log.info("收到死信队列的消息{},时间为{}",new String(message.getBody(),"UTF-8"),new Date().toString());}
}
  • 死信队列-TTL过期时间测试结果
    在这里插入图片描述

3.1.2 死信队列实战:队列达到最大长度 设置正常队列最大长度

  1. 生产者
public class Producer {//普通交换机名称public static final String NORMAL_EXCHANGE="normal_exchange";public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登录用户名");factory.setPassword("RabbitMQ登录密码");Connection connection = factory.newConnection();Channel channel = connection.createChannel();for (int i = 1; i <= 10; i++) {String msg=""+i;channel.basicPublish(NORMAL_EXCHANGE,"normal",null,msg.getBytes());}}
}
  1. 消费者a
    //设置当前正常队列的长度限制超过长度,后面的消息会进入到死信队列
    map.put(“x-max-length”,6);
public class Consumer01 {//普通交换机名称public static final String NORMAL_EXCHANGE="normal_exchange";//死信交换机名称public static final String DEAD_EXCHANGE="dead_exchange";//普通队列名称public static final String NORMAL_QUEUE="normal_queue";//死信队列名称public static final String DEAD_QUEUE="dead_queue";public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登录用户名");factory.setPassword("RabbitMQ登录密码");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//声明普通交换机和死信交换机channel.exchangeDeclare(NORMAL_EXCHANGE,"direct");channel.exchangeDeclare(DEAD_EXCHANGE,"direct");//声明普通队列HashMap<String, Object> map = new HashMap<>();//正常队列设置死信交换机map.put("x-dead-letter-exchange",DEAD_EXCHANGE);//设置死信队列的routingKeymap.put("x-dead-letter-routing-key","dead");//设置当前正常队列的长度限制超过长度,后面的消息会进入到死信队列map.put("x-max-length",6);channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);//声明死信队列channel.queueDeclare(DEAD_QUEUE,false,false,false,null);//绑定普通交换机与普通队列channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"normal");//绑定死信交换机与死信队列channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"dead");DeliverCallback deliverCallback=( consumerTag, message)->{System.out.println("Consumer1接收消息===>"+new String(message.getBody(),"UTF-8"));};CancelCallback cancelCallback=(consumerTag)-> System.out.println(consumerTag);channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,cancelCallback);}
}
  1. 消费者b
public class Consumer02 {//死信队列名称public static final String DEAD_QUEUE="dead_queue";public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登录用户名");factory.setPassword("RabbitMQ登录密码");Connection connection = factory.newConnection();Channel channel = connection.createChannel();DeliverCallback deliverCallback=( consumerTag, message)->{System.out.println("Consumer1接收消息===>"+new String(message.getBody(),"UTF-8"));};CancelCallback cancelCallback=(consumerTag)-> System.out.println(consumerTag);channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);}
}

3.1.3 死信队列实战:消息被拒

  1. 生产者
public class Producer {//普通交换机名称public static final String NORMAL_EXCHANGE="normal_exchange";public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登录用户名");factory.setPassword("RabbitMQ登录密码");Connection connection = factory.newConnection();Channel channel = connection.createChannel();for (int i = 1; i <= 10; i++) {String msg="info"+i;channel.basicPublish(NORMAL_EXCHANGE,"normal",null,msg.getBytes());}}
}
  1. 消费者a
  • 此消息被拒接,是否重新放回正常队列, false:不放回 则会放到死信队列
  • 1.channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
  • 2.并且开启手动应答
public class Consumer01 {//普通交换机名称public static final String NORMAL_EXCHANGE="normal_exchange";//死信交换机名称public static final String DEAD_EXCHANGE="dead_exchange";//普通队列名称public static final String NORMAL_QUEUE="normal_queue";//死信队列名称public static final String DEAD_QUEUE="dead_queue";public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip");factory.setUsername("登录账户");factory.setPassword("登录密码");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//声明普通交换机和死信交换机channel.exchangeDeclare(NORMAL_EXCHANGE,"direct");channel.exchangeDeclare(DEAD_EXCHANGE,"direct");//声明普通队列HashMap<String, Object> map = new HashMap<>();//正常队列设置死信交换机map.put("x-dead-letter-exchange",DEAD_EXCHANGE);//设置死信队列的routingKeymap.put("x-dead-letter-routing-key","dead");channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);//声明死信队列channel.queueDeclare(DEAD_QUEUE,false,false,false,null);//绑定普通交换机与普通队列channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"normal");//绑定死信交换机与死信队列channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"dead");DeliverCallback deliverCallback=( consumerTag, message)->{String msg=new String(message.getBody());if("info5".equals(msg)){System.out.println("Consumer1接收消息===>"+msg+"此消息被拒绝");//此消息被拒接,是否重新放回正常队列, false:不放回 则会放到死信队列channel.basicReject(message.getEnvelope().getDeliveryTag(),false);}else {System.out.println("Consumer1接收消息===>"+new String(message.getBody(),"UTF-8"));channel.basicAck(message.getEnvelope().getDeliveryTag(),false);}};CancelCallback cancelCallback=(consumerTag)-> System.out.println(consumerTag);//开启手动应答channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,cancelCallback);}
}
  1. 消费者b
public class Consumer02 {//死信队列名称public static final String DEAD_QUEUE="dead_queue";public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登录用户名");factory.setPassword("RabbitMQ登录密码");Connection connection = factory.newConnection();Channel channel = connection.createChannel();DeliverCallback deliverCallback=( consumerTag, message)->{System.out.println("Consumer1接收消息===>"+new String(message.getBody(),"UTF-8"));};CancelCallback cancelCallback=(consumerTag)-> System.out.println(consumerTag);channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);}
}

3.2 基于SpringBoot实现延迟队列

在这里插入图片描述

  1. 配置队列交换机
@Configuration
public class QueueConfig {@Bean("exchange")public DirectExchange exchange(){return new DirectExchange("msg");}@Bean("simpleQue")public Queue simpleQue(){HashMap<String, Object> map = new HashMap<>();//设置死信交换机map.put("x-dead-letter-exchange","dead");//设置死信路由map.put("x-dead-letter-routing-key","deadKey");//消息失效时间map.put("x-message-ttl",10000);return new Queue("simple",false,false,false,map);}@Beanpublic Binding simpleQueueBandingExchange(@Qualifier("simpleQue") Queue simple,@Qualifier("exchange") DirectExchange msg)throws Exception{return BindingBuilder.bind(simple).to(msg).with("info");}@Bean("deadExchange")public DirectExchange exchange1(){return new DirectExchange("dead");}@Bean("deadQueue")public Queue deadQ(){return new Queue("deadQue",false,false,false,null);}@Beanpublic Binding deadKeyBindingDeadExchange(@Qualifier("deadQueue")Queue queue,@Qualifier("deadExchange")DirectExchange dead){//绑定死信队列到死信交换机通过路由return BindingBuilder.bind(queue).to(dead).with("deadKey");}
}
  1. 生产者
@RestController
public class Provider {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/ttl/{message}")public void t1(@PathVariable String message){String queueName="simple";Date date = new Date();System.out.println(date);rabbitTemplate.convertAndSend("msg","info",message);}
}
  1. 消费者
@Component
public class Consumer {@RabbitListener(queues = "deadQue")public void hello(Message msg, Channel channel)throws Exception{System.out.println("接收到消息"+new String(msg.getBody()));Date date1 = new Date();System.out.println(date1);}
}
  • 我们看到消息每隔十秒更新一次
    在这里插入图片描述

3.3 发布确认 高级特性

3.3.1 可靠性投递confirm模式

  • 场景:在生产环境中由于一些不明原因,导致rabbitmq重启,在rabbitmq重启期间的生产者消息投递失败,导致消息丢失,需要手动处理和恢复。-可靠性投递confirm模式
  • 需要在application核心配置文件中设置发布确认类型
  • spring-rabbitmq-publisher-confirm-type: correlated
  • 类型1:none:禁用发布确认模式,是默认值
  • 类型2:correlated:发布消息成功到交换机后出发回调方法
  • 类型3:simple:和correlated效果一样,但是如果回调返回的是false,会关闭信道,接下来无法发送消息
  1. 配置类
@Component
public class confirmConfig {public static final String CONFIRM_EXCHANGE_NAME="confirm.exchange";public static final String CONFIRM_QUEUE="confirm.queue";public static final String CONFIRM_ROUTING_KEY="confirm";@Bean("confirmExchange")public DirectExchange confirmExchange(){return new DirectExchange(CONFIRM_EXCHANGE_NAME);}@Bean("confirmQueue")public Queue confirmQueue(){return new Queue(CONFIRM_QUEUE);}@Beanpublic Binding confirmQueueBindingConfirmExchange(@Qualifier("confirmExchange")DirectExchange confirmExchange,@Qualifier("confirmQueue")Queue confirmQueue){return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);}
}
  • 当生产者发送给交换机消息时,交换机的名字错了,或者交换机挂掉了,会导致消息的丢失,那么我们需要实现回调接口,当交换机收到消息后会给生产者发送回调消息
  1. 实现回调接口:实现 RabbitTemplate.ConfirmCallback接口的confirm方法并且将其注入到rabbit模板的内部类中
@Component
@Slf4j
public class ConfirmCallback implements RabbitTemplate.ConfirmCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;//注入@PostConstruct //当所有注解执行完后,再执行这个注解public void init(){rabbitTemplate.setConfirmCallback(this);}/*** 交换机确认回调方法*  发消息,交换机接收到了,回调*  参数*      1. correlationData:保存消息的ID及相关信息,这个消息是我们生产者手动传入的*      2. 交换机收到消息 true*      3. null*//*** 交换机确认回调方法*  发消息,交换机接收失败,回调*  参数*      1. correlationData:保存消息的ID及相关信息*      2. 交换机收到消息 false*      3. cause:失败的原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {String id=correlationData!=null?correlationData.getId():"";if(b){log.info("交换机已经收到了ID为{}的消息",id);}else {log.info("交换机为收到了ID为{}的消息,原因是:{}",id,s);}}
}
  1. 生产者
@RestController
public class ConfirmProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/sendMsg/{msg}")public void t1(@PathVariable String msg){CorrelationData correlationData = new CorrelationData();correlationData.setId("1");rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,ConfirmConfig.CONFIRM_ROUTING_KEY,"嘿嘿嘿".getBytes(),correlationData);}
}
  1. 消费者
@Component
public class ConfirmConsumer {@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE)public void consumer(Message message){System.out.println("高级特性确认发布消费者收到了消息===>"+new String(message.getBody()));}
}
  • 测试:当我们正常发送消息
    在这里插入图片描述
  • 测试:当我们把交换机名字换掉
    在这里插入图片描述

3.3.2 可靠性投递return模式

  • 场景:若交换机收到消息,队列没有收到消息,应该如何解决?
  • 需要在application核心配置文件中设置是否回退消息,当消息路由不到消费者
  • spring-rabbitmq-publisher-returns=true 开启回退消息
@Component
@Slf4j
public class ConfirmCallback implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback{@Autowiredprivate RabbitTemplate rabbitTemplate;//注入@PostConstruct //当所有注解执行完后,再执行这个注解public void init(){rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnCallback(this);}/*** 交换机确认回调方法*  发消息,交换机接收到了,回调*  参数*      1. correlationData:保存消息的ID及相关信息*      2. 交换机收到消息 true*      3. null*//*** 交换机确认回调方法*  发消息,交换机接收失败,回调*  参数*      1. correlationData:保存消息的ID及相关信息*      2. 交换机收到消息 false*      3. cause:失败的原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {String id=correlationData!=null?correlationData.getId():"";if(b){log.info("交换机已经收到了ID为{}的消息",id);}else {log.info("交换机未收到了ID为{}的消息,原因是:{}",id,s);}}/*** 消息传递过程中 不可达 消费者的队列时将消息返回给生产者* 只有当消息 不可达 目的地的时候 才进行回调* 参数1:消息体* 参数2:回复代码* 参数3:回复原因* 参数4:交换机* 参数5:路由key*/@Overridepublic void returnedMessage(Message message, int i, String s, String s1, String s2) {log.info("消息{},被交换机{}退回,原因是{},路由是{}",new String(message.getBody()),s1,s,s2);}}

3.4 优先级队列

  • 优先级越高,消息先被消费者消费
  • 官方设置最大优先级 0-255 超出优先级则报错 自己使用时数字不必设置很大,会浪费CPU效率
  1. 生产者
public class PriorityProducer {public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登录用户名");factory.setPassword("RabbitMQ登录密码");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//设置优先级参数AMQP.BasicProperties build = new AMQP.BasicProperties.Builder().priority(10).build();for (int i = 1; i <= 10; i++) {String msg="info"+i;if(i==5){channel.basicPublish("","hi",build,msg.getBytes());}else {channel.basicPublish("","hi",null,msg.getBytes());}}}
}
  1. 消费者
public class PriorityConsumer {public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登录用户名");factory.setPassword("RabbitMQ登录密码");Connection connection = factory.newConnection();Channel channel = connection.createChannel();HashMap<String, Object> map = new HashMap<>();//设置当前队列为优先级队列map.put("x-max-priority",10);channel.queueDeclare("hi",false,false,false,map);channel.basicConsume("hi",true,(consumerTag,message)->{System.out.println("优先级队列接收消息顺序===>"+new String(message.getBody()));},(consumerTag) -> System.out.println("取消回调"));}
}
  • 测试结果:我们定义的是消息5优先级最高,其他消息为默认优先级
    在这里插入图片描述

3.5 消费端限流

  • 参数一:prefetchSize:预先载入的大小 0表示不限制大小
  • 参数二:prefetchCount:预先载入的消息条数
  • 参数三:global:false
  • 注意:autoAck手动应答一定要为false
	//设置每次确定一个消息channel.basicQos(0,1,false);
12
  • 生产者
public class AckProvider {//队列名称public static final String QUEUE_NAME="hello_Ack";//发消息public static void main(String[] args) throws Exception{//创建一个连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip");factory.setUsername("用户");factory.setPassword("密码");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME,false,false,false,null);Scanner scanner = new Scanner(System.in);while (true){String msg = scanner.nextLine();channel.basicPublish("",QUEUE_NAME, null,msg.getBytes());System.out.println("消息发送完毕");}}
}
  • 消费者
public class AckConsumer2 {//队列名称,接收此队列的消息public static final String QUEUE_NAME="hello_Ack";public static void main(String[] args) throws Exception{//创建连接工程ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip");factory.setUsername("用户");factory.setPassword("密码");//创建连接Connection connection = factory.newConnection();//通过连接来获取 信道来收消息Channel channel = connection.createChannel();//声明 接收消息的回调DeliverCallback deliverCallback=(consumerTag, message)-> {//message:包含消息头和消息体,我们只想拿到消息体//若不进行转换,直接输出message我们拿到的则是地址System.out.println(new String(message.getBody()));try {Thread.sleep(1000*5);} catch (InterruptedException e) {e.printStackTrace();}//手动确认消息://参数1:确认队列中那个具体的消息 参数2:是否开启多个消息同时确认channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};//声明 取消消费的回调CancelCallback cancelCallback=consumerTag->{System.out.println("消费消息被中断");};//每次只消费一个channel.basicQos(0,1,false);/*** 消费者消费消息* 1.消费哪个队列* 2.消费成功之后是否要自动答应*   true:代表自动应答*   false:手动应答* 3.消费成者成功消费的回调* 4.消费者取消消费的回调*///关闭自动应答 falsechannel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/556879.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

spring mysql 连接池配置_SpringBoot数据库连接池常用配置

关注公众号&#xff1a;程序猿王国 持续更新&#xff0c;每日分享在配置文件中添加配置如下(我使用的是多数据源)&#xff1a;spring.datasource.primary.urljdbc\:mysql\://localhost\:3306/test?useUnicode\true&characterEncoding\utf-8spring.datasource.prim…

交换机到底是啥?

1、交换机 交换机个人资料&#xff1a; 1.证件照 注&#xff1a;copy 百度百科 我的博客/交换机到底是啥&#xff1f; 这东西乍一看有点像月光宝盒&#xff0c;是不是。 2.个人简介 交换机&#xff08;Switch&#xff09;意为“开关”是一种用于电&#xff08;光&#xff0…

unity3d 动态合批设置_Unity3D SkinnedMeshRenderer合批优化

最近做了性能优化相关的工作&#xff0c;其中一些是关于战斗模块的渲染的。主要是对场景中使用的基于SkinnedMeshRenderer的网格进行了一些合批优化(降DC)&#xff0c;记录如下。项目使用的Unity版本为5.6.4p1。游戏中的战斗模块是这样的&#xff1a;战斗逻辑由服务器承担&…

java类的加载顺序_java类加载先后顺序

这里讲的不是类加载机制,是类的加载先后顺序。话不多说了&#xff0c;先设定以下场景:package com.jingdong;public class A {public static void main(String[] args){System.out.println(Ib.b);B bnew B();b.ibTest();}}public class B implements Ib{private D d;private C …

MyBatis:模糊查询的4种实现方式

1、根据姓名模糊查询员工信息 1.1、方式一 步骤一&#xff1a;编写配置文件 步骤二&#xff1a;测试 步骤三&#xff1a;分析 此种方式需要在调用处手动的去添加“%”通配符。 1.2、方式二 说明&#xff1a; 使用方式一可以实现模糊查询&#xff0c;但是有一点不方便的地…

java 阻塞 socket_java socket非阻塞I/O

1 非阻塞(Nonblocking)体系结构在这一部分&#xff0c;我将从理论的角度来解释非阻塞体系的结构及其工作原理。这部“喜剧”(当然&#xff0c;如果你喜欢的话也可以称做戏剧)的“人物”如下&#xff1a;●服务器端&#xff1a;接收请求的应用程序。●客户端&#xff1a;向…

java mod函数的使用方法_java 数学计算的具体使用

java.lang.Math 库提供了常用的数学计算工具常量final double E 2.7182818284590452354; // 自然对数底数final double PI 3.14159265358979323846; // 圆周率final double DEGREES_TO_RADIANS 0.017453292519943295; // 角度转弧度final double RADIANS_TO_DEGREES 57.295…

java的debug模式_java第六章:debug模式介绍及大量实例练习

1.Debug模式1.1什么是Debug模式【理解】是供程序员使用的程序调试工具&#xff0c;它可以用于查看程序的执行流程&#xff0c;也可以用于追踪程序执行过程来调试程序。1.2Debug模式操作流程【应用】如何加断点选择要设置断点的代码行&#xff0c;在行号的区域后面单击鼠标左键即…

注解RequestMapping中的URI路径最前面到底需不需要加斜线?

注解RequestMapping中的URI路径最前面到底需不需要加斜线? 您有没有这样的困惑&#xff1a;在协同开发过程中&#xff0c;使用RequestMapping&#xff0c;或者是GetMapping&#xff0c;或者是PostMapping这类注解时&#xff0c;有的程序员加了斜线&#xff0c;有的程序员没有…

java ajax jquery分页插件_jquery ajax分页插件的简单实现

说到基于jQuery的ajax分页插件&#xff0c;那我们就先看看主要的代码结构&#xff1a;1、首先定义一个pager对象&#xff1a;var sjPager window.sjPager {opts: {//默认属性pageSize: 10,preText: "pre",nextText: "next",firstText: "First"…

java thrift连接池_由浅入深了解Thrift之客户端连接池化

一、问题描述在上一篇《由浅入深了解Thrift之服务模型和序列化机制》文章中&#xff0c;我们已经了解了thrift的基本架构和网络服务模型的优缺点。如今的互联网圈中&#xff0c;RPC服务化的思想如火如荼。我们又该如何将thrift服务化应用到我们的项目中哪&#xff1f;实现thrif…

Vue 进阶组件实战应用 -- 父子组件传值的应用实例(子父组件传值的两种触发方式)

基础的子组件和父组件通信已经搞定了&#xff0c;可以看此博客 父子组件传值基础应用 需求 现在需求是在一个父页面引用子组件&#xff0c;不只是要实现基本的父子组件传值。并且子组件给父组件传值的触发条件要在父页面触发。 目前小编采用的方式是使用ref 属性this.emit 方法…

学习Spring Boot:(一)入门

微服务 微服务其实是服务化思路的一种最佳实践方向&#xff0c;遵循SOA&#xff08;面向服务的架构&#xff09;的思路&#xff0c;各个企业在服务化治理上面的道路已经走得很远了&#xff0c;整个软件交付链上各个环节的基础设施逐渐成熟了&#xff0c;微服务就诞生了。 微服务…

学习Spring Boot:(二)启动原理

前言 主要了解前面的程序入口 SpringBootApplication 这个注解的结构。 正文 参考《SpringBoot揭秘 快速构建微服务体系》第三章的学习&#xff0c;总结下。 SpringBootApplication背后的秘密 Target(ElementType.TYPE) Retention(RetentionPolicy.RUNTIME) Documented In…

学习Spring Boot:(四)应用日志

前言 应用日志是一个系统非常重要的一部分&#xff0c;后来不管是开发还是线上&#xff0c;日志都起到至关重要的作用。这次使用的是 Logback 日志框架。 正文 Spring Boot在所有内部日志中使用Commons Logging&#xff0c;但是默认配置也提供了对常用日志的支持&#xff0c…

学习Spring Boot:(五)使用 devtools热部署

前言 spring-boot-devtools 是一个为开发者服务的一个模块&#xff0c;其中最重要的功能就是自动应用代码更改到最新的App上面去。原理是在发现代码有更改之后&#xff0c;重新启动应用&#xff0c;但是比速度比手动停止后再启动还要更快&#xff0c;更快指的不是节省出来的手工…

学习Spring Boot:(六) 集成Swagger2

前言 Swagger是用来描述和文档化RESTful API的一个项目。Swagger Spec是一套规范&#xff0c;定义了该如何去描述一个RESTful API。类似的项目还有RAML、API Blueprint。 根据Swagger Spec来描述RESTful API的文件称之为Swagger specification file&#xff0c;它使用JSON来表…

java的队列实现方法_Java实现队列的三种方法集合

数组实现队列//数组实现队列class queue{int[] a new int[5];int i 0;//入队操作public void in(int m) {a[i] m;}// 出队列操作 取出最前面的值 通过循环遍历把所有的数据向前一位public int out() {int index 0;int temp a[0];for(int j 0;j < i;j) {a[j] a[j 1];…

php 简转繁体,php如何实现简体繁体转换

思路&#xff1a;根据中文简体、繁体对应的数据表&#xff0c;将其整理成一个以简体字为键&#xff0c;繁体字为值的一个一维数组&#xff0c;类似下面这样的一个数组结构&#xff1a;$dataarray(侧>側,厂>廠);在线学习视频分享&#xff1a;php视频教程好了&#xff0c;根…

学习Spring Boot:(八)Mybatis使用分页插件PageHelper

首先Mybqtis可以通过SQL 的方式实现分页很简单&#xff0c;只要在查询SQL 后面加上limit #{currIndex} , #{pageSize}就可以了。 本文主要介绍使用拦截器的方式实现分页。 实现原理 拦截器实现了拦截所有查询需要分页的方法&#xff0c;并且利用获取到的分页相关参数统一在s…