微服务技术栈之rabbitMQ高级(二)

  • 我们该如何确保MQ消息的可靠性

  • 如果真的发送失败,有没有其它的兜底方案?

这些问题,在这一次的学习中都会找到答案。

生产者的可靠性

首先,我们一起分析一下消息丢失的可能性有哪些。

消息从发送者发送消息,到消费者处理消息,需要经过的流程是这样的:

消息从生产者到消费这的每一步都有可能导致消息丢失:

  • 发送消息时丢失:
    • 生产者发送消息时连接MQ失败

    • 生产者发送消息到达MQ后未找到Exchange

    • 生产者发送消息到达MQ的Exchange后,未找到合适的Queue

    • 消息到达MQ后,处理消息的进程发生异常

  • MQ导致消息丢失:
    • 消息到达MQ,保存到队列后,尚未消费就突然宕机

  • 消费者处理消息时:
    • 消息接收后尚未处理突然宕机

    • 消息接收后处理过程中抛出异常

所以,我们要解决消息丢失问题,保证MQ的可靠性,就必须从三个方面入手

  • 确保生产者一定把消息发送到MQ

  • 确保MQ不会将消息弄丢

  • 确保消费者一定要处理消息

生产者重试机制

生产者发送消息时,出现了网络故障,导致与MQ的连接中断。

为了解决这一问题,SpringAMQP提供的消息发送时的重试机制。即:当RabbitTemplate与MQ连接超时后,多次重试。

修改publisher工程的yml配置文件

配置文件: 
  rabbitmq:host: 192.168.200.131port: 5672virtual-host: /hmallusername: hmallpassword: 123connection-timeout: 1S # 设置连接超时时间template:retry:enabled: true # 开启超时重试机制initial-interval: 1000ms #失败后的初始等待时间multiplier: 1 #失败后下次的等待时长倍数,下次等待时长 = 失败后的初始等待时间(initial-interval) * multipliermax-attempts: 3 # 最大重试次数
停止mq服务:

docker stop mq

命令: 

[root@localhost ~]# docker stop mq
mq
[root@localhost ~]#
启动测试:

然后我们测试TestQueue方法。测试发送一条消息。

代码:

    @Test@DisplayName("简单的队列测试")public void testSimpleQueue(){//1,队列名称String queueName = "simple.queue";//2,消息String msg = "hello world";//3,发送消息rabbitTemplate.convertAndSend(queueName,msg);}

看结果

 发现会每隔1秒(连接超时1秒,隔1秒;所以输出的时候显示每隔2秒)重试1次,总共重试了3次。消息发送的超时重试机制配置成功了!

注意事项: 

当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的。

如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。

生产者确认机制

一般情况下,只要生产者与MQ之间的网路连接顺畅,基本不会出现发送消息丢失的情况,因此大多数情况下我们无需考虑这种问题。

但是,不排除在下面的情况也会出现消息发送到MQ之后丢失的现象,例如:

  • MQ内部处理消息的进程发生了异常

  • 生产者发送消息到达MQ后未找到Exchange

  • 生产者发送消息到达MQ的Exchange后,未找到合适的Queue,因此无法路由

针对上述情况,RabbitMQ提供了生产者消息确认机制,包括publisher Confirm和publisher Return两种。在开启确认机制的情况下,当生产者发送消息给MQ后,MQ会根据消息处理的情况返回不同的回执

示意图:

总结:
  • 当消息投递到MQ,但是路由失败时,通过Publisher Return返回异常信息,同时返回ack的确认信息,代表投递成功

  • 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功

  • 持久消息投递到了MQ,并且入队完成持久化,返回ACK ,告知投递成功

  • 其它情况都会返回NACK,告知投递失败

其中acknack属于Publisher Confirm机制,ack是投递成功;nack是投递失败。而return则属于Publisher Return机制。

默认两种机制都是关闭状态,需要通过配置文件来开启。

实现生产者确认

开启生产者确认:

在publisher生产者工程的yml添加下面的配置

spring:rabbitmq:publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型publisher-returns: true # 开启publisher return机制

代码

  rabbitmq:host: 192.168.200.131port: 5672virtual-host: /hmallusername: hmallpassword: 123publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型publisher-returns: true # 开启publisher return机制

publisher-confirm-type有三种模式可选:

  • none:关闭confirm

  • simple:同步阻塞等待MQ的回执

  • correlated:MQ异步回调返回回执

一般推荐使用correlated,回调机制

定义ReturnCallback:

每个RabbitTemplate只能配置一个ReturnCallback,因此我们可以在一个配置类中,统一进行配置。我在publisher模板定义一个配置类:

Mqconfig配置类

代码:

@Slf4j
@Configuration
@RequiredArgsConstructor
public class MqConfig {private final RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.debug("触发了return callback");System.out.println("交换机exchange:"+returnedMessage.getExchange());System.out.println("routingKey路由key:"+returnedMessage.getRoutingKey());System.out.println("消息体message:"+returnedMessage.getMessage());System.out.println("应答码:replayCode:"+returnedMessage.getReplyCode());System.out.println("应答信息:replayText:"+returnedMessage.getReplyText());}});}}
 定义ConfirmCallback:

