微服务day07

MQ高级

发送者可靠性,MQ的可靠性,消费者可靠性。

发送者可靠性

发送者重连

连接重试的配置文件:

spring:rabbitmq:connection-timeout: 1s # 设置MQ的连接超时时间template:retry:enabled: true # 开启超时重试机制initial-interval: 1000ms # 失败后的初始等待时间multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multipliermax-attempts: 3 # 最大重试次数

关闭MQ。

运行测试用例结果,进行了三次重连。

11-12 10:15:41:975  INFO 23824 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.21.101:5672]
11-12 10:15:43:997  INFO 23824 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.21.101:5672]
11-12 10:15:46:002  INFO 23824 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.21.101:5672]

发送者确认

总结如下:

  • 当消息投递到MQ,但是路由失败时,通过Publisher Return返回异常信息,同时返回ack的确认信息,代表投递成功

  • 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功

  • 持久消息投递到了MQ,并且入队完成持久化,返回ACK ,告知投递成功

  • 其它情况都会返回NACK,告知投递失败

其中acknack属于Publisher Confirm机制,ack是投递成功;nack是投递失败。而return则属于Publisher Return机制。

默认两种机制都是关闭状态,需要通过配置文件来开启

开启发送者确认:

在publisher模块的application.yaml中添加配置:

spring:rabbitmq:publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型publisher-returns: true # 开启publisher return机制

这里publisher-confirm-type有三种模式可选:

  • none:关闭confirm机制

  • simple:同步阻塞等待MQ的回执

  • correlated:MQ异步回调返回回执

一般我们推荐使用correlated,回调机制。

定义ReturnCallback

每个RabbitTemplate只能配置一个ReturnCallback,因此我们可以在配置类中统一设置。我们在publisher模块定义一个配置类:

package com.itheima.publisher.config;import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;import javax.annotation.PostConstruct;@Configuration
@Slf4j
@RequiredArgsConstructor
public class MQconfig {private final RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {log.error("触发return callback,");log.debug("exchange: {}", returned.getExchange());log.debug("routingKey: {}", returned.getRoutingKey());log.debug("message: {}", returned.getMessage());log.debug("replyCode: {}", returned.getReplyCode());log.debug("replyText: {}", returned.getReplyText());}});}
}

由于每个消息发送时的处理逻辑不一定相同,因此ConfirmCallback需要在每次发消息时定义。具体来说,是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数

这里的CorrelationData中包含两个核心的东西:

  • id:消息的唯一标示,MQ对不同的消息的回执以此做判断,避免混淆

  • SettableListenableFuture:回执结果的Future对象

将来MQ的回执就会通过这个Future来返回,我们可以提前给CorrelationData中的Future添加回调函数来处理消息回执:

