RabbitMQ保姆级教程

文章目录

  • 前言
  • 一、MQ是什么?
    • 1.1 AMQP
  • 二、在Linux安装RabbitMQ
    • 2.1 安装
    • 2.2 RabbitMQ启动命令
    • 2.3 开启RabbitMQ 后台管理界面
      • 2.3.1 登录rabbitMQ UI界面
    • 2.3 Docker启动RabbitMQ
    • 2.4 常见消息模型
    • 2.5 生产者(Producer) / 消费者(Consumer)
    • 2.6 工作队列模式(Work Queues)
    • 2.7 参数细节
    • 2.8 实现能者多劳
      • 2.8.1 Ack手动应答防止数据丢失和消息拒收后重新发送
      • 2.8.2 预取值
    • 2.9 Publish/Subscribe 发布/订阅
    • 2.10 Routing(路由) - Direct
    • 2.11 Routing(路由)- Topic
  • 三、进阶篇 高级特性
    • 3.1 死信队列
      • 3.1.1 死信队列实战:消息TTL过期
      • 3.1.2 死信队列实战:队列达到最大长度 设置正常队列最大长度
      • 3.1.3 死信队列实战:消息被拒
    • 3.2 基于SpringBoot实现延迟队列
    • 3.3 发布确认 高级特性
      • 3.3.1 可靠性投递confirm模式
      • 3.3.2 可靠性投递return模式
    • 3.4 优先级队列
    • 3.5 消费端限流

前言

提示:RaabitMQ消息队列的学习。


一、MQ是什么?

  • MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统
    之间进行通信。
  • RabbitMQ 是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包
    裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑 RabbitMQ 是
    一个快递站,一个快递员帮你传递快件。RabbitMQ 与快递站的主要区别在于,它不处理快件而是接收,
    存储和转发消息数据。
    在这里插入图片描述
  • 工作原理
    在这里插入图片描述

1.1 AMQP

  • AMQP,即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用
    层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,遵
    循此协议,不收客户端和中间件产品和开发语言限制。2006年,AMQP 规范发布。类比HTTP。

二、在Linux安装RabbitMQ

2.1 安装

	1. 我们把erlang环境与rabbitMQ 安装包解压到Linux2. rpm -ivh erlang安装包3. yum install socat -y 安装依赖 / rpm -ivh socat依赖包 --force --nodeps4. rpm -ivh rabbitmq安装包

2.2 RabbitMQ启动命令

	1. 开启服务 /sbin/service rabbitmq-server start  / service rabbitmq-server start 2. 停止服务 service rabbitmq-server stop 3. 重启服务 service rabbitmq-server restart 

2.3 开启RabbitMQ 后台管理界面

	1.  rabbitmq-plugins enable rabbitmq_management
  • 添加一个新的用户
	1. 创建rabbitMQ账号rabbitmqctl add_user 用户名 密码2. 设置用户角色rabbitmqctl set_user_tags 用户名 administrator #设置用户名为超级管理员3. 设置用户权限rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"4. 查看rabbitmq的用户和角色rabbitmqctl list_users5. 登录rabbitMQ 界面: Linux虚拟机ip:15672 即可

2.3.1 登录rabbitMQ UI界面

记得开放15672端口访问 Linux虚拟机ip:15672 即可

输入账户密码后看到这个界面代表成功
在这里插入图片描述

2.3 Docker启动RabbitMQ

Docker安装

	1. docker pull rabbitmq:3-management2. 开启rabbitMQdocker run \-e RABBITMQ_DEFAULT_USER=root \-e RABBITMQ_DEFAULT_PASS=123456 \--name mq \--hostname mq1 \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq:3-management

2.4 常见消息模型

  • channel:操作MQ的工具
  • exchange:路由消息到队列中
  • queue:缓存消息
  • virtual host:虚拟主机,是对queue、exchange等资源的逻辑分组
    在这里插入图片描述

2.5 生产者(Producer) / 消费者(Consumer)

  • 所需依赖
	<dependencies><!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.7.3</version></dependency><!-- https://mvnrepository.com/artifact/commons-io/commons-io --><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.4</version></dependency></dependencies>
1234567891011121314

在这里插入图片描述

  • 生产者代码
	/*** 生产者:发消息*/
public class Producer {//队列名称public static final String QUEUE_NAME="hello";//发消息public static void main(String[] args) throws Exception{//创建一个连接工厂ConnectionFactory factory = new ConnectionFactory();//工厂IP连接rabbitMQ队列factory.setHost("ip地址");//设置用户名密码factory.setUsername("RabbitMQ登录用户名");factory.setPassword("RabbitMQ登录密码");//创建连接Connection connection = factory.newConnection();//通过连接来获取 信道来发消息Channel channel = connection.createChannel();/*** 生成一个队列* 1.队列名称* 2.队列里面的信息是否持久化 默认false 信息存储在内存中* 3.该列队是否只供一个消费者进行消费,是否进行消息共享*   true:可以多个消费者消费*   false:只能一个消费者消费* 4.是否自动删除,最后一个消费者断开连接后,该队列是否自动删除*   true:自动删除*   false:不自动删除* 5.其他参数*/channel.queueDeclare(QUEUE_NAME,false,false,false,null);//发消息String message="hello rabbitMQ";/*** 发送一个消息* 1.发送到哪个交换机* 2.路由的KEY值是哪个? 指的是本次队列的名称* 3.其他参数信息* 4.发送的消息体*/channel.basicPublish("",QUEUE_NAME,null,message.getBytes());System.out.println("消息发送完毕");channel.close();connection.close();}
}
  • 消费者
	/*** 消费者:接收消息*/
public class Consumer {//队列名称,接收此队列的消息public static final String QUEUE_NAME="hello";public static void main(String[] args) throws Exception{//创建连接工程ConnectionFactory factory = new ConnectionFactory();//工厂IP连接rabbitMQ队列factory.setHost("ip地址");//设置用户名密码factory.setUsername("RabbitMQ登录用户名");factory.setPassword("RabbitMQ登录密码");//创建连接Connection connection = factory.newConnection();//通过连接来获取 信道来收消息Channel channel = connection.createChannel();//声明 接收消息的回调DeliverCallback deliverCallback=(consumerTag, message)-> {//message:包含消息头和消息体,我们只想拿到消息体//若不进行转换,直接输出message我们拿到的则是地址String data = new String(message.getBody());System.out.println(new String(message.getBody()));};//声明 取消消费的回调CancelCallback cancelCallback=consumerTag->{System.out.println("消费消息被中断");};/*** 消费者消费消息* 1.消费哪个队列* 2.消费成功之后是否要自动答应*   true:代表自动应答*   false:手动应答* 3.消费成者成功消费的回调* 4.消费者取消消费的回调*/channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}
}

