[RabbitMQ] 7种工作模式详细介绍

🌸个人主页:https://blog.csdn.net/2301_80050796?spm=1000.2115.3001.5343
🏵️热门专栏:
🧊 Java基本语法(97平均质量分)https://blog.csdn.net/2301_80050796/category_12615970.html?spm=1001.2014.3001.5482
🍕 Collection与数据结构 (92平均质量分)https://blog.csdn.net/2301_80050796/category_12621348.html?spm=1001.2014.3001.5482
🧀线程与网络(96平均质量分) https://blog.csdn.net/2301_80050796/category_12643370.html?spm=1001.2014.3001.5482
🍭MySql数据库(93平均质量分)https://blog.csdn.net/2301_80050796/category_12629890.html?spm=1001.2014.3001.5482
🍬算法(97平均质量分)https://blog.csdn.net/2301_80050796/category_12676091.html?spm=1001.2014.3001.5482
🍃 Spring(97平均质量分)https://blog.csdn.net/2301_80050796/category_12724152.html?spm=1001.2014.3001.5482
🎃Redis(97平均质量分)https://blog.csdn.net/2301_80050796/category_12777129.html?spm=1001.2014.3001.5482
🐰RabbitMQ(97平均质量分) https://blog.csdn.net/2301_80050796/category_12792900.html?spm=1001.2014.3001.5482
感谢点赞与关注~~~
在这里插入图片描述

目录

  • 1. 7种工作模式介绍
    • 1.1 简单模式(simple)
    • 1.2 工作队列(Work Queue)
    • 1.3 对交换机和路由键的解释
    • 1.4 广播模式/发布订阅模式(Publish/Subscribe)
    • 1.5 路由模式(Routing)
    • 1.6 通配符模式(Topics)
    • 1.7 RPC通信(RPC)
    • 1.8 发布确认(Publish Confirms)
  • 2. 工作模式的代码实现
    • 2.1 简单模式
    • 2.2 工作队列
    • 2.3 广播模式/发布订阅模式(Publish/Subscribe)
    • 2.4 路由模式(Routing)
    • 2.5 通配符模式
    • 2.6 RPC通信(RPC)
    • 2.7 发布确认(Publisher Confirms/消息可靠性保证)
      • 2.7.1 概述
      • 2.7.2 单独确认
      • 2.7.3 批量确认
      • 2.7.4 异步确认

1. 7种工作模式介绍

在这里插入图片描述

1.1 简单模式(simple)

在这里插入图片描述
P: producer生产者,也就是要发送消息的程序.
C: consumer消费者,消息的接收者.
Queue: 消息队列,其中可以缓存信息,生产者可以向其中投递信息,消费者从其中获取信息.
特点: 一个生产者,一个消费者,一个消息只能被消费一次,也称为点对点模式.

1.2 工作队列(Work Queue)

在这里插入图片描述
一个生产者P,多个消费者C1,C2,在队列中有多个消息的情况下,work Queue会将消息分派给不同的消费者,每个消费者都会接收到不同的消息.
特点: 消息不会重复,分配给不同的消费者.

1.3 对交换机和路由键的解释

在这里插入图片描述

  • X: 代表的是交换机(exchange),作用就是生产者在将消息发送到交换机之后,交换机会按照生产者指定的RoutingKey,也就是路由规则把消息路由到一个或者多个队列中.
  • RabbitMQ中的交换机有四种类型: fanout,direct,topic,headers,不同的类型有不同的路由策略.
    1. fanout: 广播模式,将消息发送给所有绑定交换机的队列.(Publish/Subscribe模式)
    2. direct: 定向模式,将消息交给符合指定的routingKey的队列.(Routing模式)
    3. topic: 通配符模式,将消息交给符合指定的Routing pattern的队列.(Topics模式).
    4. header: headers类型的交换器性能会很差,而且也不实用,基本上不会看到它的存在.
      交换机只是负责转发消息,不具有存储消息的功能,因此,如果没有任何队列和交换机绑定,或者是消息没有任何符合路由规则的队列,那么信息就会丢失.
  • RoutingKey: 路由键,生产者在消息转发给交换机的时候,指定的一个字符串,用来让交换机知道该如何处理这条消息.
  • BindingKey: 绑定键,BindingKey是把交换机和消息队列绑定的字符串,这样交换机就会知道如何根据路由键将对应的消息转发到指定的队列了.
    在这里插入图片描述
    下面我们就举一个具体的例子来对RoutingKey和BindingKey具体说明.

在发送消息的时候,生产者设置了RoutingKey为orange,消息就会路由到Q1.
在这里插入图片描述

BindingKey其实也属于RoutingKey的一种,我们为了把这两个键混淆,我们可以这样理解:
在交换机和队列绑定的时候,需要的路由键是BindingKey.在发送消息的时候,需要的路由键是RoutingKey.

1.4 广播模式/发布订阅模式(Publish/Subscribe)