我们新建一个测试,向系统自带的交换机发送消息,并且添加ConfirmCallback

    @Testpublic void testObgect1() throws InterruptedException {CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable ex) {//failure出现异常时的处理逻辑,基本不会触发。log.error("发送失败",ex);}@Overridepublic void onSuccess(CorrelationData.Confirm result) {//判断是否成功发送到服务器if (result.isAck()){log.debug("发送消息成功,收到 ack!");}else{ // result.getReason(),String类型,返回nack时的异常描述log.error("发送消息失败,收到 nack, reason : {}", result.getReason());}}});//准备Map数据Map map = new HashMap();map.put("name","jack");map.put("age",21);rabbitTemplate.convertAndSend("obgect.queue1",map,cd);//用来接受返回的值Thread.sleep(2000);}

可以看到,由于传递的RoutingKey是错误的,路由失败后,触发了return callback,同时也收到了ack。

当我们修改为正确的RoutingKey以后,就不会触发return callback了,只收到ack。

而如果连交换机都是错误的,则只会收到nack。

注意

开启生产者确认比较消耗MQ性能,一般不建议开启。而且大家思考一下触发确认的几种情况:

  • 路由失败:一般是因为RoutingKey错误导致,往往是编程导致

  • 交换机名称错误:同样是编程错误导致

  • MQ内部故障:这种需要处理,但概率往往较低。因此只有对消息可靠性要求非常高的业务才需要开启,而且仅仅需要开启ConfirmCallback处理nack就可以了。

MQ的可靠性

数据持久化

默认情况下springamqp发送的消息就是持久化的,不需要做特殊处理。

    @Testpublic void testTopic3(){//自定义消息为非持久化Message build = MessageBuilder.withBody("这是一个大新闻啊".getBytes()).setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build();//修改交换机的名字为hm.topicString name = "hm.topic";//修改路由键为bluerabbitTemplate.convertAndSend(name,"china.goods",build);}

注意:在开启持久化机制以后,如果同时还开启了生产者确认,那么MQ会在消息持久化以后才发送ACK回执,进一步确保消息的可靠性。

不过出于性能考虑,为了减少IO次数,发送到MQ的消息并不是逐条持久化到数据库的,而是每隔一段时间批量持久化。一般间隔在100毫秒左右,这就会导致ACK有一定的延迟,因此建议生产者确认全部采用异步方式。

Lazy Queue

MQ可靠性小结

消费者可靠性

消费者确认机制

消费者失败重试策略

相关文档:

哈哈哈

业务幂等性

 修改发送者代码,修改消息转换器:

    @Beanpublic MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;}

修改消费者代码来获取消息id:

    @RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage(Message message) throws InterruptedException {log.info("spring 消费者接收到消息:【" + message.getBody().toString() + "】");log.info("spring 消费者接收到消息id为:【" + message.getMessageProperties().getMessageId().toString() + "】");if (true) {throw new RuntimeException("故意的");}log.info("消息处理完成");}

 运行结果: 由于前面的步骤设置了报错,因此重发了3次。

11-12 16:20:12:977  INFO 27400 --- [ntContainer#1-1] com.itheima.consumer.mq.leatinMq         : spring 消费者接收到消息:【[B@6139d571】
11-12 16:20:12:977  INFO 27400 --- [ntContainer#1-1] com.itheima.consumer.mq.leatinMq         : spring 消费者接收到消息id为:【fc7c6a0c-06cb-4a7e-b158-91d1675dd83b】
11-12 16:20:13:978  INFO 27400 --- [ntContainer#1-1] com.itheima.consumer.mq.leatinMq         : spring 消费者接收到消息:【[B@6139d571】
11-12 16:20:13:978  INFO 27400 --- [ntContainer#1-1] com.itheima.consumer.mq.leatinMq         : spring 消费者接收到消息id为:【fc7c6a0c-06cb-4a7e-b158-91d1675dd83b】
11-12 16:20:14:979  INFO 27400 --- [ntContainer#1-1] com.itheima.consumer.mq.leatinMq         : spring 消费者接收到消息:【[B@6139d571】
11-12 16:20:14:979  INFO 27400 --- [ntContainer#1-1] com.itheima.consumer.mq.leatinMq         : spring 消费者接收到消息id为:【fc7c6a0c-06cb-4a7e-b158-91d1675dd83b】
11-12 16:20:14:985  WARN 27400 --- [ntContainer#1-1] o.s.a.r.retry.RepublishMessageRecoverer  : Republishing failed message to exchange 'error.direct' with routing key error

只需要修改消费者的处理逻辑部分即可:

package com.hmall.trade.listener;import com.hmall.trade.domain.po.Order;
import com.hmall.trade.service.IOrderService;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;@Component
@RequiredArgsConstructor
public class Orderlisten {private final IOrderService orderService;@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "trade.pay.success.queue", durable = "true"),exchange = @Exchange(name = "pay.topic"),key = "pay.success"))public void listenPaySuccess(Long orderId){//1、根据id获取订单信息Order order = orderService.getById(orderId);//判断订单状态,只有待支付才修改if (order == null || order.getStatus() != 1){//条件错误直接结束return;}orderService.markOrderPaySuccess(orderId);}
}

小结,小小面试题

延迟消息

死信交换机

消息的消费者代码:

创建普通交换机和信息队列,将消息队列的死信绑定死信交换机

dleConfigtasion.classpackage com.itheima.consumer.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class dleConfigrasion {//交给Bean注解来进行处理//创建交换机@Beanpublic DirectExchange ttlExchange(){//参数:交换机名称,是否持久化,是否自动删除,持久化默认为开启(持久化就是是否保存到磁盘)return ExchangeBuilder.directExchange("ttl.direct").build();}//创建消息队列@Beanpublic Queue ttltQueue(){
//        return new Queue("fanout.queue1");//使用build来创建消息队列//指定该队列的死信交换机return QueueBuilder.durable("ttl.queue").deadLetterExchange("dlt.direct").build();}// 绑定队列和交换机@Beanpublic Binding bindingfanoutQueue1red(Queue ttltQueue, DirectExchange ttlExchange){return BindingBuilder.bind(ttltQueue).to(ttlExchange).with("hi");}}

死信交换机和消息队列:

leatinMq.class   @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "dlt.queue"),exchange = @Exchange(name = "dlt.direct",type = ExchangeTypes.DIRECT),key = {"hi"}))public void Directlisten1dlt(String msg){System.err.println("消费者接收到队列dlt.direct的消息:"+msg+"_"+ LocalTime.now());}

消息发送方:需要指定消息的存活时间:

    @Testpublic void testTopic(){//修改路由键为bluerabbitTemplate.convertAndSend("ttl.direct", "hi", "这是一个大新闻啊",new MessagePostProcessor() {//可以对生成的message进行处理@Overridepublic Message postProcessMessage(Message message) throws AmqpException {//设置过期时间为10秒钟message.getMessageProperties().setExpiration("1000");return message;}});}

代码结束。

延时插件:延时插件下载地址

插件官方文档:延时插件官方文档

详细操作文档: 黑马文档

取消超时订单

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/61014.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

i春秋-EXEC(命令执行、nc传输文件、带外通道传输数据)

练习平台地址 竞赛中心 题目描述 题目内容 小猫旁边有一个no sign F12检查页面 没有提示 检查源代码 发现使用了vim编辑器 进而联想到vim编辑器的临时交换文件.xxx.swp 访问.index.php.swp&#xff0c;成功下载文件 使用vim -r 查看文件内容 vim -r index.php.swp <?p…

【Web前端】Promise的使用

Promise是异步编程的核心概念之一。代表一个可能尚未完成的操作&#xff0c;并提供了一种机制来处理该操作最终的成功或失败。具体来说&#xff0c;Promise是由异步函数返回的对象&#xff0c;能够指示该操作当前所处的状态。 当Promise被创建时&#xff0c;它会处于“待定”&a…

YOLO系列基础(六)YOLOv1原理详解,清晰明了!

系列文章地址 YOLO系列基础&#xff08;一&#xff09;卷积神经网络原理详解与基础层级结构说明-CSDN博客 YOLO系列基础&#xff08;二&#xff09;Bottleneck瓶颈层原理详解-CSDN博客 YOLO系列基础&#xff08;三&#xff09;从ResNet残差网络到C3层-CSDN博客 YOLO系列基础…

硬石电机学习2024116

F4 概况 共模抑制线圈作用是滤波 LD3.3是将5v转为芯片用的3.3V CH340用于板子和电脑通讯 光耦隔离保护主控 16M的外部flash 1M的芯片内部的flash 10kHZ高速的光耦隔离&#xff0c;1M的低俗光耦隔离 F4 stm32概况 stm8和51都是一次可以运算处理8位的 32表示一次处理32位…

基于Python爬虫大屏可视化的热门旅游景点数据分析系统

作者&#xff1a;计算机学姐 开发技术&#xff1a;SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等&#xff0c;“文末源码”。 专栏推荐&#xff1a;前后端分离项目源码、SpringBoot项目源码、Vue项目源码、SSM项目源码、微信小程序源码 精品专栏&#xff1a;…

RAG经验论文《FACTS About Building Retrieval Augmented Generation-based Chatbots》笔记

《FACTS About Building Retrieval Augmented Generation-based Chatbots》是2024年7月英伟达的团队发表的基于RAG的聊天机器人构建的文章。 这篇论文在待读列表很长时间了&#xff0c;一直没有读&#xff0c;看题目以为FACTS是总结的一些事实经验&#xff0c;阅读过才发现FAC…

解析传统及深度学习目标检测方法的原理与具体应用之道

深度学习目标检测算法 常用的深度学习的目标检测算法及其原理和具体应用方法&#xff1a; R-CNN&#xff08;Region-based Convolutional Neural Networks&#xff09;系列1&#xff1a; 原理&#xff1a; 候选区域生成&#xff1a;R-CNN 首先使用传统的方法&#xff08;如 Se…

boost之property

简介 property在boost.graph中有使用&#xff0c;用于表示点属性或者边属性 结构 #mermaid-svg-56YI0wFLPH0wixrJ {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-56YI0wFLPH0wixrJ .error-icon{fill:#552222;}#me…

Oracle 19c PDB克隆后出现Warning: PDB altered with errors受限模式处理

在进行一次19c PDB克隆过程中&#xff0c;发现克隆结束&#xff0c;在打开后出现了报错&#xff0c;PDB变成受限模式&#xff0c;以下是分析处理过程 09:25:48 SQL> alter pluggable database test1113 open instancesall; Warning: PDB altered with errors. Elapsed: 0…

AndroidStudio-Activity的生命周期

一、Avtivity的启动和结束 从当前页面跳到新页面&#xff0c;跳转代码如下&#xff1a; startActivity(new Intent(源页面.this&#xff0c;目标页面.class))&#xff1b; 从当前页面回到上一个页面&#xff0c;相当于关闭当前页面&#xff0c;返回代码如下&#xff1a; finis…

ubuntu20.04 解决Pycharm没有写入权限,无法通过检查更新更新的问题

ubuntu20.04 解决Pycharm没有写入权限&#xff0c;无法通过检查更新更新的问题 您提供的截图显示了一个关于PyCharm更新的问题&#xff0c;其中提到了&#xff1a;“PyCharm 没有 /opt/pycharm-community-2024.1.2 的写入权限&#xff0c;请通过特权用户运行以更新。” 这表明…

云原生之运维监控实践-使用Telegraf、Prometheus与Grafana实现对InfluxDB服务的监测

背景 如果你要为应用程序构建规范或用户故事&#xff0c;那么务必先把应用程序每个组件的监控指标考虑进来&#xff0c;千万不要等到项目结束或部署之前再做这件事情。——《Prometheus监控实战》 去年写了一篇在Docker环境下部署若依微服务ruoyi-cloud项目的文章&#xff0c;当…

WinDefender Weaker

PPL Windows Vista / Server 2008引入 了受保护进程的概念&#xff0c;其目的不是保护您的数据或凭据。其最初目标是保护媒体内容并符合DRM &#xff08;数字版权管理&#xff09;要求。Microsoft开发了此机制&#xff0c;以便您的媒体播放器可以读取例如蓝光&#xff0c;同时…

计算机视觉 1-8章 (硕士)

文章目录 零、前言1.先行课程&#xff1a;python、深度学习、数字图像处理2.查文献3.环境安装 第一章&#xff1a;概论1.计算机视觉的概念2.机器学习 第二章&#xff1a;图像处理相关基础1.图像的概念2.图像处理3.滤波器4.卷积神经网络CNN5.图像的多层表示&#xff1a;图像金字…

实习冲刺练习 第二十三天

每日一题 回文链表. - 力扣&#xff08;LeetCode&#xff09; class Solution { public:bool isPalindrome(ListNode* head) {if(headnullptr) return false;vector<int> v;while(head!nullptr){//将链表的值存入数组中v.push_back(head->val);headhead->next;}in…

报错 No available slot found for the embedding model

报错内容 Server error: 503 - [address0.0.0.0:12781, pid304366] No available slot found for the embedding model. We recommend to launch the embedding model first, and then launch the LLM models. 目前GPU占用情况如下 解决办法: 关闭大模型, 先把 embedding mode…

RabbitMQ介绍和快速上手案例

文章目录 1.引入1.1同步和异步1.2消息队列的作用1.3rabbitMQ介绍 2.安装教程2.1更新软件包2.2安装erlang2.3查看这个erlang版本2.4安装rabbitMQ2.5安装管理页面2.6浏览器测试2.7添加管理员用户 3.rabbitMQ工作流程4.核心概念介绍4.1信道和连接4.2virtual host4.3quene队列 5.We…

数据结构(初阶4)---循环队列详解

循环队列 1.循环队列的结构  1).逻辑模式 2.实现接口  1).初始化  2).判断空和满  3).增加  4).删除  5).找头  6).找尾 3.循环队列的特点 1.循环队列的结构 1).逻辑模式 与队列是大同小异的&#xff0c; 其中还是有一个指向队列头的head指针&#xff0c; 也有一个指向尾…

java中volatile 类型变量提供什么保证?能使得一个非原子操作变成原子操作吗?

大家好&#xff0c;我是锋哥。今天分享关于【java中volatile 类型变量提供什么保证&#xff1f;能使得一个非原子操作变成原子操作吗&#xff1f;】面试题。希望对大家有帮助&#xff1b; java中volatile 类型变量提供什么保证&#xff1f;能使得一个非原子操作变成原子操作吗&…