2.6 工作队列模式(Work Queues)

  • 模式说明
    在这里插入图片描述
  • Work Queues:与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消费,采用的是 轮询机制
  • 应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度
  • 工作模式:生产者
	public class ProducerWorkQueue {//队列名称public static final String QUEUE_NAME="hello";//发消息public static void main(String[] args) throws Exception{//创建一个连接工厂ConnectionFactory factory = new ConnectionFactory();//工厂IP连接rabbitMQ队列factory.setHost("ip地址");//设置用户名密码factory.setUsername("RabbitMQ登录用户名");factory.setPassword("RabbitMQ登录密码");//创建连接Connection connection = factory.newConnection();//通过连接来获取 信道来发消息Channel channel = connection.createChannel();/*** 生成一个队列* 1.队列名称* 2.队列里面的信息是否持久化 默认false 信息存储在内存中* 3.该列队是否只供一个消费者进行消费,是否进行消息共享*   true:可以多个消费者消费*   false:只能一个消费者消费* 4.是否自动删除,最后一个消费者断开连接后,该队列是否自动删除*   true:自动删除*   false:不自动删除* 5.其他参数*/channel.queueDeclare(QUEUE_NAME,false,false,false,null);for (int i = 1; i <= 10; i++) {//发消息String message=i+"hello rabbitMQ";/*** 发送一个消息* 1.发送到哪个交换机* 2.路由的KEY值是哪个? 指的是本次队列的名称* 3.其他参数信息* 4.发送的消息体*/channel.basicPublish("",QUEUE_NAME,null,message.getBytes());System.out.println("消息发送完毕");}channel.close();connection.close();}
}
  • 工作模式:两个消费者
	/*** 消费者:接收消息*/
public class ConsumerWorkQueues1 {//队列名称,接收此队列的消息public static final String QUEUE_NAME="hello";public static void main(String[] args) throws Exception{//创建连接工程ConnectionFactory factory = new ConnectionFactory();//工厂IP连接rabbitMQ队列factory.setHost("ip地址");//设置用户名密码factory.setUsername("RabbitMQ登录用户名");factory.setPassword("RabbitMQ登录密码");//创建连接Connection connection = factory.newConnection();//通过连接来获取 信道来收消息Channel channel = connection.createChannel();//声明 接收消息的回调DeliverCallback deliverCallback=(consumerTag, message)-> {//message:包含消息头和消息体,我们只想拿到消息体//若不进行转换,直接输出message我们拿到的则是地址String data = new String(message.getBody());System.out.println(new String(message.getBody()));};//声明 取消消费的回调CancelCallback cancelCallback=consumerTag->{System.out.println("消费消息被中断");};/*** 消费者消费消息* 1.消费哪个队列* 2.消费成功之后是否要自动答应*   true:代表自动应答*   false:手动应答* 3.消费成者成功消费的回调* 4.消费者取消消费的回调*/channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}
}
	/*** 消费者:接收消息*/
public class ConsumerWorkQueues2 {//队列名称,接收此队列的消息public static final String QUEUE_NAME="hello";public static void main(String[] args) throws Exception{//创建连接工程ConnectionFactory factory = new ConnectionFactory();//工厂IP连接rabbitMQ队列factory.setHost("ip地址");//设置用户名密码factory.setUsername("RabbitMQ登录用户名");factory.setPassword("RabbitMQ登录密码");//创建连接Connection connection = factory.newConnection();//通过连接来获取 信道来收消息Channel channel = connection.createChannel();//声明 接收消息的回调DeliverCallback deliverCallback=(consumerTag, message)-> {//message:包含消息头和消息体,我们只想拿到消息体//若不进行转换,直接输出message我们拿到的则是地址String data = new String(message.getBody());System.out.println(new String(message.getBody()));};//声明 取消消费的回调CancelCallback cancelCallback=consumerTag->{System.out.println("消费消息被中断");};/*** 消费者消费消息* 1.消费哪个队列* 2.消费成功之后是否要自动答应*   true:代表自动应答*   false:手动应答* 3.消费成者成功消费的回调* 4.消费者取消消费的回调*/channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}
}
  • 结果:各执行五次,也验证了 我们上面所说的 轮询机制
    在这里插入图片描述
    在这里插入图片描述
  • 小结:
    一个消息只能有一个接收者,但是可以有多个接收者

2.7 参数细节

  • durable:是否进行持久化,当前队列如果进行持久化,我们重启rabbitMQ后当前队列依旧存在
	//消费者生成的队列channel.queueDeclare(QUEUE_NAME,(durable)true/false,false,false,null);
  • props :队列中的信息是否持久化,若消息持久化,我们重启rabbitMQ后当前队列依旧存在
	//MessageProperties.PERSISTENT_TEXT_PLAIN:将消息进行持久化channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());  
  • autoDelete:是否自动删除,最后一个消费者断开连接后,该队列是否自动删除
	channel.queueDeclare(QUEUE_NAME,false,false,(autoDelete的参数位置)false,null);
  • autoAck:自动应答
	若开启了自动应答,rabbitMQ消息队列分配给消费者10个数据,只要消费者拿到消息队列的数据时,就会告诉消息队列,数据处理完毕。若当我们处理到第5个数据时,消费者出现了宕机,死掉了,则会出现数据丢失
	channel.basicConsume(QUEUE_NAME,(autoAck是否自动应答)false,deliverCallback,cancelCallback);

2.8 实现能者多劳

  • 业务场景:

    当我们的两个消费者执行业务时,a消费者执行速度快,b消费者执行速度慢,我们想让执行速度快的多执行,应当如何实现呢?

    1. 开启不公平分发,能者多劳 channel.basicQos(1); 0:轮询机制 1:能者多劳
    2. 开启手动确认
  • 消费者a

	/*** 消费者:接收消息*/
public class ConsumerWorkQueues1 {//队列名称,接收此队列的消息public static final String QUEUE_NAME="hello";public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登录用户名");factory.setPassword("RabbitMQ登录密码");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//开启不公平分发,能者多劳channel.basicQos(1);DeliverCallback deliverCallback=(consumerTag, message)-> {String data = new String(message.getBody());System.out.println(new String(message.getBody()));//参数1:确认队列中那个具体的消息:// 可以获取消息的id // 消息routingkey// 交换机 exchange// 消息和重传标志//参数2:是否开启多个消息同时确认channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback=consumerTag->{System.out.println("消费消息被中断");};/*** 消费者消费消息* 1.消费哪个队列* 2.消费成功之后是否要自动答应*   true:代表自动应答*   false:手动应答* 3.消费成者成功消费的回调* 4.消费者取消消费的回调*///关闭自动应答 falsechannel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);}
}
  • 消费者b:消费消息时然消费者b休眠100毫秒
