尚硅谷rabbitmq 2024 消息可靠性答疑二 第22节

returnedMessage()只有失败才调用,confirm()成功失败了都会调用,为什么?

在RabbitMQ中,消息的确认和返回机制是为了确保消息的可靠传递和处理。`confirm`和`returnedMessage`方法的调用时机和目的不同,因此它们的行为也有所区别。

### `ConfirmCallback`和`confirm`方法

**目的**:`ConfirmCallback`的主要目的是确认消息是否成功发布到交换机(Exchange)。

- **成功(ack=true)**:当消息成功到达交换机时,会调用`confirm`方法,并且`ack`参数为`true`。
- **失败(ack=false)**:当消息未能到达交换机时,会调用`confirm`方法,并且`ack`参数为`false`。失败的原因可能是交换机不存在或其他路由问题。

```java
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    if (ack) {
        log.info("Message confirmed successfully, correlationData: {}", correlationData);
    } else {
        log.warn("Message confirmation failed, correlationData: {}, cause: {}", correlationData, cause);
        // 这里可以添加重试逻辑或其他处理
    }
}
```

### `ReturnsCallback`和`returnedMessage`方法

**目的**:`ReturnsCallback`的主要目的是处理那些成功到达交换机但无法路由到队列的消息。

- **调用时机**:只有当消息无法路由到任何队列时,才会调用`returnedMessage`方法。这种情况通常发生在消息的路由键(Routing Key)与任何绑定的队列不匹配时。

```java
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
    log.warn("Message returned: exchange: {}, routingKey: {}, replyCode: {}, replyText: {}, message: {}",
            returnedMessage.getExchange(), returnedMessage.getRoutingKey(),
            returnedMessage.getReplyCode(), returnedMessage.getReplyText(),
            returnedMessage.getMessage());
    // 这里可以添加处理逻辑,例如重发消息或记录日志
}
```

### 为什么`confirm`总是被调用,而`returnedMessage`只有在失败时调用?

1. **消息发布确认(confirm)**:
   - **成功与失败**:`confirm`方法用于确认消息是否成功到达交换机。无论成功还是失败,都会调用这个方法,以便你能够记录日志、重试或进行其他处理。这提供了一个全局的确认机制,确保你知道消息的发布状态。
   - **可靠性**:这是消息发布的第一步确认,确保消息至少到达交换机。

2. **消息返回(returnedMessage)**:
   - **仅失败时调用**:`returnedMessage`方法仅在消息无法路由到任何队列时调用。这是因为只有在这种情况下,消息才会被返回给发送者。成功路由的消息不需要调用这个方法,因为它们已经被正确处理。
   - **路由失败处理**:这是消息发布的第二步确认,确保消息不仅到达交换机,还能正确路由到队列。如果路由失败,你可以在这个方法中处理返回的消息,例如记录日志、重试发送或通知其他系统。

### 总结

- **`confirm`方法**:总是被调用,用于确认消息是否成功到达交换机。无论成功还是失败,都会触发这个回调,以便你能够处理消息发布的结果。
- **`returnedMessage`方法**:只有在消息无法路由到任何队列时才会调用,用于处理那些成功到达交换机但无法路由的消息。

这种设计确保了消息发布的可靠性和可追溯性,提供了两个层次的确认机制,分别处理消息发布和消息路由的不同阶段。
==========
rabbitTemplate.convertAndSend(EXCHANGE DIRECT, ROUTING KEy, obiect: "Message Test confirm~~~").
----convertAndSend  ,convert?是转换了什么呢


在Spring AMQP中,`RabbitTemplate`的`convertAndSend`方法用于将消息转换为适合传输的格式,并发送到指定的交换机和路由键。`convert`部分指的是消息的转换过程,`send`部分指的是消息的发送过程。

### `convertAndSend` 方法的作用

1. **消息转换(Convert)**:
   - **对象到消息的转换**:`convertAndSend`方法会将你传入的对象(例如字符串、Java对象等)转换为`Message`对象。这个转换过程通常涉及将对象序列化为字节数组,并设置消息的头信息(如内容类型、编码等)。
   - **消息转换器(Message Converter)**:Spring AMQP提供了多种消息转换器(如`SimpleMessageConverter`、`Jackson2JsonMessageConverter`、`Jaxb2Marshaller`等),你可以根据需要选择或自定义消息转换器。

2. **消息发送(Send)**:
   - **发送到交换机**:转换后的消息会被发送到指定的交换机(Exchange)。
   - **路由到队列**:交换机会根据指定的路由键(Routing Key)将消息路由到合适的队列。