在这里插入图片描述
一个生产者P,多个消费者C1,C2,X代表交换机,交换机会将消息复制多份,发送给所有和交换机绑定的队列,每个消费者接收相同的消息.生产者发送⼀条消息,经过交换机转发到多个不同的队列,多个不同的队列就有多个不同的消费者.
适用场景: 消息需要被多个消费者同时接收的场景.比如: 实时通知或者广播消息.

比如国家地震局发送地震预警,地震发生的时候,需要把预警消息发送给可能有震感地区的所有电子设备.

1.5 路由模式(Routing)

在这里插入图片描述
路由模式是发布订阅模式的变种,在发布订阅模式的基础上,增加了路由key.
相比广播模式,路由模式是Exchange根据RoutingKey的规则,将数据筛选后发给对应的消费者队列.
适合场景: 需要根据特定的规则分发消息的场景.
比如我们在Spring中学习的日志,日志等级分为error,warning,info,debug,就可以通过这种模式,把不同的日志发送到不同的队列.

1.6 通配符模式(Topics)

在这里插入图片描述
路由模式的升级版,在routingKey的基础上,增加了通配符的功能,使之更加灵活.其中,一个.是一个节,使用*代表的是一个节,使用#代表的是多个节.Topics和Routing的基本原理相同.不同之处是:routingKey的匹配方式不同,Routing模式是相等匹配,topics模式是通配符匹配.
适用场景: 需要灵活匹配和过滤消息的场景.

1.7 RPC通信(RPC)

在RPC模式中没有生产者和消费者,大概就是通过两个队列实现了一个消息回调的过程.有点类似与我们在网络中学习的"请求和响应",这个功能是MQ的额外功能.
在这里插入图片描述

  1. 客户端发送消息到一个指定的队列,并在消息属性中设置replyTo字段,这个字段指定了回调队列,用于接收服务端返回的响应.
  2. 服务端接收到请求后,处理请求并发送响应消息到replyTo指定的回调队列.
  3. ⼀旦收到响应,客户端会检查消息的correlationId属性,以确保它是所期望的响应.因为队列中不仅仅有一条消息,保证发送的请求和收到的响应通过correlationId对应的上.

1.8 发布确认(Publish Confirms)

Publish Confirms模式是RabbitMQ提供的一种确保消息可靠发送到RabbitMQ服务器的机制.在这种模式之下,生产者可以等待RabbitMQ服务器确认,可以确保消息已经被服务器接收并处理.

  1. 生产者将Channel设置为Confirm模式,发布的每一条消息都会获得一个唯一的ID,生产者可以将这些序号与消息关联起来,以便跟踪消息的状态.
  2. 当消息被RabbitMQ服务器接收并处理之后,服务器会一步地向生产者发送一个(ACK)给生产者(其中包含消息的唯一ID),表名消息已经送达.
    通过该模式,生产者可以确保消息被RabbitMQ服务器成功接收,从而避免消息丢失的问题.
    适用场景: 对数据安全性较高的场景,比如金融交易,订单处理.
    在这里插入图片描述

2. 工作模式的代码实现

前面我们对这几种工作模式有了简单的了解,接下来我们学习他们的写法.

2.1 简单模式

就是快速上手中的程序,此处忽略.

2.2 工作队列

就是简单模式的增强版,和简单模式下最大的区别就是,工作队列模式支持多个消费者接收消息,消费者之间是竞争关系,每个消息只能被⼀个消费者接收.
在这里插入图片描述

  • 引入依赖
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.7.3</version>
</dependency>
  • 编写生产者
    工作队列模式的代码和简单模式的代码没有多大的出入.我们只是发送了10次消息.我们把发送消息的地方改为10次发送.
    有一些配置相关的东西是固定的,所以我们可以把他们单独提出来一个Constant类.
public class Constant {public static String HOST = "39.105.137.64";public static int PORT = 5672;public static String USER_NAME = "jiangruijia";public static String PASSWORD = "qwe123524";public static String QUEUE_NAME = "work";public static String VIRTUAL_HOST = "/";
}
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constant.HOST);connectionFactory.setPort(Constant.PORT);connectionFactory.setUsername(Constant.USER_NAME);connectionFactory.setPassword(Constant.PASSWORD);connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(Constant.QUEUE_NAME,true,false,false,null);for (int i = 0; i < 10 ; i++){//发送10次消息String message = "hello work~" + i;channel.basicPublish("",Constant.QUEUE_NAME,null,message.getBytes());}channel.close();connection.close();}
}
  • 消费者代码
public class Consumer2 {//两个消费者是竞争关系public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constant.HOST);connectionFactory.setPort(Constant.PORT);connectionFactory.setUsername(Constant.USER_NAME);connectionFactory.setPassword(Constant.PASSWORD);connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(Constant.QUEUE_NAME,true,false,false,null);DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body);System.out.println("接收到消息" + message);}};channel.basicConsume(Constant.QUEUE_NAME,true,consumer);//先不要关闭资源,因为需要先开启消费者,等待生产者发送消息}
}