public class ConsumerWorkQueues2 {//队列名称,接收此队列的消息public static final String QUEUE_NAME="hello";public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登录用户名");factory.setPassword("RabbitMQ登录密码");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//开启不公平分发,能者多劳channel.basicQos(1);//声明 接收消息的回调DeliverCallback deliverCallback=(consumerTag, message)-> {try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(new String(message.getBody()));//手动确认消息://参数1:确认队列中那个具体的消息 参数2:是否开启多个消息同时确认channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};//声明 取消消费的回调CancelCallback cancelCallback=consumerTag->{System.out.println("消费消息被中断");};/*** 消费者消费消息* 1.消费哪个队列* 2.消费成功之后是否要自动答应*   true:代表自动应答*   false:手动应答* 3.消费成者成功消费的回调* 4.消费者取消消费的回调*///关闭自动应答 falsechannel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);}
  • 执行结果:
    消费者a执行
    在这里插入图片描述
    消费者b执行
    在这里插入图片描述

2.8.1 Ack手动应答防止数据丢失和消息拒收后重新发送

  • 应用场景:两个消费者每次都从队列中来获取消息,若消费者a正常执行,消费者b在执行过程中出现了宕机,挂掉了那么我们未被消费的消息会被重新放回到队列中,防止消息丢失。
  1. 生产者
public class ProducerWorkQueue {//队列名称public static final String QUEUE_NAME="hello";//发消息public static void main(String[] args) throws Exception{//创建一个连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登录用户名");factory.setPassword("RabbitMQ登录密码");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME,false,false,false,null);Scanner scanner = new Scanner(System.in);while (true){String msg = scanner.nextLine();channel.basicPublish("",QUEUE_NAME, null,msg.getBytes());System.out.println("消息发送完毕");}}
}
  1. 消费者a
public class ConsumerWorkQueues1 {//队列名称,接收此队列的消息public static final String QUEUE_NAME="hello";public static void main(String[] args) throws Exception{//创建连接工程ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登录用户名");factory.setPassword("RabbitMQ登录密码");//创建连接Connection connection = factory.newConnection();//通过连接来获取 信道来收消息Channel channel = connection.createChannel();//声明 接收消息的回调DeliverCallback deliverCallback=(consumerTag, message)-> {//message:包含消息头和消息体,我们只想拿到消息体//若不进行转换,直接输出message我们拿到的则是地址String data = new String(message.getBody());System.out.println("消费者1===>"+new String(message.getBody()));try {int i=3/0;//模拟业务发生异常channel.basicAck(message.getEnvelope().getDeliveryTag(),false);}catch (Exception e){System.out.println("拒收消息发生了异常");//拒收消息//参数一:表示投递的消息标签//参数二:是否开启多个消息同时确认//参数三:是否重新给队列发送channel.basicNack(message.getEnvelope().getDeliveryTag(),false,true);}};//声明 取消消费的回调CancelCallback cancelCallback=consumerTag->{System.out.println("消费消息被中断");};/*** 消费者消费消息* 1.消费哪个队列* 2.消费成功之后是否要自动答应*   true:代表自动应答*   false:手动应答* 3.消费成者成功消费的回调* 4.消费者取消消费的回调*///关闭自动应答 falsechannel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);}
}
  1. 消费者b
public class ConsumerWorkQueues2 {//队列名称,接收此队列的消息public static final String QUEUE_NAME="hello";public static void main(String[] args) throws Exception{//创建连接工程ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登录用户名");factory.setPassword("RabbitMQ登录密码");//创建连接Connection connection = factory.newConnection();//通过连接来获取 信道来收消息Channel channel = connection.createChannel();//声明 接收消息的回调DeliverCallback deliverCallback=(consumerTag, message)-> {//message:包含消息头和消息体,我们只想拿到消息体//若不进行转换,直接输出message我们拿到的则是地址System.out.println("睡10秒");try {Thread.sleep(1000*10);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(new String(message.getBody()));//手动确认消息://参数1:确认队列中那个具体的消息 参数2:是否开启多个消息同时确认channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};//声明 取消消费的回调CancelCallback cancelCallback=consumerTag->{System.out.println("消费消息被中断");};/*** 消费者消费消息* 1.消费哪个队列* 2.消费成功之后是否要自动答应*   true:代表自动应答*   false:手动应答* 3.消费成者成功消费的回调* 4.消费者取消消费的回调*///关闭自动应答 falsechannel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);}
}
  • 当消费者b在消费消息时,我们让消费者b睡眠10秒模拟业务流程,在这10秒内我们手动关掉消费者b
  1. 发送 aa 消费者a接收
    在这里插入图片描述

在这里插入图片描述

  1. 发送bb消费者b接收,在消费者b睡眠过程中我们停止消费者b,来看看手动应答的结果
    在这里插入图片描述
    此时我们查看消费者a,出现了本应该是消费者b消费的消息bb
    在这里插入图片描述

2.8.2 预取值

	channel.basicQos(1);  0:轮询机制  1:能者多劳 若值>1代表当前队列的预取值,代表当前队列大概会拿到多少值

2.9 Publish/Subscribe 发布/订阅

在这里插入图片描述

  • 也可以叫 广播模式,当我们的P消费者发送了消息,交给了X(交换机),所有绑定了这个X(交换机)的队列都可以接收到P消费者发送的消息
  • 代码实现生产者
	public class Provider {public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登录用户名");factory.setPassword("RabbitMQ登录密码");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//将通道声明指定交换机,   参数一:交换机名称  参数二:交换机类型 fanout广播类型	//参数2:交换机类型也可使用  BuiltinExchangeType. 的方式来查看选择channel.exchangeDeclare("order", "fanout");channel.basicPublish("order","",null,"fanout type message".getBytes());channel.close();connection.close();}
}
  • 代码实现消费者
	public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登录用户名");factory.setPassword("RabbitMQ登录密码");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//通道绑定交换机channel.exchangeDeclare("order","fanout");//获取临时队列名称String queueName = channel.queueDeclare().getQueue();//绑定交换机和队列channel.queueBind(queueName,"order","");channel.basicConsume(queueName,true,(consumerTag,message)->{System.out.println("消费者1===>"+new String(message.getBody()));},consumerTag -> System.out.println("取消消费消息"));}
}

2.10 Routing(路由) - Direct

在这里插入图片描述

