rabbitMq举例

新来个技术总监,把 RabbitMQ 讲的那叫一个透彻,佩服!

生产者

代码举例

public String sendMsg(final String exchangeName,final String routingKey,final String msg) {}

/*** 发送消息* @param exchangeName	exchangeName* @param routingKey	routingKey* @param msg	mq message* @return msgId*/public String sendMsg(final String exchangeName,final String routingKey,final String msg) {final CorrelationData correlationDataInfo = new CorrelationData();final String msgId = String.join(":", "yl:dps", exchangeName, routingKey,String.valueOf(IdUtil.getSnowflake(1, 3).nextId()));correlationDataInfo.setId(msgId);//send over callback lograbbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if(correlationData!=null) {final String id = correlationData.getId();if (ack) {log.info("消息投递成功,消息Id[{}]【{}】", id, JSON.toJSONString(correlationData));} else {log.error("消息投递失败,消息Id[{}] [{}]", id, cause);}}else {log.error("消息投递失败,correlationData为null!");}});rabbitTemplate.convertAndSend(exchangeName, routingKey, msg, correlationDataInfo);

分析代码

这段代码的功能是向 RabbitMQ 发送消息,并且在发送消息后通过回调函数来确认消息的投递状态。下面是对代码的详细分析:

1. 方法签名

java

public String sendMsg(final String exchangeName, final String routingKey, final String msg)

  • 该方法接收三个参数:
    • exchangeName:消息发送的交换机名称。
    • routingKey:消息的路由键,用来确定消息将被路由到哪个队列。
    • msg:待发送的消息内容。

2. CorrelationData 创建与消息 ID 设置

java

final CorrelationData correlationDataInfo = new CorrelationData(); final String msgId = String.join(":", "yl:dps", exchangeName, routingKey, String.valueOf(IdUtil.getSnowflake(1, 3).nextId())); correlationDataInfo.setId(msgId);

  • CorrelationData:RabbitMQ 提供的一个类,用于设置和传递消息的相关信息,主要用来在消息确认回调中标识消息。
  • 生成 msgIdmsgId 是通过拼接一个前缀 "yl:dps"、交换机名 exchangeName、路由键 routingKey,以及通过 IdUtil.getSnowflake(1, 3).nextId() 生成的雪花算法(Snowflake)ID来创建的唯一标识符。这个 ID 用来唯一标识这条消息。
  • setId(msgId):将生成的 msgId 设置到 correlationDataInfo 对象中。

3. 设置回调函数

java

rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if(correlationData != null) { final String id = correlationData.getId(); if (ack) { log.info("消息投递成功,消息Id[{}]【{}】", id, JSON.toJSONString(correlationData)); } else { log.error("消息投递失败,消息Id[{}] [{}]", id, cause); } } else { log.error("消息投递失败,correlationData为null!"); } });

  • setConfirmCallback:通过 rabbitTemplate.setConfirmCallback() 方法设置消息确认回调函数。在消息发送之后,RabbitMQ 会调用该回调函数来通知消息是否成功投递到交换机。
  • 参数说明
    • correlationData:携带消息相关信息的对象(这里是 correlationDataInfo),包含了消息的 ID 等。
    • ack:布尔值,表示消息是否成功投递(true 表示成功,false 表示失败)。
    • cause:如果 ack 为 falsecause 会包含失败的原因。
  • 回调内容
    • 如果 correlationData 不为空:
      • 如果 ack 为 true,则表示消息投递成功,记录日志。
      • 如果 ack 为 false,则表示消息投递失败,记录失败日志,并包含失败原因。
    • 如果 correlationData 为 null,记录错误日志。

4. 发送消息

 

java

rabbitTemplate.convertAndSend(exchangeName, routingKey, msg, correlationDataInfo);

  • convertAndSend:该方法用于将消息发送到指定的交换机(exchangeName)、路由键(routingKey)和消息内容(msg)。同时,将 correlationDataInfo(包含 msgId)附带发送,以便在回调中进行确认和记录。
  • rabbitTemplate 是 RabbitTemplate 对象,用来发送和接收消息。它是 Spring AMQP 的核心组件,封装了与 RabbitMQ 的交互。

