微服务——服务异步通讯(MQ高级)

MQ的一些常见问题

消息可靠性

生产者消息确认

返回ack,怎么感觉这么像某个tcp的3次握手。

使用资料提供的案例工程.

在图形化界面创建一个simple.queue的队列,虚拟机要和配置文件里面的一样。

 SpringAMQP实现生产者确认

AMQP里面支持多种生产者确认的类型。

simple是同步等待模式,发了消息之后就一直等待结果,可能会导致代码阻塞。

correlated是异步回调模式,像前段的ajax请求的回调函数。

ApplicationContextAware是bean工厂通知。会在Spring容器创建完后来通知并传一个spring容器到下面的方法。然后从中取到rabbitTemplate的bean并设置ReturnCallback。 

ReturnCallback:消息到了交换机,路由时失败了没有到达消息队列

ConfirmCallback:消息连交换机都没到。

这个不像ReturnCallback只能配置一个,这个可以在每次发消息时设置。

这里在发送消息时多了一个correlationData,这是在配置开关选择的confirm类型为correlated。里面封装了消息的唯一id和callback.

callback里面的result是成功的回调函数,ex是失败的回调函数。这里的失败是指回调都没收到。

实现

先是在生产者的配置文件里要加上前面的配置j

编写returnCallback

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {//获取RabbitTemplate对象RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);//配置ReturnCallbackrabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {//记录日志log.error("消息发送到队列失败,响应码:{},失败原因:{},交换机:{},路由key:{},消息:{}",replyCode,replyText,exchange,routingKey,message.toString());//如果有需要的话,可以重发消息});}
}

编写ConfirmCallback

这里先要在图形界面手动将交换机和消息队列做绑定 

    @Testpublic void testSendMessage2SimpleQueue() throws InterruptedException {//1.准备消息String message = "hello, spring amqp!";//2.准备correlationData//2.1消息IDCorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());//2.2准备ConfirmCallbackcorrelationData.getFuture().addCallback(result -> {//判断结果if(result.isAck()){//ACKlog.debug("消息成功投递到交换机!消息ID:{}",correlationData.getId());}else{//NACKlog.error("消息投递到交换机失败!消息ID:{}",correlationData.getId());}}, ex -> {//记录日志log.error("消息发送失败!",ex);//重发消息});//3.发送消息rabbitTemplate.convertAndSend("camq.topic", "simple.test", message,correlationData);}

测试得到

成功的测试情况

 

失败的测试情况

投递交换机失败,交换机不存在

投递队列失败,队列不存在

 

消息持久化

这里通过重启rabbitmq容器发现消息都不见了可以确认,rabbitmq和redis一样都是内存运行的。

甚至我手动加上的消息队列和绑定关系都不见了。这里消息队列不见是因为前面创建队列时选择的是Transient,不持久化。系统默认的交换机都还在,是因为durable为true,持久化。

创建队列或交换机的时候可以设置Durability为Durable即可持久化。

在消费者代码中进行交换机和队列的创建,然后可以看见如下持久化的交换机和队列.

@Configuration
public class CommonConfig {@Beanpublic DirectExchange simpleExchange(){return new DirectExchange("simple.direct",true,false);}@Beanpublic Queue simpleQueue(){return QueueBuilder.durable("simple.queue").build();}
}

手动发送一条消息进行测试

重启之后消息还是消失了。

要想让消息持久化,需要在发送消息时指定。

@Testpublic void testDurableMessage(){//1.准备消息Message message = MessageBuilder.withBody("hello,pop".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT) //持久的.build();//2.发送消息rabbitTemplate.convertAndSend("simple.queue",message);}

 重启之后消息就持久化了。

通常在springamqp中这些都是持久化的。

消费者消息确认

在none模式下,消费者拿到消息都就报异常了,然后消息也没了。

在auto模式下,消费者拿到消息后给mq报了个unack,然后消息会重新投递,消费者继续拿消息,tmd,死循环了。 但是这里消息就不会消失了。

@RabbitListener(queues = "simple.queue")public void listenSimpleQueue(String msg) {System.out.println("消费者接收到simple.queue的消息:【" + msg + "】");System.out.println(1/0);log.info("消费者处理消息成功!");}

消费失败重试机制

重试次数耗尽之后会将消息丢弃。

消费者失败消息处理策略

 在消费者代码中

@Configuration
public class ErrorMessageConfig {@Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");}@Beanpublic Queue errorQueue(){return new Queue("error.queue",true);}@Beanpublic Binding errorBinding(){return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");}@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate,"error.direct","error");}
}

 重新发送消息进行测试,可以看见重试次数耗尽之后就送到了死信队列了。

