RabbitMQ基础教程

1.什么是消息队列

消息队列(Message Queue),我们一般简称为MQ。消息队列中间件是分布式系统中重要的组件,具有异步性、松耦合、分布式、可靠性等特点。用于实现高性能、高可用、可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。目前主流的消息队列有RocketMQ、Kafka、RabbitMQ、ZeroMQ、MetaMQ等。消息队列在很多业务场景中都会使用到,例如:异步处理、应用解耦、流量消锋、数据同步、日志处理等等。下面是一个消息队列最简单的架构模型。

img

名词解释:

  • Producer:消息的生产者,负责将消息发送到Broker
  • Broker:消息处理中心(内部通常包含多个队列,称之为queue),负责消息的存储等操作
  • Consumer:消息消费者,负责从Broker中获取消息并进行相应处理

2.RabbitMQ

2.1 简介

RabbitMQ是流行的开源消息队列其中的一种,用erlang语言开发。它基于AMQP协议(AMQP是应用层协议的一个开放标准,称为高级消息队列协议,专门为面向消息的中间件设计)的标准实现。RabbitMQ支持多种语言客户端(如:Java、C#、Python、Ruby、C、PHP)等。在易用性、扩展性、高可用性等方面表现都不错。

2.2 安装

由于RabbitMQ是基于erlang语言编写,在安装前先必须安装erlang环境。

官网地址:https://www.erlang.org/downloads。

img

最新版本为22.2,Windows用户可直接下载OPT 22.2 Windows 64-bit Binary直接安装即可。

img

接着去RabbitMQ官网下载最新版本的安装包进行安装。

官网地址:https://www.rabbitmq.com/install-windows.html#chocolatey

img

点击下载rabbitmq-server-3.8.2.exe的Bintray安装包,下载后直接打开安装。

img

如果是macOS用户,可以通过Homebrew直接安装,并且Homebrew在安装RabbitMQ时会自动下载并安装erlang环境。

img

2.3 配置环境变量

将RabbitMQ安装目录下的sbin子目录加入到环境变量的Path中。

img

img

2.4 启动/停止服务

启动或停止服务有应用方式启动和服务启动两种方式。

应用方式启动:

命令说明
rabbitmq-server直接启动,关闭窗口后应用就会停止
rabbitmq-server -detached后台启动,后台独立进程方式运行,关闭窗口后应用不会关闭
rabbitmqctl stop停止应用

示例:

img

服务方式启动:

当安装完后可以在服务列表中查看到RabbitMQ这个服务,可以在这里直接启用或停止。

img

也可以在命令行使用相关命令启动或关闭服务(注意:控制台要以管理员方式运行)

命令说明
rabbitmq-service start启动服务
rabbitmq-service stop停止服务
rabbitmq-service disable禁用服务
rabbitmq-service enable启用服务

示例:

img

2.5 可视化管理插件

RabbitMQ默认提供了一个rabbitmq_management可视化管理插件,方便我们通过web访问的方式来管理和查看RabbitMQ。此插件默认是禁用的,因此需要手动启用它。在命令行使用rabbitmq-plugins来启用插件。如下:

rabbitmq-plugins enable rabbitmq_management

img

启用后可以在浏览器中输入http://localhost:15672来访问登录页面,默认登陆账号和密码都为guest

img

登陆成功后进入功能管理首页。

img

在后续的示例中会讲解这里面的具体内容。

2.6 用户管理

RabbitMQ默认提供了一个guest用户,我们也可以创建新用户并给用户分配相应的权限。创建用户有两种方式,一种是使用rabbitmqctl工具,另一种是使用可视化的方式操作。

使用可视化操作:

在web管理登陆页面登陆后,点击Admin选项,这里会列出所有的用户信息,默认只有一个guest用户,如下:

img

点击下面的Add a user,在展开的页面中填写新用户的姓名、密码以及身份标签,确认无误后点击Add User按钮保存。如下:

img

此时用户列表就会多出一个新建的用户,如下:

img

但这个用户还不能正常使用,因为还未分配访问的虚拟主机(虚拟主机的概念会在下个章节说明)以及权限,所以点击列表中的用户名(也就是wangl)跳转到如下页面:

img

说明:

  • Virtual Host:设置虚拟主机的路径,默认为“/”,因为没有新创建别的虚拟主机,所以只有一个默认的。
  • Configure regexp:设置用户的配置权限,支持正则表达式(.*表示所有)。
  • Write regexp:设置用户的写权限,支持正则表达式(.*表示所有)。
  • Read regexp:设置用户的读权限,支持正则表达式(.*表示所有)。

最后点击Set permission按钮保存,然后回到用户列表,这时新建的用户就能正常使用了

img

登出后使用新用户登陆来访问。

使用rabbitmqctl工具:

在命令行可以使用rabbitmqctl,它是RabbitMQ中间件的一个命令行管理工具。

1.创建用户:

命令:rabbitmqctl add_user username password

示例:rabbitmqctl add_user user1 123

2.删除用户:

命令:rabbitmqctl delete_user username

示例:rabbitmqctl delete_user user1

3.修改密码:

命令:rabbitmqctl change_password username newpassword

示例:rabbitmqctl change_password user1 321

4.列出所有用户:

命令:rabbitmqctl list_users

5.设置用户权限:

命令:rabbitmqctl set_permissions [-p vhostpath] username

示例:rabbitmqctl set_permissions -p / user1 .* .* .*

6.删除用户权限:

命令:rabbitmqctl clear_permissions [-p vhostpath] username

示例:rabbitmqctl clear_permissions -p / user1

2.7 配置文件

不同的操作系统默认存放的配置文件目录是不一样的(也可以通过环境变量指定配置文件的目录),下面列出在不同系统中默认配置文件的存放位置。

img

以Windows为例,我们在C:\Users%USERNAME%\AppData\Roaming\RabbitMQ目录下创建一个名为rabbitmq.conf的配置文件。

img

使用记事本打开添加如下配置信息可以修改默认的配置。

listeners.tcp.default = 5673
management.listener.port = 15673
num_acceptors.tcp = 10

说明:

属性描述默认值
listeners.tcp.defaultAMQP连接的默认监听端口,也就是访问RabbitMQ的默认端口号5672
management.listener.port访问web管理插件的默认端口15672
num_acceptors.tcp接受tcp连接的erlang进程数10

这里我们修改了默认的tcp连接端口以及web管理插件的默认端口,配置完成之后记得要重启RabbitMQ服务,接着重新打开web管理页面,使用修改后的端口进行访问。

img

参考:https://www.linuxidc.com/Linux/2019-03/157354.htm

2.8 AMQP通信模型

img

名词解释:

  • Broker:消息处理中心,也就是RabbitMQ Server。
  • Virutal Host:虚拟主机相当于一个命名空间。用于隔离不同的Exchange和Queue。每个Virutal Host内部有自己的Exchange和Queue,他们之间互不影响。我们可以为不同用户指定不同的Virutal Host,这样不同用户只能访问当前设置的Virutal Host下的Exchange和Queue,而不能访问其他的Virutal Host。在RabbitMQ有一个默认的Virutal Host就是“/”。我们也可以通过可视化插件或者使用rabbitmqctl工具来创建新的Virutal Host。
  • Exchange:Exchange也称之为交换机,核心作用就是将消息生产者(Producer)发送过来的message依据指定的路由规则发送到特定的Queue中。
  • Queue:存放message的队列,消息最终会被消息消费者(Consumer)取出消费。
  • Producer:消息的生产者,负责将消息发送到交换机(Exchange)中。
  • Consumer:消息消费者,负责从Queue中获取消息并进行相应处理。
  • Binding:Binding就是将一个或者多个消息队列(Queue)绑定到交换机(Exchange)上。绑定时会设置一个路由的key(一种路由规则表达式)。这样当Exchange接收到Producer发送的消息时,会根据路由规则将消息发送到具体的Queue中。

3. 基础应用

RabbitMQ支持多种语言的客户端,在这个章节中将使用Java客户端来操作RabbitMQ。新建Maven项目并添加依赖。

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.7.0</version>
</dependency>

3.1 Queue

直接使用Queue是实现消息发布订阅最简单的一种方式,内部会通过一个默认的Exchange(交换机)来将消息路由到Queue中。

img

Producer示例:

public class Producer {/*** 消息队列名称*/private static final String QUEUE_NAME = "test_queue";public static void main(String[] args) {//创建连接工厂并设置RabbitMQ主机地址,默认端口为5672ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("localhost");//创建连接对象,并使用连接对象构建一个消息通信的通道try (Connection conn = connectionFactory.newConnection();//使用连接对象构建一个消息通信的通道Channel channel = conn.createChannel()) {/*** 创建队列* 参数一:队列名称* 参数二:队列是否持久化(true为持久化)* 参数三:是否排他(true为排他),排他性指的是当exclusive为true时,*        队列只对首次创建的connection是可见的,false则表示被所有创建的connection都可见* 参数四:如果设置为true,表示连接断开时会自动删除此队列* 参数五:队列的其他属性设置,一个map集合*/channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message = "hello world";/*** 发布消息* 参数一:设置为"",表示未指定交换机的名称,此时会通过一个默认的交换机来路由消息* 参数二:队列名称* 参数三:消息路由头的其他属性,这里未添添加任何属性,设置为null* 参数四:消息体,将其转换为字节数组*/channel.basicPublish("", QUEUE_NAME, null, message.getBytes());} catch (Exception e) {throw new RuntimeException(e);}}
}

运行Producer,打开web管理界面,在Queues的选项里可以查看到新创建一个名为"test_queue"的队列,并且存有一条发布的消息,如下:

img

注意:队列会在第一次使用时创建,如果之前已经创建则不会再创建。

Cosumer示例:

public class Consumer {/*** 消息队列名称*/private final static String QUEUE_NAME = "test_queue";public static void main(String[] argv) throws Exception {//创建连接工厂并设置RabbitMQ主机地址,默认端口为5672ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("localhost");//创建连接对象,并使用连接对象构建一个消息通信的通道Connection connection = connectionFactory.newConnection();//创建通信通道Channel channel = connection.createChannel();//创建队列(如果存在则不再创建)channel.queueDeclare(QUEUE_NAME, false, false, false, null);//接收消息时所需的回调接口DeliverCallback callback = (consumerTag, delivery) -> {//获取消息体String message = new String(delivery.getBody(), "UTF-8");System.out.println("ConsumerTag:" + consumerTag);};//接收消息/*** 接收消息* 参数一:队列名称* 参数二:是否自动签收(true为自动签收),自动签收就是*        消息处理完后会自动给rabbitmq回馈一条消息,表示这条消息已经处理完毕* 参数三:消息的回调接口,也就是上面声明的DeliverCallback,用于接收消息体* 参数四:消费者取消订阅时的回调接口,会传入一个consumerTag签收标签*/channel.basicConsume(QUEUE_NAME, true, callback, consumerTag -> {});}
}

注意:Consumer在创建Connection时不要放在try-with-resources语句块中,避免Connection自动关闭导致程序结束。因为Consumer运行后会产生阻塞,需要一直监听队列是否有新的消息,如果有则从队列取出并消费。

运行Consumer,在控制台查看接收的消息。

img

再次查看web管理控制台,此时队列的消息已经被消费掉。

img

大家可以反复运行Producer进行测试。

3.2 Exchange

前面的例子主要是讲解Queue的用法,并通过一个默认的Exchange(交换机)来路由消息。在这个章节中我们主要来了解其他几种Exchange的用法,Exchange的概念在前面的AMQP的通信模型中已经介绍过,它主要是根据路由key将转发消息到绑定的队列(Queue)上。

img

Exchange的类型有Topic、Direct、Fanout、Headers这四种。而Headers类型的交换机使用场景较少,我们主要学习Topic、Direct、Fanout这几种交换机的用法。

3.2.1 Topic

作用:将消息中的Routing key与该Exchange关联的所有Binding中的Routing key进行比较,如果匹配(可以通过通配符进行模糊匹配),则发送到该Binding对应的Queue中。

CusumerA示例:

public class ConsumerA {/*** 定义Exchange名称*/private final static String EXCHANGE_NAME = "logs.topic";/*** 定义一个Queue名称,这里指定为info.queue*/private final static String QUEUE_NAME = "info.queue";public static void main(String[] args) throws Exception {//初始化连接工厂,并指定rabbitmq的主机地址, 默认端口为5672ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");//创建连接对象,并使用连接对象构建一个消息通信的通道Connection conn = factory.newConnection();Channel channel = conn.createChannel();//声明Exchange,类型指定为为topic, //第三个参数是否持久化,true为持久化,默认值为falsechannel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, false);//声明queuechannel.queueDeclare(QUEUE_NAME, false, false, true, null);//为queue和exchange绑定路由key(使用"*"进行模糊绑定),表示任意以".info"结尾的key//的消息都会发送到这个queue中channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.info");//消息回调接口DeliverCallback callback = (consumerTag, delivery) -> {//获取路由keySystem.out.println("Routing key: " + delivery.getEnvelope().getRoutingKey());//获取消息体String message = new String(delivery.getBody(), "UTF-8");System.out.println("ConsumerA receive message: " + message);};//接收消息channel.basicConsume(QUEUE_NAME, true, callback, consumerTag -> {});}

CusomerB示例:

public class ConsumerB {/*** 定义Exchange名称*/private final static String EXCHANGE_NAME = "logs.topic";/*** 定义一个Queue名称,这里指定为error.queue*/private final static String QUEUE_NAME = "error.queue";public static void main(String[] args) throws Exception {//初始化连接工厂,并指定rabbitmq的主机地址, 默认端口为5672ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");//创建连接对象,并使用连接对象构建一个消息通信的通道Connection conn = factory.newConnection();Channel channel = conn.createChannel();//声明交换机,类型为topicchannel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);//声明queuechannel.queueDeclare(QUEUE_NAME, false, false, true, null);//为queue和exchange绑定路由key(使用"*"进行模糊绑定),表示任意以".error"结尾的key//的消息都会发送到这个queue中channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.error");//接收消息DeliverCallback callback = (consumerTag, delivery) -> {//获取路由keySystem.out.println("Routing key: " + delivery.getEnvelope().getRoutingKey());//获取消息体String message = new String(delivery.getBody(), "UTF-8");System.out.println("ConsumerB receive message: " + message);};channel.basicConsume(QUEUE_NAME, true, callback, consumerTag -> {});}
}

分别运行ConsumerA和ConsumerB,打开web管理页面,在Exchanges的页面中我们可以看到创建了一个名为logs.exchange,类型为topic的Exchange。

img

在Queues的页面中可以看到创建了error.queue和info.queue两个queue。

img

在Exchanges页面的列表中点击logs.topic我们创建的这个exchange,可以查看Exchange和queue的绑定信息,以及路由的key。

img

同样在Queues页面的的列表中点击error.queue或者info.queue,也可以查看相互绑定的信息。

img

Producer示例:

public class Producer {/*** Exchange名称*/private final static String EXCHANGE_NAME = "logs.topic";public static void main(String[] args) throws Exception {//初始化连接工厂,并指定rabbitmq的主机地址,默认端口为5672ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");//创建连接对象,并使用连接对象构建一个消息通信的通道try(Connection conn = factory.newConnection();Channel channel = conn.createChannel()) {//创建交换机,类型为topic//第三个参数是否持久化,true为持久化,默认值为falsechannel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, false);//定义一个info的messageString infoMessage = "info message...";//定义一个error的messageString errorMessage = "error message...";//将消息发送到交换机,并指定不同路由key//第三个参数是否持久化消息,如果需要持久化则设置为MessageProperties.PERSISTENT_TEXT_PLAIN。//如果不需要持久化,则设置为nullchannel.basicPublish(EXCHANGE_NAME, "log.error", null, errorMessage.getBytes());channel.basicPublish(EXCHANGE_NAME, "log.info", null, errorMessage.getBytes());} catch (Exception e) {e.printStackTrace();}}
}

运行Producer,将两条消息发送到Exchange,此时Exchange会根据消息中指定的路由key将消息不同的消息发送到不同的Queue中。

结果:

img

img

3.2.2 Direct

作用:将消息中的Routing key与该Exchange关联的所有Binding中的Routing key进行比较,如果完全匹配(注意:是完全匹配),则发送到该Binding对应的Queue中。

ConsumerA示例:

public class ConsumerA {/*** Exchange名称*/private final static String EXCHANGE_NAME = "logs.direct";/*** Queue名称*/private final static String QUEUE_NAME = "info.queue";public static void main(String[] args) throws Exception {//初始化连接工厂,并指定rabbitmq的主机地址, 默认端口为5672ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");//创建连接对象,并使用连接对象构建一个消息通信的通道Connection conn = factory.newConnection();Channel channel = conn.createChannel();//声明Exchange,类型为directchannel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//声明queuechannel.queueDeclare(QUEUE_NAME, false, false, true, null);//为queue和exchange绑定路由key,这里不能使用模糊匹配,direct类型要求路由的key必须完全匹配channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "log.info");//接收消息DeliverCallback callback = (consumerTag, delivery) -> {System.out.println("Routing key: " + delivery.getEnvelope().getRoutingKey());String message = new String(delivery.getBody(), "UTF-8");System.out.println("ConsumerA receive message: " + message);};channel.basicConsume(QUEUE_NAME, true, callback, consumerTag -> {});}
}

ConsumerB示例:

public class ConsumerB {/*** 定义Exchange名称*/private final static String EXCHANGE_NAME = "logs.direct";/*** 定义一个Queue名称,这里指定为error.queue*/private final static String QUEUE_NAME = "error.queue";public static void main(String[] args) throws Exception {//初始化连接工厂,并指定rabbitmq的主机地址, 默认端口为5672ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");//创建连接对象,并使用连接对象构建一个消息通信的通道Connection conn = factory.newConnection();Channel channel = conn.createChannel();//创建交换机,类型为directchannel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//创建queuechannel.queueDeclare(QUEUE_NAME, false, false, true, null);//为queue和exchange绑定路由key,这里不能使用模糊匹配,direct类型要求路由的key必须完全匹配channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "log.error");//接收消息DeliverCallback callback = (consumerTag, delivery) -> {System.out.println("Routing key: " + delivery.getEnvelope().getRoutingKey());String message = new String(delivery.getBody(), "UTF-8");System.out.println("ConsumerB receive message: " + message);};channel.basicConsume(QUEUE_NAME, true, callback, consumerTag -> {});}
}

Producer示例:

public class Producer {/*** Exchange名称*/private final static String EXCHANGE_NAME = "logs.direct";public static void main(String[] args) throws Exception {//初始化连接工厂,并指定rabbitmq的主机地址,默认端口为5672ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");//创建连接对象,并使用连接对象构建一个消息通信的通道try(Connection conn = factory.newConnection();Channel channel = conn.createChannel()) {//创建交换机,类型为directchannel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//定义一个info的messageString infoMessage = "info message...";//定义一个error的messageString errorMessage = "error message...";//将消息发送到交换机,并指定不同路由keychannel.basicPublish(EXCHANGE_NAME, "log.info", null, infoMessage.getBytes());channel.basicPublish(EXCHANGE_NAME, "log.error", null, errorMessage.getBytes());} catch (Exception e) {e.printStackTrace();}}
}

运行ConsumerA,ConsumerB以及Producer

结果:

img

img

3.2.3 Fanout

说明:直接将消息转发到所有binding的对应queue中,这种exchange在路由转发的时候,忽略Routing key,直接将消息发送到所有绑定的queue中,因此所有队列都会接收到相同的消息,相当于广播。

ConsumerA示例:

public class ConsumerA {/*** Exchange名称*/private final static String EXCHANGE_NAME = "logs.fanout";/*** Queue名称*/private final static String QUEUE_NAME = "info.queue";public static void main(String[] args) throws Exception {//初始化连接工厂,并指定rabbitmq的主机地址, 默认端口为5672ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");//创建连接对象,并使用连接对象构建一个消息通信的通道Connection conn = factory.newConnection();Channel channel = conn.createChannel();//声明Exchange,类型为fanoutchannel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);//声明queuechannel.queueDeclare(QUEUE_NAME, false, false, true, null);//为queue和exchange绑定路由key,这里将路由key可设置为任意字符,通常设置为""channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "aa");//接收消息DeliverCallback callback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("ConsumerA receive message: " + message);};channel.basicConsume(QUEUE_NAME, true, callback, consumerTag -> {});}
}

ConsumerB示例:

public class ConsumerB {/*** 定义Exchange名称*/private final static String EXCHANGE_NAME = "logs.fanout";/*** 定义一个Queue名称,这里指定为error.queue*/private final static String QUEUE_NAME = "error.queue";public static void main(String[] args) throws Exception {//初始化连接工厂,并指定rabbitmq的主机地址, 默认端口为5672ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");//创建连接对象,并使用连接对象构建一个消息通信的通道Connection conn = factory.newConnection();Channel channel = conn.createChannel();//声明Exchange,类型为fanoutchannel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);//声明queuechannel.queueDeclare(QUEUE_NAME, false, false, true, null);//为queue和exchange绑定路由key,这里将路由key可设置为任意字符,通常设置为""channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "bb");//接收消息DeliverCallback callback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("ConsumerB receive message: " + message);};channel.basicConsume(QUEUE_NAME, true, callback, consumerTag -> {});}
}

Producer示例:

public class Producer {/*** Exchange名称*/private final static String EXCHANGE_NAME = "logs.fanout";public static void main(String[] args) throws Exception {//初始化连接工厂,并指定rabbitmq的主机地址,默认端口为5672ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");//创建连接对象,并使用连接对象构建一个消息通信的通道try(Connection conn = factory.newConnection();Channel channel = conn.createChannel()) {//创建交换机,类型为fanoutchannel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);//定义一个info的messageString infoMessage = "info message...";//定义一个error的messageString errorMessage = "error message...";//将消息发送到交换机,路由key可任意设置,通常设置为""channel.basicPublish(EXCHANGE_NAME, "", null, infoMessage.getBytes());channel.basicPublish(EXCHANGE_NAME, "", null, errorMessage.getBytes());} catch (Exception e) {e.printStackTrace();}}
}

运行ConsumerA,ConsumerB以及Producer

结果:

img

img

ConsumerA和ConsumerB同时都收到info和error的消息。

4. 整合Spring Boot

4.1 示例

添加依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

yml配置:

spring:rabbitmq:addresses: 127.0.0.1# 连接端口,默认5672port: 5672# 设置登陆认证的账号密码,默认为guestusername: guestpassword: guest# 虚拟主机地址,默认为"/"virtual-host: /# 设置连接诶超时时间connection-timeout: 5000# 配置消费者监听设置listener:simple:# 最小消息消费线程数,这里表示每个Listener容器将开启2个线程去处理消息# 在2.0版本后可以在@RabbitListener注解中配置该参数concurrency: 2# 最大消费线程数max-concurrency: 5# 每个消费线程能从队列获取的消息数量# 每个customer会从消息队列中预取一些消息放入自己的LinkedBlockingQueue中进行消费,# 注意,每个customer线程都有自己对应的BlockingQueueprefetch: 1# 消息签收模式# none:表示没有任何的应答会被发送# manual:表示监听者必须通过调用Channel.basicAck()来告知所有的消息# auto:表示自动应答,除非坚挺着抛出异常,这是默认配置方式acknowledge-mode: auto# 当消费者监听器产生异常时是否将消息重新放回队列,默认值为truedefault-requeue-rejected: true

配置类

在配置类中主要声明Exchange、Queue等Bean的装配

@Configuration
public class RabbitConfig {public static final String EXCHANGE_NAME = "order.exchange";public static final String QUEUE_NAME = "order.queue";public static final String ROUTER_KEY = "order.*";/*** 装配Topic类型的Exchange* 也可以装配其他类型如:DirectExchange、FanoutExchange* TopicExchange构造方法第一个参数指定交换机名称,第二个参数是否持久化交换机,* 第三个参数是否自动删除交换机*/@Beanpublic TopicExchange exchange(){//return new TopicExchange(EXCHANGE_NAME);return new TopicExchange(EXCHANGE_NAME, false, true);}/*** 装配消息队列* Queue构造方法第一个参数指定Queue的名称,第二个参数表示是否持久化queue* @return*/@Beanpublic Queue queue() {return new Queue(QUEUE_NAME, false);}/*** 将queue绑定到exchange*/@Beanpublic Binding binding(){return BindingBuilder.bind(queue()).to(exchange()).with(ROUTER_KEY);}
}

Consumer示例:

@Service
public class ConsumerService {/*** 使用@RabbitListener注解进行监听,通过queues属性指定要从哪个queue中消费消息* @Payload注解标注的参数为转换后的消息对象* @Headers注解标注的参数为消息头* @param message 消息体内容* @param headers 消息头* @param channel 消息通道*/@RabbitListener(queues = RabbitConfig.QUEUE_NAME)public void receiveMessage(@Payload String message,@Headers Map<String, Object> headers,Channel channel) throws IOException {System.out.println("接收消息:" + message);}
}

上面的消费者使用的是自动签收模式,如果设为手动签收,也就是在yml中设置了acknowledge-mode: manual,那么在签收时需要调用Channel的basicAck()方法来确认签收的消息。

//当手动确认签收时,需要自行给rabbitmq回馈一条消息,这条消息已经处理完毕
//从headers获取一个签收标签
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
//确认签收,basicAck方法参入一个签收标签,第二个参数表示是否支持批量签收,false表示单个签收
channel.basicAck(deliveryTag, false);

Producer示例:

@Service
public class ProducerService {/*** 注入RabbitTemplate*/@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 发送文本消息* @param message*/public void sendMessage(String message){//创建消息的唯一IDCorrelationData correlationData = new CorrelationData();//这里使用订单ID作为消息的IDcorrelationData.setId(UUID.randomUUID().toString());//发送消息rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, "order.message", message, correlationData);}
}

测试:

编写单元测试,注入ProducerService来发送消息。

@SpringBootTest
class Ch04ApplicationTests {@Autowiredprivate ProducerService service;@Testpublic void testSendMessage() {service.sendMessage("Hello world");}}

先运行SpringBoot启动类,然后执行单元测试,查看ConsumerService的接收结果。

img

4.2 @RabbitListener注解

@RabbitListener可以标注在方法上或者类上,Spring会根据不同的@RabbitListener注解创建并启动不同的监听容器(MessageListenerContainer),并通过queues属性指定需要监听的队列。每个监听容器都有自己的唯一标识,可以通过id属性来标识,如果不指定id属性则会自动创建一个默认的唯一标识。

/*** @param message 消息内容* @param headers 消息头,需要@Headers或者@Header注解标注(可选参数)* @param channel 消息通道(可选参数)
*/
@RabbitListener(id="001", queues = "queue.a")
public void consumerA(String message,@Headers Map<String, Object> headers,Channel channel) {...
}@RabbitListener(id="002", queues = "queue.b")
public void consumerB(String message,@Headers Map<String, Object> headers,Channel channel) {...
}

除了可以通过配置类来声明交换机、队列与绑定,也可以使用@RabbitListener提供的bindings属性来进行声明绑定。例如:

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "order.queue", durable = "true"),exchange = @Exchange(name = "order.exchange", type = ExchangeTypes.TOPIC),key = "order.*")
public void receive(Long id) {...
}

4.3 @RabbitHandler注解

当消费端需要接收不同的消息类型时,可以结合@RabbitHandler搭配使用。将@RabbitListener注解标注在类上,在不同方法上使用@RabbitHandler标注,这样Listener监听容器会根据消息转换后的类型来调用相应的方法来处理。

@RabbitListener(queues = {"queue.a","queue.b"})
public class ConsumerService {@RabbitHandlerpublic void receiveA(String message) {...}@RabbitHandlerpublic void receiveB(User message) {...}@RabbitHandlerpublic void receiveC(Student message) {...}}  

4.4 自定义消息转换器

Spring默认使用的消息转换器是SimpleMessageConverter,只能处理基于文本的内容,序列化的Java对象和字节数组。

img

当然也可以自定义MessageConverter,例如将发送的一个实体把它序列化成Json,接收时又将Json自动转换为一个实体,那么可以使用Jackson2JsonMessageConverter。

添加依赖:

转换Json时需要用到Jackson

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-json</artifactId>
</dependency>

配置类:

只需在配置类中添加Jackson2JsonMessageConverter的装配

@Configuration
public class RabbitConfig {	...@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}
}

Order示例:

用于Producer将一个Order序列化为Json后发送到MQ,Consumer从MQ接收Json后将其反序列化为一个Order对象。

public class Order {/*** 订单ID*/private String orderId;/*** 订单消息*/private String message;public String getOrderId() {return orderId;}public void setOrderId(String orderId) {this.orderId = orderId;}public String getMessage() {return message;}public void setMessage(String message) {this.message = message;}
}

Producer示例:

@Service
public class ProducerService {/*** 注入RabbitTemplate*/@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 发送对象,使用自定义消息转换器转换为json* @param order*/public void sendObject(Order order) {//创建消息的唯一IDCorrelationData correlationData = new CorrelationData();//这里使用订单ID作为消息的IDcorrelationData.setId(order.getOrderId());//发送消息rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, "order.message", order, 		correlationData);}
}

Consumer示例:

@Service
public class ConsumerService {/*** 使用自定义消息转换器* 使用@RabbitListener注解进行监听,通过queues属性指定要从哪个queue中消费消息* @Payload注解标注的参数为转换后的消息对象* @Headers注解标注的参数为消息头* @param order 转换后的消息对象* @param headers 消息头* @param channel 消息通道*/@RabbitListener(queues = RabbitConfig.QUEUE_NAME)public void receiveObject(@Payload Order order,@Headers Map<String, Object> headers,Channel channel) throws IOException {System.out.println("接收消息:");System.out.println("订单编号:" + order.getOrderId());System.out.println("订单明细:" + order.getMessage());}
}

测试:

编写单元测试方法

@Test
public void testSendObject() {Order orderDTO = new Order();orderDTO.setOrderId("10001");orderDTO.setMessage("test order...");service.sendObject(orderDTO);
}

先运行SpringBoot启动类,执行单元测试并查看Consumer接收结果:

img

5. ACK机制

ACK (Acknowledge character)是一种应答确认符号。用于在网络通信中,数据接收方成功接收到消息后会给发送方返回一个确认信息。

5.1 发送确认

5.1.1 ConfirmCallback

当消息的发送端发送一条消息到Broker时,为了确保这条消息成功发送到Exchange,因此Broker可以返回一个确认信息给发送端,也就是Producer的Confirm模式。

yml配置:

设置publisher-confirm-type为correlated

spring:rabbitmq:addresses: 127.0.0.1port: 5672username: guestpassword: guestvirtual-host: /connection-timeout: 5000# 启用ConfirmCallback模式publisher-confirm-type: correlated

Producer示例:

public void sendMessage(String message){//使用uuid作为消息的唯一IDCorrelationData correlationData = new CorrelationData();correlationData.setId(UUID.randomUUID().toString());//发送消息rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, "order.message", message, correlationData);//通过setConfirmCallback设置一个回调来确认消息是否成功发布到Exchange中//如果发布成功ack则为true,失败为falserabbitTemplate.setConfirmCallback((cdata, ack, cause) -> {//获取CorrelationData中的IDString eventId = cdata.getId();if (ack) {System.out.println("投递成功:"+eventId);} else {System.out.println("投递失败:"+eventId );}});}
5.1.2 ReturnsCallback

上面的confrim模式只能确认消息是否正确到达Exchange中,但不能保证消息正确投递到目标 queue里。如果一定要确保消息投递到queue中,就需要使用ReturnCallback。

yml配置:

将publisher-returns和template.mandatory设置为true

spring:rabbitmq:addresses: 127.0.0.1port: 5672username: guestpassword: guestvirtual-host: /connection-timeout: 5000# 启用ReturnCallback模式publisher-returns: true# 当mandatory标志位设置为true时,如果exchange根据自身类型和routingKey无法找到一个合适的queue,# 那么broker会调用basic.return方法将消息返还给生产者。设置为false时,出现上述情况broker会直接将消       息丢弃template:mandatory: true

Producer示例:

public void sendMessage(String message){//使用uuid作为消息的唯一IDCorrelationData correlationData = new CorrelationData();correlationData.setId(UUID.randomUUID().toString());//发送消息rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, "order.message", message, correlationData);//通过setReturnsCallback设置回调来确认消息是否成功发布到queue中//注意,只有消息未正确到达queue时才会执行此回调此方法rabbitTemplate.setReturnsCallback(returnedMessage -> {System.out.println("ReturnsCallback=====>");System.out.println(returnedMessage.getMessage());System.out.println(returnedMessage.getReplyCode());System.out.println(returnedMessage.getReplyText());System.out.println(returnedMessage.getRoutingKey());});}

当消息未正确到达queue时,就会执行ReturnCallback。

5.2 消费确认

当消费端在消费一条消息时,Broker会等待消费端返回一条ACK来确认消息是否已成功消费,如果消费成功,那么Broker就会从队列中移除此消息。在Springboot中配置ack有none、auto、manual三种模式。

5.2.1 NONE

none表示不做任何的签收确认(相当于无ack),不管消费者是否正常消费消息,broker都认为消息已经被正常消费,并从broker中移除此消息。这样会导致消费端在处理消息的过程中如果产生异常,那么消息就会丢失。

yml配置:

spring:rabbitmq:addresses: 127.0.0.1port: 5672username: guestpassword: guestvirtual-host: /connection-timeout: 5000listener:simple:concurrency: 2max-concurrency: 5prefetch: 1# ack确认机制# none:表示不做任何确认签收(相当于无ack)acknowledge-mode: none
5.2.2 AUTO

auto表示自动确认,自动确认会根据消费端在处理消息的过程是否抛出异常来决定返回ack或者nack给broker。

yml设置:

spring:rabbitmq:addresses: 127.0.0.1port: 5672username: guestpassword: guestvirtual-host: /connection-timeout: 5000listener:simple:concurrency: 2max-concurrency: 5prefetch: 1# ack确认机制# auto:表示自动确认(默认配置)acknowledge-mode: auto# 当消费者产生异常时是否将消息重新放回队列,默认值为truedefault-requeue-rejected: true

需要注意的是,在自动确认模式下,default-requeue-rejected设置为true并不能完全决定是否重新放回队列,另外一个决定因素是具体装配了哪一个MessageRecoverer(消息回收器)的Bean,它的作用是在消费消息失败后要做什么样的处理。默认使用是RejectAndDontRequeueRecoverer。下面分别说明几种有常见的MessageRecoverer实现。

RejectAndDontRequeueRecoverer:

这是默认使用MessageRecoverer,只要在消费端抛出除AmqpRejectAndDontRequeueException以外的其他异常并且default-requeue-rejected设置为true的情况下,消息都会自动重新投递到队列中,否则就会丢弃。

ImmediateRequeueMessageRecoverer:

这个会在抛出除AmqpRejectAndDontRequeueException以外的其他异常会自动返回nack,会忽略default-requeue-rejected的设置,并立即将消息放回当前队列。

@Configuration
public class RabbitConfig {/*** 装配ImmediateRequeueMessageRecoverer* @return*/@Beanpublic MessageRecoverer messageRecoverer() {return new ImmediateRequeueMessageRecoverer();}
}  

RepublishMessageRecoverer:

这个会在消费失败后将消息投递到自己指定的一个队列中,由其他订阅的消费者来处理。

@Configuration
public class RabbitConfig {public static final String EXCHANGE_NAME = "error.exchange";public static final String QUEUE_NAME = "error.queue";public static final String ROUTER_KEY = "error.key";@Autowiredprivate RabbitTemplate rabbitTemplate;@Beanpublic DirectExchange exchange(){return new DirectExchange(EXCHANGE_NAME, false, true);}@Beanpublic Queue queue() {return new Queue(QUEUE_NAME, false, true);}@Beanpublic Binding binding(){return BindingBuilder.bind(queue()).to(exchange()).with(ROUTER_KEY);}/*** 装配RepublishMessageRecoverer* @return*/@Beanpublic MessageRecoverer messageRecoverer() {return new RepublishMessageRecoverer(rabbitTemplate, EXCHANGE_NAME, ROUTER_KEY);}
}
5.2.3 MANUAL

manual表示手动确认,也就是在消费端的代码中手动调用basicAck方法确认签收。如果产生异常,可以通过basicNack或者basicReject拒绝签收。需要注意的是,当ack模式为manual时,default-requeue-rejected设置是无效的,必须在basicNack或者basicReject拒绝签收时指定是否重新放回队列。

yml配置:

spring:rabbitmq:addresses: 127.0.0.1port: 5672username: guestpassword: guestvirtual-host: /connection-timeout: 5000listener:simple:concurrency: 2max-concurrency: 5prefetch: 1# ack确认机制# manual:表示手动确认acknowledge-mode: manual

手动确认签收:

在消费端通过调用basicAck方法来确认签收

@RabbitListener(queues = RabbitConfig.QUEUE_NAME)
public void receiveMessage(@Payload String message,@Headers Map<String, Object> headers,Channel channel) throws IOException {System.out.println("接收消息:" + message);//从headers中获取一个唯一标识Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);//确认签收//参数1:消息投递的唯一标识//参数2:是否支持批量签收(true表示批量确认,false表示单个确认)channel.basicAck(deliveryTag, false);
}

在手动确认时,方法参数多了headers和channel两个参数。header表示消息的头信息,channel表示当前的消息通道。在投递一个消息时,消息头中会包含一个delivery tag,这个值表示本次投递的唯一标识,在同一个Channel中,这个值是唯一的。delivery tag长度为64为,值从1开始,每发送一次消息该值会递增1。消费者端在确认消息时带上此参数,用于告诉RabbitMQ某次投递已经正确应答。通过调用channel的basicAck方法来确认应答。

拒绝签收:

消费端在处理消息时可以依据业务规则来决定是否确认签收或拒绝签收。如果需要拒绝签收,可以调用channel的basicNack或者basicReject方法

//参数1:消息投递的标签
//参数2:是否支持批量拒绝
//参数3:是否重新放回队列(true表示放回)
channel.basicNack(deliveryTag, false, true);
//参数1:消息投递的标签
//参数2:是否重新放回队列(true表示放回)
channel.basicReject(deliveryTag, true);

两个方法区别在于basicReject一次只能拒绝单条消息,basicNack可以拒绝多条。并且这两个方法在拒绝签收时可以设置是否将消息重新放回消息队列。

6. 重试机制

在消息投递或者消费的过程因为网络或异常导致消息不能正常投递和消费时,可以采用重试机制。需要注意的是,这里的重试和RabbitMQ无关,RabbitMQ本身是不提供重试的功能,而是由Spring的retry框架实现,具体可以参考spring-retry模块的使用。

6.1 发送端重试

发送端重试是针对RabbitTemplate,在消息的投递过程中由于网络原因连接失败或者其他的错误导致消息没有正常投递到Broker,那么可以启用template的retry功能。

yml:

spring:rabbitmq:addresses: 127.0.0.1port: 5672username: guestpassword: guestvirtual-host: /connection-timeout: 5000# 发送端重试template:retry:# 启用重试机制enabled: true# 重试次数max-attempts: 3# 重试间隔时间(单位:毫秒)initial-interval: 2000ms

6.2 消费端重试

消费端重试主要针对的是消费者的Listener。当消费者在处理一条消息时,在这个过程如果Listener抛出异常或其他原因导致消息没有正常被消费,那么可以启用listener的rety功能。需要注意的是,当acknowledge-mode设置为auto并且default-requeue-rejected设置为true时,同时使用的是默认的MessageRecoverer(消息回收器),这样当消费端抛出除AmqpRejectAndDontRequeueException以外的其他异常时会将消息重新放回队列中,此时消费者又会从队列中取出消息进行消费,那么就会导致无限循环消费,这是不合理的。正确的做法是需要指定重试的次数,并且到达该次数后让RabbitMQ将此消息放到死信队列中(死信队列在下个章节讲解)做相应处理或由人工解决。如果未配置死信队列,那么达到次数后该消息将被丢弃。当然也可以配置RepublishMessageRecoverer,到达重试次数后将消息投递到自己指定的交换机和队列来处理,效果是一样的。

yml配置:

spring:rabbitmq:addresses: 127.0.0.1port: 5672username: guestpassword: guestvirtual-host: /connection-timeout: 5000listener:simple:concurrency: 2max-concurrency: 5prefetch: 1# 消费端重试retry:# 启用消费端重试enabled: true# 重试次数max-attempts: 3# 重试间隔时间(单位:毫秒)initial-interval: 2000ms

7. 死信队列

7.1 概念

在消费端重试时,当到达重试次数后,此时被拒绝的消息就会变为死信(通常一个消息变为死信有几种情况,例如被拒绝的消息、消息达到TLL过期时间、以及队列达到了最大长度等),如果没有相应的处理,那么broker将丢弃此消息。所以当这些重试之后都无法消费的消息,我们就将其放入死信队列中做进一步的处理。而这个死信队列本身也是一个普通的Queue。这个Queue也需要绑定一个Exchange,这个Exchange就称之为死信交换机(DLX)。同样这个Exchange可以是任意类型如Direct、Topic、Fanout的Exchange,与普通的Exchange没有什么差异。因此当我们将一个消息发送到死信队列时,通过这个死信交换机将消息发送到指定的Queue。下面给出一个具体的示例:

7.2 自动确认处理

可以结合Spring的retry进行重试,当到大重试次数后指定将消息投递到死信交换机。

yml配置:

spring:rabbitmq:addresses: 127.0.0.1port: 5672username: guestpassword: guestvirtual-host: /connection-timeout: 5000listener:simple:concurrency: 2max-concurrency: 5prefetch: 1# 自动确认acknowledge-mode: auto# 重试设置(如果使用手动确认建议使用redis来实现重试次数)retry:# 启用消费端重试监听enabled: true# 重试次数max-attempts: 3# 重试间隔时间(单位:毫秒)initial-interval: 2000ms

配置类:

@Configuration
public class RabbitConfig {public static final String EXCHANGE_NAME = "order.exchange";public static final String QUEUE_NAME = "order.queue";public static final String ROUTER_KEY = "order.*";//声明死信交换机名称public static final String DEAD_EXCHANGE_NAME = "dead.exchange";//声明死信队列名称public static final String DEAD_QUEUE_NAME = "dead.queue";//死信队列路由keypublic static final String DEAD_ROUTER_KEY = "dead.key";/*** 配置普通业务的Exchange*/@Beanpublic TopicExchange exchange() {return new TopicExchange(EXCHANGE_NAME, false, true);}/*** 装配死信Exchange(DLX),可以是direct类型也可以是其他类型** @return*/@Beanpublic DirectExchange deadExchange() {return new DirectExchange(DEAD_EXCHANGE_NAME, false, true);}/*** 配置普通业务的消息队列并关联死信交换机,当这个队列中的消息被拒绝或达到重试次数后,* 通过死信路由的key将其发送到对应的死信交换机* @return*/@Beanpublic Queue queue() {//使用QueueBuilder.nonDurable(QUEUE_NAME)创建不持久化的queue,//如果需要创建持久化的queue使用durable(QUEUE_NAME)方法return QueueBuilder.nonDurable(QUEUE_NAME)//自动删除//.autoDelete()//设置死信交换机的名称.withArgument("x-dead-letter-exchange", DEAD_EXCHANGE_NAME)//设置死信队列路由的key.withArgument("x-dead-letter-routing-key", DEAD_ROUTER_KEY)//消息超过这个时间还未被消费则路由到死信交换机//.withArgument("x-message-ttl", 5000).build();}/*** 配置死信队列*/@Beanpublic Queue deadQueue() {return new Queue(DEAD_QUEUE_NAME, false);}/*** 将queue绑定到exchange*/@Beanpublic Binding binding() {return BindingBuilder.bind(queue()).to(exchange()).with(ROUTER_KEY);}/*** 将死信队列绑定到死信交换机上** @return*/@Beanpublic Binding deadLetterBinding() {return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(DEAD_ROUTER_KEY);}/*** 装配Jackson2JsonMessageConverter* @return*/@Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}}

订单实体

public class Order {/*** 订单ID*/private String id;public String getId() {return id;}public void setId(String id) {this.id = id;}
}

消费端:

@Service
public class ConsumerService {/*** 在消费者执行中引发一个异常,此时Spring会自动执行retry功能,* 当达到retry次数时,该消息会自动路由到DLX中*/@RabbitListener(queues = RabbitConfig.QUEUE_NAME)public void receiveObject(@Payload Order order,@Headers Map<String, Object> headers,Channel channel) throws Exception {System.out.println("订单编号:" + order.getId());//产生异常System.out.println(10 / 0);}
}

死信队列消费端:

@Service
public class DeadLetterService {/*** 监听死信队列,如果有消息进入死信队列,将执行此方法做进一步的处理* @param message*/@RabbitListener(queues = RabbitConfig.DEAD_QUEUE_NAME)public void receiveDeadLetter(@Payload Order order,@Headers Map<String, Object> headers,Channel channel) throws IOException {System.out.println("接收到死信消息,订单ID:" + order.getId());}
}

发送端:

@Service
public class ProducerService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendObject(Order order) {//创建CorrelationDataCorrelationData correlationData = new CorrelationData();//这里使用订单ID作为消息的IDcorrelationData.setId(order.getId());//发送消息rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, "order.message", order, correlationData);}
}

单元测试:

@SpringBootTest
class RabbitApplicationTests {@Autowiredprivate ProducerService service;@Testpublic void testSendObject() {Order orderDTO = new Order();orderDTO.setId("10001");service.sendObject(orderDTO);}}

7.2 手动确认处理

手动处理不需要retry的支持,可以结合Redis来存储重试的次数,当达到重试次数后执行nack并将消息投递到死信交换机中,重点在消费者中的代码实现。

yml配置:

spring:# redis配置redis:host: 127.0.0.1port: 6379database: 0password: wanglconnect-timeout: 2s# rabbitmq设置rabbitmq:# rabbitmq服务器地址addresses: 127.0.0.1# 连接端口,默认是5672port: 5672# 账号密码username: guestpassword: guest# 虚拟主机地址,默认为"/"virtual-host: /# 连接的超时时间connection-timeout: 5000# 启用ConfirmCallback模式(发送确认),当消息到达交换机后会返回一条ack给发送端publisher-confirm-type: correlated# 设置发送端重试template:retry:# 启用重试机制enabled: true# 重试次数max-attempts: 3# 重试间隔时间(单位:毫秒)initial-interval: 2000ms# 消费者监听设置listener:simple:# 最小的消费线程数量concurrency: 2# 最大的消费线程数量max-concurrency: 5# 限流,每个线程能从队列获取的消息数量prefetch: 1# 手动确认acknowledge-mode: manual

消费端:

@Service
public class ConsumerService {/*** 重置次数的key前缀*/private static final String ATTEMPTS_PREFIX = "attempts:";/*** 最大重试次数*/private static final Integer MAX_RETRY = 3;@Autowiredprivate RedisTemplate<String, Object> redisTemplate;@RabbitListener(queues = RabbitConfig.QUEUE_NAME)public void receiveObject(@Payload Order order,@Headers Map<String, Object> headers,Channel channel) throws Exception {//获取一个消息的标签Long tag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);try {log.info("订单ID: " + order.getOrderId());//产生一个异常System.out.println(10 / 0);//正常执行则手动签收消息channel.basicAck(tag, false);} catch (Exception e) {//如果产生异常则拒绝签收并将消息放回队列进行重试操作//从redis中获取重试次数,increment会在Redis中执行自增并返回自增的值,这一步是原子操作的Long retryTotal = redisTemplate.opsForValue().increment(ATTEMPTS_PREFIX + order.getOrderId());//如果大于最大重试次数则放入死信if(retryTotal > MAX_RETRY) {//拒绝签收,第三个参数设置为false表示不重新放回队列,//如果配置了死信队列则直接丢到死信队列中channel.basicNack(tag, false, false);//删除keyredisTemplate.delete(ATTEMPTS_PREFIX + order.getOrderId());} else {//拒绝签收并重新放回队列继续执行重试channel.basicNack(tag, false, true);}}}
}

死信队列消费端:

@Service
public class DeadLetterConsumer {/*** 监听死信队列* @param order* @param headers* @param channel*/@RabbitListener(queues = RabbitConfig.DEAD_QUEUE_NAME)public void receiveDeadLetter(Order order,@Headers Map<String, Object> headers,Channel channel) throws IOException {log.info("接收到异常订单,编号:" + order.getOrderId());//手动确认签收Long tag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);channel.basicAck(tag, false);}
}

8. 延迟队列

所谓延迟队列就是根据我们的业务要求将消息延迟进行处理。

  • 在电商中,用户下单后并没有立即支付,如果在指定的时间内未支付,则取消该订单

  • 在系统发布一个通告,在某时刻之后通知到指定的人

8.1 实现方式

Rabbitmq实现延迟消费通常有两种形式:

  1. 利用自身Time To Live(TTL)以及Dead Letter Exchanges(DLX)的特性实现

    (也就是如果达到TTL时间未消费则投递到死信队列)

  2. 利用Rabbitmq插件rabbitmq_delayed_message_exchange(延迟投递)

rabbitmq_delayed_message_exchange插件的实现方式简单点说就是当发布消息后不会立即进入队列,而是存储在mnesia(一个分布式数据系统)表中,当达到延迟的时间后就立刻将消息投递至目标队列中。需要注意的是,插件能支持的最大延迟时间为(2^32)-1毫秒, 大约49天。

官方说明:

For each message that crosses an "x-delayed-message" exchange, the plugin will try to determine if the message has to be expired by making sure the delay is within range, ie: Delay > 0, Delay =< ?ERL_MAX_T (In Erlang a timer can be set up to (2^32)-1 milliseconds in the future).

8.2 安装插件

在官网https://www.rabbitmq.com/community-plugins.html下载延迟消息插件。

img

注意对应rabbitmq版本,下载后将插件拷贝到rabbitmq的plugins目录,拷贝后在终端使用以下命令可以看插件列表

rabbitmq-plugins list

img

启用插件:

在终端使用以下命令启用延迟插件。

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

img

启用插件后重启RabbitMQ服务。

8.3 示例

这里以用户下单后未支付的场景为例,如果在指定的时间内未支付,则取消该订单。

创建订单表:

create table order_info(order_id varchar(50) primary key,order_status tinyint(1) not null, -- 0:取消订单 1:未支付 2:已支付order_message varchar(100)
);

添加依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-json</artifactId>
</dependency><dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>2.0.0</version>
</dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId>
</dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId>
</dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope><exclusions><exclusion><groupId>org.junit.vintage</groupId><artifactId>junit-vintage-engine</artifactId></exclusion></exclusions>
</dependency>

yml配置:

spring:# 数据源配置datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/order?serverTimezone=GMT&useUnicode=true&characterEncoding=utf-8username: rootpassword: root# hikari连接池配置hikari:minimum-idle: 5maximum-pool-size: 20idle-timeout: 900000connection-timeout: 15000connection-test-query: select 1# 配置RabbitMQrabbitmq:addresses: 127.0.0.1# 连接端口,默认5672port: 5672# 设置登陆认证的账号密码,默认为guestusername: guestpassword: guest# 虚拟主机地址,默认为"/"virtual-host: /# 设置连接诶超时时间connection-timeout: 5000# 配置消费者监听设置listener:simple:# 最小消息消费线程数concurrency: 2# 最大消息消费线程数max-concurrency: 5# 限流,每个消费线程能从队列获取的消息数量prefetch: 1# 自动应答acknowledge-mode: auto
# mybatis配置
mybatis:type-aliases-package: edu.nf.ch05.entitymapper-locations: classpath:/mappers/*.xml

配置类:

@Configuration
public class RabbitConfig {public static final String EXCHANGE_NAME = "delay.exchange";public static final String QUEUE_NAME = "delay.queue";public static final String ROUTER_KEY = "order.message";/*** 自定义Exchange,设置延迟交换机类型为direct,也可以设置为topic等其他类型*/@Beanpublic CustomExchange delayExchange() {Map<String, Object> params = new HashMap<>();params.put("x-delayed-type", "direct");return new CustomExchange(EXCHANGE_NAME, "x-delayed-message", false, true, params);}/*** 装配消息队列* Queue构造方法第二个参数表示是否持久化消息* @return*/@Beanpublic Queue queue(){return new Queue(QUEUE_NAME, false);}/*** 将queue绑定到exchange*/@Beanpublic Binding binding(){return BindingBuilder.bind(queue()).to(delayExchange()).with(ROUTER_KEY).noargs();}/*** 自定义消息转换器* @return*/@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}
}

Order示例:

@Data
public class Order {private String orderId;private Integer status;private String message;}

OrderDao示例:

public interface OrderDao {/*** 根据ID查询订单信息* @param orderId* @return*/Order getOrderById(String orderId);/*** 保存订单信息* @param order*/void saveOrder(Order order);/*** 修改订单* @param order*/void updateOrder(Order order);
}

Mapper映射配置:

<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN""http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="edu.nf.ch05.dao.OrderDao"><resultMap id="orderMap" type="order"><id property="orderId" column="order_id"/><result property="status" column="order_status"/><result property="message" column="order_message"/></resultMap><select id="getOrderById" parameterType="string" resultMap="orderMap">select order_id, order_status, order_message from order_info where order_id = #{orderId}</select><insert id="saveOrder" parameterType="order">insert into order_info(order_id, order_status, order_message) values(#{orderId}, #{status}, #{message})</insert><update id="updateOrder" parameterType="order">update order_info set order_status = #{status} where order_id = #{orderId}</update>
</mapper>

ProducerService示例:

@Service
public class ProducerService {/*** 注入RabbitTemplate*/@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 注入OrderDao*/@Autowiredprivate OrderDao orderDao;/*** 发送消息* @param order 订单对象* @param delayTime 延迟消费时长*/public void send(Order order, int delayTime) {//创建消息的唯一IDCorrelationData correlationData = new CorrelationData();correlationData.setId(order.getOrderId());//将订单信息入库,此时订单状态1,表示未支付orderDao.saveOrder(order);//发送消息rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, RabbitConfig.ROUTER_KEY, order, messagePostProcessor -> {//通过消息的后置处理器设置延迟放入的时间messagePostProcessor.getMessageProperties().setDelay(delayTime);return messagePostProcessor;}, correlationData);}
}

ConsumerService示例:

@Service
@Slf4j
public class ConsumerService {/*** 注入OrderDao*/@Autowiredprivate OrderDao orderDao;/*** 接收消息* 这里会延迟接收,也就是在发送端指定的延迟时间后才才进行接收*/@RabbitListener(queues = RabbitConfig.QUEUE_NAME)public void receiveMessage(Order order) {log.info("接收消息,订单编号:" + order.getOrderId());//依据订单编号查询数据库,如果订单状态为1则将其更新为0,表示取消订单order = orderDao.getOrderById(order.getOrderId());if(order.getStatus() == 1){order.setStatus(0);orderDao.updateOrder(order);log.info("订单已取消");}}
}

测试:

运行SpringBoot启动程序:

@SpringBootApplication
@MapperScan("edu.nf.ch05.dao")
public class Ch05Application {public static void main(String[] args) {SpringApplication.run(Ch05Application.class, args);}}

执行单元测试:

@SpringBootTest
public class ProducerServiceTests {@Autowiredprivate ProducerService producerService;@Testvoid testSend() {Order order = new Order();order.setOrderId("100001");order.setMessage("test order...");order.setStatus(1);producerService.send(order, 10000);}}

查看数据库,测试会录入一条订单信息,其状态为1。

img

如果在指定的过期时间内未其他服务处理该订单,那么消费者会从队列中取出这条订单信息,根据ID去数据库查询该订单的状态,如果为1(未支付)则自动取消订单,将其状态更新为0。

img

再次查看这条订单记录,此时的状态已更新为0。

img

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/165681.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

BUUCTF [WUSTCTF2020]find_me 1

BUUCTF:https://buuoj.cn/challenges 题目描述&#xff1a; 得到的 flag 请包上 flag{} 提交。 感谢 Iven Huang 师傅供题。 比赛平台&#xff1a;https://ctfgame.w-ais.cn/ 密文&#xff1a; 下载附件&#xff0c;得到一个.jpg图片。 解题思路&#xff1a; 1、得到一张图…

Matlab三角剖分插值问题分析

目录 前言 一、问题引入 二、一个例子 1.生成散点图 2.对数据进行剖分 3.点法式分析 三、最后结果 前言 上一篇文章感觉对三角剖分问题没有说清楚&#xff0c;这次专门对三角剖分问题再仔细说说。 一、问题引入 实际上这个问题是用来解决二维曲面插值问题的。 二维插值问题&…

外部中断为什么会误触发?

今天在写外部中断的程序的时候&#xff0c;发现中断特别容易受到干扰&#xff0c;我把手放在对应的中断引脚上&#xff0c;中断就一直触发&#xff0c;没有停过。经过一天的学习&#xff0c;找到了几个解决方法&#xff0c;所以写了这篇笔记。如果你的中断也时不时会误触发&…

通过Spring整合MyBatis实现持久层操作

文章目录 为什么要整合Spring和MyBatis&#xff1f;步骤一&#xff1a;添加依赖步骤二&#xff1a;配置数据源步骤三&#xff1a;配置MyBatis步骤四&#xff1a;创建Mapper接口和XML文件步骤五&#xff1a;使用Mapper接口拓展&#xff1a;事务管理 &#x1f389;通过Spring整合…

Leetcode173. 二叉搜索树迭代器

Every day a Leetcode 题目来源&#xff1a;173. 二叉搜索树迭代器 解法1&#xff1a;中序遍历 我们可以直接对二叉搜索树做一次完全的递归遍历&#xff0c;获取中序遍历的全部结果并保存在数组中。随后&#xff0c;我们利用得到的数组本身来实现迭代器。 代码&#xff1a…

竞赛 : 题目:基于深度学习的水果识别 设计 开题 技术

1 前言 Hi&#xff0c;大家好&#xff0c;这里是丹成学长&#xff0c;今天做一个 基于深度学习的水果识别demo 这是一个较为新颖的竞赛课题方向&#xff0c;学长非常推荐&#xff01; &#x1f9ff; 更多资料, 项目分享&#xff1a; https://gitee.com/dancheng-senior/pos…

Spark-06:共享变量

目录 1.广播变量&#xff08;broadcast variables&#xff09; 2.累加器&#xff08;accumulators&#xff09; 在分布式计算中&#xff0c;当在集群的多个节点上并行运行函数时&#xff0c;默认情况下&#xff0c;每个任务都会获得函数中使用到的变量的一个副本。如果变量很…

开启数据库审计(db,extended级别或os级别),并将审计文件存放到/home/oracle/audit下

文章目录 开启数据库审计&#xff08;db,extended级别或os级别&#xff09;&#xff0c;并将审计文件存放到/home/oracle/audit下一. 简介二. 配置2.1. 审计是否安装2.2. 审计表空间迁移2.3. 审计参数2.4. 审计级别2.5. 其他审计选项2.6. 审计相关视图 三. 使用3.1. 开启/关闭审…

成为独立开发者有多难

首先自我介绍&#xff1a;我是一名前端开发工程师&#xff0c;7年的前端开发经验。CSDN 九段刀客_js,vue,ReactNative-CSDN博客,80多万的访问量&#xff0c;1万多的粉丝。 相信80%的程序员的终极梦想都是成为一名独立开发者&#xff0c;不用找工作有自己的产品可以有睡后收入。…

深度学习模型训练计算量的估算

深度学习模型训练计算量的估算 方法1&#xff1a;基于网络架构和批处理数量计算算术运算次数前向传递计算和常见层的参数数量全连接层&#xff08;Fully connected layer&#xff09;参数浮点数计算量 CNN参数浮点数计算量 转置CNN参数浮点数计算量 RNN参数浮点数计算量 GRU参数…

刷题学习记录(含2023ISCTFweb题的部分知识点)

[SWPUCTF 2021 新生赛]sql 进入环境 查看源码&#xff0c;发现是get传参且参数为wllm fuzz测试&#xff0c;发现空格&#xff0c;&#xff0c;and被过滤了 同样的也可以用python脚本进行fuzz测试 import requests fuzz{length ,,handler,like,select,sleep,database,delete,h…

java学习part09类的构造器

1. 2.默认构造器 如果没有显式定义任何构造器&#xff0c;系统会默认加一个默认构造器。 如果定义了&#xff0c;则不会有默认构造器。 默认构造器的权限和类的权限一样&#xff0c;类是public构造器就是public&#xff0c;类是缺省默认构造器就是缺省 反编译之后添加的构造…

解决DaemonSet没法调度到master节点的问题

最近在kubernetes部署一个springcloud微服务项目&#xff0c;到了最后一步部署边缘路由&#xff1a;使用nginx-ingress和traefik都可以&#xff0c;必须使用DaemonSet部署&#xff0c;但是发现三个节点&#xff0c;却总共只有两个pod。 换句话说&#xff0c; DaemonSet没法调度…

UML建模图文详解教程05——包图

版权声明 本文原创作者&#xff1a;谷哥的小弟作者博客地址&#xff1a;http://blog.csdn.net/lfdfhl本文参考资料&#xff1a;《UML面向对象分析、建模与设计&#xff08;第2版&#xff09;》吕云翔&#xff0c;赵天宇 著 包图概述 包图(package diagram)是用来描述模型中的…

一个最简单的工业通讯数据分析例子

1.背景 对工业设备的通讯协议进行分析可以帮助我们更好地理解其工作原理和相关技术&#xff0c;并且有助于以下几个方面&#xff1a; 1. 优化工业设备的通讯效率&#xff1a;了解通讯协议的细节可以帮助我们找到通讯效率低下的原因并进行优化&#xff0c;提高设备的通讯效率和…

MySQL 8 配置文件详解与最佳实践

MySQL 8 是一款强大的关系型数据库管理系统&#xff0c;通过适当的配置文件设置&#xff0c;可以充分发挥其性能潜力。在这篇博客中&#xff0c;我们将深入探究 MySQL 8 常用的配置文件&#xff0c;并提供一些建议&#xff0c;帮助您优化数据库性能。 配置文件概览 在 MySQL …

【数据结构】二叉树概念 | 满二叉树 | 完全二叉树

二叉树的概念 二叉树在实践中用的很多。 一棵二叉树是结点的一个有限集合&#xff0c;该集合&#xff1a; 或者为空&#xff1b;由一个根结点加上两棵别称为左子树和右子树的二叉树组成。二叉树最多两个孩子。 这里注意&#xff1a;二叉树并不是度为2的树。 二叉树的度最大值是…

Go lumberjack 日志轮换和管理

在开发应用程序时&#xff0c;记录日志是一项关键的任务&#xff0c;以便在应用程序运行时追踪问题、监视性能和保留审计记录。Go 语言提供了灵活且强大的日志记录功能&#xff0c;可以通过多种方式配置和使用。其中一个常用的日志记录库是 github.com/natefinch/lumberjack&am…

【JAVA】我们该如何规避代码中可能出现的错误?(二)

个人主页&#xff1a;【&#x1f60a;个人主页】 系列专栏&#xff1a;【❤️初识JAVA】 文章目录 前言异常方法&#xff08;Throwable类&#xff09;Throwable类的方法 捕获异常多重捕获块 前言 异常是程序中的一些错误&#xff0c;但并不是所有的错误都是异常&#xff0c;并…

git-3

1.如何让工作区的文件恢复为和暂存区一样&#xff1f; 工作区所作的变更还不及暂存区的变更好&#xff0c;想从暂存区拷贝到工作区&#xff0c;变更工作区(恢复成和暂存区一样的状态)&#xff0c;想到用git checkout -- 文件名 2.怎样取消暂存区部分文件的更改&#xff1f; 如…