RabbitMQ知识掌握 【进阶篇】

一、如何保证消息的可靠性 🍉

1.保证消息的可靠性投递 🥝

在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 RabbitMQ 重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行 RabbitMQ 的消息可靠投递呢?特别是在这样比较极端的情况,RabbitMQ 集群不可用的时候,无法投递的消息该如何处理呢?

在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。

消息从生产者到消费者的经历哪些组件:
生产者–交换机—队列—消费者

  • confirm 确认模式
  • return 退回模式
  • 消息从 producer 到 exchange 则会返回一个 confirmCallback 。
  • 消息从 exchange–>queue 投递失败则会返回一个 returnCallback 。

默认rabbitmq不开启上面两种模式。

我们将利用这两个 callback 控制消息的可靠性投递

在这里插入图片描述
confirm和return的实现

  1. 设置ConnectionFactory的publisher-confirm-type: correlated开启 确认模式。

  2. 使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。

  3. 设置ConnectionFactory的publisher-returns=“true” 开启 退回模式。

  4. 使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到queue失败后执行回调函数returnedMessage。

confirm机制

(1)开启confirm

在这里插入图片描述

#开启confirm机制  默认为none会自动删除数据  开启手动模式 correlated
spring.rabbitmq.publisher-confirm-type=correlated

(2)设置rabbitTemplate的confirmCallback回调函数

在这里插入图片描述

  @Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void text01(){rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {if (!b)System.out.println("系统繁忙请重新发送");}});rabbitTemplate.convertAndSend("zt_exchange","b.aaa", "你好,阿娇");}

return机制

在这里插入图片描述

#开启return机制  用来捕捉虚拟主机向队列中传递信息错误
spring.rabbitmq.publisher-returns=true

(2)设置回调

在这里插入图片描述

