RabbitMQ消息可靠性等机制详解(精细版三)

目录

七 RabbitMQ的其他操作

7.1 消息的可靠性(发送可靠)

7.1.1 confim机制(保证发送可靠)

7.1.2 Return机制(保证发送可靠)

7.1.3 编写配置文件

7.1.4 开启Confirm和Return

7.2 手动Ack(保证接收可靠)

7.2.1 添加配置文件

7.2.2 手动ack

7.3 避免消息重复消费

7.3.1 导入依赖

7.3.2 编写配置文件

7.3.3 修改生产者

7.3.4 修改消费者


 官方文档  RabbitMQ Documentation | RabbitMQ

MQ全称为Message Queue,消息队列是应用程序和应用程序之间的通信方法。

RabbitMQ是一个Erlang开发的AMQP(高级消息排队 协议)(英文全称:Advanced Message Queuing Protocol )的开源实现。-------------接上章 

七 RabbitMQ的其他操作

7.1 消息的可靠性(发送可靠)

7.1.1 confim机制(保证发送可靠)

RabbitMQ的事务:事务可以保证消息100%传递,可以通过事务的回滚去记录日志,后面定时再次发送当前消息。事务的操作,效率太低,加了事务操作后,比平时的操作效率至少要慢100倍。

RabbitMQ除了事务,还提供了Confirm的确认机制,这个效率比事务高很多。

消息传递可靠性

7.1.2 Return机制(保证发送可靠)

Confirm只能保证消息到达exchange,无法保证消息可以被exchange分发到指定queue。

而且exchange是不能持久化消息的,queue是可以持久化消息。

采用Return机制来监听消息是否从exchange送到了指定的queue中

消息传递可靠性

在消息发送方项目上加入下面内容:

7.1.3 编写配置文件
spring:rabbitmq:host: 你的地址port: 5672virtual-host: /tingyiusername: testpassword: testpublisher-confirms: truepublisher-returns: true

7.1.4 开启Confirm和Return
package com.tingyi.rabbitmq.config;
​
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
​
import javax.annotation.PostConstruct;
​
/*** @author 听忆*/
@Component
public class PublisherConfirmAndReturnConfig implements RabbitTemplate.ConfirmCallback ,RabbitTemplate.ReturnCallback{
​@Autowiredprivate RabbitTemplate rabbitTemplate;
​@PostConstruct  // init-methodpublic void initMethod(){//指定 ConfirmCallbackrabbitTemplate.setConfirmCallback(this);
​//指定 ReturnCallbackrabbitTemplate.setReturnCallback(this);}
​@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if(ack){System.out.println("消息已经送达到Exchange");}else{System.out.println("消息没有送达到Exchange");}}
​@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println("消息没有送达到Queue");}
}

7.2 手动Ack(保证接收可靠)

7.2.1 添加配置文件
  • 在消费方application.yml文件添加下面配置, 改为手动应答机制.

spring:rabbitmq:host: 你的地址port: 5672virtual-host: /tingyiusername: testpassword: testlistener:simple:acknowledge-mode: manual

7.2.2 手动ack
package com.tingyi.rabbitmq.topic;
​
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
​
import java.io.IOException;
​
/*** @author 听忆*/
@Component
public class Consumer {
​@RabbitListener(queues = "boot-queue")public void getMessage(String msg, Channel channel, Message message) throws IOException {System.out.println("接收到消息:" + msg);try {int i = 1 / 0;/*** 消费者发起成功通知* 第一个参数: DeliveryTag,消息的唯一标识  channel+消息编号* 第二个参数:是否开启批量处理 false:不开启批量* 举个栗子: 假设我先发送三条消息deliveryTag分别是5、6、7,可它们都没有被确认,*          当我发第四条消息此时deliveryTag为8,multiple设置为 true,会将5、6、7、8的消息全部进行确认。*/channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);} catch (Exception e) {e.printStackTrace();/*** 返回失败通知* 第一个参数: DeliveryTag,消息的唯一标识  channel+消息编号* 第二个boolean true所有消费者都会拒绝这个消息,false代表只有当前消费者拒绝* 第三个boolean true消息接收失败重新回到原有队列中*/channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);}
​}
}

7.3 避免消息重复消费

重复消费消息,会对非幂等行操作造成问题

重复消费消息的原因是,消费者没有给RabbitMQ一个ack

重复消费

  1. 为了解决消息重复消费的问题,可以采用Redis,在消费者消费消息之前,现将消息的id放到Redis中,

  2. id-0(正在执行业务)

  3. id-1(执行业务成功)

  4. 然后使用ack给RabbitMQ返回消息

  5. 如果RabbitMQack失败,在RabbitMQ将消息交给其他的消费者时,先执行setnx,如果key已经存在,获取他的值,如果是0,当前消费者就什么都不做,如果是1,直接ack。

  6. 极端情况:第一个消费者在执行业务时,出现了死锁,在setnx的基础上,再给key设置一个生存时间。

