RabbitMQ消息可靠性等机制详解(精细版三)

目录

七 RabbitMQ的其他操作

7.1 消息的可靠性(发送可靠)

7.1.1 confim机制(保证发送可靠)

7.1.2 Return机制(保证发送可靠)

7.1.3 编写配置文件

7.1.4 开启Confirm和Return

7.2 手动Ack(保证接收可靠)

7.2.1 添加配置文件

7.2.2 手动ack

7.3 避免消息重复消费

7.3.1 导入依赖

7.3.2 编写配置文件

7.3.3 修改生产者

7.3.4 修改消费者


 官方文档  RabbitMQ Documentation | RabbitMQ

MQ全称为Message Queue,消息队列是应用程序和应用程序之间的通信方法。

RabbitMQ是一个Erlang开发的AMQP(高级消息排队 协议)(英文全称:Advanced Message Queuing Protocol )的开源实现。-------------接上章 

七 RabbitMQ的其他操作

7.1 消息的可靠性(发送可靠)

7.1.1 confim机制(保证发送可靠)

RabbitMQ的事务:事务可以保证消息100%传递,可以通过事务的回滚去记录日志,后面定时再次发送当前消息。事务的操作,效率太低,加了事务操作后,比平时的操作效率至少要慢100倍。

RabbitMQ除了事务,还提供了Confirm的确认机制,这个效率比事务高很多。

消息传递可靠性

7.1.2 Return机制(保证发送可靠)

Confirm只能保证消息到达exchange,无法保证消息可以被exchange分发到指定queue。

而且exchange是不能持久化消息的,queue是可以持久化消息。

采用Return机制来监听消息是否从exchange送到了指定的queue中

消息传递可靠性

在消息发送方项目上加入下面内容:

7.1.3 编写配置文件
spring:rabbitmq:host: 你的地址port: 5672virtual-host: /tingyiusername: testpassword: testpublisher-confirms: truepublisher-returns: true

7.1.4 开启Confirm和Return
package com.tingyi.rabbitmq.config;
​
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
​
import javax.annotation.PostConstruct;
​
/*** @author 听忆*/
@Component
public class PublisherConfirmAndReturnConfig implements RabbitTemplate.ConfirmCallback ,RabbitTemplate.ReturnCallback{
​@Autowiredprivate RabbitTemplate rabbitTemplate;
​@PostConstruct  // init-methodpublic void initMethod(){//指定 ConfirmCallbackrabbitTemplate.setConfirmCallback(this);
​//指定 ReturnCallbackrabbitTemplate.setReturnCallback(this);}
​@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if(ack){System.out.println("消息已经送达到Exchange");}else{System.out.println("消息没有送达到Exchange");}}
​@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println("消息没有送达到Queue");}
}

7.2 手动Ack(保证接收可靠)

7.2.1 添加配置文件
  • 在消费方application.yml文件添加下面配置, 改为手动应答机制.

spring:rabbitmq:host: 你的地址port: 5672virtual-host: /tingyiusername: testpassword: testlistener:simple:acknowledge-mode: manual

7.2.2 手动ack
package com.tingyi.rabbitmq.topic;
​
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
​
import java.io.IOException;
​
/*** @author 听忆*/
@Component
public class Consumer {
​@RabbitListener(queues = "boot-queue")public void getMessage(String msg, Channel channel, Message message) throws IOException {System.out.println("接收到消息:" + msg);try {int i = 1 / 0;/*** 消费者发起成功通知* 第一个参数: DeliveryTag,消息的唯一标识  channel+消息编号* 第二个参数:是否开启批量处理 false:不开启批量* 举个栗子: 假设我先发送三条消息deliveryTag分别是5、6、7,可它们都没有被确认,*          当我发第四条消息此时deliveryTag为8,multiple设置为 true,会将5、6、7、8的消息全部进行确认。*/channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);} catch (Exception e) {e.printStackTrace();/*** 返回失败通知* 第一个参数: DeliveryTag,消息的唯一标识  channel+消息编号* 第二个boolean true所有消费者都会拒绝这个消息,false代表只有当前消费者拒绝* 第三个boolean true消息接收失败重新回到原有队列中*/channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);}
​}
}

7.3 避免消息重复消费

重复消费消息,会对非幂等行操作造成问题

重复消费消息的原因是,消费者没有给RabbitMQ一个ack

重复消费

  1. 为了解决消息重复消费的问题,可以采用Redis,在消费者消费消息之前,现将消息的id放到Redis中,

  2. id-0(正在执行业务)

  3. id-1(执行业务成功)

  4. 然后使用ack给RabbitMQ返回消息

  5. 如果RabbitMQack失败,在RabbitMQ将消息交给其他的消费者时,先执行setnx,如果key已经存在,获取他的值,如果是0,当前消费者就什么都不做,如果是1,直接ack。

  6. 极端情况:第一个消费者在执行业务时,出现了死锁,在setnx的基础上,再给key设置一个生存时间。