另一个消费者和这个消费者相同,直接复制粘贴一份.

  • 运行程序,观察结果
    先启动两个消费者(而且两个消费者不可以关闭资源),再运行生产者,如果先启动生产者,在启动第一个消费者的时候,消息会被瞬间消费完.
    在这里插入图片描述
    在这里插入图片描述

2.3 广播模式/发布订阅模式(Publish/Subscribe)

在这里插入图片描述

  • 编写生产者
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constant.HOST);connectionFactory.setPort(Constant.PORT);connectionFactory.setUsername(Constant.USER_NAME);connectionFactory.setPassword(Constant.PASSWORD);connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();//声明交换机.channel.exchangeDeclare(Constant.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT,true,false,false,null);//声明队列channel.queueDeclare(Constant.FANOUT_QUEUE1,true,false,false,null);channel.queueDeclare(Constant.FANOUT_QUEUE2,true,false,false,null);//绑定队列与交换机channel.queueBind(Constant.FANOUT_QUEUE1,Constant.FANOUT_EXCHANGE,"");channel.queueBind(Constant.FANOUT_QUEUE2,Constant.FANOUT_EXCHANGE,"");String message = "发送广播消息";channel.basicPublish(Constant.FANOUT_EXCHANGE,"",null,message.getBytes());channel.close();connection.close();}
}

参数解释:
- exchangeDeclare: 第一个参数是交换机名称,第二个参数是交换机的路由规则(这里指定为FANOUT广播类型),第三个参数是是否持久化,如果设置持久化,那么在重启服务之后,交换机不会被释放,第四个参数是是否自动删除,当没有队列与其绑定的时候,它就会被删除.第五个参数是是否是内部使用的,一般情况下为false,第六个参数是指定相关参数.
- queueDeclare: 声明队列,我们在之前解释过,这里不再赘述.
- queueBind: 绑定队列与交换机,第一个参数是队列名称,第二个参数是交换机名称,第三个参数是交换机和队列之间的路由规则,在这里我们是广播模式,所以我们没有指定路由规则,指定为默认的"".
- basicPublish: 第一个参数是发送消息的交换机,第二个参数是发送消息时的路由关键字,这里为广播模式,所以我们指定为"",第三个参数是一些相关配置,第四个参数是发送的消息.

  • 编写消费者代码
    虽然在消费者中可以不用声明队列,但是为有时候生产者和消费者不会在一个主机上,我们还是加上队列的声明比较符合逻辑,队列存在的时候,不会重复创建队列.
public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constant.HOST);connectionFactory.setPort(Constant.PORT);connectionFactory.setUsername(Constant.USER_NAME);connectionFactory.setPassword(Constant.PASSWORD);connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();//队列声明可以省略,如果队列已经存在,则不会创建队列channel.queueDeclare(Constant.FANOUT_QUEUE1,true,false,false,null);DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body);System.out.println("获取到广播消息:"+message);}};channel.basicConsume(Constant.FANOUT_QUEUE1,true,consumer);}
}

第二个消费者直接复制一份,改掉消费者于队列的绑定和队列的声明即可.

  • 运行程序
    首先启动两个消费者,再运行生产者.
    我们看到生产者生产出两条消息之后,迅速被消费者消费掉.
    在这里插入图片描述
    两个消费者接收到了消息:
    在这里插入图片描述
    在这里插入图片描述

2.4 路由模式(Routing)

相比于发布订阅模式,交换机和队列不可以是任意绑定了==,而是需要指定一个BindingKey(RoutingKey的一种)==.生产者在向交换机发送消息的时候,也需要指定RoutingKey.这时,Exchange也不再把消息交给每⼀个绑定的key,而是根据消息RoutingKey进行判断,只有队列绑定时的BindingKey和发送消息的RoutingKey完全⼀致,才会接收到消息.
在这里插入图片描述

  • 编写生产者
    与发布订阅模式的不同是,交换机类型不同,而且绑定队列的bindingKey不同.
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constant.HOST);connectionFactory.setPort(Constant.PORT);connectionFactory.setUsername(Constant.USER_NAME);connectionFactory.setPassword(Constant.PASSWORD);connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(Constant.ROUTING_EXCHANGE, BuiltinExchangeType.DIRECT,true,false,false,null);channel.queueDeclare(Constant.ROUTING_QUEUE1,true,false,false,null);channel.queueDeclare(Constant.ROUTING_QUEUE2,true,false,false,null);channel.queueBind(Constant.ROUTING_QUEUE1,Constant.ROUTING_EXCHANGE,"a");channel.queueBind(Constant.ROUTING_QUEUE2,Constant.ROUTING_EXCHANGE,"a");channel.queueBind(Constant.ROUTING_QUEUE2,Constant.ROUTING_EXCHANGE,"b");channel.queueBind(Constant.ROUTING_QUEUE2,Constant.ROUTING_EXCHANGE,"c");String message1 = "routingKey_a";String message2 = "routingKey_b";String message3 = "routingKey_c";channel.basicPublish(Constant.ROUTING_EXCHANGE,"a",null,message1.getBytes());channel.basicPublish(Constant.ROUTING_EXCHANGE,"b",null,message2.getBytes());channel.basicPublish(Constant.ROUTING_EXCHANGE,"c",null,message3.getBytes());connection.close();channel.close();}
}