在里面将异常的堆栈信息也包含了. 

 

死信交换机

初识死信交换机

区别在于,上一个是消费者失败之后寻找交换机路由到error队列,这个是退回到队列,再指定交换机,最后路由。

TTL

这个的应用场景比如说订单超时未支付然后自动取消。

实现  

          

准备 代码部分

    @RabbitListener(bindings = @QueueBinding(value=@Queue(name = "dl.queue",durable = "true"),exchange=@Exchange(name="dl.direct"),key = "dl"))public void listenDlQueue(String msg){log.info("接收到 dl.queue的延迟消息:{}",msg);}

@Configuration
public class TTLMessageConfig {@Beanpublic DirectExchange ttlExchange(){return new DirectExchange("ttl.direct");}@Beanpublic Queue ttlQueue(){return QueueBuilder.durable("ttl.queue").ttl(10000).deadLetterExchange("dl.direct").deadLetterRoutingKey("dl").build();}@Beanpublic Binding simpleBinging(){return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");}
}

 测试代码

    @Testpublic void testTTLMessage(){//1.准备消息Message message = MessageBuilder.withBody("hello,ttl".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT) //持久的.build();//2.发送消息rabbitTemplate.convertAndSend("ttl.direct","ttl",message);//3.记录日志log.info("消息成功发送!");}

10s之后在消费者那里就可以看见

 

 然后这里会以短的优先,5s后消费者就可以收到消息。

延迟队列

1.重装rabbitmq容器 

这个插件需要找到mq内部的插件文件夹,所以需要在创建容器的时候进行数据卷挂载。

docker run \-e RABBITMQ_DEFAULT_USER=itcast \-e RABBITMQ_DEFAULT_PASS=123321 \-v mq-plugins:/plugins \--name mq \--hostname mq1 \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq:3.8-management

 2.安装DelayExchange插件

官方的安装指南地址为:Scheduling Messages with RabbitMQ | RabbitMQ - Blog

上述文档是基于linux原生安装RabbitMQ,然后安装插件。

2.1.下载插件

RabbitMQ有一个官方的插件社区,地址为:Community Plugins — RabbitMQ

大家可以去对应的GitHub页面下载3.8.9版本的插件,地址为Release v3.8.9 · rabbitmq/rabbitmq-delayed-message-exchange · GitHub这个对应RabbitMQ的3.8.5以上版本。 

查看挂载的数据卷.

docker volume inspect mq-plugins

接下来的看着好麻烦,以后看文档吧.

还真的麻烦的一批,真不想再搞这玩意,文件搞来搞去。

不知道为什么,挂载数据卷时一直报错,不能用自己定义的文件夹来挂载。

 

 

在消费者中如下声明

    @RabbitListener(bindings = @QueueBinding(value=@Queue(name = "delay.queue",durable = "true"),exchange=@Exchange(name="delay.direct",delayed = "true"),key = "delay"))public void listenDelayQueue(String msg){log.info("接收到 delay.queue的延迟消息:{}",msg);}

 在生产者中如下定义

    @Testpublic void testSendDelayMessage(){//1.准备消息Message message = MessageBuilder.withBody("hello,ttl".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT) //持久的.setHeader("x-delay",5000).build();//2.准备correlationDataCorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());//3.发送消息rabbitTemplate.convertAndSend("delay.direct", "delay", message,correlationData);log.info("发送消息成功");}

测试结果如下 成功实现延迟5秒。但是会被报错,理论上说交换机应该立即转发,不会延迟,但是这里的延迟交换机可以帮忙保存消息延迟发送,所以这里才会报错,not_router,消息没有到达队列

 为了解决这个报错,需要修改生产者代码

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {//获取RabbitTemplate对象RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);//配置ReturnCallbackrabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {//判断是否是延迟消息if (message.getMessageProperties().getReceivedDelay()>0) {//是一个延迟消息,忽略错误提示return;}//记录日志log.error("消息发送到队列失败,响应码:{},失败原因:{},交换机:{},路由key:{},消息:{}",replyCode,replyText,exchange,routingKey,message.toString());//如果有需要的话,可以重发消息});}
}

惰性队列

消息堆积问题

问题解决

消费者中声明两个队列。 

@Configuration
public class LazyConfig {@Beanpublic Queue lazyQueue(){return QueueBuilder.durable("lazy.queue").lazy().build();}@Beanpublic Queue normalQueue(){return QueueBuilder.durable("normal.queue").build();}
}

 测试,准备两个队列之后分别向两个队列发消息。

    @Testpublic void testLazyMessage(){for(int i=0;i<1000000;i++){//1.准备消息Message message = MessageBuilder.withBody("hello,ttl".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT) //持久的.build();//3.发送消息rabbitTemplate.convertAndSend("lazy.queue", message);}}@Testpublic void testnormalMessage(){for(int i=0;i<1000000;i++){//1.准备消息Message message = MessageBuilder.withBody("hello,ttl".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT) //持久的.build();//3.发送消息rabbitTemplate.convertAndSend("normal.queue", message);}}

可以看见惰性队列的消息全部到paged out 刷出磁盘了?????、,为什么非惰性队列的也是刷出磁盘了。

 

MQ集群

集群个屁,不搞了.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/231008.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【华为】文档中命令行约定格式规范(命令行格式规范、命令行行为规范、命令行参数格式、命令行规范)

文章目录 命令行约定格式**粗体&#xff1a;命令行关键字***斜体&#xff1a;命令行参数*[ ]&#xff1a;可选配置{ x | y | ... } 和 [ x | y | ... ]&#xff1a;选项{ x | y | ... }* 和 [ x | y | ... ]*&#xff1a;多选项&<1-n>&#xff1a;重复参数#&#xff…

kill编译异常处理

当kill编译时出现如下警告 Build target Target 1 linking... *** WARNING L16: UNCALLED SEGMENT, IGNORED FOR OVERLAY PROCESSSEGMENT: ?PR?_LCD_SHOWCHAR?LCD1602 *** WARNING L16: UNCALLED SEGMENT, IGNORED FOR OVERLAY PROCESSSEGMENT: ?PR?_LCD_SHOWSTRING?LCD…

SpringSecurity 手机号登录

一、工作流程 1.向手机发送验证码&#xff0c;第三方短信发送平台&#xff0c;如阿里云短信。 2.手机获取验证码后&#xff0c;在表单中输入验证码。 3.使用自定义过滤器​SmsCodeValidateFilter​。 4.短信校验通过后&#xff0c;使用自定义手机认证过滤器​SmsCodeAuthentic…

UE4/UE5 日志插件(基于spdlog)

1 解决问题 对于高频日志序列化到本地的需求&#xff0c;spdlog肯定完美满足。 源码地址&#xff1a;https://github.com/gabime/spdlog 博主下载的版本为 spdlog-1.12.0&#xff0c;各位大佬可以根绝自己爱好选择。 2 过程介绍 大概目录&#xff1a; SpdlogLibC目录下是对…

Qt/C++音视频开发60-坐标拾取/按下鼠标获取矩形区域/转换到视频源真实坐标

一、前言 通过在通道画面上拾取鼠标按下的坐标&#xff0c;然后鼠标移动&#xff0c;直到松开&#xff0c;根据松开的坐标和按下的坐标&#xff0c;绘制一个矩形区域&#xff0c;作为热点或者需要电子放大的区域&#xff0c;拿到这个坐标区域&#xff0c;用途非常多&#xff0…

C语言之文件操作(下)

C语言之文件操作&#xff08;下&#xff09; 文章目录 C语言之文件操作&#xff08;下&#xff09;1. 文件的顺序读写1.1 文件的顺序读写函数1.1.1 字符输入/输出函数&#xff08;fgetc/fputc&#xff09;1.1.2 ⽂本⾏输⼊/输出函数&#xff08;fgets/fputs&#xff09;1.1.3 格…

工业应用新典范,飞凌嵌入式FET-D9360-C核心板发布!

来源&#xff1a;飞凌嵌入式官网 当前新一轮科技革命和产业变革突飞猛进&#xff0c;工业领域对高性能、高可靠性、高稳定性的计算需求也在日益增长。为了更好地满足这一需求&#xff0c;飞凌嵌入式与芯驰科技&#xff08;SemiDrive&#xff09;强强联合&#xff0c;基于芯驰D9…

SI24R03国产自主可控RISC-V架构MCU低功耗2.4GHz收发芯片SoC

目录 RISC-V架构的优势SI24R03/04特性射频收发器模块特征MCU 模块特征 其他特征 RISC-V架构的优势 相对于目前主流的英特尔X86架构及ARM等架构来说&#xff0c;RISC-V架构具有指令精简、模块化、可扩展、开源、免费等优点。RISC-V的基础指令集只有40多条&#xff0c;加上其他基…

Kafka--从Zookeeper数据理解Kafka集群工作机制

从Zookeeper数据理解Kafka集群工作机制 这一部分主要是理解Kafka的服务端重要原理。但是Kafka为了保证高吞吐&#xff0c;高性能&#xff0c;高可扩展的三高架构&#xff0c;很多具体设计都是相当复杂的。如果直接跳进去学习研究&#xff0c;很快就会晕头转向。所以&#xff0c…

Echarts相关配置

title&#xff1a;标题组件 tooltip:提示框组件 legend:图例组件 toolbox:工具栏 grid&#xff1a;直角坐标系内绘图网格 xAxis:直角坐标系grid中的x轴 yAxis&#xff1a;直角坐标系grid中的y轴 series:系列列表。每个系列通过type决定自己的图表类型 color&#xff1a;调色…

如何用 Cargo 管理 Rust 工程系列 戊

以下内容为本人的学习笔记&#xff0c;如需要转载&#xff0c;请声明原文链接 微信公众号「ENG八戒」https://mp.weixin.qq.com/s/-OiWtUCUc3FmKIGMBEYfHQ 单元和集成测试 Rust 为单元测试提供了非常好的原生支持。 创建库工程时&#xff0c;cargo 生成的源码文件 lib.rs 自带…

【C语言】自定义类型——枚举、联合体

引言 对枚举、联合体进行介绍&#xff0c;包括枚举的声明、枚举的优点&#xff0c;联合体的声明、联合体的大小。 ✨ 猪巴戒&#xff1a;个人主页✨ 所属专栏&#xff1a;《C语言进阶》 &#x1f388;跟着猪巴戒&#xff0c;一起学习C语言&#x1f388; 目录 引言 枚举 枚举…

06. Python模块

目录 1、前言 2、什么是模块 3、Python标准库模块 3.1、os模块 3.2、datetime 模块 3.3、random模块 4、自定义模块 4.1、创建和使用 4.2、模块命名空间 4.3、作用域 5、安装第三方依赖 5.1、使用 pip 安装单个依赖 5.2、从 requirements.txt 安装依赖 5.3、安装指…

还在为学MyBatis发愁?史上最全,一篇文章带你学习MyBatis

文章目录 前言一、&#x1f4d6;MyBatis简介1.Mybatis历史2.MyBatis特性3.对比&#xff08;其他持久化层技术&#xff09; 二、&#x1f4e3;搭建MyBatis1.开发环境2.创建maven工程3.创建MyBatis核心配置文件4.创建mapper接口5.创建MyBatis的映射文件6.通过junit测试功能7.加入…

OpenCV4工业缺陷检测的六种方法

机器视觉 机器视觉是使用各种工业相机&#xff0c;结合传感器跟电气信号实现替代传统人工&#xff0c;完成对象识别、计数、测量、缺陷检测、引导定位与抓取等任务。其中工业品的缺陷检测极大的依赖人工完成&#xff0c;特别是传统的3C制造环节&#xff0c;产品缺陷检测依赖于人…

python+torch线性回归模型机器学习

程序示例精选 pythontorch线性回归模型机器学习 如需安装运行环境或远程调试&#xff0c;见文章底部个人QQ名片&#xff0c;由专业技术人员远程协助&#xff01; 前言 这篇博客针对《pythontorch线性回归模型机器学习》编写代码&#xff0c;代码整洁&#xff0c;规则&#xf…

USB2.0 Spec

USB System Description A USB system is described by three definitional areas: • USB interconnect • USB devices • USB host USB interconnect The USB interconnect is the manner in which USB devices are connected to and communicate with the host. USB Ho…

docker基本命令

1.docker命令图解 2. 从仓库拉取镜像 #下载最新版 docker pull nginx # 镜像名:版本名&#xff08;标签&#xff09; docker pull nginx:1.20.1docker rmi 镜像名:版本号/镜像id3. 容器启动及停止 docker run [OPTIONS] IMAGE [COMMAND] [ARG...] docker run [设置项] 镜…

R语言【rgbif】——occ_search对待字符长度大于1500的WKT的特殊处理真的有必要吗?

一句话结论&#xff1a;只要有网有流量&#xff0c;直接用长WKT传递给参数【geometry】、参数【limit】配合参数【start】获取所有记录。 当我在阅读 【rgbif】 给出的用户手册时&#xff0c;注意到 【occ_search】 强调了 参数 【geometry】使用的wkt格式字符串长度。 文中如…

使用数组模拟栈的相关操作【栈1.1】

public class ArrayStackDemo {public static void main(String[] args) {ArrayStack arrayStack new ArrayStack(4);Scanner sc new Scanner(System.in);boolean loop true;char key ;while (loop) {System.out.println("栈操作菜单项");System.out.println(&q…