### 示例代码

```java
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class MyService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage() {
        String exchange = "my.exchange";
        String routingKey = "my.routingKey";
        String message = "Message Test confirm~~~";

        // 使用convertAndSend方法发送消息
        rabbitTemplate.convertAndSend(exchange, routingKey, message);
    }
}
```

### 消息转换器(Message Converter)

Spring AMQP提供了多种内置的消息转换器,你也可以自定义消息转换器。以下是一些常用的消息转换器:

1. **SimpleMessageConverter**:
   - 默认的消息转换器,可以处理字符串、字节数组、序列化对象等。
   - 如果传入的是字符串,会将其转换为字节数组。

2. **Jackson2JsonMessageConverter**:
   - 将Java对象转换为JSON格式的字节数组,适用于JSON序列化和反序列化。
   - 需要引入Jackson库。

3. **Jaxb2Marshaller**:
   - 使用JAXB将Java对象转换为XML格式的字节数组,适用于XML序列化和反序列化。

### 自定义消息转换器

如果内置的消息转换器不能满足你的需求,你可以自定义消息转换器。以下是一个自定义消息转换器的示例:

```java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.stereotype.Component;

@Component
public class MyCustomMessageConverter implements MessageConverter {

    @Override
    public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
        // 自定义转换逻辑,例如将对象转换为字节数组
        byte[] bytes = serializeObject(object);
        return new Message(bytes, messageProperties);
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        // 自定义转换逻辑,例如将字节数组转换为对象
        return deserializeObject(message.getBody());
    }

    private byte[] serializeObject(Object object) {
        // 实现序列化逻辑
        return object.toString().getBytes();
    }

    private Object deserializeObject(byte[] bytes) {
        // 实现反序列化逻辑
        return new String(bytes);
    }
}
```

然后在配置类中注册自定义消息转换器:

```java
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    @Bean
    public RabbitTemplate rabbitTemplate(RabbitTemplate rabbitTemplate, MyCustomMessageConverter myCustomMessageConverter) {
        rabbitTemplate.setMessageConverter(myCustomMessageConverter);
        return rabbitTemplate;
    }
}
```

### 总结

- **`convertAndSend`方法**:将传入的对象转换为`Message`对象,并发送到指定的交换机和路由键。
- **消息转换器**:负责将对象序列化为字节数组,Spring AMQP提供了多种内置的消息转换器,你也可以自定义消息转换器。
- **自定义消息转换器**:可以根据需要实现`MessageConverter`接口,提供自定义的序列化和反序列化逻辑。

通过理解消息转换和发送的过程,你可以更好地控制消息


================
1. **消息发布确认(confirm)**:
   - **成功与失败**:`confirm`方法用于确认消息是否成功到达交换机。无论成功还是失败,都会调用这个方法,以便你能够记录日志、重试或进行其他处理。这提供了一个全局的确认机制,确保你知道消息的发布状态。
   - **可靠性**:这是消息发布的第一步确认,确保消息至少到达交换机。

2. **消息返回(returnedMessage)**:
   - **仅失败时调用**:`returnedMessage`方法仅在消息无法路由到任何队列时调用。这是因为只有在这种情况下,消息才会被返回给发送者。成功路由的消息不需要调用这个方法,因为它们已经被正确处理。
   - **路由失败处理**:这是消息发布的第二步确认,确保消息不仅到达交换机,还能正确路由到队列。如果路由失败,你可以在这个方法中处理返回的消息,例如记录日志、重试发送或通知其他系统。
-----------这是rabbitmq里的,那kafka里面怎么做的呢?


在Kafka中,消息的确认和处理机制与RabbitMQ有所不同。以下是Kafka中对应的机制:

1. **消息发布确认(Acks)**:
   - **成功与失败**:Kafka的生产者配置中有一个`acks`参数用于控制消息发布确认。常见的值有:
     - `acks=0`:生产者不会等待任何确认。消息发送后立即认为成功。
     - `acks=1`:领导者分区接收到消息后即确认,不等待副本确认。
     - `acks=all`(或`acks=-1`):所有副本都确认接收到消息后才认为成功。这是最可靠的设置,确保消息不会丢失。
   - **可靠性**:`acks=all`提供了最高的可靠性,确保消息被所有副本接收。