和上面广播模式不同的是,在绑定队列与交换机的时候,需要指定bindingKey.channel.queueBind(Constant.ROUTING_QUEUE1,Constant.ROUTING_EXCHANGE,"a");比如这一行指定了bindingKey为a.
在发送消息的时候,需要指定消息的RoutingKey,比如:channel.basicPublish(Constant.ROUTING_EXCHANGE,"a",null,message1.getBytes());.

  • 编写消费者
    Routing模式的消费者代码和Publish/Subscribe代码⼀样,同样复制出来两份.修改消费的队列名称就可以.
public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constant.HOST);connectionFactory.setPort(Constant.PORT);connectionFactory.setUsername(Constant.USER_NAME);connectionFactory.setPassword(Constant.PASSWORD);connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(Constant.ROUTING_QUEUE1,true,false,false,null);DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body);System.out.println("接收到消息:"+message);}};channel.basicConsume(Constant.ROUTING_QUEUE1,true,consumer);}
}
  • 运行代码
    在这里插入图片描述
    队列中的消息被消费.
    队列2中的消息:
    在这里插入图片描述
    队列1中的消息:
    在这里插入图片描述

2.5 通配符模式

Topic和Routing模式的区别就是:
1. Topic模式使用的交换机类型为topic(Routing模式使⽤的交换机类型为direct).
2. Topic类型的交换机在匹配规则上进行了扩展,bindingKey支持通配符匹配.
在这里插入图片描述
在Topic类型的交换机在匹配的规则上,有一些要求:
1. RoutingKey是一系列由.分割的单词,比如"a.b.c".
2. BindingKey和RoutingKey一样,也是点.分割的字符串.
3. BindingKey中可以存在两种特殊字符串,用于模糊匹配.其中*表示一个单词,#表示多个单词.
比如:
• BindingKey为"d.a.b"会同时路由到Q1和Q2.
• BindingKey为"d.a.f"会路由到Q1.
• BindingKey为"c.e.f"会路由到Q2.
• BindingKey为"d.b.f"会被丢弃,或者返回给⽣产者(需要设置mandatory参数).
接下来我们就来实现Topic模式:

  • 编写生产者
    和路由模式最大的区别就是: 交换机类型不同,绑定队列的RoutingKey不同.
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constant.HOST);connectionFactory.setPort(Constant.PORT);connectionFactory.setUsername(Constant.USER_NAME);connectionFactory.setPassword(Constant.PASSWORD);connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(Constant.TOPIC_QUEUE1,true,false,false,null);channel.queueDeclare(Constant.TOPIC_QUEUE2,true,false,false,null);channel.exchangeDeclare(Constant.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC,true,false,false,null);//交换机类型不同channel.queueBind(Constant.TOPIC_QUEUE1,Constant.TOPIC_EXCHANGE,"*.a.*");channel.queueBind(Constant.TOPIC_QUEUE2,Constant.TOPIC_EXCHANGE,"*.*.b");channel.queueBind(Constant.TOPIC_QUEUE2,Constant.TOPIC_EXCHANGE,"c.#");//bindingKey不同String message1 = "hello_b.a.c";String message2 = "hello_a.c.b";String message3 = "hello_c.a.b";channel.basicPublish(Constant.TOPIC_EXCHANGE,"b.a.c",null,message1.getBytes());channel.basicPublish(Constant.TOPIC_EXCHANGE,"c.a.b",null,message3.getBytes());channel.basicPublish(Constant.TOPIC_EXCHANGE,"a.c.b",null,message2.getBytes());channel.basicPublish(Constant.TOPIC_EXCHANGE,"c.a.b",null,message3.getBytes());channel.close();connection.close();}
}
  • 编写消费者
public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constant.HOST);connectionFactory.setPort(Constant.PORT);connectionFactory.setUsername(Constant.USER_NAME);connectionFactory.setPassword(Constant.PASSWORD);connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(Constant.TOPIC_QUEUE1,true,false,false,null);DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body);System.out.println("收到消息: "+s);}};channel.basicConsume(Constant.TOPIC_QUEUE1,true,consumer);}
}

第二个消费者直接修改一下队列名称就可以.

  • 运行结果
    队列产生消息,并消费
    在这里插入图片描述
    消费者根据RoutingKey收到了关键字:
    消费者2:
    在这里插入图片描述
    消费者1:
    在这里插入图片描述

2.6 RPC通信(RPC)

