001 rabbitmq减库存demo direct

文章目录

    • Producer
    • Consumer
    • RabbitMQDirectConfig.java
    • application.yaml
    • ServletInitializer.java
    • RabbitmqApplication.java
    • pom.xml
    • “该消息已经准备好再次被消费”和“队列已经准备好重新分发该消息”

Producer

这段代码定义了一个名为Producer的RESTful Web服务Controller,它提供了一个HTTP GET接口/direct/sendMsg,用于发送消息到RabbitMQ的交换机。当该接口被调用时,它会发送一个固定的消息字符串到名为myDirectExchangeAAA的交换机,并使用keyAAA作为RoutingKey。发送成功后,它会返回一个表示成功的字符串。


// 声明一个包名,用于组织和管理Java类。  
package com.example.direct;  // 导入Spring框架中RabbitMQ相关的RabbitTemplate类,它提供了发送和接收消息的方法。  
import org.springframework.amqp.rabbit.core.RabbitTemplate;  // 导入Spring框架的自动装配注解,用于自动注入依赖。  
import org.springframework.beans.factory.annotation.Autowired;  // 导入Spring Web模块的注解,用于映射HTTP GET请求到特定的处理方法。  
import org.springframework.web.bind.annotation.GetMapping;  // 导入Spring Web模块的注解,用于定义Controller类的请求映射路径。  
import org.springframework.web.bind.annotation.RequestMapping;  // 导入Spring Web模块的注解,用于标识一个类为RESTful Web服务的Controller。  
import org.springframework.web.bind.annotation.RestController;  // 使用@RestController注解标识该类为RESTful Web服务的Controller,  
// 意味着此类将处理HTTP请求并返回数据。  
@RestController  // 使用@RequestMapping注解定义该Controller的基础路径为"direct"。  
@RequestMapping("direct")  // 声明一个名为Producer的公共类。  
public class Producer {  // 使用@Autowired注解自动注入RabbitTemplate的实例,  // 以便在类中使用RabbitMQ的功能。  @Autowired  private RabbitTemplate rabbitTemplate;  // 使用@GetMapping注解映射HTTP GET请求到sendMsg方法,  // 当访问"/direct/sendMsg"路径时,将调用此方法。  @GetMapping("sendMsg")  // 声明一个公共的sendMsg方法,该方法不接收任何参数,并返回一个字符串。  public String sendMsg(){  // 定义一个要发送的消息字符串。  String msg = "已经生成了订单,需要减去库存1个";  // 使用RabbitTemplate的convertAndSend方法发送消息到RabbitMQ交换机。  // 第一个参数是交换机的名称("myDirectExchangeAAA"),  // 第二个参数是RoutingKey("keyAAA"),用于确定消息应该路由到哪个队列,  // 第三个参数是要发送的消息内容(msg)。  rabbitTemplate.convertAndSend("myDirectExchangeAAA","keyAAA",msg);  // 返回一个表示消息发送成功的字符串。  return "send msg ok";  }  
}

Consumer


package com.example.direct; // 声明包名为com.example.direct。  import com.rabbitmq.client.Channel; // 导入RabbitMQ的Channel类,它代表了一个通信信道。  import org.springframework.amqp.core.Message; // 导入Spring AMQP的Message类,表示一条消息。  import org.springframework.amqp.rabbit.annotation.RabbitHandler; // 导入RabbitHandler注解,标识处理RabbitMQ消息的方法。  import org.springframework.amqp.rabbit.annotation.RabbitListener; // 导入RabbitListener注解,用于监听RabbitMQ队列。  import org.springframework.stereotype.Component; // 导入Spring的Component注解,标识该类为Spring的一个组件。  import java.io.IOException; // 导入Java的IOException类,处理可能的输入输出异常。  // 使用@Component注解将该类声明为Spring的一个组件,这样Spring会自动扫描并管理它。  
@Component   
public class Consumer { // 声明一个公共类Consumer。  // 使用@RabbitHandler注解标识该方法为处理RabbitMQ消息的方法。  // 使用@RabbitListener注解监听名为"queueAAA"的队列。  @RabbitHandler   @RabbitListener(queues = "queueAAA")   public void getMSg1(Message message, Channel channel){ // 定义一个公共方法getMSg1,接收一个Message和一个Channel作为参数。  try {  System.out.println("模拟库存业务处理减库存:" + message); // 打印接收到的消息。  Integer stock = 3;  int number = 10;   number -= stock; // 消息确认:立马删除 消息。这是RabbitMQ的消息确认机制,确保消息被正确处理后可以安全删除。  channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);   System.out.println("减库存业务执行结束,队列消息已删除"); // 打印消息表示业务处理完毕且消息已被删除。  } catch (Exception e) { // 捕获所有异常。  try {  System.out.println("减库存业务有异常,消息重入队列"); // 打印异常信息。  // 当处理消息时发生异常,可以选择将消息重新放回队列以供后续处理。  // 这里basicReject的第二个参数为true,表示消息将重新入队。  channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);   } catch (IOException ioException) { // 捕获可能的IO异常。  ioException.printStackTrace(); // 打印IO异常的堆栈信息。  }  e.printStackTrace(); // 打印原始异常的堆栈信息。  }  }  
}