由于每个消息发送时的处理逻辑不一定相同,因此ConfirmCallback需要在每次发消息时定义。具体来说,是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数:

这里的CorrelationData包含两个核心的东西:

  • id:消息的唯一标示,MQ对不同的消息的回执以此做判断,避免混淆

  • SettableListenableFuture:回执结果的Future对象

将来MQ的回执就会通过这个Future来返回,我们可以提前给CorrelationData中的Future添加回调函数来处理消息回执:

新增一个测试方法,向交换机发送消息,并且routingkey是不存在的,来进行测试,并且添加ConfirmCallback:

我在一个测试类下,新增一个testConfirmCallback方法:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/750741.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

StarRocks实战——云览科技存算分离实践

目录 背景 一、平台现状&痛点 1.1 使用组件多,维护成本高 1.2 链路冗长,数据时效性难以保证 1.3 服务稳定性不足 二、StarRocks 存算分离调研 2.1 性能对比 2.2 易用性 2.3 存储成本 三、StarRocks 存算分离实践 3.1 查询优化 3.1.1 物化…

Linux网络编程: 以太网帧Frame/ARP/RARP详解

一、TCP/IP五层模型 物理层(Physical Layer):物理层是最底层,负责传输比特流(bitstream)以及物理介质的传输方式。它定义了如何在物理媒介上传输原始的比特流,例如通过电缆、光纤或无线传输等。…

【论文阅读】Diffused Heads: Diffusion Models Beat GANs on Talking-Face Generation

Diffused Heads: 扩散模型在说话人脸生成方面击败GANs paper:[2301.03396] Diffused Heads: Diffusion Models Beat GANs on Talking-Face Generation (arxiv.org) code:MStypulkowski/diffused-heads: Official repository for Diffused Heads: Diffu…

R:简易的Circos图

library(grid) library(circlize) library(RColorBrewer) library(ComplexHeatmap) setwd("C:/Users/fordata/Downloads/Circos") # 创建颜色调色板 coul <- colorRampPalette(brewer.pal(9, "Set3"))(12) # 读取基因组数据 genome <- read.table(ci…

贪心算法(两个实例)

例一&#xff1a;调度问题 问题&#xff1a;由n项任务&#xff0c;每项任务的加工时间已知&#xff0c;从零时刻开始陆续加入一台机器上去加工&#xff0c;每个任务完成的时间是从0时刻到任务加工截至的时间。 求总完成时间&#xff08;所有任务完成时间最短计划方案&#xf…

PostMan测试文件上传

后端代码 package com.example.backend.controller;import cn.hutool.core.io.FileUtil; import cn.hutool.core.util.StrUtil; import com.example.backend.common.Result; import lombok.extern.slf4j.Slf4j; import org.springframework.web.bind.annotation.*; import org…

Python Web开发记录 Day9:Django part3 用户管理

名人说&#xff1a;莫道桑榆晚&#xff0c;为霞尚满天。——刘禹锡&#xff08;刘梦得&#xff0c;诗豪&#xff09; 创作者&#xff1a;Code_流苏(CSDN)&#xff08;一个喜欢古诗词和编程的Coder&#x1f60a;&#xff09; 目录 1、数据库准备2、用户列表3、新建用户4、编辑用…

科研学习|论文解读——美国政治经济中的权力:网络分析(JASIST, 2019)

论文原题目 Power in the U.S. political economy: A network analysis 摘要 美国政治经济的许多特征产生于大型政治和经济机构之间的互动&#xff0c;然而我们对它们的互动性质和这些机构之间的权力分配知之甚少。在本文中&#xff0c;对总部设在美国的组织的网络进行了详细的…

学习shell脚本

文章目录 什么是shell脚本为什么要学习shell脚本第一个脚本编写与执行 简单的shell脚本练习简单案例脚本的执行方式差异(source、sh script、./script) 如何使用shell脚本的判断式利用test命令的测试功能利用判断符号[ ]shell脚本的默认变量($0、$1...) shell脚本的条件判断式利…

2.3 物理层设备