RPC通信,是远程过程调用,它是一种发送请求,得到响应的模式,有点类似与我们之前学习网络时候的http协议.
RabbitMQ实现RPC通信的过程,大概是通过两个队列实现⼀个可回调的过程.
在这里插入图片描述
在这个模式中,没有明确的生产者和消费者,在发送请求的时候,客户端是生产者,服务端是消费者,在接收响应的时候,服务端是生产者,客户端是消费者.
大概的流程如下:

  1. 客户端发送消息到⼀个指定的队列,并在消息属性中设置replyTo字段,这个字段指定了⼀个回调队列,在消息中还会设置correlate_id,这个字段用于保证接收到的响应是与请求对应的响应,服务端处理后,会把响应结果发送到这个队列.
  2. 服务端接收到消息之后,处理请求并发送响应消息到replyTo指定的回调队列.
  3. 客户端在回调队列上等待响应消息.⼀旦收到响应,客户端会检查消息的correlationId属性,以确保它是所期望的响应.

接下来我们来实现PCR模式:

  • 客户端代码
    客户端代码的主要流程如下:
    首先声明两个队列,包含回调队列replyQueueName,和发送请求的队列.并声明本次请求的唯一标志corrld.之后将replyQueueName和corrld配置到要发送的消息队列中.之后使用阻塞队列来阻塞当前的进程,监听回调队列中的消息,把请求放到阻塞队列中.阻塞队列中有消息之后,主线程被唤醒,打印返回的内容.
public class Client {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constant.HOST);connectionFactory.setPort(Constant.PORT);connectionFactory.setUsername(Constant.USER_NAME);connectionFactory.setPassword(Constant.PASSWORD);connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(Constant.SRC_REQUEST_QUEUE,true,false,false,null);channel.queueDeclare(Constant.SRC_RESPONSE_QUEUE,true,false,false,null);//发送请求String corrId = UUID.randomUUID().toString();AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().correlationId(corrId).replyTo(Constant.SRC_RESPONSE_QUEUE)//指定相关属性.build();String request = "发送请求";//如果没有交换机的时候,RoutingKey就是队列的名称channel.basicPublish("",Constant.SRC_REQUEST_QUEUE,properties,request.getBytes());//接收请求BlockingQueue<String> queue = new ArrayBlockingQueue<>(1);DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {if(corrId.equals(properties.getCorrelationId())){//判断corrId是否相等queue.offer(new String(body));}}};channel.basicConsume(Constant.SRC_RESPONSE_QUEUE,true,consumer);String ret = queue.take();System.out.println("收到请求:" + ret);channel.close();connection.close();}
}
  • 服务端代码
    服务端需要做的就是:接收消息,根据消息内容进行响应处理,把应答结果返回到回调队列中.
public class Server {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constant.HOST);connectionFactory.setPort(Constant.PORT);connectionFactory.setUsername(Constant.USER_NAME);connectionFactory.setPassword(Constant.PASSWORD);connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(Constant.SRC_REQUEST_QUEUE,true,false,false,null);DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("成功接收到请求:"+new String(body));AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().correlationId(properties.getCorrelationId()).build();String message = "返回响应";channel.basicPublish("",Constant.SRC_RESPONSE_QUEUE,basicProperties,message.getBytes());channel.basicAck(envelope.getDeliveryTag(),false);}};channel.basicConsume(Constant.SRC_REQUEST_QUEUE,false,consumer);//设置收到消息之后不自动应答,在发送响应之后手动应答}
}

RabbitMQ消息确定的机制:
在RabbitMQ中,basicConsumer方法的autoAck参数用于指定消费者是否应该自动向消息队列确认消息.
自动确认(autoAck=true):消息队列在将消息发送给消费者后,会立即从内存中删除该消息.这意味着,如果消费者处理消息失败,消息将丢失,因为消息队列认为消息已经被成功消费
手动确认(autoAck=false):消息队列在将消息发送给消费者后,需要消费者显式地调用basicAck方法来确认消息.手动确认提供了更高的可靠性,确保消息不会被意外丢失,适用于消息处理重要且需要确保每个消息都被正确处理的场景.

  • 运行程序
    在这里插入图片描述
    在这里插入图片描述

2.7 发布确认(Publisher Confirms/消息可靠性保证)

2.7.1 概述

作为消息中间件,都会面临丢失的问题.
消息丢失大概分为三种类型:

  1. 生产者问题.因为应用程序故障,网络都用等原因,生产者没有成功向Broker发送消息.
  2. 消息中间件自身问题,生产者成功发送给了Broker,但是Broker没有吧消息保存好,导致了信息丢失.
  3. 消费者问题,Broker发送消息到消费者,消费者在消费消息的时候,没有处理好,导致Broker将消费失败的消息从队列中删除了.
    在这里插入图片描述
    上面的这几个问题都有对应的解决方式,问题2可以通过持久化的机制来解决,问题3可以通过消息应答机制来解决,问题1,可以采用发布确认的机制来实现.
    发布确认属于RabbitMQ的七大工作模式之一.
    生产者会将Channel设置为Confirm模式,一旦信道进入Confirm模式,所有在该信道上面的消息都会被指派为一个唯一的Id.一旦消息被投递到所有匹配的队列之后,RabbitMQ就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经确认到达目的队列了.Broker回传给生产者的确认消息中包含了确认消息的序号,此外Broker也可以设置channel.basicAck方法中的multiple参数,true表示在deliveryTag序号之前的消息都已经收到了,如果为false,那么则有消息没有收到,消息确认出了一些问题.
    在这里插入图片描述
    使用发送机制的时候,必须要将信道设置为Confirm模式.发布确认有3种策略,单独确认,批量确认,异步确认,接下来我们就来学习着三种策略.
    下面是开启信道确认模式的方法.