RabbitMQDirectConfig.java

package com.example.direct;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQDirectConfig {//    1. 创建交换机@Beanpublic DirectExchange newDirectExchange(){return new DirectExchange("myDirectExchangeAAA",true,false);}//2. 创建队列@Beanpublic Queue newQueueA(){return new Queue("queueAAA",true);}//3. 绑定队列到交换机中@Beanpublic Binding bindingA(){return BindingBuilder.bind(newQueueA()).to(newDirectExchange()).with("keyAAA");}}

application.yaml


server:servlet:context-path: /app
spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestpublisher-confirm-type: correlated  # 确认交换机已经接收到生产者的消息了publisher-returns: true   #  消息已经到了队列(交换机与队列绑定成功的)listener:simple:acknowledge-mode: manual # 手动消息确认

ServletInitializer.java


package com.example;import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;public class ServletInitializer extends SpringBootServletInitializer {@Overrideprotected SpringApplicationBuilder configure(SpringApplicationBuilder application) {return application.sources(RabbitmqApplication.class);}}

RabbitmqApplication.java


package com.example;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class RabbitmqApplication {public static void main(String[] args) {SpringApplication.run(RabbitmqApplication.class, args);}}

pom.xml


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.6</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.example</groupId><artifactId>rabbitmq</artifactId><version>0.0.1-SNAPSHOT</version><packaging>war</packaging><name>rabbitmq</name><description>rabbitmq</description><properties><java.version>1.8</java.version></properties><dependencies><!-- AMQP客户端 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.4.1</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-tomcat</artifactId><scope>provided</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

扣库存业务处理成功,消息确认 ACK
当扣库存业务处理成功后,消费者(即处理扣库存业务的系统或服务)会向消息队列发送一个确认消息(ACK)。
这个ACK消息是告知消息队列,该条消息已经被成功处理,可以从队列中删除。
RabbitMQ等消息队列在收到ACK后,会将该条消息从队列中移除,确保它不会被再次分发。

扣库存业务处理成功,没有消息确认 uAC
如果扣库存业务处理成功,但没有发送ACK消息进行确认,这种情况通常是不正常的
在RabbitMQ的默认设置中,如果消费者没有显式地发送ACK,消息队列会认为该消息未被成功处理。
这可能导致消息被重新放入队列(requeue),以供其他消费者尝试处理,或者在某些配置下,可能会导致消息被丢弃或转入死信队列。
则意味着系统需要处理这种未确认的情况,可能通过重试、日志记录或警报来确保消息不会丢失。

扣库存失败,消息拒绝,重入队列 ready 1
当扣库存业务处理失败时,消费者可以选择拒绝该消息,并指示消息队列重新将该消息放入队列以供后续处理。
在RabbitMQ中,这通常通过调用basicReject方法实现,并设置requeue参数为true。
“ready 1”表示队列已经准备好重新分发该消息。具体含义可能依赖于您的系统实现和上下文。
重新入队列的消息将有机会被其他消费者或同一消费者(在后续轮询中)再次尝试处理。

“该消息已经准备好再次被消费”和“队列已经准备好重新分发该消息”

在RabbitMQ中,当说到“该消息已经准备好再次被消费”和“队列已经准备好重新分发该消息”时,虽然两者在某种程度上是相关的,但它们指的是不同的状态或情况。以下是对这两个表述的详细解释:

  1. 该消息已经准备好再次被消费
    含义:这通常意味着某个特定的消息在被消费者拒绝后,已经被重新放回到队列中,并处于可消费状态。换句话说,这个消息现在可以被任何连接到该队列的消费者再次获取和处理。
    背景:在RabbitMQ中,当消费者处理消息失败时,它可以选择拒绝该消息,并通过设置requeue参数来决定是否将消息重新放回队列。如果requeue设置为true,则消息会被重新放回队列的尾部,等待下一次的消费。
    影响:这个消息会再次出现在队列中,供其他消费者或同一消费者在未来的某个时间点再次消费。这有助于实现消息的可靠传递,确保即使处理失败,消息也不会丢失。
  2. 队列已经准备好重新分发该消息
    含义:这通常指的是队列本身的状态。当队列中有消息等待被消费时,我们可以说这个队列已经准备好分发或重新分发消息。这并不一定意味着某个特定的消息已经被重新放回队列,而是指队列整体处于可以分发消息的状态。
    背景:在RabbitMQ中,队列是存储和分发消息的中间件。当生产者发送消息到队列时,这些消息会被存储在队列中,等待消费者来消费。如果消费者处理消息失败并将其重新放回队列,队列会再次将这个消息添加到其分发列表中。
    影响:当队列处于这种状态时,任何连接到该队列的消费者都有可能接收到这个消息(或其他等待中的消息)。队列会按照其配置(如轮询、优先级等)来决定下一个接收消息的消费者。
    总结
    “该消息已经准备好再次被消费”关注的是某个特定消息的状态,即该消息已被重新放回队列并可供再次消费。
    “队列已经准备好重新分发该消息”则关注的是队列整体的状态,即队列中有消息等待被分发,这些消息可能包括之前被拒绝并重新放回的消息,也可能包括新进入队列的消息。
    在RabbitMQ的实际应用中,理解和区分这两种状态对于确保消息的可靠传递和系统的稳定运行至关重要。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/4180.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

iOS(Object C) 插入排序

插入排序的思想: 可以想象你在打牌,手里有一张牌2, 第一次摸到一张牌5; 5 比1 大,所以摸到的牌5放在1的右边; (此时手里的牌为 2->5) 第二次摸到一张牌3; 3比5小,所以3和5互换位置,再拿3和2比,3比2大,3不动(此时手里的牌为 2-> 3 -> 5) 第三次摸到一张牌1,1比5小,…

Windows10系统安装IIS的步骤

在Windows 10上安装IIS的步骤如下&#xff1a;12 打开控制面板&#xff0c;选择“程序”或“程序和功能”。点击“启用或关闭Windows功能”。在列表中找到“Internet Information Services”&#xff0c;勾选该选项。根据需要勾选IIS的具体组件&#xff0c;如万维网服务、IIS可…

创新指南 | 2024年企业如何十步打造最佳的数字化营销策略组合

营销是一个动态且不断变化的领域。顶级的数字营销策略随着消费者和技术趋势的变化而变化。这就是为什么每个公司都需要一个经过良好规划并具有明确里程碑和目标的营销策略。一旦你有了正确的计划&#xff0c;你实现为业务设定的目标的可能性就会大大增加。这意味着&#xff0c;…

面试经典150题——求根节点到叶节点数字之和

​ 1. 题目描述 2. 题目分析与解析 2.1 思路一——DFS 理解问题&#xff1a; 首先要理解题目的要求&#xff0c;即对于给定的二叉树&#xff0c;我们需要找出从根节点到所有叶子节点的所有路径&#xff0c;然后将每一条路径上的数字组成一个整数&#xff0c;最后求出这些整数…

k8s部署prometheus

部署pvc 修改nfs-deployment.yaml文件中的信息&#xff0c;然后应用YAML文件 cat > /opt/k8s/prometheus/prometheus-pvc.yaml <<EOF apiVersion: v1 kind: PersistentVolumeClaim metadata:name: prometheus-data-pvc spec:accessModes:- ReadWriteManystorageClas…

京东天幕,宙斯,云鼎什么关系

京东云鼎是京东云推出的一站式零售应用云计算平台&#xff0c;为商家、品牌商及合作伙伴与京东无界赋能的商业赋能、业务赋能、品牌赋能等服务集成&#xff0c;提供弹性的云计算资源、可靠的安全服务、高效的数据推送服务、标准化的网关服务&#xff0c;并优先享受京东的云端生…

手把手教数据结构与算法:栈的应用(平衡符号和简单计算器)

栈 基本概念 栈的定义 栈&#xff08;Stack&#xff09;&#xff1a;是只允许在一端进行插入或删除的线性表。首先栈是一种线性表&#xff0c;但限定这种线性表只能在某一端进行插入和删除操作。 栈顶&#xff08;Top&#xff09;&#xff1a;线性表允许进行插入删除的那一端…

@Value

Value 注解是 Spring 框架中的一个注解&#xff0c;用于从属性文件、环境变量、Java 系统属性等地方读取值&#xff0c;并将这些值注入到 Spring 管理的 Bean 中。 Component public class MyBean {Value("${my.property}")private String myProperty;// Getter and…

Docker常用命令(镜像、容器)

一、镜像 1.1 存出镜像 1.2 载入镜像 1.3 上传镜像 二、容器 2.1 容器创建 2.2 查看容器的运行状态 ​2.3 启动容器 2.4 创建并启动容器 2.5 在后台持续运行 docker run 创建的容器 2.6 终止容器运行 2.7 容器的进入 ​2.8把宿主机的文件传入到容器内部 2.9 从容器…

debian gnome-desktop GUI(图形用户界面)系统

目录 &#x1f31e;更新 &#x1f3a8;安装 &#x1f34e;分配 &#x1f6cb;️重启 &#x1f511;通过VNC连接 debian gnome-desktop &#x1f31e;更新 sudo apt update sudo apt -y upgrade &#x1f3a8;安装 sudo apt -y install task-gnome-desktop 这个过程比…

前端发版缓存问题

前端发版后浏览器缓存问题 浏览器缓存机制是为了提高网页加载速度和减少带宽消耗而设计的。当浏览器访问一个资源时&#xff0c;它会首先检查该资源是否已经在缓存中。如果资源存在且未过期&#xff0c;浏览器会直接从缓存中加载资源&#xff0c;而不会向服务器发送请求。这种…

pytest-asyncio:协程异步测试案例

简介&#xff1a;pytest-asyncio是一个pytest插件。它便于测试使用异步库的代码。具体来说&#xff0c;pytest-asyncio提供了对作为测试函数的协同程序的支持。这允许用户在测试中等待代码。 历史攻略&#xff1a; asyncio并发访问websocket Python&#xff1a;协程 - 快速创…

ROS1快速入门学习笔记 - 06订阅者Subscriber的实现

一、话题模型&#xff08;发布/订阅&#xff09; 二、 实现步骤 与发布者步骤类似&#xff0c;我们将发布者的对应代码写入功能包的src文件中。 1. C程序代码 /*********************************************************************** Copyright 2020 GuYueHome (www.guyu…

SpringBoot学习之SpringBoot3集成OpenApi(三十八)

Springboot升级到Springboot3以后,就彻底放弃了对之前swagger的支持,转而重新支持最新的OpenApi,今天我们通过一个实例初步看看OpenApi和Swagger之间的区别. 一、POM依赖 我的POM文件如下,仅作参考: <?xml version="1.0" encoding="UTF-8"?>…

Openharmony - 设备异常关机Power Down问题分析

By: fulinux E-mail: fulinux@sina.com Blog: https://blog.csdn.net/fulinus 喜欢的盆友欢迎点赞和订阅! 你的喜欢就是我写作的动力! 目录 1.问题描述1.1出现power down的原因1.1.1硬件故障或信号1.1.2软件错误或系统崩溃2.抓日志信息2.1.抓日志方法2.2.问题初步分析3.问题排…

商城数据库(49-52)

49——订单ID表&#xff08;wang_orderids&#xff09; CREATE TABLE wang_orderids (id bigint(11) NOT NULL AUTO_INCREMENT COMMENT 自增ID,rnd float(16,2) NOT NULL COMMENT 毫秒数,PRIMARY KEY (id) ) ENGINEInnoDB DEFAULT CHARSETutf8 COMMENT订单ID表; 50——订单表…

第三方登录以及微信小程序一键登录的实现方案

文章目录 场景解决 场景 第三方登录以及微信小程序一键登录的实现方案, 作下记录 解决 第三方登录 JustAuth微信小程序一件登录 wxjava

【数据结构与算法】:手搓顺序表(Python篇)

文章目录 一、顺序表的概念二、顺序表的实现1. 顺序表的创建1.1 扩容1.2 整体建立顺序表 2. 顺序表的基本运算算法2.1 顺序表的添加&#xff08;尾插&#xff09;2.2 指定位置插入2.3 指定位置删除2.4 顺序表的查找2.5 顺序表元素的索引访问2.6 顺序表元素的修改2.7 顺序表长度…

《Kafka 3.x.x 入门到精通》

Kafka 3.x.x 入门到精通 Kafka是一个由Scala和Java语言开发的&#xff0c;经典高吞吐量的分布式消息发布和订阅系统&#xff0c;也是大数据技术领域中用作数据交换的核心组件之一。以高吞吐&#xff0c;低延迟&#xff0c;高伸缩&#xff0c;高可靠性&#xff0c;高并发&#x…

两大成果发布!“大规模量子云算力集群”和高性能芯片展示中国科技潜力

在当前的科技领域&#xff0c;量子计算的进步正日益引起全球的关注。中国在这一领域的进展尤为显著&#xff0c;今天&#xff0c;北京量子信息科学研究院&#xff08;以下简称北京量子院&#xff09;和中国科学院量子信息与量子科技创新研究院&#xff08;以下简称量子创新院&a…