备注: java中的方法叫做setIfAbsent, redis中的命令叫做setnx

       作用:如果为空就set值,并返回1, true

​ 如果存在(不为空)不进行操作,并返回0, false​

7.3.1 导入依赖

生产者和消费者都加入下面依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><version>2.4.5</version>
</dependency>

7.3.2 编写配置文件
spring:redis:host: 你的地址port: 6379

7.3.3 修改生产者
@Test
public void contextLoads() throws IOException {CorrelationData messageId = new CorrelationData(UUID.randomUUID().toString());//第四个参数: 设置消息唯一idrabbitTemplate.convertAndSend("boot-topic-exchange","slow.red.dog","你看听忆哇",messageId);System.in.read();
}

7.3.4 修改消费者
package com.tingyi.rabbitmq.topic;
​
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
​
import java.io.IOException;
import java.util.concurrent.TimeUnit;
​
/*** @author 听忆*/
/*** java中的方法叫做setIfAbsent, redis中的命令叫做setnx* 作用:*      如果为空就set值,并返回1, true*      如果存在(不为空)不进行操作,并返回0, false*/
@Component
public class Consumer {
​@Autowiredprivate StringRedisTemplate redisTemplate;
​@RabbitListener(queues = "boot-queue")public void getMessage(String msg, Channel channel, Message message) throws IOException {//0. 获取MessageId, 消息唯一idString messageId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");//1. 设置key到Redisif(redisTemplate.opsForValue().setIfAbsent(messageId,"0", 10, TimeUnit.SECONDS)) {
​//2. 消费消息System.out.println("接收到消息:" + msg);
​//3. 设置key的value为1redisTemplate.opsForValue().set(messageId,"1",10,TimeUnit.SECONDS);
​//4.  手动ackchannel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
​}else {
​//5. 获取Redis中的value即可 如果是1,手动ackif("1".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))){channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}}
​}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/864237.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【C语言】23.文件操作

由于要对数据进行持久化保存&#xff0c;我们就有了文件。 一、程序文件与数据文件 磁盘&#xff08;硬盘&#xff09;上的文件是文件。 但是在程序设计中&#xff0c;我们⼀般谈的文件有两种&#xff1a;程序文件、数据文件&#xff08;从文件功能的角度来分类的&#xff09…

“论云上自动化运维及其应用”写作框架,软考高级论文,系统架构设计师论文

论文真题 云上自动化运维是传统IT运维和DevOps的延伸&#xff0c;通过云原生架构实现运维的再进化。云上自动化运维可以有效帮助企业降低IT运维成本&#xff0c;提升系统的灵活度&#xff0c;以及系统的交付速度&#xff0c;增强系统的可靠性&#xff0c;构建更加安全、可信、…

2024年最适合Python小白的零基础入门教程!

伴随着云计算、大数据、AI等技术的迅速崛起&#xff0c;市场对Python人才的需求和市场人才的匮乏&#xff0c;让长期沉默的Python语言一下子备受众人的关注&#xff0c;再加上简单易学&#xff0c;使得Python一跃成为TIOBE排行榜的第一。 准备学Python或者想学Python的小伙伴们…

13 Redis-- 数据一致性模型、MySQL 和 Redis 的数据一致性

数据一致性模型 根据一致性的强弱分类&#xff0c;可以将一致性模型按以下顺序排列&#xff1a; 强一致性 > 最终一致性 > 弱一致性 数据一致性模型一般用于分布式系统中&#xff0c;目的是定义多个节点间的同步规范。 在这里&#xff0c;我们将其引入数据库和缓存组…

【正点原子K210连载】第十四章 按键输入实验 摘自【正点原子】DNK210使用指南-CanMV版指南

1&#xff09;实验平台&#xff1a;正点原子ATK-DNK210开发板 2&#xff09;平台购买地址https://detail.tmall.com/item.htm?id731866264428 3&#xff09;全套实验源码手册视频下载地址&#xff1a; http://www.openedv.com/docs/boards/xiaoxitongban 第十四章 按键输入实…

Vue3 登录成功,浏览器存在toke,再次访问/login路由到/index 首页页面

文章目录 目录 文章目录 流程 小结 概要流程技术细节小结 概要 首先需要清楚知道浏览器localstorage和Session storage的区别 localStorage 和 sessionStorage 是 HTML5 提供的两种客户端存储数据的方法&#xff0c;它们在使用和生命周期上有一些区别&#xff1a; 1. 生命周期…

@Cacheable解决复杂对象形参导致的缓存失效问题(如Map参数)

在Spring中使用 Cacheable 注解可以非常方便地实现方法的自动缓存机制。如以下代码&#xff1a; Cacheable(value "YwtbToken", key "#p0") public String createToken(String dlzh) {...}但当Cacheable 注解修饰的方法参数使用了复杂对象&#xff0c;如…

物联网数据可视化利器:云组态设计器全新升级

数据可视化已成为数据展示与分析领域非常重要的工具。由多种图表、3D图形组成的大屏能够帮助用户非常直观简洁地了解数据。在物联网环境下,用户在制作数据展示大屏时,对数据可视化工具提出了更高的要求,例如能够展示3D组件、灵活的图层结构、支持多种数据源、实时的数据更新、图…

函数创建单链表---无n型,需要 while 循环 + scanf

题目&#xff1a; #include <stdlib.h> struct link{int data;struct link *next; }; struct link* creatLink(); int main(){struct link *head,*p;headcreatLink();for(phead->next ;p;pp->next )printf("%d ",p->data );return 0; }/* 请在这里填…

软考《信息系统运行管理员》-2.1信息系统运维的管理

2.1信息系统运维的管理 信息系统运维管理体系框架 信息系统运维管理主要流程的目标 标准化&#xff1a;通过流程框架&#xff0c;构件标准的运维流程流程化&#xff1a;将大部分运维工作流程化&#xff0c;确保工作可重复&#xff0c;并且这些工作都有质量的完成&#xff0c;…

线性代数|机器学习-P20鞍点和极值

文章目录 1 . 瑞利商的思考1.1 瑞利商的定义1.2 投影向量 2. 拉格朗日乘子法3. 鞍点4. 线性拟合4.1 范德蒙矩阵线性拟合4.2 python 代码4.3 范德蒙矩阵缺点 5. 均值和方差5.1 样本均值和方差5.2 总体期望 μ \mu μ,总体方差 σ 2 \sigma^2 σ2 1 . 瑞利商的思考 1.1 瑞利商…

MySQL学习(6):SQL语句之数据控制语言:DCL

DCL用来管理数据库用户&#xff0c;控制数据库的访问权限 1.管理用户 1.1查询用户 use mysql; select * from user; #用户信息都存放在系统数据库mysql的user表中 在user表中&#xff0c;一个用户是由用户名和主机名共同决定的&#xff0c;上图中的host一栏就是用户的主机名…

常用组件详解(二):torchsummary

文章目录 一、基本使用二、常见指标2.1Input size2.2Forward/backward pass size 一、基本使用 torchsummary库是一个好用的模型可视化工具&#xff0c;用于帮助开发者把握每个网络层级的细节&#xff0c;包括其中的连接和维度。使用方法&#xff1a; from torchsummary impor…

ubuntu 安装docker

目录 docker 打包与加载 加载 Docker 镜像&#xff1a; ubuntu 安装docker 系统版本 检查卸载老版本docker 安装步骤 运行docker docker 打包与加载 打包成 .tar 文件&#xff1a; tar -cvf my-docker-image.tar * 加载 Docker 镜像&#xff1a; docker load -i my-d…

Access Levels in Swift

Access Levels (访问级别) Swift provides six different access levels for entities(实体) within your code. These access levels are relative to the source file in which an entity is defined, the module(模块) that source file belongs to, and the package that …

分享一个超级实用的东西——巴比达远程访问

前言 &#x1f388;家人们&#xff0c;今天我要和你们分享一个超级实用的东西——巴比达远程访问&#xff01;&#x1f389; &#x1f4bb;有了它&#xff0c;无论你身在何处&#xff0c;都能轻松访问家中的电脑&#x1f4bb;&#xff0c;就像在身边一样方便&#xff01;&…

短视频电商源码如何选择

在数字时代的浪潮下&#xff0c;短视频电商以其直观、生动、互动性强的特点&#xff0c;迅速崛起成为电商行业的一股新势力。对于有志于进军短视频电商领域的创业者来说&#xff0c;选择一款合适的短视频电商源码至关重要。本文将从多个角度探讨如何选择短视频电商源码&#xf…

携程礼品卡能转出去吗?

携程的卡好像只能在旅游的时候用 像买车票啊&#xff0c;机票啊&#xff0c;酒店&#xff0c;景点门票啥的&#xff0c;但是如果我没有出游计划的话 这个卡是不是就要被闲置下来&#xff1f; 这个问题一直让我感到很苦恼&#xff0c;还好有收卡云&#xff0c;不然我的携程卡…

模型部署:C++libtorch实现全连接模型10分类和卷积模型ResNet18的四分类的模型部署推理

Clibtorch实现模型部署推理 模型 全连接模型&#xff1a;公开mnist手写识别数字的十分类卷积模型&#xff1a;自行采集的鲜花四分类 部署 语言环境&#xff1a;C 对比Python python是解释性语言&#xff0c;效率很慢&#xff0c;安全性很低 系统开发一般是java、C/C&…

在 CentOS 上安装 Docker Engine

前言 Docker 是啥之类的就不必多说了&#xff0c;直接上安装步骤。 官网安装教程地址&#xff1a;https://docs.docker.com/engine/install/centos/ 1.Uninstall old versions &#xff08;卸载旧版本&#xff09; Older versions of Docker went by docker or docker-engin…