Channel channel = connection.createChannel();
channel.confirmSelect();//开启信道确认模式

2.7.2 单独确认

首先我们需要建立连接,建立连接需要放入try语句中,所以我们可以把建立连接单独提出一个静态方法.

private static Connection createConnection() throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constant.HOST);connectionFactory.setPort(Constant.PORT);connectionFactory.setUsername(Constant.USER_NAME);connectionFactory.setPassword(Constant.PASSWORD);connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);return connectionFactory.newConnection();
}

之后在主方法中调用三种策略,

public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {publishingMessagesIndividually();publishMessagesInBatch();handlePublishConfirmsAsynchronously();
}

之后我们来编写单条确认模式:

    /*** 单条确认*/
private static void publishingMessagesIndividually() throws IOException, TimeoutException, InterruptedException {try (Connection connection = createConnection()){//首先建立连接Channel channel = connection.createChannel();channel.confirmSelect();//开启信道确认模式channel.queueDeclare(Constant.CONFIRM_QUEUE1,true,false,false,null);//创建队列String message = "发送信息";for (int i = 0; i < 200; i++){channel.basicPublish("",Constant.CONFIRM_QUEUE1,null,(message+i).getBytes());channel.waitForConfirms(5000);//等待消息确认,如果超过规定的等待时间还没有确认,则抛出异常}}
}

在编写代码之前,我们首先要使用try语句与服务器建立连接.
之后在创建Channel之后,需要开启信道的确认模式.channel.confirmSelect().
这里我们在发送消息之后,需要做的最重要的一件事就是等待消息的确认channel.waitForConfirms(5000),在这个方法中可以指定阻塞时间,如果在指定的时间内消息被确认,这个方法就会立即返回,如果在指定时间之内没有确认消息,则会抛出异常.

2.7.3 批量确认

/*** 批量确认*/
private static void publishMessagesInBatch() throws IOException, TimeoutException, InterruptedException {try (Connection connection = createConnection()){Channel channel = connection.createChannel();channel.confirmSelect();//设置为确认模式channel.queueDeclare(Constant.CONFIRM_QUEUE2,true,false,false,null);String message = "批量确认发送消息";int batchSize = 100;//每次批量发送的消息条数int outstandingMessageCount = 0;//记录已经发送的条数for (int i = 0 ; i < 200 ; i++){channel.basicPublish("",Constant.CONFIRM_QUEUE2,null,(message+i).getBytes());outstandingMessageCount++;//每发送一条消息,参数就进行++if(outstandingMessageCount == batchSize){//达到了一次性批量发送的指定数量,等待确认,确认完成之后,将参数清零channel.waitForConfirms(5000);outstandingMessageCount = 0;}}//如果发送的消息不是100的倍数,就还有消息没有确认if (outstandingMessageCount > 0){channel.waitForConfirms(5000);}}
}

这里需要注意的几点就是,在发送的消息达到一次性最大的批量数量的时候,就要确认,如果确认成功之后,需要把记录的发送数量清零.
之后,就是在出循环之后,如果发送的消息条数不是batchSize的整数倍的时候,这时候不满足循环之内的if条件,还是有一些消息没有确认完成,就需要在循环之外再次进行确认.

2.7.4 异步确认

异步确认就是,生产者在发送消息的同时,还可以确认消息是否收到.
Channel接口中为我们提供了一个方法,addConfirmListener.这个方法可以添加ConfirmListener回调接口.
ConfirmListener中包含两个方法: handleAck(long deliveryTag, boolean multiple)handleNack(long deliveryTag, boolean multiple),分别对应处理的是MQ发送给生产者的ack和nack.其中ack代表的是消息确认成功,即消息都收到的情况下,nack指的是消息在确认的时候出现了一些问题.deliveryTag表示的是发送消息的序号,multiple表示是否批量确认.
这里我们还需要一个有序集合来存储为确认的消息.

/*** 异步确认*/
private static void handlePublishConfirmsAsynchronously() throws IOException, TimeoutException, InterruptedException {try (Connection connection = createConnection()){Channel channel = connection.createChannel();channel.confirmSelect();channel.queueDeclare(Constant.CONFIRM_QUEUE3,true,false,false,null);SortedSet<Long> set = Collections.synchronizedSortedSet(new TreeSet<>());//设置一个有序集合,用来存储未确认消息的序号channel.addConfirmListener(new ConfirmListener() {//为信道添加监听器,监听消息的确认情况@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {//表示消息被成功确认if (multiple){//判断消息的处理是否批量set.headSet(deliveryTag+1).clear();//将小于deliveryTag的消息全部清除,证明这批消息已经被ack了}else {set.remove(set.last());//如果不是批量,清除最后一个即可}}@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {if (multiple){//判断消息的处理是否批量set.headSet(deliveryTag+1).clear();//将小于deliveryTag的消息全部清除,证明这批消息已经被ack了}else {set.remove(set.last());//如果不是批量,清除最后一个即可}//这里在消息确认不成功的时候,需要重传,这里省略}});String message = "异步确认发送消息";for (int i = 0 ;i < 200 ;i++){long nextPublishSeqNo = channel.getNextPublishSeqNo();//获取到消息发送的序号channel.basicPublish("",Constant.CONFIRM_QUEUE3,null,(message+i).getBytes());set.add(nextPublishSeqNo);//把这些消息都添加到集合中}while (!set.isEmpty()){//等待集合中的消息都被确认完成Thread.sleep(1000);}}
}

这里我们在消息确认成功之后,即handleAck方法被调用的时候,需要把这些消息都从集合中清除掉.一种是批量的情况,直接清除掉deliveryTag之前所有的消息,另一种是没有批量的情况,直接清除掉最后一个元素即可.当然在没有确认成功的情况下,我们需要根据具体的业务逻辑进行消息的重发.在给队列中发送消息的时候,我们需要从Channel中获取到下次发送消息开始的序号,之后我们把开始的序号放入set中,代表这些消息还没有被处理过.最后我们需要等待消息确认完成,只要存放未确认消息的set中不为空,就证明还有消息没有被确认,我们就进行阻塞等待.
上面三种方式中,假如发送的消息较多,这三种策略的执行时间:单个确认>批量确认>异步确认.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/880733.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Android Studio 新版本 Logcat 的使用详解

点击进入官方Logcat介绍 一个好的Android程序员要会使用AndroidStudio自带的Logcat查看日志&#xff0c;会Log定位也是查找程序bug的第一关键。同时Logcat是一个查看和处理日志消息的工具&#xff0c;它可以更快的帮助开发者调试应用程序。 步入正题&#xff0c;看图说话。 点…

特征工程——一门提高机器学习性能的艺术

当前围绕人工智能(AI)和机器学习(ML)展开的许多讨论以模型为中心&#xff0c;聚焦于 ML和深度学习(DL)的最新进展。这种模型优先的方法往往对用于训练这些模型的数据关注不足&#xff0c;甚至完全忽视。类似MLOps的领域正迅速发展&#xff0c;通过系统性地训练和利用ML模型&…

Hive SQL业务场景:连续5天涨幅超过5%股票

一、需求描述 现有一张股票价格表 dwd_stock_trade_dtl 有3个字段分别是&#xff1a; 股票代码(stock_code), 日期(trade_date)&#xff0c; 收盘价格(closing_price) 。 请找出满足连续5天以上&#xff08;含&#xff09;每天上涨超过5%的股票&#xff0c;并给出连续满足…

C++入门基础知识93(实例)——实例18【猴子吃桃问题】

成长路上不孤单&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a; 【14后&#x1f60a;///C爱好者&#x1f60a;///持续分享所学&#x1f60a;///如有需要欢迎收藏转发///&#x1f60a;】 今日分享关于猴子吃桃问题的相关内容&#xff01; 关…

IP协议讲解

IP协议 IP协议的本质&#xff1a;提供一种能力&#xff0c;将数据跨网络从A主机传输到B主机 4位版本号(version): 指定IP协议的版本, 对于IPv4来说, 就是4. 4位头部长度(header length): IP头部的长度是多少个32bit, 也就是 length * 4 的字节数. 4bit表示最大 的数字是15, 因…

天坑!Spark+Hive+Paimon+Dolphinscheduler

背景: 数据中台项目使用Spark+Hive+Paimon做湖仓底层,调度任务使用的是基于Dolphinscheduler进行二开。在做离线脚本任务开发时,在Paimon库下执行非查询类SQL报错。 INSERT报错 DELETE报错 现状: 原始逻辑为数据中台中选择的Paimon数据源,实际上在Dolphinscheduler中是…

视频集成与融合项目中需要视频编码,但是分辨率不兼容怎么办?

在众多视频整合项目中&#xff0c;一个显著的趋势是融合多元化的视频资源&#xff0c;以实现统一监管与灵活调度。这一需求促使项目团队不断探索新的集成方案&#xff0c;确保不同来源的视频流能够无缝对接&#xff0c;共同服务于统一的调看与管理平台&#xff0c;进而提升整体…

TI DSP TMS320F280025 Note13:CPUtimer定时器原理分析与使用

TMS320F280025 CPUtimer定时器原理分析与使用 ` 文章目录 TMS320F280025 CPUtimer定时器原理分析与使用框图分析定时器中断定时器使用CPUtimers.cCPUtimers.h框图分析 定时器框图如图所示 定时器有一个预分频模块和一个定时/计数模块, 其中预分频模块包括一个 16 位的定时器分…

【机器学习基础】Transformer学习

Transformer学习 梯度消失FeedForward层激活函数的主要作用是在网络中加入非线性变换 梯度消失 梯度爆炸 FeedForward层 Transformer结构: Transformer结构主要分为两大部分: 一是Encoder层结构:Encoder 的输入由 Input Embedding 和 Positional Embedding 求和输入Multi…

生产环境升级mysql流程及配置主从服务

之前写到过mysql升级8.4的文章, 因此不再介绍mysql的安装过程 避免服务器安装多个mysql引起冲突的安装方法_安装两个mysql会冲突吗-CSDN博客 生产环境升级mysql8.4.x流程 安装mysql 参考之前文章: 避免服务器安装多个mysql引起冲突的安装方法_安装两个mysql会冲突吗-CSDN博客…

Nmap网络扫描器基础功能介绍

怎么快速知道网络中存在哪些设备呢&#xff1f;我们可以借用扫描工具Nmap来实现这个功能。 下载 Windows系统可以前往Nmap官网下载安装包。 Linux使用对应的包管理器可以直接安装&#xff0c;命令如下 # Debian/Ubuntu apt install nmap# RedHat/Fedora yum install nmap …

Squaretest单元测试辅助工具使用

1、idea安装插件 Squaretest 然后关掉idea 2、安装字节码软件&#xff08;jclasslib&#xff09; 3、找到idea里面的Squaretest安装目录 找到包含TestStarter的jar包 4、打开 com.squaretest.c.f 打开后选择常量池 5、找到第16个修改 Long value值&#xff0c;修改的数字即为使…

Percona Monitoring and Management

Percona Monitoring and Management (PMM)是一款开源的专用于管理和监控MySQL、MongoDB、PostgreSQL

828华为云征文|华为云Flexus云服务器X实例部署——盲盒抽奖商城系统以及编译发布小程序

盲盒抽奖商城系统使用 thinkphp6.0 uniapp 开发&#xff0c;做到了全移动端兼容。一个系统不仅可以打包 小程序 还可以 打包APP &#xff0c;H5 华为云Flexus云服务器X实例在安装搭建盲盒商城小程序方面具有显著优势&#xff0c;这些优势主要体现在以下几个方面&#xff1a; …

安卓13删除下拉栏中的关机按钮版本2 android13删除下拉栏关机按钮

总纲 android13 rom 开发总纲说明 文章目录 1.前言2.问题分析3.代码分析4.代码修改5.编译6.彩蛋1.前言 顶部导航栏下拉可以看到,底部这里有个设置按钮,点击可以进入设备的设置页面,这里我们将更改为删除,不同用户通过这个地方进入设置。我们之前写过一个文章也是一样的删除…

数据结构之链表(2),双向链表

目录 前言 一、链表的分类详细 二、双向链表 三、双向链表的实现 四、List.c文件的完整代码 五、使用演示 总结 前言 接着上一篇单链表来详细说说链表中什么是带头和不带头&#xff0c;“哨兵位”是什么&#xff0c;什么是单向什么是双向&#xff0c;什么是循环和不循环。然后实…

数据结构-栈(理解版)

一、栈的定义 相信大家对于栈或多或少有一些了解&#xff0c;可能大多数人会告诉你栈是一种先进后出的数据结构。这其实说了跟没说一样(❁◡❁)&#xff01;当然&#xff08;last in&#xff0c;first out&#xff09;是栈最有特色的性质。 这里可以给大家一些比较好理解的例…

大模型增量训练--基于transformer制作一个大模型聊天机器人

针对夸夸闲聊数据集&#xff0c;利用UniLM模型进行模型训练及测试&#xff0c;更深入地了解预训练语言模型的使用方法&#xff0c;完成一个生成式闲聊机器人任务。 项目主要结构如下&#xff1a; data 存放数据的文件夹 dirty_word.txt 敏感词数据douban_kuakua_qa.txt 原始语…

sqlserver迁移数据库文件存储位置

业务背景&#xff1a;由于C盘爆满&#xff0c;需要将数据库文件迁移到别处比如D盘 下面以某一个数据库转移为示例&#xff1a;&#xff08;可以用SSMS工具&#xff0c;新建查询配合使用&#xff09; 1.查询数据库文件存储路径 sql语句&#xff1a; -- 查询路径 USE QiangTes…

HarmonyOS/OpenHarmony 离线加载web资源,并实现web资源更新

关键词&#xff1a;h5离线包加载、h5离线包更新、沙箱 在上一篇文章中&#xff0c;我们已经介绍了如何将 rawfile 资源文件中的文件数据拷贝到沙箱下&#xff0c;那么该篇文章将介绍如何加载该沙箱目录下的文件资源&#xff08;此处以打包后的web资源为例&#xff09;&#xf…