RabbitMQ工作模式(详解 工作模式:简单队列、工作队列、公平分发以及消息应答和消息持久化)

文章目录

  • 十.RabbitMQ
    • 10.1 简单队列实现
    • 10.2 Work 模式(工作队列)
    • 10.3 公平分发
    • 10.4 RabbitMQ 消息应答与消息持久化
      • 消息应答
        • 概念
        • 配置
      • 消息持久化
        • 概念
        • 配置
    • 10.5 订阅模式
      • 广播模式
      • 路由模式
      • 主题模式(通配符模式)
    • 10.6 消息确认机制
    • 10.7 SpringBoot集成MQ
      • 简单队列
      • 工作队列
      • 公平分发

十.RabbitMQ

10.1 简单队列实现

简单队列通常指的是一个基本的消息队列,它可以用于在生产者(生产消息的一方)和消费者(消费消息的一方)之间传递消息。

在这里插入图片描述

新创建Springboot项目

引入依赖

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.7.1</version></dependency>

连接工具类

public class ConnectionUtils
{public static Connection getConnection(){try{Connection connection = null;//定义一个连接工厂ConnectionFactory factory = new ConnectionFactory();//设置服务端地址(域名地址/ip)factory.setHost("127.0.0.1");//设置服务器端口号factory.setPort(5672);//设置虚拟主机(相当于数据库中的库)factory.setVirtualHost("/");//设置用户名factory.setUsername("guest");//设置密码factory.setPassword("guest");connection = factory.newConnection();return connection;}catch (Exception e){return null;}}
}

创建生产者

public class Provider01 {public static void main(String[] args) {try {System.out.println("--------生产者-------");// 获取连接Connection conn = ConnectionUtils.getConnection();//创建通道Channel channel = conn.createChannel();// 创建队列// 队列名称,是否持久化,是否排他,是否自动删除,其他参数channel.queueDeclare("test4072", false, false, false, null);// 定义发送信息String msg = "hello rabbitmq-kwh";// 发送数据channel.basicPublish("", "test4072", null, msg.getBytes());System.out.println("发送成功....");// 关闭资源channel.close();conn.close();} catch (Exception e) {e.printStackTrace();}}
}

创建消费者

public class Consumer01 {public static void main(String[] args) {try {System.out.println("======消费者======");// 获取连接Connection conn = ConnectionUtils.getConnection();//创建通道Channel channel = conn.createChannel();// 创建队列(有就直接连接。没有则创建)// 队列名称,是否持久化,是否排他,是否自动删除,其他参数channel.queueDeclare("test4072", false, false, false, null);// 消费者消费消息DefaultConsumer consumer = new DefaultConsumer(channel){@Override //一旦有消息进入 将触发public void handleDelivery(String consumerTag,Envelope envelope,                        AMQP.BasicProperties properties,byte[] body) throws IOException {String str = new String(body,"utf-8");System.out.println("msg==接收=="+str);}};// 监听队列channel.basicConsume("test4072",true,consumer);}catch (Exception e) {e.printStackTrace();}}
}

10.2 Work 模式(工作队列)

工作队列的概念

  • 工作队列模式:生产者将任务发送到队列中,多个消费者从队列中取出任务并并行处理。这意味着,多个消费者可以共同工作来处理同一个队列中的任务。
  • 负载均衡:每个消费者只处理一个任务(消息),通过增加消费者数量,任务的处理可以并行化,提高整体处理能力。

工作队列的特点:

  1. 任务分配:RabbitMQ 将队列中的任务(消息)分配给可用的消费者,通常是按照“轮询”或“平衡”方式分配,即消费者可以公平地处理任务。
  2. 任务处理并行化:多个消费者可以并行地从同一个队列中消费消息,从而实现任务的并行处理,减轻单一消费者的负担。
  3. 消息丢失的风险低:通过合理配置队列和消息持久化机制,即使 RabbitMQ 重启,也能确保任务消息不丢失。

在这里插入图片描述

生产者

(只是在简单队列中的生产者中循环发送了信息。)

/*** Work 模式(工作队列)*/
public class Provider01 {public static void main(String[] args) {try {System.out.println("--------生产者-------");// 获取连接Connection conn = ConnectionUtils.getConnection();//创建通道Channel channel = conn.createChannel();// 创建队列// 队列名称,是否持久化,是否排他,是否自动删除,其他参数channel.queueDeclare("test4072", false, false, false, null);for (int i = 0; i < 50; i++) {// 定义发送信息String msg = "hello rabbitmq-kwh"+i;// 发送数据channel.basicPublish("", "test4072", null, msg.getBytes());Thread.sleep(1000);}System.out.println("发送成功....");// 关闭资源channel.close();conn.close();} catch (Exception e) {e.printStackTrace();}}
}

消费者01

public class Consumer01 {public static void main(String[] args) {try {System.out.println("======消费者01======");// 获取连接Connection conn = ConnectionUtils.getConnection();//创建通道Channel channel = conn.createChannel();// 创建队列(有就直接连接。没有则创建)// 队列名称,是否持久化,是否排他,是否自动删除,其他参数channel.queueDeclare("test4072", false, false, false, null);// 消费者消费消息DefaultConsumer consumer = new DefaultConsumer(channel){@Override //一旦有消息进入 将触发public void handleDelivery(String consumerTag,                     Envelope envelope,                        AMQP.BasicProperties properties,           byte[] body) throws IOException {String str = new String(body,"utf-8");System.out.println("msg==接收=="+str);}};// 监听队列channel.basicConsume("test4072",true,consumer);}catch (Exception e) {e.printStackTrace();}}
}

消费者02

public class Consumer02 {public static void main(String[] args) {try {System.out.println("======消费者02======");// 获取连接Connection conn = ConnectionUtils.getConnection();//创建通道Channel channel = conn.createChannel();// 创建队列(有就直接连接。没有则创建)// 队列名称,是否持久化,是否排他,是否自动删除,其他参数channel.queueDeclare("test4072", false, false, false, null);// 消费者消费消息DefaultConsumer consumer = new DefaultConsumer(channel){@Override //一旦有消息进入 将触发public void handleDelivery(String consumerTag,                    Envelope envelope,                       AMQP.BasicProperties properties,byte[] body) throws IOException {String str = new String(body,"utf-8");System.out.println("msg==接收=="+str);}};// 监听队列channel.basicConsume("test4072",true,consumer);}catch (Exception e) {e.printStackTrace();}}
}

在这里插入图片描述

. 消费者 1 与消费者 2 处理的数据条数一样。

. 消费者 1 偶数 ;消费者 2 奇数

这种方式叫轮询分发(Round-robin)。

10.3 公平分发

指消息被均匀地分配给多个消费者,以便各个消费者的负载大致相等。通过这种方式,RabbitMQ 旨在避免某些消费者过载而其他消费者空闲的情况。

在这里插入图片描述

在10.2 中,现在有 2 个消费者,所有的奇数的消息都是繁忙的,而偶数则是轻松的。按照轮询的方式,奇数的任务交给了第一个消费者,所以一直在忙个不停。偶数的任务交给另一个消费者,则立即完成任务,然后闲得不行。而 RabbitMQ 则是不了解这些的。这是因为当消息进入队列,RabbitMQ 就会分派消息。它不看消费者为应答的数目,只是盲目的将消息发给轮询指定的消费者。

改造生产者

/*
同一时刻服务器只会发一条消息给消费者
1 限制发送给消费者不得超过一条消息
*/
channel.basicQos(1);
/*** 公平分发*/
public class Provider01 {public static void main(String[] args) {try {System.out.println("--------生产者-------");// 获取连接Connection conn = ConnectionUtils.getConnection();//创建通道Channel channel = conn.createChannel();//同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 创建队列// 队列名称,是否持久化,是否排他,是否自动删除,其他参数channel.queueDeclare("test4072", false, false, false, null);for (int i = 0; i < 50; i++) {// 定义发送信息String msg = "hello rabbitmq-kwh"+i;// 发送数据channel.basicPublish("", "test4072", null, msg.getBytes());Thread.sleep(1000);}System.out.println("发送成功....");// 关闭资源channel.close();conn.close();} catch (Exception e) {e.printStackTrace();}}}

消费者01

(在10.2 中消费者的基础上,只添加 channel.basicQos(1);,限制每次只消费一个消息)

public class Consumer01 {public static void main(String[] args) {try {System.out.println("======消费者01======");// 获取连接Connection conn = ConnectionUtils.getConnection();//创建通道Channel channel = conn.createChannel();//限制每次只消费一个消息channel.basicQos(1);// 创建队列(有就直接连接。没有则创建)// 队列名称,是否持久化,是否排他,是否自动删除,其他参数channel.queueDeclare("test4072", false, false, false, null);// 消费者消费消息DefaultConsumer consumer = new DefaultConsumer(channel){@Override //一旦有消息进入 将触发public void handleDelivery(String consumerTag,                      Envelope envelope,                     AMQP.BasicProperties properties,byte[] body) throws IOException {String str = new String(body,"utf-8");System.out.println(envelope.getDeliveryTag()+"msg==接收=="+str);// 休眠一秒钟try {Thread.sleep(1000);} catch (Exception e) {e.printStackTrace();}finally {// 手动确认消息// 第一个参数:消息的序号,// 第二个参数:是否批量,false 单条消息应答 当为 true 时批量应答channel.basicAck(envelope.getDeliveryTag(),false);}}};// 监听队列// 自动应答设为 falsechannel.basicConsume("test4072",false,consumer);}catch (Exception e) {e.printStackTrace();}}
}

消费者02

(同消费者01)

在这里插入图片描述

消费者 1 休眠 1 秒,消费者 2 休眠 2 秒。

分别设置接收消息数,手动反馈,关闭自动应答

10.4 RabbitMQ 消息应答与消息持久化

消息应答

概念

**消息应答(ack)**是 RabbitMQ 中一个重要的机制,用于保证消息在被消费者处理后得以正确确认,确保消息不会丢失。如果消费者成功处理了消息,应该发送一个确认应答;如果消费者遇到问题或失败,则可以选择拒绝该消息,甚至重新放回队列供其他消费者处理。

应答类型:

  • **自动应答(auto-ack):**自动应答是默认设置,消费者从队列中获取消息后,RabbitMQ 会立即认为该消息已经被成功处理,即使消费者并未真正处理完成。在这种模式下,消息会在被消费后立即从队列中删除,而无需消费者确认。这种模式的缺点是,如果消费者在处理消息时崩溃,消息会丢失。
  • **手动应答(manual ack):**消费者处理完消息后,需要显式地发送确认应答,通知 RabbitMQ 该消息已经处理完成。这样,如果消费者没有发送确认应答,RabbitMQ 会重新将消息发送给其他消费者。
配置
// 监听队列
// 参数2:自动应答设为 false; true:开启自动应答
channel.basicConsume("test4072",false,consumer);

参数2为true时:自动确认

只要消息从队列中获取,无论消费者获取到消息后是否成功消费,都认为是消息已经成功消费。一旦rabbitmq 将消息分发给消费者,就会从内存中删除。(会丢失数据消息)

参数2为false时:手动确认

消费者从队列中获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态。如果有一个消费者挂掉,就会交付给其他消费者。手动告诉 rabbitmq 消息处理完成后,rabbitmq 删除内存中的消息。

反馈:

//手动回馈
channel.basicAck(envelope.getDeliveryTag(),false);

使用 Nack 让消息回到队列中

// 处理条数; 是否批量处理 ;是否放回队列 false 丢弃
channel.basicNack(envelope.getDeliveryTag(),false,true);

生产者

/*** 消息应答*/
public class Provider01 {public static void main(String[] args) {try {System.out.println("--------生产者-------");// 获取连接Connection conn = ConnectionUtils.getConnection();//创建通道Channel channel = conn.createChannel();//同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 创建队列// 队列名称,是否持久化,是否排他,是否自动删除,其他参数channel.queueDeclare("test4072", false, false, false, null);for (int i = 0; i < 50; i++) {// 定义发送信息String msg = "hello rabbitmq-kwh"+i;// 发送数据channel.basicPublish("", "test4072", null, msg.getBytes());Thread.sleep(1000);}System.out.println("发送成功....");// 关闭资源channel.close();conn.close();} catch (Exception e) {e.printStackTrace();}}}

消费者01

public class Consumer01 {public static void main(String[] args) {try {System.out.println("======消费者01======");// 获取连接Connection conn = ConnectionUtils.getConnection();//创建通道Channel channel = conn.createChannel();//限制每次只消费一个消息,防止通道中消息阻塞channel.basicQos(1);// 创建队列(有就直接连接。没有则创建)// 队列名称,是否持久化,是否排他,是否自动删除,其他参数channel.queueDeclare("test4072", false, false, false, null);// 消费者消费消息DefaultConsumer consumer = new DefaultConsumer(channel){@Override //一旦有消息进入 将触发public void handleDelivery(String consumerTag,                       Envelope envelope,                      AMQP.BasicProperties properties,byte[] body) throws IOException {String str = "";try {str = new String(body,"utf-8");if(envelope.getDeliveryTag()==3){int i=1/0;}System.out.println(envelope.getDeliveryTag()+"消费者01msg==接收=="+str);//手动应答 处理完了// 手动确认消息,即手动反馈// 第一个参数:消息的序号,// 第二个参数:是否批量,false 单条消息应答 ;当为 true 时批量应答channel.basicAck(envelope.getDeliveryTag(),false);}catch(Exception e){// e.printStackTrace();System.out.println("消费者01处理第"+envelope.getDeliveryTag()+"条,时报错,消息内容为"+str);//手动应答 报错了// 第一个参数:消息的序号,// 第二个参数:是否批量,false 单条消息应答 当为 true 时批量应答// 第三个参数:是否放回队列 ;false丢弃 ,true 放回队列channel.basicNack(envelope.getDeliveryTag(),false,true);}// 休眠一秒钟try {Thread.sleep(1000);} catch (Exception e) {e.printStackTrace();}}};// 监听队列// 参数2:自动应答设为 false; true:开启自动应答channel.basicConsume("test4072",false,consumer);}catch (Exception e) {e.printStackTrace();}}
}

消费者02(同消费者01)

在这里插入图片描述

消息持久化

概念

RabbitMQ 的持久化机制是确保消息和队列在系统崩溃、重启或其他故障情况下不会丢失的关键功能。确保消息不会丢失需要做两件事:将队列和消息都标记为持久化。

配置

持久化队列

// 创建队列,
// 队列名称,是否持久化(队列),是否排他,是否自动删除,其他参数channel.queueDeclare("test4072", true, false, false, null);

消息持久化

// 发送数据
// MessageProperties.PERSISTENT_TEXT_PLAIN:持久化消息
//设置生成者发送消息为持久化信息(要求保存到硬盘上)保存在内存中
//MessageProperties.PERSISTENT_TEXT_PLAIN,指令完成持久化
channel.basicPublish("", "test4072", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());

如果改动队列参数配置,需要删除原有的队列,重新建,因为在 rabbitmq 是不允许重新定义一个已存在的队列。

在这里插入图片描述

生产者

/*** 消息持久化*/
public class Provider01 {public static void main(String[] args) {try {System.out.println("--------生产者-------");// 获取连接Connection conn = ConnectionUtils.getConnection();//创建通道Channel channel = conn.createChannel();//同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 创建队列// 队列名称,是否持久化(队列),是否排他,是否自动删除,其他参数channel.queueDeclare("test4072", true, false, false, null);for (int i = 0; i < 50; i++) {// 定义发送信息String msg = "hello rabbitmq-kwh"+i;// 发送数据// MessageProperties.PERSISTENT_TEXT_PLAIN:持久化消息channel.basicPublish("", "test4072", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());Thread.sleep(1000);}System.out.println("发送成功....");// 关闭资源channel.close();conn.close();} catch (Exception e) {e.printStackTrace();}}
}

10.5 订阅模式

发布订阅(Publish、Subscribe),又根据交换机类型不同分为三种:

  • Fanout Exchange:广播
  • Direct Exchange:路由
  • Topic Exchange:主题

广播模式

在这里插入图片描述

将消息交给所有绑定到交换机的队列,生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。交换机把消息发送给绑定过的所有队列.队列的消费者都能拿到消息。实现一条消息被多个消费者消费.

生产者

/*** 订阅模式---广播*/public class Provider01 {public static void main(String[] args) {try {System.out.println("--------生产者-------");// 获取连接Connection conn = ConnectionUtils.getConnection();//创建通道Channel channel = conn.createChannel();// 创建交换机channel.exchangeDeclare("test4072_x01","fanout");// 创建队列// 参数1;队列名称,// 参数2:是否持久化,// 参数3:是否排他:是否允许其他的connection创建的channel下的channel连接// 参数3:是否自动删除,// 参数4:是否空闲时自动删除// channel.queueDeclare("test4072", false, false, false, null);// 定义发送信息String msg = "hello rabbitmq-kwh";// 发送数据//参数1:交换机名称//参数2://参数3:channel.basicPublish("test4072_x01", "", null, msg.getBytes());System.out.println("发送成功....");// 关闭资源channel.close();conn.close();} catch (Exception e) {e.printStackTrace();}}}

消费者01

public class Consumer01 {public static void main(String[] args) {try {System.out.println("======消费者01======");// 获取连接Connection conn = ConnectionUtils.getConnection();//创建通道Channel channel = conn.createChannel();// 创建队列(有就直接连接。没有则创建)// 队列名称,是否持久化,是否排他,是否自动删除,其他参数channel.queueDeclare("test4072_01", false, false, false, null);//绑定队列到交换机//参数1: 队列名称//参数2:交换机名称// 参数3:channel.queueBind("test4072_01", "test4072_x01", "");// 消费者消费消息DefaultConsumer consumer = new DefaultConsumer(channel){@Override //一旦有消息进入 将触发public void handleDelivery(String consumerTag,                       Envelope envelope,                  AMQP.BasicProperties properties,byte[] body) throws  IOException {String str = new String(body,"utf-8");System.out.println("msg==接收=="+str);channel.basicAck(envelope.getDeliveryTag(),false);}};// 监听队列(订阅消息)//参数2:true:自动应答channel.basicConsume("test4072_01",false,consumer);}catch (Exception e) {e.printStackTrace();}}
}

消费者02

public class Consumer02 {public static void main(String[] args) {try {System.out.println("======消费者02======");// 获取连接Connection conn = ConnectionUtils.getConnection();//创建通道Channel channel = conn.createChannel();// 创建队列(有就直接连接。没有则创建)// 队列名称,是否持久化,是否排他,是否自动删除,其他参数channel.queueDeclare("test4072_02", false, false, false, null);//绑定队列到交换机//参数1: 队列名称//参数2:交换机名称// 参数3:channel.queueBind("test4072_02", "test4072_x01", "");// 消费者消费消息DefaultConsumer consumer = new DefaultConsumer(channel){@Override //一旦有消息进入 将触发public void handleDelivery(String consumerTag,                       Envelope envelope,                   AMQP.BasicProperties properties,byte[] body) throws IOException {String str = new String(body,"utf-8");System.out.println("msg==接收=="+str);channel.basicAck(envelope.getDeliveryTag(),false);}};// 监听队列(订阅消息)//参数2:true:自动应答channel.basicConsume("test4072_02",false,consumer);}catch (Exception e) {e.printStackTrace();}}
}

在这里插入图片描述

路由模式

在这里插入图片描述

在这里插入图片描述

1.在广播模式中,生产者发布消息,所有消费者都可以获取所有消息。

2.在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。在Direct模型下,队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key).消息的发送方在向Exchange发送消息时,也必须指定消息的routing key。

3.P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。

4.X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列

/*** 发布订阅-----路由模式*/public class Publisher01 {public static void main(String[] args) {try {Connection conn = ConnectionUtils.getConnection();Channel channel = conn.createChannel();//处理路由键 directchannel.exchangeDeclare("test4072_x02","direct");// 消息内容String message = "Hello direct!11";channel.basicPublish("test4072_x02","error",null,message.getBytes());System.out.println("发送成功....");channel.close();conn.close();} catch (Exception e) {e.printStackTrace();}}
}

消费者01

public class Consumer01 {public static void main(String[] args) {try {System.out.println("======消费者01======");// 获取连接Connection conn = ConnectionUtils.getConnection();//创建通道Channel channel = conn.createChannel();// 创建队列(有就直接连接。没有则创建)// 队列名称,是否持久化,是否排他,是否自动删除,其他参数channel.queueDeclare("test4072_01", false, false, false, null);channel.basicQos(1);//绑定交换机与队列 放入 keychannel.queueBind("test4072_01","test4072_x02","error");channel.queueBind("test4072_01","test4072_x02","info");// 消费者消费消息DefaultConsumer consumer = new DefaultConsumer(channel){@Override //一旦有消息进入 将触发public void handleDelivery(String consumerTag,                       Envelope envelope,                   AMQP.BasicProperties properties,byte[] body) throws IOException {String str = new String(body,"utf-8");System.out.println("msg==接收=="+str);channel.basicAck(envelope.getDeliveryTag(),false);}};// 监听队列(订阅消息)//参数2:true:自动应答channel.basicConsume("test4072_01",false,consumer);}catch (Exception e) {e.printStackTrace();}}
}

消费者02

public class Consumer02 {public static void main(String[] args) {try {System.out.println("======消费者02======");// 获取连接Connection conn = ConnectionUtils.getConnection();//创建通道Channel channel = conn.createChannel();// 创建队列(有就直接连接。没有则创建)// 队列名称,是否持久化,是否排他,是否自动删除,其他参数channel.queueDeclare("test4072_02", false, false, false, null);channel.basicQos(1);//绑定交换机与队列 放入 keychannel.queueBind("test4072_02","test4072_x02","info");// 消费者消费消息DefaultConsumer consumer = new DefaultConsumer(channel){@Override //一旦有消息进入 将触发public void handleDelivery(String consumerTag,              Envelope envelope,                      AMQP.BasicProperties properties,byte[] body) throws IOException {String str = new String(body,"utf-8");System.out.println("msg==接收=="+str);channel.basicAck(envelope.getDeliveryTag(),false);}};// 监听队列(订阅消息)//参数2:true:自动应答channel.basicConsume("test4072_02",false,consumer);}catch (Exception e) {e.printStackTrace();}}
}

在这里插入图片描述

主题模式(通配符模式)

在路由模式的基础上多了一个正则

1.Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!

2.Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

3.通配符规则:

#:匹配一个或多个词

*:匹配恰好1个词

# 匹配一个或多个 (user.msg.1)

* 匹配一个 (user.goods) user.* user.login.1002

在这里插入图片描述

在这里插入图片描述

生产者

/*** 发布订阅-----主题模式*/public class Publisher01 {public static void main(String[] args) {try {Connection conn = ConnectionUtils.getConnection();Channel channel = conn.createChannel();//处理路由键 directchannel.exchangeDeclare("test4072_x02","topic");// 消息内容String message = "Hello direct!11";channel.basicPublish("test4072_x02","error.1001",null,message.getBytes());System.out.println("发送成功....");channel.close();conn.close();} catch (Exception e) {e.printStackTrace();}}
}

消费者01

public class Consumer01 {public static void main(String[] args) {try {System.out.println("======消费者01======");// 获取连接Connection conn = ConnectionUtils.getConnection();//创建通道Channel channel = conn.createChannel();// 创建队列(有就直接连接。没有则创建)// 队列名称,是否持久化,是否排他,是否自动删除,其他参数channel.queueDeclare("test4072_01", false, false, false, null);channel.basicQos(1);//绑定交换机与队列 放入 keychannel.queueBind("test4072_01","test4072_x02","error.*");channel.queueBind("test4072_01","test4072_x02","info");// 消费者消费消息DefaultConsumer consumer = new DefaultConsumer(channel){@Override //一旦有消息进入 将触发public void handleDelivery(String consumerTag,                      Envelope envelope,                   AMQP.BasicProperties properties,byte[] body)throws  IOException {String str = new String(body,"utf-8");System.out.println("msg==接收=="+str);channel.basicAck(envelope.getDeliveryTag(),false);}};// 监听队列(订阅消息)//参数2:true:自动应答channel.basicConsume("test4072_01",false,consumer);}catch (Exception e) {e.printStackTrace();}}
}

消费者02

public class Consumer02 {public static void main(String[] args) {try {System.out.println("======消费者02======");// 获取连接Connection conn = ConnectionUtils.getConnection();//创建通道Channel channel = conn.createChannel();// 创建队列(有就直接连接。没有则创建)// 队列名称,是否持久化,是否排他,是否自动删除,其他参数channel.queueDeclare("test4072_02", false, false, false, null);channel.basicQos(1);//绑定交换机与队列 放入 keychannel.queueBind("test4072_02","test4072_x02","info");channel.queueBind("test4072_01","test4072_x02","error");// 消费者消费消息DefaultConsumer consumer = new DefaultConsumer(channel){@Override //一旦有消息进入 将触发public void handleDelivery(String consumerTag,                     Envelope envelope,                        AMQP.BasicProperties properties,byte[] body) throws IOException {String str = new String(body,"utf-8");System.out.println("msg==接收=="+str);channel.basicAck(envelope.getDeliveryTag(),false);}};// 监听队列(订阅消息)//参数2:true:自动应答channel.basicConsume("test4072_02",false,consumer);}catch (Exception e) {e.printStackTrace();}}
}

在这里插入图片描述

10.6 消息确认机制

10.7 SpringBoot集成MQ

简单队列

导入依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置MQ

spring:rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guest

生产者

@RestController
@RequestMapping("/index")
public class IndexController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/send")public String send() {rabbitTemplate.convertAndSend("test_4072","hellokwh");return "发送成功...";}}

监听队列

//监听队列
@Component
public class BootRabbitMQListener {@RabbitListener(queuesToDeclare = @Queue("test_4072"))public void onMessage01(String message) {System.out.println("消费者01: " + message);}}

在这里插入图片描述

工作队列

@RestController
@RequestMapping("/index")
public class IndexController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/send")public String send() {rabbitTemplate.convertAndSend("test_4072","hellokwh");return "发送成功...";}}
//监听队列
@Component
public class BootRabbitMQListener {@RabbitListener(queuesToDeclare = @Queue("test_4072"))public void onMessage01(String message) {System.out.println("消费者01: " + message);}@RabbitListener(queuesToDeclare = @Queue("test_4072"))public void onMessage02(String message) {System.out.println("消费者02: " + message);}
}

在这里插入图片描述

公平分发

需要先设置成手动反馈

spring:rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guestlistener:simple:acknowledge-mode: manual #手动反馈
@RestController
@RequestMapping("/index")
public class IndexController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/send")public String send() {for (int i = 0; i < 20; i++) {rabbitTemplate.convertAndSend("test_4072","hellokwh"+i);}return "发送成功...";}}
//监听队列
@Component
public class BootRabbitMQListener {@RabbitListener(queuesToDeclare = @Queue("test_4072"))public void onMessage01(Message message, Channel channel) {String msg = "";try {msg = new String(message.getBody(), "utf-8");System.out.println("消费者01: " + msg);// 手动反馈channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {e.printStackTrace();try {// 放回队列channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);} catch (Exception e1) {e1.printStackTrace();}}finally {try {// 休眠一秒钟Thread.sleep(1000);} catch (Exception e) {e.printStackTrace();}}}@RabbitListener(queuesToDeclare = @Queue("test_4072"))public void onMessage02(Message message, Channel channel) {String msg = "";try {msg = new String(message.getBody(), "utf-8");System.out.println("消费者02: " + msg);// 手动反馈channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {e.printStackTrace();try {// 放回队列channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);} catch (Exception e1) {e1.printStackTrace();}}finally {try {// 休眠一秒钟Thread.sleep(3000);} catch (Exception e) {e.printStackTrace();}}}}

在这里插入图片描述

传对象

//对象实体类
public class UserGoods implements Serializable {private Long  goodsId;private String goodsName;private Long  userId;
}
@RestController
@RequestMapping("/index")
public class IndexController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/send")public String send() {UserGoods userGoods = new UserGoods(1L, "猕猴桃", 1L);rabbitTemplate.convertAndSend("test_4072",userGoods);return "发送成功...";}}
@Component
public class BootRabbitMQListener {@RabbitListener(queuesToDeclare = @Queue("test_4072"))public void onMessage01(Message message, Channel channel) {String msg = "";try {//获取传过来的对象UserGoods userGoods = (UserGoods) SerializationUtils.deserialize(message.getBody());//调用数据库获取商品ID// 手动反馈channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);System.out.println("消费者01"+userGoods.getUserId()+"购买了商品"+userGoods.getGoodsId()+"--"+userGoods.getGoodsName());} catch (Exception e) {e.printStackTrace();try {// 放回队列channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);} catch (Exception e1) {e1.printStackTrace();}}finally {try {// 休眠一秒钟Thread.sleep(3000);} catch (Exception e) {e.printStackTrace();}}}@RabbitListener(queuesToDeclare = @Queue("test_4072"))public void onMessage02(Message message, Channel channel) {String msg = "";try {//获取传过来的对象UserGoods userGoods = (UserGoods) SerializationUtils.deserialize(message.getBody());//调用数据库获取商品ID// 手动反馈channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);System.out.println("消费者02"+userGoods.getUserId()+"购买了商品"+userGoods.getGoodsId()+"--"+userGoods.getGoodsName());} catch (Exception e) {e.printStackTrace();try {// 放回队列channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);} catch (Exception e1) {e1.printStackTrace();}}finally {try {// 休眠一秒钟Thread.sleep(3000);} catch (Exception e) {e.printStackTrace();}}}}
@Configuration
public class RabbitMQConfig {// 创建队列@Beanpublic Queue queue() {return new Queue("test_4072");}// @Bean// public Queue myQueue() {//     Queue  queue =  QueueBuilder.nonDurable("myNonDurableQueue").autoDelete().build();//     return queue;//     // return new QueueBuilder.nonDurable("myNonDurableQueue").build();// }}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/65530.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Excel for Finance 07 `FV PV` 函数

Excel 的 FV 函数用于计算一笔投资在未来的价值&#xff0c;基于固定的利率和定期付款。这是一个金融函数&#xff0c;常用来分析储蓄计划、贷款、或投资的增长。 语法&#xff1a; FV(rate, nper, pmt, [pv], [type])参数说明&#xff1a; rate&#xff08;必需&#xff09;&…

React(二)——注册页/登录页/Reducer/

文章目录 项目地址一、使用Yarn安装所有环境二、文件结构以及路由配置三、登录和注册3.1 注册页面3.1.1 静态页面3.1.2 表单提交useSate3.2 登录页面3.3 admin 的登录页面四、关于auth登录和注册的Reducer4.1 authReducer创建4.2 根rootReducer的创建4.3 创建和配置Redux的stor…

每天五分钟深度学习框架pytorch:越来越深的卷积神经网络模型VGG

本文重点 前面我们使用pytorch搭建了卷积神经网络LeNet-5,AlexNet,本文我们学习卷积神经网络VGG,VGG相比于前面的两个神经网络而言比较深,我们知道网络模型越深那么就难以训练,但是VGG效果比较好。 Vgg使用了更小的滤波器,同时使用了更深的网络结构,AlexNet只有8层网络结…

小程序配置文件 —— 12 全局配置 - pages配置

全局配置 - pages配置 在根目录下的 app.json 文件中有一个 pages 字段&#xff0c;这里我们介绍一下 pages 字段的具体用法&#xff1b; pages 字段&#xff1a;用来指定小程序由哪些页面组成&#xff0c;用来让小程序知道由哪些页面组成以及页面定义在哪个目录&#xff0c;…

从0到100:基于Java的大学选修课选课小程序开发笔记(上)

背景 为学生提供便捷的课程选择方式&#xff0c;并帮助学校进行课程管理和资源调配&#xff1b;主要功能包括&#xff1a;课程展示&#xff0c;自主选课&#xff0c;取消选课&#xff0c;后台录入课程&#xff0c;统计每门课程报名情况&#xff0c;导出数据&#xff0c;用户管…

Dify服务器部署教程

Dify的github地址: https://github.com/langgenius/dify 服务器要求&#xff1a;2c4g 1、克隆仓库 可以通过命令或者下载zip解压后上传服务器都行 git clone https://github.com/langgenius/dify.git 2、docker启动 cd dify/dockercp .env.example .envdocker compose up -d…

Mac 12.1安装tiger-vnc问题-routines:CRYPTO_internal:bad key length

背景&#xff1a;因为某些原因需要从本地mac连接远程linxu桌面查看一些内容&#xff0c;必须使用桌面查看&#xff0c;所以ssh无法满足&#xff0c;所以决定安装vnc客户端。 问题&#xff1a; 在mac上通过 brew install tiger-vnc命令安装, 但是报错如下&#xff1a; > D…

大模型WebUI:Gradio全解系列9——Additional Features:附加功能(上)

大模型WebUI&#xff1a;Gradio全解系列9——Additional Features&#xff1a;附加功能&#xff08;上&#xff09; 前言本篇摘要9. Additional Features&#xff1a;附加功能9.1 队列9.1.1 使用方法9.1.2 配置队列演示 9.2 输入输出流9.2.1 输出流1. 生成器yield2. 流媒体 9.2…

Java - 日志体系_Apache Commons Logging(JCL)日志接口库_桥接Logback 及 源码分析

文章目录 PreApache CommonsApache Commons ProperLogging &#xff08;Apache Commons Logging &#xff09; JCL 集成logbackPOM依赖配置文件 logback.xml使用 源码分析jcl-over-slf4j 的工作原理1. LogFactory 的实现2. SLF4JLogFactory 和 Log 的实例化过程3. SLF4JLog 和 …

文档大师:打造一站式 Word 报告解决方案1

前言 在政府、医院、银行、财务以及销售等领域&#xff0c;常常需要创建各种报告文件来展开工作汇报&#xff0c;譬如季度销售报告、年度总结报告、体检报告和保险合同等。在没有报表工具支持之前&#xff0c;这类报告主要通过 Word 制作&#xff0c;费时费力且难以维护&#…

阿尔萨斯(JVisualVM)JVM监控工具

文章目录 前言阿尔萨斯(JVisualVM)JVM监控工具1. 阿尔萨斯的功能2. JVisualVM启动3. 使用 前言 如果您觉得有用的话&#xff0c;记得给博主点个赞&#xff0c;评论&#xff0c;收藏一键三连啊&#xff0c;写作不易啊^ _ ^。   而且听说点赞的人每天的运气都不会太差&#xff…

<数据集>芝麻作物和杂草识别数据集<目标检测>

数据集下载链接 &#xff1c;数据集&#xff1e;芝麻作物和杂草识别数据集&#xff1c;目标检测&#xff1e;https://download.csdn.net/download/qq_53332949/90181548数据集格式&#xff1a;VOCYOLO格式 图片数量&#xff1a;1300张 标注数量(xml文件个数)&#xff1a;130…

STM32-笔记18-呼吸灯

1、实验目的 使用定时器 4 通道 3 生成 PWM 波控制 LED1 &#xff0c;实现呼吸灯效果。 频率&#xff1a;2kHz&#xff0c;PSC71&#xff0c;ARR499 利用定时器溢出公式 周期等于频率的倒数。故Tout 1/2KHZ&#xff1b;Ft 72MHZ PSC71&#xff08;喜欢设置成Ft的倍数&…

JVM实战—4.JVM垃圾回收器的原理和调优

大纲 1.JVM的新生代垃圾回收器ParNew如何工作 2.JVM老年代垃圾回收器CMS是如何工作的 3.线上部署系统时如何设置垃圾回收相关参数 4.新生代垃圾回收参数如何优化 5.老年代的垃圾回收参数如何优化 6.问题汇总 1.JVM的新生代垃圾回收器ParNew如何工作 (1)JVM的核心运行原理…

E-commerce .net+React(一)——项目初始化

文章目录 项目地址一、创建.Net环境1.1环境配置1.1.1 使用vscode创建webapi1.1.2 Clean architecture结构创建1.1.3 将创建好结构的项目添加到git里1.1.4 EF Core配置1. 在infrastructure里安装EF所需环境2. 创建Product数据模型3. 创建EF Core的DbContext 数据库上下文4. 创建…

elasticsearch-java客户端jar包中各模块的应用梳理

最近使用elasticsearch-java客户端实现对elasticsearch服务的Api请求&#xff0c;现对elasticsearch-java客户端jar包中各模块的应用做个梳理。主要是对co.elastic.clients.elasticsearch路径下的各子包的简单说明。使用的版本为&#xff1a;co.elastic.clients:elasticsearch-…

vscode实用插件(持续更新)

目录 Git History Diff Git Graph Error Lens Git History Diff 用于将当前分支的某个文件夹与远程分支的相同文件夹做对比&#xff0c;方便代码评审&#xff01;解决了为了一个问题而多次commit&#xff0c;导致代码不好评审&#xff0c;即不晓得和远程分支相比&#xff0…

full-stack-fastapi-template postgres 管理系统安装指南

full-stack-fastapi-template postgres 管理系统安装指南 本项目基于 full-stack-fastapi-template 模板开发 1. 环境准备 请确保您的系统已安装以下软件&#xff1a; Python 3.9 Node.js 16 PostgreSQL 13 Git Docker (可选&#xff0c;用于容器化部署) 2. 获取代码 # 2.…

Prompt提示工程上手指南(七)Prompt编写实战-基于智能客服问答系统下的Prompt编写

前言 本系列文章从最初的基础原理与入门实践切入&#xff0c;一直延伸到主流策略、引导策略、RAG&#xff08;检索增强生成&#xff09;、思维树&#xff08;ToT&#xff09;与避免幻觉&#xff08;Hallucination&#xff09;的策略这种渐进的结构方便了对初学者和进阶者的双向…

mac系统vsCode中使用Better Comments在.vue文件里失效

问题&#xff1a;关于Better Comments默认在html、TS、JS中有效&#xff0c;在vue中无效,需要单独进行配置 windows系统可以参考友链Better Comments&#xff08;注释高亮&#xff09;在vue文件里失效的问题 关于Better Comments电脑的配置路径&#xff1a; Windows系统&…