routing值订阅模型-Direct(直连)

  • 在上面广播模式中,一条消息,会被所有订阅的队列都消费。但是在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange

    在Direct模型下:

    1. 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
    2. 消息的发送方在Exchange发送消息时,也必须指定消息的RoutingKey
    3. Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的RoutingKey与消息的Routing Key完全一致,才会接受到消息
  • 生产者

public class Provider {public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登录用户名");factory.setPassword("RabbitMQ登录密码");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//通过信道声明交换机, 参数一:交换机名称  参数二:direct 路由模式channel.exchangeDeclare("logsExchange","direct");//发送消息 参数一:发送信息到的交换机名称//       参数二:绑定路由 发送给队列的那个路由key,//只有当队列的路由key与交换机的路由key相对应时,队列才会接受到消息channel.basicPublish("logsExchange","msgRouting",null,"routing logs direct info 发送了消息".getBytes());channel.close();connection.close();}
}
  • 消费者
public class Consumer1 {public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登录用户名");factory.setPassword("RabbitMQ登录密码");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("logs","direct");//获取临时队列名String queueName = channel.queueDeclare().getQueue();//绑定队列:参数一:临时队列名称 参数二:绑定的交换机名称 参数三:路由key,若消费者的路由key与生产者的路由key相同则可以收到消息channel.queueBind(queueName,"logsExchange","infoRouting");channel.queueBind(queueName,"logsExchange","msgRouting");channel.basicConsume(queueName,true,(consumerTag, message) -> System.out.println(new String(message.getBody())),consumerTag -> System.out.println(1));}
}
  • 消费者2
public class Consumer2 {public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登录用户名");factory.setPassword("RabbitMQ登录密码");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("logs","direct");//获取临时队列名String queueName = channel.queueDeclare().getQueue();channel.queueBind(queueName,"logs","error");channel.queueBind(queueName,"logs","msg");channel.basicConsume(queueName,true,(consumerTag, message) -> System.out.println(new String(message.getBody())),consumerTag -> System.out.println(1));}
}

2.11 Routing(路由)- Topic

在这里插入图片描述

  • Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。
  • 只不过Topic类型Exchange可以让队列在绑定RoutingKey的时候使用通配符!
	#通配符* (star) can substitute for exactly one word :匹配一个词# (hash) can substitute for zero or more words :匹配一个或多个词
  • 生产者
public class Provider {public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登录用户名");factory.setPassword("RabbitMQ登录密码");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//通过信道声明交换机, 参数一:交换机名称  参数二:topic 动态路由channel.exchangeDeclare("order","topic");String routingKey="user.order";//发送消息 参数一:发送信息到的交换机名称  参数二:绑定路由 发送给队列的那个路由keychannel.basicPublish("order",routingKey,null,("routing logs topic发送了消息"+routingKey).getBytes());channel.close();connection.close();}
}
  • 消费者1
public class Consumer1 {public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登录用户名");factory.setPassword("RabbitMQ登录密码");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("order","topic");//获取临时队列名String queueName = channel.queueDeclare().getQueue();//绑定队列:参数一:临时队列名称 参数二:绑定的交换机名称 参数三:动态通配符路由keychannel.queueBind(queueName,"order","user.*");channel.basicConsume(queueName,true,(consumerTag, message) -> System.out.println(new String(message.getBody())),consumerTag -> System.out.println(1));}
}
  • 消费者2
public class Consumer2 {public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登录用户名");factory.setPassword("RabbitMQ登录密码");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("order","topic");//获取临时队列名String queueName = channel.queueDeclare().getQueue();//绑定队列:参数一:临时队列名称 参数二:绑定的交换机名称 参数三:动态通配符路由keychannel.queueBind(queueName,"order","user.#");channel.basicConsume(queueName,true,(consumerTag, message) -> System.out.println(new String(message.getBody())),consumerTag -> System.out.println(1));}
}

三、进阶篇 高级特性

3.1 死信队列

	死信,顾名思义就是无法被消费的信息,字面意思可以这样理解,一般来说,producer将消息投递到queue里,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,自然就有了死信队列
  • 应用场景
	为了保证订单业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。比如说:用户在商城下单成功并点击去支付后在指定时间未支付时自动失效
  • 生产者:给正产的消息队列发送消息,并且设置消息过期时间为10S,超过10S消息未被消费,则消息进入死信队列
public class TTLProvider {//普通交换机名称public static final String NORMAL_EXCHANGE="normal_exchange";public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip");factory.setUsername("账户");factory.setPassword("密码");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//发送死信 设置TTL过期时间AMQP.BasicProperties properties=new AMQP.BasicProperties().builder().expiration("10000").build();for (int i = 1; i <= 10; i++) {String msg=""+i;channel.basicPublish(NORMAL_EXCHANGE,"normal",properties,msg.getBytes());}System.out.println("结束发送");}
}
  • 正常队列消费者
public class TTLConsumer1 {//普通交换机名称public static final String NORMAL_EXCHANGE="normal_exchange";//死信交换机名称public static final String DEAD_EXCHANGE="dead_exchange";//普通队列名称public static final String NORMAL_QUEUE="normal_queue";//死信队列名称public static final String DEAD_QUEUE="dead_queue";public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip");factory.setUsername("账户");factory.setPassword("密码");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//声明普通交换机和死信交换机channel.exchangeDeclare(NORMAL_EXCHANGE,"direct");channel.exchangeDeclare(DEAD_EXCHANGE,"direct");//声明普通队列HashMap<String, Object> map = new HashMap<>();//当消息被拒绝接受/未被消费 会将消息转发到死信队列//正常队列设置死信交换机map.put("x-dead-letter-exchange",DEAD_EXCHANGE);//设置死信队列的routingKeymap.put("x-dead-letter-routing-key","dead");channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);//声明死信队列channel.queueDeclare(DEAD_QUEUE,false,false,false,null);//绑定普通交换机与普通队列channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"normal");//绑定死信交换机与死信队列channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"dead");DeliverCallback deliverCallback=( consumerTag, message)->{System.out.println("Consumer1接收消息===>"+new String(message.getBody(),"UTF-8"));};CancelCallback cancelCallback=(consumerTag)-> System.out.println(consumerTag);channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,cancelCallback);}
}
  • 死信队列消费者
public class TTLConsumer2 {//死信队列名称public static final String DEAD_QUEUE="dead_queue";public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip");factory.setUsername("账户");factory.setPassword("密码");Connection connection = factory.newConnection();Channel channel = connection.createChannel();DeliverCallback deliverCallback=( consumerTag, message)->{System.out.println("Consumer1接收消息===>"+new String(message.getBody(),"UTF-8"));};CancelCallback cancelCallback=(consumerTag)-> System.out.println(consumerTag);channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);}
}
  • 结果:当设置了死信队列,和TTL过期时间,若超过了过期时间消息未被消费,则消息会转发到死信队列
    死信队列产生三大原因
  • 消息被拒接
  • 消息TTL过期
  • 队列达到最大长度