2. **消息返回(Callback)**:
   - **成功与失败**:Kafka提供了一个回调机制来处理消息发送的结果。你可以在发送消息时指定一个`Callback`,它会在消息发送成功或失败时被调用。
   - **处理回调**:通过实现`Callback`接口的`onCompletion`方法,你可以处理成功和失败的情况。例如,记录日志、重试发送或通知其他系统。

以下是一个简单的示例代码,展示了如何在Kafka中使用`acks`和`Callback`:

```java
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all");  // 设置消息确认机制

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

ProducerRecord<String, String> record = new ProducerRecord<>("topic-name", "key", "value");

producer.send(record, new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        if (exception == null) {
            // 消息发送成功
            System.out.println("Message sent successfully to topic " + metadata.topic() + " partition " + metadata.partition() + " offset " + metadata.offset());
        } else {
            // 消息发送失败
            exception.printStackTrace();
            // 处理失败情况,例如重试发送、记录日志等
        }
    }
});

producer.close();
```

通过这种方式,Kafka提供了灵活的消息确认和处理机制,确保消息的可靠传递和处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/56178.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java微信支付接入(8) - API V3 Native 用户取消订单API

官方文档&#xff1a;https://pay.weixin.qq.com/wiki/doc/apiv3/apis/chapter3_4_3.shtml 实现用户主动取消订单的功能 定义取消订单接口 /*** 用户取消订单* param orderNo* return* throws Exception*/ ApiOperation("用户取消订单") PostMapping("/cance…

swoole框架有哪些呢

基于 Swoole 的 PHP 框架有很多&#xff0c;以下是一些比较流行和常用的框架&#xff1a; Hyperf&#xff1a;高性能企业级协程框架&#xff0c;基于 Swoole 4.4 实现。提供丰富的组件&#xff0c;如协程版的 MySQL 客户端、Redis 客户端、WebSocket 服务端及客户端等1。 Swof…

【微信小程序_11_全局配置】

摘要:本文介绍了微信小程序全局配置文件 app.json 中的常用配置项,重点阐述了 window 节点的各项配置,包括导航栏标题文字、背景色、标题颜色,窗口背景色、下拉刷新样式以及上拉触底距离等。通过这些配置可实现小程序窗口外观的个性化设置,提升用户体验。 微信小程序_11_全…

C语言 | Leetcode C语言题解之第462题最小操作次数使数组元素相等II

题目&#xff1a; 题解&#xff1a; static inline void swap(int *a, int *b) {int c *a;*a *b;*b c; }static inline int partition(int *nums, int left, int right) {int x nums[right], i left - 1;for (int j left; j < right; j) {if (nums[j] < x) {swap(…

树莓派应用--AI项目实战篇来啦-5.OpenCV绘画函数的使用

1. 介绍 OpenCV作为一款功能强大的计算机视觉库&#xff0c;被广泛地应用于图像处理和计算机视觉领域。 除了在机器视觉和人工智能领域有者广泛的应用&#xff0c;OpenCV 还能够媲美艺术家的创造力&#xff0c;通过其强大的绘图函数&#xff0c;绘制出令人叹为观止的艺术画作。…

flask项目框架搭建

目录结构 blueprints python包&#xff0c;蓝图文件&#xff0c;相当于路由组的概念,方便模块化开发 例如auth.py文件 from flask import Blueprint, render_templatebp Blueprint("auth", __name__, url_prefix"/auth")bp.route("/login") d…

Python数据可视化常用工具,值得收藏!!!

我们了解了如何使用 Pandas 进行简单的绘图,使用 Pandas 自带的绘图功能能够快速地生成一些基本的图表,例如折线图、柱状图等.但为了实现更复杂或专业的可视化效果,我们通常还需要借助更为强大的绘图库——Matplotlib. 本篇文章将详细介绍如何结合 Matplotlib 和 Pandas 实现数…

Redis-缓存一致性

缓存双写一致性 更新策略探讨 面试题 缓存设计要求 缓存分类&#xff1a; 只读缓存&#xff1a;&#xff08;脚本批量写入&#xff0c;canal 等&#xff09;读写缓存 同步直写&#xff1a;vip数据等即时数据异步缓写&#xff1a;允许延时&#xff08;仓库&#xff0c;物流&a…

C++: AVL树的实现

一.AVL树的旋转 AVL树是平衡搜索二叉树的一种。 平衡因子&#xff1a;节点右树的高度减左树的高度&#xff0c;AVL树规定平衡因子的绝对值小于2。若不在这个范围内&#xff0c;说明该树不平衡。 AVL树节点&#xff1a; struct AVLTreeNode {AVLTreeNode(const T& data …