@Testpublic void text02(){rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {//当交换机发送消息到队列过程中失败启动当前方法System.out.println(replyCode);}});rabbitTemplate.convertAndSend("zt_exchange","b.v", "你好,阿娇");}

2.如何保证消息在队列中不丢失🥝

(1)设置队列为持久化

在这里插入图片描述

(2)设置消息的持久化

生产消息不设置过期时间默认为持久化

在这里插入图片描述

3.确保消息能可靠的被消费掉🥝

ACK确认机制

多个消费者同时收取消息,收取消息到一半,突然某个消费者挂掉,要保证此条消息不丢失,就需要acknowledgement机制,就是消费者消费完要通知服务端,服务端才将数据删除

这样就解决了,即使一个消费者出了问题,没有同步消息给服务端,还有其他的消费端去消费,保证了消息不丢的case。

ACK的实现

ack指Acknowledge,确认。 表示消费端收到消息后的确认方式。
有三种确认方式:
自动确认:acknowledge=“none”
手动确认:acknowledge=“manual”
根据异常情况确认:acknowledge=“auto”,(这种方式使用麻烦,并且不常用,不作讲解)

其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息队列中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。

在这里插入图片描述

#修改消费端  手动确认消息  默认自动确认 none
spring.rabbitmq.listener.simple.acknowledge-mode=manual

(2)修改代码

在这里插入图片描述

package com.lzq.listener;import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@Component//交于容器创建并管理
public class MyListener {@RabbitListener(queues={"zt_queue01"})
//queues:表示你监听的队列名public void hello01(Message message , Channel channel) throws IOException { //把监听到的消息封装到Mmessage类对象中long deliveryTag = message.getMessageProperties().getDeliveryTag();byte[] body = message.getBody();String s= new String(body);System.out.println("123");// Map map = JSON.parseObject(s, HashMap.class);try {System.out.println("消息的内容:"+s);//basicAck:确认消息 -- rabbit服务端删除/** long deliveryTag,第一个参数为消息的标志* boolean multiple 第二个参数为是否把该消费之前未确认的消息一起确认掉* */channel.basicAck(deliveryTag,true);}catch (Exception e){//basicNack:服务继续发送消息/** long deliveryTag, boolean multiple, 前两个参数与上面的意义一样* boolean requeue 是否要求rabbitmq服务器重新发送该消息* */channel.basicNack(deliveryTag,true,true);}
//业务操作}
}

总结: 如何保证消息的可靠性?

[1] 保证消息的可靠性投递: confirm机制和return机制

[2] 队列中:—持久化

[3]使用ack机制保证消费者的可靠性消费。

二、延迟队列🍉

TTL🥝

TTL 全称 Time To Live(存活时间/过期时间)。

当消息到达存活时间后,还没有被消费,会被自动清除。

RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
根据设置的时间十秒后会消失

 @Testpublic void test04(){/*** String exchange, String routingKey, Object message,* CorrelationData correlationData*/Message message =new Message("咻咻咻".getBytes());message.getMessageProperties().setExpiration("10000");rabbitTemplate.send("bbb","a.b",message);}

小结:

设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期。

设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这一消息是否过期。

如果两者都进行了设置,以时间短的为准。

死信队列🥝

死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。

在这里插入图片描述

什么样的消息会成为死信消息

  1. 队列消息长度到达限制;

  2. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;

  3. 原队列存在消息过期设置,消息到达超时时间未被消费;

队列绑定死信交换机:

在这里插入图片描述

在这里插入图片描述

延迟队列🥝

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。

需求:

  1. 下单后,30分钟未支付,取消订单,回滚库存。

  2. 新用户注册成功7天后,发送短信问候。

实现方式:

  1. 定时器:性能差—每隔一段时间要进行数据库查询。

  2. 延迟队列

通过消息队列完成延迟队列的功能

很可惜,在RabbitMQ中并未提供延迟队列功能。

但是可以使用:TTL+死信队列 组合实现延迟队列的效果。

在这里插入图片描述

三、 如何防止消费者重复消费消息🍉

消息的幂等性—无论操作几次结果都是一样。

1、生成全局id,存入redis或者数据库,在消费者消费消息之前,查询一下该消息是否有消费过。

2、如果该消息已经消费过,则告诉mq消息已经消费,将该消息丢弃(手动ack)。

3、如果没有消费过,将该消息进行消费并将消费记录写进redis或者数据库中。

在这里插入图片描述

简单描述一下需求,如果订单完成之后,需要为用户累加积分,又需要保证积分不会重复累加。那么再mq消费消息之前,先去数据库查询该消息是否已经消费,如果已经消费那么直接丢弃消息。

生产者 🥝

package com.ykq.score.producer;import com.alibaba.fastjson.JSONObject;
import com.xiaojie.score.entity.Score;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;import java.util.UUID;@Component
@Slf4j
public class ScoreProducer implements RabbitTemplate.ConfirmCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;//定义交换机private static final String SCORE_EXCHANGE = "ykq_score_exchaneg";//定义路由键private static final String SCORE_ROUTINNGKEY = "score.add";/*** @description: 订单完成* @param:* @return: java.lang.String* @author xiaojie* @date: 2023/7/10 22:30*/public String completeOrder() {String orderId = UUID.randomUUID().toString();System.out.println("订单已完成");//发送积分通知Score score = new Score();score.setScore(100);score.setOrderId(orderId);String jsonMSg = JSONObject.toJSONString(score);sendScoreMsg(jsonMSg, orderId);return orderId;}@Asyncpublic void sendScoreMsg(String jsonMSg, String orderId) {this.rabbitTemplate.setConfirmCallback(this);rabbitTemplate.convertAndSend(SCORE_EXCHANGE, SCORE_ROUTINNGKEY, jsonMSg, message -> {//设置消息的id为唯一message.getMessageProperties().setMessageId(orderId);return message;});}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String s) {if (ack) {log.info(">>>>>>>>消息发送成功:correlationData:{},ack:{},s:{}", correlationData, ack, s);} else {log.info(">>>>>>>消息发送失败{}", ack);}}
}

消费者🥝

package com.xiaojie.score.consumer;import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import com.xiaojie.score.entity.Score;
import com.xiaojie.score.mapper.ScoreMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.Map;@Component
@Slf4j
public class ScoreConsumer {@Autowiredprivate ScoreMapper scoreMapper;@RabbitListener(queues = {"ykq_score_queue"})public void onMessage(Message message, @Headers Map<String, Object> headers, Channel channel) throws IOException {String orderId = message.getMessageProperties().getMessageId();if (StringUtils.isBlank(orderId)) {return;}log.info(">>>>>>>>消息id是:{}", orderId);String msg = new String(message.getBody());Score score = JSONObject.parseObject(msg, Score.class);if (score == null) {return;}//执行前去数据库查询,是否存在该数据,存在说明已经消费成功,不存在就去添加数据,添加成功丢弃消息Score dbScore = scoreMapper.selectByOrderId(orderId);if (dbScore != null) {//证明已经消费消息,告诉mq已经消费,丢弃消息channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);return;}Integer result = scoreMapper.save(score);if (result > 0) {//积分已经累加,删除消息channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);return;} else {log.info("消费失败,采取相应的人工补偿");} }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/1361.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

微服务Gateway网关(自动定位/自定义过滤器/解决跨域)+nginx反向代理gateway集群

目录 Gateway网关 1.0.为什么需要网关&#xff1f; 1.1.如何使用gateway网关 1.2.网关从注册中心拉取服务 1.3.gateway自动定位 1.4.gateway常见的断言 1.5.gateway内置的过滤器 1.6.自定义过滤器-全局过滤器 1.7.解决跨域问题 2.nginx反向代理gateway集群 2.1.配置…

什么是 TCP 和 UDP?Java 中如何实现 TCP 和 UDP 协议

在计算机网络中&#xff0c;TCP&#xff08;传输控制协议&#xff09;和UDP&#xff08;用户数据报协议&#xff09;是两种最常用的传输层协议。它们都用于在网络上传输数据&#xff0c;但是它们之间有很多不同之处。本文将介绍TCP和UDP的基本概念&#xff0c;以及在Java中如何…

ubuntu20.04配置vscode

下载&#xff1a; https://az764295.vo.msecnd.net/stable/660393deaaa6d1996740ff4880f1bad43768c814/code_1.80.0-1688479026_amd64.debhttps://az764295.vo.msecnd.net/stable/660393deaaa6d1996740ff4880f1bad43768c814/code_1.80.0-1688479026_amd64.deb 安装&#xff1a…

前端三大框架的生命周期最底层原理解析

引言 在现代前端开发中,React、Angular和Vue.js等三大框架已经成为了行业中最受欢迎和广泛使用的工具。这些框架的核心功能之一是生命周期管理,通过生命周期方法,我们可以在这些关键点执行特定的操作,以实现更好的控制和管理前端应用程序的行为。然而,你是否好奇这些生命…

Ubuntu 放弃了战斗向微软投降

导读这几天看到 Ubuntu 放弃 Unity 和 Mir 开发&#xff0c;转向 Gnome 作为默认桌面环境的新闻&#xff0c;作为一个Linux十几年的老兵和Linux桌面的开发者&#xff0c;内心颇感良多。Ubuntu 做为全世界Linux界的桌面先驱者和创新者&#xff0c;突然宣布放弃自己多年开发的Uni…

Linux环境:ifconfig命令查看结果:网卡信息说明

ifconfig命令输出结果包含了当前系统中所有网络接口的详细信息&#xff0c;主要包括&#xff1a; 网络接口名称&#xff1a;如“eth0”表示第一块以太网卡。MAC地址&#xff1a;每个网卡都有唯一的MAC地址&#xff0c;用于在局域网内寻址。IP地址&#xff1a;网卡的IP地址&…

回首2023上半年:成长、思考、感恩

文章目录 每日一句正能量前言一、目标达成情况总结二、工作和学习成果总结三、下半年规划总结四、个人想法 后记附录 每日一句正能量 做一个向日葵族&#xff0c;面对阳光&#xff0c;不自艾自怜&#xff0c;每天活出最灿烂的自己。曾经拥有的&#xff0c;不要忘记。不能得到的…

机器学习朴素贝叶斯笔记

朴素贝叶斯&#xff08;Naive Bayes&#xff09;是一种基于贝叶斯定理和特征独立性假设的简单但有效的分类算法。它常用于文本分类、垃圾邮件过滤和情感分析等任务。下面我将详细解释朴素贝叶斯的原理和步骤。 首先&#xff0c;我们需要了解几个重要的概念&#xff1a; 贝叶斯…

day52

思维导图 比较指令结果的条件码 练习 汇编实现1-100的累加 .text .global _strat _start: mov r0,#0mov r1,#0 add_fun:add r0,r0,#1cmp r0,#100addls r1,r1,r0bls add_fun .end

Vue 项目路由、自定义指令、api方法自动引入资源(require.context使用)

前端项目&#xff08;当前我以Vue项目为例&#xff09;当我们把api挂载在main上后 // 将api挂载到vue的原型上 import api from /api Vue.prototype.$apiapi在src下会有一个api文件夹&#xff0c;结构如下&#xff1a; 通常情况下&#xff0c;api文件夹的index.js文件我们通常…

ChatGPT 最佳实践指南之:使用外部工具

Use external tools 使用外部工具 Compensate for the weaknesses of GPTs by feeding them the outputs of other tools. For example, a text retrieval system can tell GPTs about relevant documents. A code execution engine can help GPTs do math and run code. If a …

8.postgresql--Update join 和 Delete using

Update join Update join用于基于另一张表更新表数据&#xff0c;语法如下&#xff1a; UPDATE t1 SET t1.c1 new_value FROM t2 WHERE t1.c2 t2.c2;CREATE TABLE product_segment (id SERIAL PRIMARY KEY,segment VARCHAR NOT NULL,discount NUMERIC (4, 2) );INSERT INTO…

基于C/S架构工作原理序号工作步骤和理论的区别

基于C/S架构工作原理序号工作步骤和理论的区别 SSH 概念 对称加密linux 系统加密&#xff0c;就是加密和揭秘都是使用同一套密钥。 非对称加密有两个密钥&#xff1a;“私钥”和“公钥”。私钥加密后的密文&#xff0c;只能通过对应的公钥进行揭秘。而通过私钥推理出公钥的…

不满足于RPC,详解Dubbo的服务调用链路

系列文章目录 【收藏向】从用法到源码&#xff0c;一篇文章让你精通Dubbo的SPI机制 面试Dubbo &#xff0c;却问我和Springcloud有什么区别&#xff1f; 超简单&#xff0c;手把手教你搭建Dubbo工程&#xff08;内附源码&#xff09; Dubbo最核心功能——服务暴露的配置、使用…

数据可视化——用python绘制简单的折线图

文章目录 前言JSON使用 pyecharts 模块绘制折线图下载 pyecharts 模块使用 pyecharts 模块绘制简单的折线图添加配置选项 前言 前面我们已经学习了python的基础语法和面向对象&#xff0c;那么接下来我们将学习python编程语言的过人之处——数据的可视化之折线图。 JSON 说到…

LeetCode第354场周赛

题目一 特殊元素平方和 给你一个下标从 1 开始、长度为 n 的整数数组 nums 。 对 nums 中的元素 nums[i] 而言&#xff0c;如果 n 能够被 i 整除&#xff0c;即 n % i 0 &#xff0c;则认为 num[i] 是一个 特殊元素 。 返回 nums 中所有 特殊元素 的 平方和 。 直接模拟就好了…

C/C++内存泄漏原因分析与应对方法

内存泄漏 一、内存泄漏的危害&#xff1a; 内存泄漏会导致当前应用程序消耗更多的内存&#xff0c;使得其他应用程序可用的内存更少了。 如果有个进程可用的内存不够&#xff0c;就会触发Linux操作系统的直接/后台内存回收&#xff08;即将一些内存页的数据写到磁盘里&#…

springboot服务端接口公网远程调试,并实现HTTP服务监听

文章目录 前言1. 本地环境搭建1.1 环境参数1.2 搭建springboot服务项目 2. 内网穿透2.1 安装配置cpolar内网穿透2.1.1 windows系统2.1.2 linux系统 2.2 创建隧道映射本地端口2.3 测试公网地址 3. 固定公网地址3.1 保留一个二级子域名3.2 配置二级子域名3.2 测试使用固定公网地址…

Argo CD 入门扫盲使用

目录 一、什么是 argo cd 二、为什么使用 argo cd 三、argo cd 架构图 四、Argo CD 使用 1、安装 Argo CD 2、安装 Argo CD CLI 3、发布 Argo CD 服务 4、获取 Argo CD 密码 5、准备 Git 仓库 6、创建 Argo CD App 7、版本升级 8、版本回滚 一、什么是 argo cd A…

K210开发实例-通用异步收发传输器(UART)使用

通用异步收发传输器(UART)使用 文章目录 通用异步收发传输器(UART)使用1、UART介绍2、UART驱动API介绍3、UART通用使用方式4、UART中断方式使用5、UART通过DMA接收发送数据6、UART通过DMA及中断方式接收发送数据1、UART介绍 UART分为高速UART和通用UART。 高速UART为UARTHS(U…