3.1.1 死信队列实战:消息TTL过期

在这里插入图片描述

  • 配置类
@Configuration
public class RabbitMQConfiguration {//普通交换机public static final String X_EXCHANGE="X";//死信交换机public static final String Y_DEAD_LETTER_EXCHANGE="Y";//普通队列public static final String QUEUE_A="QA";public static final String QUEUE_B="QB";//死信队列public static final String DEAD_QUEUE_D="QD";//声明普通x交换机@Beanpublic DirectExchange xExchange(){return new DirectExchange(X_EXCHANGE);}//声明死信交换机@Beanpublic DirectExchange yExchange(){return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);}//声明普通队列A TTL:10S@Beanpublic Queue queueA(){Map<String,Object> arg=new HashMap<>();//设置死信交换机arg.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//设置死信routingKeyarg.put("x-dead-letter-routing-key","YD");//设置TTL过期时间arg.put("x-message-ttl",10000);return QueueBuilder.durable(QUEUE_A).withArguments(arg).build();}//声明普通队列B TTL:40S@Beanpublic Queue queueB(){Map<String,Object> arg=new HashMap<>();//设置死信交换机arg.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//设置死信routingKeyarg.put("x-dead-letter-routing-key","YD");//设置TTL过期时间arg.put("x-message-ttl",40000);return QueueBuilder.durable(QUEUE_B).withArguments(arg).build();}//死信队列@Beanpublic Queue queueD(){return QueueBuilder.durable(DEAD_QUEUE_D).build();}@Beanpublic Binding queueABindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueA).to(xExchange).with("XA");}@Beanpublic Binding queueBBindingX(@Qualifier("queueB") Queue queueB,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueB).to(xExchange).with("XB");}@Beanpublic Binding queueDBindingY(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange") DirectExchange yExchange){return BindingBuilder.bind(queueD).to(yExchange).with("YD");}
}
  • TTL生产者
@RestController
@RequestMapping("/ttl")
@Slf4j
public class TTLProvider {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/{msg}")public void sendMsg(@PathVariable("msg") String msg){log.info("当前发送时间:{}发送了一条消息",new Date().toString());rabbitTemplate.convertAndSend("X","XA","TTL消息延迟为10S,消息为===>"+msg);rabbitTemplate.convertAndSend("X","XB","TTL消息延迟为40S,消息为===>"+msg);}
}
  • 死信队列消费者
@Component
@Slf4j
public class DeadLetterConsumer {@RabbitListener(queues = "QD")public void t1(Message message, Channel channel)throws Exception{log.info("收到死信队列的消息{},时间为{}",new String(message.getBody(),"UTF-8"),new Date().toString());}
}
  • 死信队列-TTL过期时间测试结果
    在这里插入图片描述

3.1.2 死信队列实战:队列达到最大长度 设置正常队列最大长度

  1. 生产者
public class Producer {//普通交换机名称public static final String NORMAL_EXCHANGE="normal_exchange";public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登录用户名");factory.setPassword("RabbitMQ登录密码");Connection connection = factory.newConnection();Channel channel = connection.createChannel();for (int i = 1; i <= 10; i++) {String msg=""+i;channel.basicPublish(NORMAL_EXCHANGE,"normal",null,msg.getBytes());}}
}
  1. 消费者a
    //设置当前正常队列的长度限制超过长度,后面的消息会进入到死信队列
    map.put(“x-max-length”,6);
public class Consumer01 {//普通交换机名称public static final String NORMAL_EXCHANGE="normal_exchange";//死信交换机名称public static final String DEAD_EXCHANGE="dead_exchange";//普通队列名称public static final String NORMAL_QUEUE="normal_queue";//死信队列名称public static final String DEAD_QUEUE="dead_queue";public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登录用户名");factory.setPassword("RabbitMQ登录密码");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//声明普通交换机和死信交换机channel.exchangeDeclare(NORMAL_EXCHANGE,"direct");channel.exchangeDeclare(DEAD_EXCHANGE,"direct");//声明普通队列HashMap<String, Object> map = new HashMap<>();//正常队列设置死信交换机map.put("x-dead-letter-exchange",DEAD_EXCHANGE);//设置死信队列的routingKeymap.put("x-dead-letter-routing-key","dead");//设置当前正常队列的长度限制超过长度,后面的消息会进入到死信队列map.put("x-max-length",6);channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);//声明死信队列channel.queueDeclare(DEAD_QUEUE,false,false,false,null);//绑定普通交换机与普通队列channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"normal");//绑定死信交换机与死信队列channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"dead");DeliverCallback deliverCallback=( consumerTag, message)->{System.out.println("Consumer1接收消息===>"+new String(message.getBody(),"UTF-8"));};CancelCallback cancelCallback=(consumerTag)-> System.out.println(consumerTag);channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,cancelCallback);}
}
  1. 消费者b
public class Consumer02 {//死信队列名称public static final String DEAD_QUEUE="dead_queue";public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登录用户名");factory.setPassword("RabbitMQ登录密码");Connection connection = factory.newConnection();Channel channel = connection.createChannel();DeliverCallback deliverCallback=( consumerTag, message)->{System.out.println("Consumer1接收消息===>"+new String(message.getBody(),"UTF-8"));};CancelCallback cancelCallback=(consumerTag)-> System.out.println(consumerTag);channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);}
}

3.1.3 死信队列实战:消息被拒

  1. 生产者
public class Producer {//普通交换机名称public static final String NORMAL_EXCHANGE="normal_exchange";public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登录用户名");factory.setPassword("RabbitMQ登录密码");Connection connection = factory.newConnection();Channel channel = connection.createChannel();for (int i = 1; i <= 10; i++) {String msg="info"+i;channel.basicPublish(NORMAL_EXCHANGE,"normal",null,msg.getBytes());}}
}
  1. 消费者a
  • 此消息被拒接,是否重新放回正常队列, false:不放回 则会放到死信队列
  • 1.channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
  • 2.并且开启手动应答