5. 返回消息 ID

return msgId;

  • 返回生成的消息 ID。这个 ID 可以在外部被用于追踪消息投递的状态。

总结:

这段代码的主要作用是:

  • 生成一个唯一的消息 ID,包含了交换机名、路由键和通过 Snowflake 算法生成的 ID。
  • 设置消息确认回调,当消息发送后,RabbitMQ 会通过回调通知消息是否成功投递。
  • 调用 rabbitTemplate.convertAndSend() 方法向指定的交换机发送消息,同时附带 CorrelationData(包含消息 ID)。
  • 返回生成的消息 ID,以便后续追踪或处理。

关键点:

  • 消息确认机制:使用 ConfirmCallback 确保消息是否成功投递,并且通过日志记录投递状态。
  • 消息 ID:通过 Snowflake 算法生成一个唯一的消息 ID,可以用于追踪和日志记录。
  • 回调机制:确保生产者能在消息投递过程中获得详细的确认信息。

消费者

代码举例

@RabbitHandler@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "${rabbitmq.yl.x.queue}", durable = "true"),exchange = @Exchange(value = "${rabbitmq.yl.x.exchange}"),key = "${rabbitmq.yl.x.routing.key}"))public void stateChange(@Payload String payload, @Headers Map<String, Object> headers, Channel channel) throws IOException {log.info("MQ-收到【x】状态变更消息:{} ", payload);final String objDeliveryTag = String.valueOf(headers.get(AmqpHeaders.DELIVERY_TAG));if (NumberUtil.isNumber(objDeliveryTag)) {final long deliveryTag= Long.parseLong(objDeliveryTag);log.info("MQ-收到【x】MQ消息进行消费【执行ack】[{}]", deliveryTag);channel.basicAck(deliveryTag,true);}if(CharSequenceUtil.isNotBlank(payload)) {}}

在这个 Java 方法 leaseStateChange 中,通常是在使用 Spring AMQP 框架与 RabbitMQ 集成时出现的消费消息的处理方法。下面逐个解析这个方法中的各个部分:

方法签名解析

public void leaseStateChange(@Payload String payload, @Headers Map<String, Object> headers, Channel channel) throws IOException

1. public void leaseStateChange
  • public: 表明这个方法是公开的,可以被外部调用。
  • void: 表明这个方法没有返回值。
  • leaseStateChange: 这是方法的名称。可以推测这个方法是用来处理与“租赁状态变化”相关的消息的。
2. @Payload String payload
  • @Payload: 这是 Spring AMQP 框架中的一个注解,用于将消息体的内容绑定到方法参数中。在这个例子中,payload 是一个 String 类型的参数,代表从 RabbitMQ 队列中接收到的消息内容。通常,这个消息体是通过 JSON 或其他格式的字符串传递的。

    • 注解的作用: @Payload 使得 Spring 能够自动将消息的主体部分注入到方法参数 payload 中。比如,如果消息体是一个 JSON 格式的字符串,Spring 会将其直接赋值给 payload 参数。

    • 示例: 假设接收到的消息体是 "{"state": "active", "leaseId": "12345"}"payload 将会是该字符串。

3. @Headers Map<String, Object> headers
  • @Headers: 这是另一个 Spring AMQP 注解,用来将消息的头部信息注入到方法参数中。RabbitMQ 消息不仅有消息体(payload),还可能包含一些头信息(比如消息的发送时间、路由信息等)。

    • 注解的作用: @Headers 会将消息头部的内容绑定到 headers 参数,这个参数是一个 Map<String, Object> 类型,其中键是头部的名称,值是相应的值。头部信息常常用于传递一些附加信息(例如消息的优先级、发送者标识等)。

    • 示例: 如果消息头包含如下信息:

      {"correlationId": "abc123", "messageType": "leaseUpdate"}

      那么 headers 将会是一个 Map,其内容是:

      {"correlationId": "abc123", "messageType": "leaseUpdate"}