备注: java中的方法叫做setIfAbsent, redis中的命令叫做setnx

       作用:如果为空就set值,并返回1, true

​ 如果存在(不为空)不进行操作,并返回0, false​

7.3.1 导入依赖

生产者和消费者都加入下面依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><version>2.4.5</version>
</dependency>

7.3.2 编写配置文件
spring:redis:host: 你的地址port: 6379

7.3.3 修改生产者
@Test
public void contextLoads() throws IOException {CorrelationData messageId = new CorrelationData(UUID.randomUUID().toString());//第四个参数: 设置消息唯一idrabbitTemplate.convertAndSend("boot-topic-exchange","slow.red.dog","你看听忆哇",messageId);System.in.read();
}

7.3.4 修改消费者
package com.tingyi.rabbitmq.topic;
​
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
​
import java.io.IOException;
import java.util.concurrent.TimeUnit;
​
/*** @author 听忆*/
/*** java中的方法叫做setIfAbsent, redis中的命令叫做setnx* 作用:*      如果为空就set值,并返回1, true*      如果存在(不为空)不进行操作,并返回0, false*/
@Component
public class Consumer {
​@Autowiredprivate StringRedisTemplate redisTemplate;
​@RabbitListener(queues = "boot-queue")public void getMessage(String msg, Channel channel, Message message) throws IOException {//0. 获取MessageId, 消息唯一idString messageId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");//1. 设置key到Redisif(redisTemplate.opsForValue().setIfAbsent(messageId,"0", 10, TimeUnit.SECONDS)) {
​//2. 消费消息System.out.println("接收到消息:" + msg);
​//3. 设置key的value为1redisTemplate.opsForValue().set(messageId,"1",10,TimeUnit.SECONDS);
​//4.  手动ackchannel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
​}else {
​//5. 获取Redis中的value即可 如果是1,手动ackif("1".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))){channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}}
​}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/864237.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

python(63): dict: del/pop不释放内存

Python中的字典&#xff0c;只有不再使用的时候才会释放对应的内存。在使用 pop 或者 delete 删除字典中的item(或者说entry)后&#xff0c;为了保证hash table 探测链的完整&#xff0c;那个被删除的entry只是被标记成了空&#xff0c;并没有真正被删除掉&#xff0c;所以该字…

如何解决安卓模拟器无法上网的问题

如何使安卓模拟器上网 在使用Android模拟器进行开发时&#xff0c;有时会遇到模拟器无法联网的问题。这篇博客将分享我解决这个问题的步骤和方法。 看了网上很多解决android studio内置模拟器无法联网的问题&#xff0c;基本上都是在模拟器手机上配置dns&#xff0c;个人试了多…

【C语言】23.文件操作

由于要对数据进行持久化保存&#xff0c;我们就有了文件。 一、程序文件与数据文件 磁盘&#xff08;硬盘&#xff09;上的文件是文件。 但是在程序设计中&#xff0c;我们⼀般谈的文件有两种&#xff1a;程序文件、数据文件&#xff08;从文件功能的角度来分类的&#xff09…

“论云上自动化运维及其应用”写作框架,软考高级论文,系统架构设计师论文

论文真题 云上自动化运维是传统IT运维和DevOps的延伸&#xff0c;通过云原生架构实现运维的再进化。云上自动化运维可以有效帮助企业降低IT运维成本&#xff0c;提升系统的灵活度&#xff0c;以及系统的交付速度&#xff0c;增强系统的可靠性&#xff0c;构建更加安全、可信、…

go语言里怎么使用kafka怎么拉取消息?

Apache Kafka 是一个分布式流处理平台&#xff0c;它允许你发布和订阅记录流。在 Go 语言中&#xff0c;你可以使用第三方库如 segmentio/kafka-go 或 Shopify/sarama 来与 Kafka 进行交互。 以下是一个使用 segmentio/kafka-go 库的简单示例&#xff0c;说明如何在 Go 语言中从…

MySQL中update语法的使用(超详细)

在MySQL中&#xff0c;UPDATE 语句用于修改已存在的表中的记录。以下是对 UPDATE 语句的详细解释和使用方法&#xff1a; 语法 UPDATE table_name SET column1 value1, column2 value2, ... WHERE condition; table_name&#xff1a;要更新的表名。SET&#xff1a;用于…

2024年最适合Python小白的零基础入门教程!

伴随着云计算、大数据、AI等技术的迅速崛起&#xff0c;市场对Python人才的需求和市场人才的匮乏&#xff0c;让长期沉默的Python语言一下子备受众人的关注&#xff0c;再加上简单易学&#xff0c;使得Python一跃成为TIOBE排行榜的第一。 准备学Python或者想学Python的小伙伴们…

13 Redis-- 数据一致性模型、MySQL 和 Redis 的数据一致性

数据一致性模型 根据一致性的强弱分类&#xff0c;可以将一致性模型按以下顺序排列&#xff1a; 强一致性 > 最终一致性 > 弱一致性 数据一致性模型一般用于分布式系统中&#xff0c;目的是定义多个节点间的同步规范。 在这里&#xff0c;我们将其引入数据库和缓存组…

【正点原子K210连载】第十四章 按键输入实验 摘自【正点原子】DNK210使用指南-CanMV版指南

1&#xff09;实验平台&#xff1a;正点原子ATK-DNK210开发板 2&#xff09;平台购买地址https://detail.tmall.com/item.htm?id731866264428 3&#xff09;全套实验源码手册视频下载地址&#xff1a; http://www.openedv.com/docs/boards/xiaoxitongban 第十四章 按键输入实…

Vue3 登录成功,浏览器存在toke,再次访问/login路由到/index 首页页面

文章目录 目录 文章目录 流程 小结 概要流程技术细节小结 概要 首先需要清楚知道浏览器localstorage和Session storage的区别 localStorage 和 sessionStorage 是 HTML5 提供的两种客户端存储数据的方法&#xff0c;它们在使用和生命周期上有一些区别&#xff1a; 1. 生命周期…

单机的redis安装

前些天发现了一个人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;最重要的屌图甚多&#xff0c;忍不住分享一下给大家。点击跳转到网站。 单机的redis安装很简单 安装EPEL&#xff08;Extra Packages for Enterprise Linux&#xff09;存储库 sudo y…

【嵌入式——FreeRTOS】延时函数

相对延时&#xff1a;每次延时都是从执行函数vTaskDelay()开始&#xff0c;直到延时指定的时间结束&#xff1b; 绝对延时&#xff1a;将整个任务的运行周期看成一个整体&#xff0c;适用于需要按照一定频率运行的任务。 延时函数解析 判断延时时间是否大于0&#xff0c;大于…

@Cacheable解决复杂对象形参导致的缓存失效问题(如Map参数)

在Spring中使用 Cacheable 注解可以非常方便地实现方法的自动缓存机制。如以下代码&#xff1a; Cacheable(value "YwtbToken", key "#p0") public String createToken(String dlzh) {...}但当Cacheable 注解修饰的方法参数使用了复杂对象&#xff0c;如…

物联网数据可视化利器:云组态设计器全新升级

数据可视化已成为数据展示与分析领域非常重要的工具。由多种图表、3D图形组成的大屏能够帮助用户非常直观简洁地了解数据。在物联网环境下,用户在制作数据展示大屏时,对数据可视化工具提出了更高的要求,例如能够展示3D组件、灵活的图层结构、支持多种数据源、实时的数据更新、图…

函数创建单链表---无n型,需要 while 循环 + scanf

题目&#xff1a; #include <stdlib.h> struct link{int data;struct link *next; }; struct link* creatLink(); int main(){struct link *head,*p;headcreatLink();for(phead->next ;p;pp->next )printf("%d ",p->data );return 0; }/* 请在这里填…

软考《信息系统运行管理员》-2.1信息系统运维的管理

2.1信息系统运维的管理 信息系统运维管理体系框架 信息系统运维管理主要流程的目标 标准化&#xff1a;通过流程框架&#xff0c;构件标准的运维流程流程化&#xff1a;将大部分运维工作流程化&#xff0c;确保工作可重复&#xff0c;并且这些工作都有质量的完成&#xff0c;…

线性代数|机器学习-P20鞍点和极值

文章目录 1 . 瑞利商的思考1.1 瑞利商的定义1.2 投影向量 2. 拉格朗日乘子法3. 鞍点4. 线性拟合4.1 范德蒙矩阵线性拟合4.2 python 代码4.3 范德蒙矩阵缺点 5. 均值和方差5.1 样本均值和方差5.2 总体期望 μ \mu μ,总体方差 σ 2 \sigma^2 σ2 1 . 瑞利商的思考 1.1 瑞利商…

MySQL学习(6):SQL语句之数据控制语言:DCL

DCL用来管理数据库用户&#xff0c;控制数据库的访问权限 1.管理用户 1.1查询用户 use mysql; select * from user; #用户信息都存放在系统数据库mysql的user表中 在user表中&#xff0c;一个用户是由用户名和主机名共同决定的&#xff0c;上图中的host一栏就是用户的主机名…

CvT:将卷积引入Vision Transformer

1. 引言 Vision Transformer (ViT)[10]是第一个完全依赖Transformer架构来获得大规模图像分类性能的计算机视觉模型。ViT设计以最小的修改从语言理解适应Transformer架构[9]。首先,将图像分割成离散的不重叠的小块(例如16 16)。然后,这些补丁被当作标记(类似于NLP中的标记)…

常用组件详解(二):torchsummary

文章目录 一、基本使用二、常见指标2.1Input size2.2Forward/backward pass size 一、基本使用 torchsummary库是一个好用的模型可视化工具&#xff0c;用于帮助开发者把握每个网络层级的细节&#xff0c;包括其中的连接和维度。使用方法&#xff1a; from torchsummary impor…