public class Consumer01 {//普通交换机名称public static final String NORMAL_EXCHANGE="normal_exchange";//死信交换机名称public static final String DEAD_EXCHANGE="dead_exchange";//普通队列名称public static final String NORMAL_QUEUE="normal_queue";//死信队列名称public static final String DEAD_QUEUE="dead_queue";public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip");factory.setUsername("登录账户");factory.setPassword("登录密码");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//声明普通交换机和死信交换机channel.exchangeDeclare(NORMAL_EXCHANGE,"direct");channel.exchangeDeclare(DEAD_EXCHANGE,"direct");//声明普通队列HashMap<String, Object> map = new HashMap<>();//正常队列设置死信交换机map.put("x-dead-letter-exchange",DEAD_EXCHANGE);//设置死信队列的routingKeymap.put("x-dead-letter-routing-key","dead");channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);//声明死信队列channel.queueDeclare(DEAD_QUEUE,false,false,false,null);//绑定普通交换机与普通队列channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"normal");//绑定死信交换机与死信队列channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"dead");DeliverCallback deliverCallback=( consumerTag, message)->{String msg=new String(message.getBody());if("info5".equals(msg)){System.out.println("Consumer1接收消息===>"+msg+"此消息被拒绝");//此消息被拒接,是否重新放回正常队列, false:不放回 则会放到死信队列channel.basicReject(message.getEnvelope().getDeliveryTag(),false);}else {System.out.println("Consumer1接收消息===>"+new String(message.getBody(),"UTF-8"));channel.basicAck(message.getEnvelope().getDeliveryTag(),false);}};CancelCallback cancelCallback=(consumerTag)-> System.out.println(consumerTag);//开启手动应答channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,cancelCallback);}
}
  1. 消费者b
public class Consumer02 {//死信队列名称public static final String DEAD_QUEUE="dead_queue";public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登录用户名");factory.setPassword("RabbitMQ登录密码");Connection connection = factory.newConnection();Channel channel = connection.createChannel();DeliverCallback deliverCallback=( consumerTag, message)->{System.out.println("Consumer1接收消息===>"+new String(message.getBody(),"UTF-8"));};CancelCallback cancelCallback=(consumerTag)-> System.out.println(consumerTag);channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);}
}

3.2 基于SpringBoot实现延迟队列

在这里插入图片描述

  1. 配置队列交换机
@Configuration
public class QueueConfig {@Bean("exchange")public DirectExchange exchange(){return new DirectExchange("msg");}@Bean("simpleQue")public Queue simpleQue(){HashMap<String, Object> map = new HashMap<>();//设置死信交换机map.put("x-dead-letter-exchange","dead");//设置死信路由map.put("x-dead-letter-routing-key","deadKey");//消息失效时间map.put("x-message-ttl",10000);return new Queue("simple",false,false,false,map);}@Beanpublic Binding simpleQueueBandingExchange(@Qualifier("simpleQue") Queue simple,@Qualifier("exchange") DirectExchange msg)throws Exception{return BindingBuilder.bind(simple).to(msg).with("info");}@Bean("deadExchange")public DirectExchange exchange1(){return new DirectExchange("dead");}@Bean("deadQueue")public Queue deadQ(){return new Queue("deadQue",false,false,false,null);}@Beanpublic Binding deadKeyBindingDeadExchange(@Qualifier("deadQueue")Queue queue,@Qualifier("deadExchange")DirectExchange dead){//绑定死信队列到死信交换机通过路由return BindingBuilder.bind(queue).to(dead).with("deadKey");}
}
  1. 生产者
@RestController
public class Provider {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/ttl/{message}")public void t1(@PathVariable String message){String queueName="simple";Date date = new Date();System.out.println(date);rabbitTemplate.convertAndSend("msg","info",message);}
}
  1. 消费者
@Component
public class Consumer {@RabbitListener(queues = "deadQue")public void hello(Message msg, Channel channel)throws Exception{System.out.println("接收到消息"+new String(msg.getBody()));Date date1 = new Date();System.out.println(date1);}
}
  • 我们看到消息每隔十秒更新一次
    在这里插入图片描述

3.3 发布确认 高级特性

3.3.1 可靠性投递confirm模式

  • 场景:在生产环境中由于一些不明原因,导致rabbitmq重启,在rabbitmq重启期间的生产者消息投递失败,导致消息丢失,需要手动处理和恢复。-可靠性投递confirm模式
  • 需要在application核心配置文件中设置发布确认类型
  • spring-rabbitmq-publisher-confirm-type: correlated
  • 类型1:none:禁用发布确认模式,是默认值
  • 类型2:correlated:发布消息成功到交换机后出发回调方法
  • 类型3:simple:和correlated效果一样,但是如果回调返回的是false,会关闭信道,接下来无法发送消息
  1. 配置类
@Component
public class confirmConfig {public static final String CONFIRM_EXCHANGE_NAME="confirm.exchange";public static final String CONFIRM_QUEUE="confirm.queue";public static final String CONFIRM_ROUTING_KEY="confirm";@Bean("confirmExchange")public DirectExchange confirmExchange(){return new DirectExchange(CONFIRM_EXCHANGE_NAME);}@Bean("confirmQueue")public Queue confirmQueue(){return new Queue(CONFIRM_QUEUE);}@Beanpublic Binding confirmQueueBindingConfirmExchange(@Qualifier("confirmExchange")DirectExchange confirmExchange,@Qualifier("confirmQueue")Queue confirmQueue){return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);}
}
  • 当生产者发送给交换机消息时,交换机的名字错了,或者交换机挂掉了,会导致消息的丢失,那么我们需要实现回调接口,当交换机收到消息后会给生产者发送回调消息
  1. 实现回调接口:实现 RabbitTemplate.ConfirmCallback接口的confirm方法并且将其注入到rabbit模板的内部类中
@Component
@Slf4j
public class ConfirmCallback implements RabbitTemplate.ConfirmCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;//注入@PostConstruct //当所有注解执行完后,再执行这个注解public void init(){rabbitTemplate.setConfirmCallback(this);}/*** 交换机确认回调方法*  发消息,交换机接收到了,回调*  参数*      1. correlationData:保存消息的ID及相关信息,这个消息是我们生产者手动传入的*      2. 交换机收到消息 true*      3. null*//*** 交换机确认回调方法*  发消息,交换机接收失败,回调*  参数*      1. correlationData:保存消息的ID及相关信息*      2. 交换机收到消息 false*      3. cause:失败的原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {String id=correlationData!=null?correlationData.getId():"";if(b){log.info("交换机已经收到了ID为{}的消息",id);}else {log.info("交换机为收到了ID为{}的消息,原因是:{}",id,s);}}
}
  1. 生产者
@RestController
public class ConfirmProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/sendMsg/{msg}")public void t1(@PathVariable String msg){CorrelationData correlationData = new CorrelationData();correlationData.setId("1");rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,ConfirmConfig.CONFIRM_ROUTING_KEY,"嘿嘿嘿".getBytes(),correlationData);}
}
  1. 消费者