4. Channel channel
  • Channel: 这是 RabbitMQ 的核心概念之一。Channel 代表一个与 RabbitMQ 服务的连接通道,允许你在该通道上进行消息的消费、确认等操作。

    • 作用: 在 Spring AMQP 中,Channel 通常用来进行消息的确认(acknowledge)操作,或者处理消息处理失败时的重新排队等任务。你可以使用 Channel 来手动确认消息,或者控制消息是否成功消费。

    • 示例: 如果在消息处理过程中出现异常,消费者可能需要通过 channel.basicNack() 方法来拒绝该消息并可能重新入队。

5. throws IOException
  • throws IOException: 表明这个方法可能会抛出 IOException 异常。RabbitMQ 的消息操作可能会遇到 I/O 错误,因此需要在方法签名中声明可能抛出此异常。通常,这类异常会发生在与 RabbitMQ 的连接中断、消息传输过程失败时等。

Spring AMQP 消费者代码示例

假设这是一个处理来自某个队列的消息的方法,下面是该方法的使用场景和完整代码示例:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.MessageListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;import java.io.IOException;
import java.util.Map;@Component
public class LeaseStateChangeListener {// 监听指定队列的消息@RabbitListener(queues = "leaseStateQueue")public void leaseStateChange(@Payload String payload, @Headers Map<String, Object> headers, Channel channel) throws IOException {try {// 处理消息体System.out.println("Received message: " + payload);// 获取消息头部信息String correlationId = (String) headers.get("correlationId");String messageType = (String) headers.get("messageType");System.out.println("CorrelationId: " + correlationId + ", MessageType: " + messageType);// 模拟处理业务逻辑processLeaseStateChange(payload);// 确认消息已成功消费channel.basicAck(headers.hashCode(), false);  // 手动确认消息} catch (Exception e) {// 异常处理,拒绝消息并重新入队System.err.println("Error processing message: " + e.getMessage());channel.basicNack(headers.hashCode(), false, true); // 拒绝并重新入队}}private void processLeaseStateChange(String payload) {// 假设这里是处理租赁状态更新的业务逻辑// 比如将消息解析为对象,进行租赁状态更新System.out.println("Processing lease state change for payload: " + payload);}
}

解析

  • @RabbitListener: 注解的作用是声明这个方法是一个 RabbitMQ 消息的消费者,并且该方法监听 leaseStateQueue 队列。当有消息到达这个队列时,这个方法会被调用。

  • 消息体 (payload): 这个方法会接收到一个消息体,@Payload 注解将该消息的内容(通常是 JSON 格式的字符串)自动绑定到方法参数 payload 上。

  • 消息头 (headers): 使用 @Headers 注解将消息的头部信息绑定到 headers 参数上,Map<String, Object> 类型。你可以从中获取如 correlationIdmessageType 等附加信息。

  • Channel: 这个参数用于消息的确认、拒绝等操作。在成功处理完消息后,调用 channel.basicAck() 来确认消息,表示该消息已经被成功消费。如果处理失败,调用 channel.basicNack() 拒绝该消息,并可以选择是否重新入队。

总结

  • 该方法是一个 RabbitMQ 消费者,用于从指定的队列中消费消息。
  • 通过 @Payload 获取消息体内容,使用 @Headers 获取消息头信息。
  • 使用 Channel 来确认消息的处理状态。
  • 使用 @RabbitListener 注解自动监听队列,并处理消费的消息。

这种方式非常适合处理队列中的业务逻辑,并能够灵活处理消息的确认、拒绝等操作。

消息怎么知道发给哪一个队列

先看队列与交换机怎么绑定的

先创建队列,然后绑定到交换机

RabbitMQ系列-6.如何通过控制台创建交换机、队列、死信队列、延迟队列 - 简书

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/64027.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2024年12月13日Github流行趋势

项目名称&#xff1a;nexus-xyz / nexus-zkvm 项目维护者&#xff1a;govereau slumber danielmarinq sjudson yoichi-nexus项目介绍&#xff1a;Nexus zkVM 是一个零知识虚拟机。项目star数&#xff1a;1,948项目fork数&#xff1a;343 项目名称&#xff1a;soxoj / maigret …