2.3 物理层设备 &#xff08;一&#xff09;中继器 产生原因 由于存在损耗&#xff0c;在线路上传输的信号功率会逐渐衰减&#xff0c;衰减到一定程度时将造成信号失真&#xff0c;因此会导致接收错误。 中继器的功能 对信号进行再生和还原&#xff0c;对衰减的信号进行放大…

VMware Worksation 问题

几个晚上在虚拟机装了好多东西&#xff0c;配置mysql&#xff0c;配置docker、Git工具等等&#xff0c;可能废寝忘食导致太困强制关了虚拟机&#xff0c;结果第二天晚上回来发现打不开&#xff0c;心态直接崩了。 问题&#xff1a; 疯狂百度告知要删除后缀为.lck的文件夹及文件…

网络爬虫丨基于scrapy+mysql爬取博客信息

文章目录 写在前面实验描述实验框架实验需求 实验内容1.安装依赖库2.创建Scrapy项目3.配置系统设置4.配置管道文件5.连接数据库6.分析要爬取的内容7.编写爬虫文件 运行结果写在后面 写在前面 本期内容&#xff1a;基于scrapymysql爬取博客信息并保存到数据库中 实验需求 ana…

如何快速搭建物联网工业云平台

随着物联网技术的快速发展&#xff0c;物联网工业云平台已经成为推动工业领域数字化转型的重要引擎。合沃作为专业的物联网云服务提供商&#xff0c;致力于为企业提供高效、可靠的物联网工业云平台解决方案。本文将深入探讨物联网工业云平台的功能、解决行业痛点的能力以及如何…

每日OJ题_简单多问题dp⑦_力扣123. 买卖股票的最佳时机 III

目录 力扣123. 买卖股票的最佳时机 III 状态机分析 解析代码 力扣123. 买卖股票的最佳时机 III 123. 买卖股票的最佳时机 III 难度 困难 给定一个数组&#xff0c;它的第 i 个元素是一支给定的股票在第 i 天的价格。 设计一个算法来计算你所能获取的最大利润。你最多可以…

D 咖智能饮品机入驻万达,引领时尚饮品新潮流!

近日&#xff0c;D 咖智能饮品机正式入驻万达广场&#xff0c;为广大消费者带来全新的时尚饮品体验。作为国内领先的智能饮品设备品牌&#xff0c;D 咖智能饮品机以其多样化的口味选择、便捷的操作方式和个性化的定制服务&#xff0c;受到了众多消费者的喜爱。 D 咖智能饮品机提…

基于Verilog的简易CPU设计

前言 本篇文章将简单讲解CPU之间各部分的功能及接线&#xff0c;并提供Verilog模拟CPU的各个组成部分。该CPU可以完成一些操作&#xff0c;如&#xff1a;加减法&#xff0c;与或&#xff0c;指令跳转等&#xff0c;最后提供testbench用于测试该CPU的工作情况是否符合预期。 C…

浏览器如何进行静态资源缓存?—— 强缓存 协商缓存

在平时使用浏览器排查问题的过程中&#xff0c;我们有时会看到浏览器网络请求中出现304状态码&#xff0c;那么是什么情况下出现304呢&#xff1f;下面是关于这一现象的解释&#xff1a; 浏览器如何进行静态资源缓存&#xff1f;—— 强缓存 & 协商缓存 状态码 304浏览器如…

Rust写一个wasm入门并在rspack和vite项目中使用(一)

rust打包wasm文档 文档地址 安装cargo-generate cargo install cargo-generate 安装过程中有问题的话手动安装cargo-generate下载地址 根据自己的系统下载压缩包&#xff0c;然后解压到用户/.cargo/bind目录下&#xff0c;将解压后的文件放到该目录下即可。 创建wasm项目 …

校园闲置物品租售系统|基于springboot框架+ Mysql+Java+B/S架构的校园闲置物品租售系统设计与实现(可运行源码+数据库+设计文档)

推荐阅读100套最新项目 最新ssmjava项目文档视频演示可运行源码分享 最新jspjava项目文档视频演示可运行源码分享 最新Spring Boot项目文档视频演示可运行源码分享 目录 前台功能效果图 管理员功能登录前台功能效果图 ​编辑 用户功能模块 商品购买管理 卖家功能模块 商品…

[密码学]OpenSSL实践篇

背景 最近在写Android abl阶段fastboot工具&#xff0c;需要我在Android代码中实现一些鉴权加解密相关的fastboot命令&#xff0c;里面用到了OpenSSL。我们先来实践一下OpenSSL在Linux系统中的指令。 OpenSSL官方网站&#xff1a;OpenSSL 中文手册 | OpenSSL 中文网 1. 查看…