@Component
public class ConfirmConsumer {@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE)public void consumer(Message message){System.out.println("高级特性确认发布消费者收到了消息===>"+new String(message.getBody()));}
}
  • 测试:当我们正常发送消息
    在这里插入图片描述
  • 测试:当我们把交换机名字换掉
    在这里插入图片描述

3.3.2 可靠性投递return模式

  • 场景:若交换机收到消息,队列没有收到消息,应该如何解决?
  • 需要在application核心配置文件中设置是否回退消息,当消息路由不到消费者
  • spring-rabbitmq-publisher-returns=true 开启回退消息
@Component
@Slf4j
public class ConfirmCallback implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback{@Autowiredprivate RabbitTemplate rabbitTemplate;//注入@PostConstruct //当所有注解执行完后,再执行这个注解public void init(){rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnCallback(this);}/*** 交换机确认回调方法*  发消息,交换机接收到了,回调*  参数*      1. correlationData:保存消息的ID及相关信息*      2. 交换机收到消息 true*      3. null*//*** 交换机确认回调方法*  发消息,交换机接收失败,回调*  参数*      1. correlationData:保存消息的ID及相关信息*      2. 交换机收到消息 false*      3. cause:失败的原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {String id=correlationData!=null?correlationData.getId():"";if(b){log.info("交换机已经收到了ID为{}的消息",id);}else {log.info("交换机未收到了ID为{}的消息,原因是:{}",id,s);}}/*** 消息传递过程中 不可达 消费者的队列时将消息返回给生产者* 只有当消息 不可达 目的地的时候 才进行回调* 参数1:消息体* 参数2:回复代码* 参数3:回复原因* 参数4:交换机* 参数5:路由key*/@Overridepublic void returnedMessage(Message message, int i, String s, String s1, String s2) {log.info("消息{},被交换机{}退回,原因是{},路由是{}",new String(message.getBody()),s1,s,s2);}}

3.4 优先级队列

  • 优先级越高,消息先被消费者消费
  • 官方设置最大优先级 0-255 超出优先级则报错 自己使用时数字不必设置很大,会浪费CPU效率
  1. 生产者
public class PriorityProducer {public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登录用户名");factory.setPassword("RabbitMQ登录密码");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//设置优先级参数AMQP.BasicProperties build = new AMQP.BasicProperties.Builder().priority(10).build();for (int i = 1; i <= 10; i++) {String msg="info"+i;if(i==5){channel.basicPublish("","hi",build,msg.getBytes());}else {channel.basicPublish("","hi",null,msg.getBytes());}}}
}
  1. 消费者
public class PriorityConsumer {public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登录用户名");factory.setPassword("RabbitMQ登录密码");Connection connection = factory.newConnection();Channel channel = connection.createChannel();HashMap<String, Object> map = new HashMap<>();//设置当前队列为优先级队列map.put("x-max-priority",10);channel.queueDeclare("hi",false,false,false,map);channel.basicConsume("hi",true,(consumerTag,message)->{System.out.println("优先级队列接收消息顺序===>"+new String(message.getBody()));},(consumerTag) -> System.out.println("取消回调"));}
}
  • 测试结果:我们定义的是消息5优先级最高,其他消息为默认优先级
    在这里插入图片描述

3.5 消费端限流