(九)机器学习 - 多项式回归

多项式回归&#xff08;Polynomial Regression&#xff09;是一种回归分析方法&#xff0c;它将自变量 xx 和因变量 yy 之间的关系建模为 nn 次多项式。多项式回归的目的是找到一个 nn 次多项式函数&#xff0c;使得这个函数能够最好地拟合给定的数据点。 多项式回归的数学表达…

Linux USB开发整理和随笔

目录 1 概述 2 硬件原理基础 2.1 USB发展 2.2 USB的拓扑 2.3 硬件接口 2.4 USB总线协议 2.4.1 通信过程 2.4.2 概念关系 2.4.3 管道PIPE 2.4.4 传输 2.4.5 事务 2.4.6 包结构与类型 2.4.6.1 令牌包 2.4.6.2 数据包 2.4.6.3 握手包 2.5 描述符 2.5.1 设备描述符…

️️️ 避坑指南:如何修复国密gmssl 库填充问题并提炼优秀加密实践20241212

&#x1f6e1;️ 避坑指南&#xff1a;如何修复国密gmssl 库填充问题并提炼优秀加密实践 ✨ 引言 在当下的数据安全环境中&#xff0c;SM4作为中国国家密码算法的代表性选择&#xff0c;被广泛应用于金融、通信和政府领域。然而&#xff0c;在实际开发中&#xff0c;即便是开…

沈阳理工大学《2024年811自动控制原理真题》 (完整版)

本文内容&#xff0c;全部选自自动化考研联盟的&#xff1a;《沈阳理工大学811自控考研资料》的真题篇。后续会持续更新更多学校&#xff0c;更多年份的真题&#xff0c;记得关注哦~ 目录 2024年真题 Part1&#xff1a;2024年完整版真题 2024年真题

2023 年 408 大题解析

讲解视频推荐&#xff1a; 【BOK408真题讲解-2023】 一、数据结构 1. 算法题&#xff08;图的邻接矩阵&#xff09;13’ 已知有向图 G 采用邻接矩阵存储&#xff0c;类型定义如下&#xff1a; typedef struct{ //图的类型定义int num Vertices, numEdges; //图的顶点数和有…

【R语言】基础知识

一、对象与变量 R语言中的所有事物都是对象&#xff0c;如向量、列表、函数&#xff0c;变量、甚至环境等。它的所有代码都是基于对象object的操作&#xff0c;变量只是调用对象的手段。 1、对象 在R语言中&#xff0c;对计算机内存的访问是通过对象实现的。 # 字符型向量 …

【SpringBug】lombok插件中@Data不能生成get和set方法

一&#xff1a;问题引入 可以看到我们在类UserInfo上写了Data注解&#xff0c;但是在测试文件中生成的反编译target文件Us二Info中没有get和set方法 二&#xff1a;解决方法 1&#xff1a;Spring升级问题&#xff08;解决了我的问题&#xff09; 原因是Spring官方进行了升级…

Java版-图论-最短路-Floyd算法

实现描述 网络延迟时间示例 根据上面提示&#xff0c;可以计算出&#xff0c;最大有100个点&#xff0c;最大耗时为100*wi,即最大的耗时为10000&#xff0c;任何耗时计算出来超过这个值可以理解为不可达了&#xff1b;从而得出实现代码里面的&#xff1a; int maxTime 10005…

STM32 CubeMx HAL库 独立看门狗IWDG配置使用

看门狗这里我就不多介绍了&#xff0c;能搜到这篇文章说明你了解 总之就是一个单片机重启程序&#xff0c;设定好超时时间&#xff0c;在超时时间内没有喂狗&#xff0c;单片机就会复位 主要应用在单片机异常重启方面&#xff0c;比如程序跑飞&#xff08;注意程序跑飞时你就…

uni-app实现小程序、H5图片轮播预览、双指缩放、双击放大、单击还原、滑动切换功能

前言 这次的标题有点长&#xff0c;主要是想要表述的功能点有点多&#xff1b; 简单做一下需求描述 产品要求在商品详情页的头部轮播图部分&#xff0c;可以单击预览大图&#xff0c;同时在预览界面可以双指放大缩小图片并且可以移动查看图片&#xff0c;双击放大&#xff0…