数据结构--堆的深度解析

目录 引言 一、基本概念 1.1堆的概念 1.2堆的存储结构 1.3堆的特点 二、 堆的基本操作 2.1初始化 2.2创建堆 2.3插入元素 2.4删除元素 2.5堆化操作 2.6堆的判空 2.7获取堆顶元素 三、堆的常见应用 1. 优先队列 2. 堆排序 3. Top-k 问题 4. 图论中的应用 四…

rom定制系列------小米5x_miui12安卓11定制固件界面预览 小米5x第三方固件

&#x1f49d;&#x1f49d;&#x1f49d;此固件来源于客户卡刷固件定制。客户需要修改为线刷。并且修改账号锁功能。 可以让客户使用官方平台批量进行刷写。方便操作。 定制机型以及功能预览&#x1f49d;&#x1f49d;&#x1f49d; 小米5x版本miui12.5.8安卓11固件。此机型…

中国网络隐私保护:机遇与挑战并存的未来

随着数字经济的蓬勃发展&#xff0c;中国已进入大数据和互联网高速发展的时代。伴随而来的&#xff0c;是公众对网络隐私保护的强烈需求。从电子支付到社交平台&#xff0c;从智能家居到人脸识别&#xff0c;网络数据正在全面渗透到人们生活的方方面面。然而&#xff0c;数据隐…

MySQL 连接

使用MySQL二进制方式连接 使用MySQL二进制方式进入到MySQL命令提示符下来连接MySQL数据库。 实例 以下是从命令行中连接MySQL服务器的简单实例&#xff1a; [roothost]# mysql -u root -p Enter password:******在登录成功后会出现 mysql> 命令提示窗口&#xff0c;你可以在…

建造者模式(C++)

定义&#xff1a;建造者模式&#xff08;Builder Pattern&#xff09;是一种创建型设计模式&#xff0c;它主要用于构建一个复杂对象&#xff0c;并将其构建过程与表示分离&#xff0c;使得同样的构建过程可以创建不同的表示。该模式通过将复杂对象的构建过程分解为多个简单的步…

安装雷池社区版,保护网站安全

环境依赖 安装雷池前请确保你的系统环境符合以下要求 操作系统&#xff1a;LinuxCPU 指令架构&#xff1a;x86_64CPU 指令架构&#xff1a;支持 ssse3 指令集软件依赖&#xff1a;Docker 20.10.14 版本以上软件依赖&#xff1a;Docker Compose 2.0.0 版本以上最低资源需求&am…

Linux NFS 服务器 搭建

1、安装 NFS 确保Linux联网 sudo apt-get install nfs-kernel-server 2、创建一个目录&#xff0c;并在该文件下创建一个文件&#xff0c;用于测试nfs。 sudo mkdir /nfssudo mkdir /nfs/rootfscd /nfs/rootfs/sudo vim test.txt在里面随便加点内容 esc :wq 保存退出就可以了 …

Java 文件拷贝

1.小文件拷贝 实例代码&#xff1a; 上面程序运行的图示&#xff1a; 弊端&#xff1a;一次读一个字节&#xff0c;效率太慢。所以需要一次读取多个字节。 2.大文件拷贝 结果&#xff1a;

云原生开发 - 监控(简约版)

要在程序中暴露指标&#xff0c;并符合 Prometheus 和 Kubernetes 的规范&#xff0c;可以按照以下步骤进行&#xff1a; 1. 选择合适的库 根据你的编程语言选择适合的 Prometheus 客户端库。例如&#xff1a; Go: github.com/prometheus/client_golangJava: io.prometheus:…

UE5运行时动态加载场景角色动画任意搭配-全流程代码(四)

UE5运行时动态加载场景、角色、角色动画、相机动画任意搭配,Android、iOS也可以跑,横竖屏兼容,手机竖屏: 1、场景切换UWorld处理 在通过OpenLevel进行场景切换的时候,UWorld会发生变化,需要我们获取正确的UWorld。 1、在GameInstance监听Level加载 void UMyGameInsta…

数据结构——复杂度

目录 数据结构前言 数据结构 算法 算法效率 时间复杂度 大O的渐进表示法 示例1 示例2 示例3 示例4 示例5 示例6 示例7 空间复杂度 示例1 示例2 示例3 示例4 常见复杂度对比 旋转数组 优化1 优化2 这一篇文章我们就开始数据结构知识的学习&#xff01; 数据…