  • 参数一:prefetchSize:预先载入的大小 0表示不限制大小
  • 参数二:prefetchCount:预先载入的消息条数
  • 参数三:global:false
  • 注意:autoAck手动应答一定要为false
	//设置每次确定一个消息channel.basicQos(0,1,false);
12
  • 生产者
public class AckProvider {//队列名称public static final String QUEUE_NAME="hello_Ack";//发消息public static void main(String[] args) throws Exception{//创建一个连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip");factory.setUsername("用户");factory.setPassword("密码");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME,false,false,false,null);Scanner scanner = new Scanner(System.in);while (true){String msg = scanner.nextLine();channel.basicPublish("",QUEUE_NAME, null,msg.getBytes());System.out.println("消息发送完毕");}}
}
  • 消费者
public class AckConsumer2 {//队列名称,接收此队列的消息public static final String QUEUE_NAME="hello_Ack";public static void main(String[] args) throws Exception{//创建连接工程ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip");factory.setUsername("用户");factory.setPassword("密码");//创建连接Connection connection = factory.newConnection();//通过连接来获取 信道来收消息Channel channel = connection.createChannel();//声明 接收消息的回调DeliverCallback deliverCallback=(consumerTag, message)-> {//message:包含消息头和消息体,我们只想拿到消息体//若不进行转换,直接输出message我们拿到的则是地址System.out.println(new String(message.getBody()));try {Thread.sleep(1000*5);} catch (InterruptedException e) {e.printStackTrace();}//手动确认消息://参数1:确认队列中那个具体的消息 参数2:是否开启多个消息同时确认channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};//声明 取消消费的回调CancelCallback cancelCallback=consumerTag->{System.out.println("消费消息被中断");};//每次只消费一个channel.basicQos(0,1,false);/*** 消费者消费消息* 1.消费哪个队列* 2.消费成功之后是否要自动答应*   true:代表自动应答*   false:手动应答* 3.消费成者成功消费的回调* 4.消费者取消消费的回调*///关闭自动应答 falsechannel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/556879.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

spring mysql 连接池配置_SpringBoot数据库连接池常用配置

关注公众号&#xff1a;程序猿王国 持续更新&#xff0c;每日分享在配置文件中添加配置如下(我使用的是多数据源)&#xff1a;spring.datasource.primary.urljdbc\:mysql\://localhost\:3306/test?useUnicode\true&characterEncoding\utf-8spring.datasource.prim…

交换机到底是啥?

1、交换机 交换机个人资料&#xff1a; 1.证件照 注&#xff1a;copy 百度百科 我的博客/交换机到底是啥&#xff1f; 这东西乍一看有点像月光宝盒&#xff0c;是不是。 2.个人简介 交换机&#xff08;Switch&#xff09;意为“开关”是一种用于电&#xff08;光&#xff0…

unity3d 动态合批设置_Unity3D SkinnedMeshRenderer合批优化

最近做了性能优化相关的工作&#xff0c;其中一些是关于战斗模块的渲染的。主要是对场景中使用的基于SkinnedMeshRenderer的网格进行了一些合批优化(降DC)&#xff0c;记录如下。项目使用的Unity版本为5.6.4p1。游戏中的战斗模块是这样的&#xff1a;战斗逻辑由服务器承担&…

java——Final修饰成员变量的注意事项

一&#xff0e;Final 修饰成员变量的注意事项 final修饰成员变量&#xff0c;该成员变量必须在创建对象之前进行赋值&#xff0c;否则编译失败final修饰成员变量&#xff0c;固定的不是成员变量拥有的默认值&#xff0c;如果固定的是默认值&#xff0c;那么将导致被final修饰的…

php 编写mysql_php编写数据写入mysql问题

我刚写好的&#xff1a;前台是htm界面填写数据留言板...我刚写好的&#xff1a;前台是 htm界面 填写数据留言板你的姓名&#xff1a;你的性别&#xff1a;男 女你的email&#xff1a;你的留言内容&#xff1a;后台是&#xff1a;if(isset($_POST)) // 只有 $_POST 变量存在&…

java 代码 点到线段的最短距离

// 点到直线的最短距离的判断 点&#xff08;x0,y0&#xff09; 到由两点组成的线段&#xff08;x1,y1&#xff09; ,( x2,y2 )private double pointToLine(int x1, int y1, int x2, int y2, int x0,int y0) {double space 0;double a, b, c;a lineSpace(x1, y1, x2, y2);// …

isNotBlank()和isNotEmpty()的区别——java中,StringUtils类中的一些常用方法

java中&#xff0c;StringUtils类中的一些常用方法 boolean isBlank(String str) //判断某字符串是否为空或长度为0或由空白符(whitespace)构成 StringUtils.isBlank(null) true StringUtils.isBlank("") true StringUtils.isBlank(" ") true StringUt…

java类的加载顺序_java类加载先后顺序

这里讲的不是类加载机制,是类的加载先后顺序。话不多说了&#xff0c;先设定以下场景:package com.jingdong;public class A {public static void main(String[] args){System.out.println(Ib.b);B bnew B();b.ibTest();}}public class B implements Ib{private D d;private C …

MyBatis:模糊查询的4种实现方式

1、根据姓名模糊查询员工信息 1.1、方式一 步骤一&#xff1a;编写配置文件 步骤二&#xff1a;测试 步骤三&#xff1a;分析 此种方式需要在调用处手动的去添加“%”通配符。 1.2、方式二 说明&#xff1a; 使用方式一可以实现模糊查询&#xff0c;但是有一点不方便的地…

java 阻塞 socket_java socket非阻塞I/O

1 非阻塞(Nonblocking)体系结构在这一部分&#xff0c;我将从理论的角度来解释非阻塞体系的结构及其工作原理。这部“喜剧”(当然&#xff0c;如果你喜欢的话也可以称做戏剧)的“人物”如下&#xff1a;●服务器端&#xff1a;接收请求的应用程序。●客户端&#xff1a;向…

java panel frame_Java 版 (精华区)--Frame和Panel的区别【转载】

初学Java的时候一直弄不清Frame和Panel的区别&#xff0c;都是在上面装组件&#xff0c;也没看出什么所以然&#xff0c;一直稀里糊涂地用。最近看来一下Java类库中几个主要类的源文件&#xff0c;才明白了一点所以然。写下了此文&#xff0c;希望能给大家一点帮助。让我们先看…

js解析java对象数组_js接收并转化Java中的数组对象的方法

在做项目时&#xff0c;要向ocx控件下发命令&#xff0c;就要在js中得到java中的对象&#xff0c;然后拼成一种格式&#xff0c;下发下去。。。当对象是一个时比较简单&#xff0c;但如果对象是一个数组时&#xff0c;就略显麻烦了。开始我以为有简单的方式&#xff0c;可以直接…

预处理prepareStatement是怎么防止sql注入漏洞的?

序&#xff0c;目前在对数据库进行操作之前&#xff0c;使用prepareStatement预编译&#xff0c;然后再根据通配符进行数据填值&#xff0c;是比较常见的做法&#xff0c;好处是提高执行效率&#xff0c;而且保证排除SQL注入漏洞。 一、prepareStatement的预编译和防止SQL注入…

java mod函数的使用方法_java 数学计算的具体使用

java.lang.Math 库提供了常用的数学计算工具常量final double E 2.7182818284590452354; // 自然对数底数final double PI 3.14159265358979323846; // 圆周率final double DEGREES_TO_RADIANS 0.017453292519943295; // 角度转弧度final double RADIANS_TO_DEGREES 57.295…

mybatis中的#{}和${}区别,和使用场景

mybatis中的#{}和${}区别 1.#将传入的数据都当成一个字符串&#xff0c;会对自动传入的数据加一个双引号。如&#xff1a;order by #user_id#&#xff0c;如果传入的值是111,那么解析成sql时的值为order by “111”, 如果传入的值是id&#xff0c;则解析成的sql为order by “i…

java 图片深度_将深度图像与RGB图像对齐

我正在尝试使用Kinect使用Python和libfreenect捕获的图像生成点 Cloud &#xff0c;但我无法将深度数据与Kinect拍摄的RGB数据对齐 .cx_d 3.3930780975300314e02cy_d 2.4273913761751615e02fx_d 5.9421434211923247e02fy_d 5.9104053696870778e02fx_rgb 5.292150809829329…

sql模糊查询

1&#xff0c;% 表示任意0个或多个字符。可匹配任意类型和长度的字符&#xff0c;有些情况下若是中文&#xff0c;请使用两个百分号&#xff08;%%&#xff09;表示。 比如 SELECT * FROM [user] WHERE u_name LIKE ‘%三%’ 将会把u_name为“张三”&#xff0c;“张猫三”、…

MyBatis 解决模糊查询包含特殊字符

第一块:MyBatis 实现模糊查询方式 1.1 sql中字符串拼接 SELECT * FROM 表名 WHERE 字段名 LIKE CONCAT(CONCAT(%, #{参数}), %);1 2. 使用 ${…} 代替 #{…} SELECT * FROM 表名 WHERE 字段名 LIKE %${参数}%; 注意:&#xff08;$不能防止sql注入, #{}—> 可以防止sql注入…

java的debug模式_java第六章:debug模式介绍及大量实例练习

1.Debug模式1.1什么是Debug模式【理解】是供程序员使用的程序调试工具&#xff0c;它可以用于查看程序的执行流程&#xff0c;也可以用于追踪程序执行过程来调试程序。1.2Debug模式操作流程【应用】如何加断点选择要设置断点的代码行&#xff0c;在行号的区域后面单击鼠标左键即…

注解RequestMapping中的URI路径最前面到底需不需要加斜线?

注解RequestMapping中的URI路径最前面到底需不需要加斜线? 您有没有这样的困惑&#xff1a;在协同开发过程中&#xff0c;使用RequestMapping&#xff0c;或者是GetMapping&#xff0c;或者是PostMapping这类注解时&#xff0c;有的程序员加了斜线&#xff0c;有的程序员没有…