3D 生成重建035-DiffRF直接生成nerf

3D 生成重建035-DiffRF直接生成nerf 文章目录 0 论文工作1 论文方法2 实验结果 0 论文工作 本文提出了一种基于渲染引导的三维辐射场扩散新方法DiffRF&#xff0c;用于高质量的三维辐射场合成。现有的方法通常难以生成具有细致纹理和几何细节的三维模型&#xff0c;并且容易出…

中国计算机学会计算机视觉专委会携手合合信息举办企业交流活动,为AI安全治理打开“新思路”

近期&#xff0c;《咬文嚼字》杂志发布了2024年度十大流行语&#xff0c;“智能向善”位列其中&#xff0c;过去一年时间里&#xff0c;深度伪造、AI诈骗等话题屡次登上热搜&#xff0c;AI技术“野蛮生长”引发公众担忧。今年9月&#xff0c;全国网络安全标准化技术委员会发布了…

详解多租户架构下的资源隔离模式

文章目录 0.简介1.多租户概念1.1 基本概念1.2 单租户 vs 多租户 2.实现方案2.1 独立数据库方案2.1.1 优点2.1.2 缺点2.1.3 应用场景 2.2 共享数据库&#xff0c;独立 Schema2.2.1 优点2.2.2 缺点2.2.3 应用场景 2.3 共享数据库、共享Schema、共享表2.3.1 优点2.3.2 缺点2.3.3 应…

SMMU软件指南SMMU编程之寄存器

安全之安全(security)博客目录导读 本博客介绍了SMMUv3的编程接口&#xff1a; • SMMU寄存器 • 流表&#xff08;Stream table&#xff09; • CD&#xff08;Context Descriptor&#xff09; • 事件队列&#xff08;Event queue&#xff09; • 命令队列&#xff08;…

分布式 窗口算法 总结

前言 相关系列 《分布式 & 目录》《分布式 & 窗口算法 & 总结》《分布式 & 窗口算法 & 问题》 参考文献 《【算法】令牌桶算法》 固定窗口算法 简介 固定窗口算法是最简单的流量控制算法。固定窗口算法的核心原理是将系统的生命周期划分为一个个…

SEC_ASA 第二天作业

拓扑 按照拓扑图配置 NTP&#xff0c;Server端为 Outside路由器&#xff0c;Client端为 ASA&#xff0c;两个设备的 NTP传输使用MD5做校验。&#xff08;安全 V4 LAB考点&#xff09; 提示&#xff1a;Outside路由器作为 Server端要配置好正确的时间和时区&#xff0c;ASA防…

【电力负荷预测实例】采用新英格兰2024年最新电力负荷数据的XGBoost电力负荷预测模型

与小编上篇文章介绍的基于BPNN神经网络的电力负荷预测相比较&#xff0c;两种模型的负荷预测方法各有优势&#xff0c;神经网络能够自动提取特征并处理非线性关系&#xff0c;而XGBoost则具有预测精度高、运行速率快和可解释性强的特点。在实际应用中&#xff0c;可以根据具体需…

数据库数据恢复—ORACLE常见故障有哪些?如何恢复数据?

Oracle数据库常见故障表现&#xff1a; 1、ORACLE数据库无法启动或无法正常工作。 2、ORACLE ASM存储破坏。 3、ORACLE数据文件丢失。 4、ORACLE数据文件部分损坏。 5、ORACLE DUMP文件损坏。 Oracle数据库数据恢复方案&#xff1a; 1、检测存放数据库的服务器/存储设备是否存…

ArcGIS MultiPatch数据转换Obj数据

文章目录 ArcGIS MultiPatch数据转换Obj数据1 效果2 技术路线2.1 Multipatch To Collada2.2 Collada To Obj3 代码实现4 附录4.1 环境4.2 一些坑ArcGIS MultiPatch数据转换Obj数据 1 效果 2 技术路线 MultiPatch --MultipatchToCollada–> Collada --Assimp–> Obj 2.…