【MQ05】异常消息处理

异常消息处理

上节课我们已经学习到了消息的持久化和确认相关的内容。但是,光有这些还不行,如果我们的消费者出现问题了,无法确认,或者直接报错产生异常了,这些消息要怎么处理呢?直接丢弃?这就是丢消息了呀。再次处理?一直继续报错怎么办?这条消息就永远都在不停报错的死循环中了。

通常,消息队列系统都会提供一套对于异常消息的处理机制,比如 RabbitMQ 的死信队列。

RabbitMQ死信队列

死信队列,其实就是在满足一定规则的前提下,将消息发送到指定的一个交换机队列中。这些规则包括:

  • 使用者使用basic.reject或basic.nack(重新排队参数设置为false)对消息进行否定确认。

  • 消息过期,根据队列的消息 TTL 过期时间而定。

  • 消息被丢弃,因为队列超过了长度限制。

前面两个好测试,最后一个要修改 RabbitMQ 的配置稍微麻烦点,那么咱们就用前面两个来测。

首先,要定义一个用于接收死信消息的交换机和队列,我们顺便也直接做一个客户端消费者,专门读取死信队列里的消息。这个就相当于是正规队列消费者处理出现问题之后,再由这个消费者来做善后。

// 5.rq.c.deadletter.php
// ……………………
// 建立连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel(); // 获取频道// 死信队列及交换机
$channel->exchange_declare('dead_letter', 'direct', false,true,false); // 定义交换机
$channel->queue_declare('dead_letter_queue', false, true); // 定义队列
$channel->queue_bind('dead_letter_queue', 'dead_letter'); // 队列绑定交换机echo "等待死信队列消息,或者使用 Ctrl+C 退出程序。", PHP_EOL;// 定义接收数据的回调函数
$callback = function ($msg) {echo '死信队列接收到数据: ', $msg->body, PHP_EOL;
};// 消费队列,获取到数据将调用 callback 回调函数
$channel->basic_consume('dead_letter_queue', '', false, true, false, false, $callback);while ($channel->is_open()) {$channel->wait();
}

交换机名字和队列名字自己起就好了。前面已经说过了,这个消费者获取到的死信队列数据都是正常消费有问题的,那么善后工作咱们就可以将这些数据记录日志或者记录到数据库,顺便发邮件、发短信提醒,或者做任何你想做的通知及记录工作。出于测试的目的,咱们就是简单打印了一下。

> php 5.rq.c.deadletter.php
等待死信队列消息,或者使用 Ctrl+C 退出程序。

启动之后就等着死信数据的到来吧。接下来,我们要修改一下正常的队列,增加一些参数。

// 5.rq.c.php
// ……………………
$channel->queue_declare('hello', false, true, false, false, false, new AMQPTable(['x-message-ttl'=>10000, // 10秒过期'x-dead-letter-exchange'=>'dead_letter', // 死信到某个交换机'x-dead-letter-routing-key'=>'', // 死信路由
]));// ……………………$callback = function ($msg) {echo '接收到数据: ', $msg->body, PHP_EOL;$msg->nack();
};// ……………………

x-message-ttl 是消息的过期时间,我们后面要用它来测过期时间进入死信队列的情况。x-dead-letter-exchange 用于定义出现问题后,将这个队列的数据放到哪个死信队列交换机中。x-dead-letter-routing-key 这个是指定进入死信队列的哪个路由。关于 RabbitMQ 交换机和路由的内容,如果有不清楚的小伙伴,可以在深入地学习一下 RabbitMQ 的官方文档和示例哦。

然后,在回调函数中,我们直接调用 $msg->nack() 。这个表示的就是手动取消确认,还记得上节课我们学过的是 $msg->ack() 吧,这个表示的是确认处理完成,没问题了,而 nack() 就是反过来的,说明当前的队列处理有问题,没有正常 ACK 。这对应的就是死信队列的第一条规则。

现在,向普通的 hello 消息队列中发送消息,结果死信队列中会接收到数据。

> php 5.rq.c.deadletter.php
等待死信队列消息,或者使用 Ctrl+C 退出程序。
死信队列接收到数据: Hello World!

过期时间

好了,上面测试的结果就是死信队列的第一条规则。接下来我们测试第二条规则。

在 hello 队列的配置中,我们加上的 x-message-ttl 是 10 秒,也就是说,这条消息 10 秒不处理就会进入到死信队列。这个非常好测,直接关掉消费者,然后生产者发送数据,等到过期时间后,死信队列就会显示接收到数据了。

这个规则可以帮我们实现一个非常重要的功能,就是:延时队列。RabbitMQ 中没有直接的延时队列相关的功能,但可以通过死信的这个规则来实现,具体内容我们下节课再说。

Redis 队列在 Laravel 框架中处理异常消息

好了,看完 RabbitMQ 的相关异常处理功能之后,我们马上会联想到,Redis 有这样的功能吗?

抱歉,真的没有,但是,Laravel 和 TP 框架的队列功能都通过业务代码的形式实现了类似的功能。我们还是以 Laravel 为例进行学习。

在 Laravel 中,异常的消息队列数据最后会保存到 MySQL 数据库中,我们需要执行数据迁移来创建表,使用下面这两个命令。

php artisan queue:failed-table
php artisan migrate

操作成功之后,会在数据库中创建一个名为 failed_jbs 的表。接下来,还是继续拿上次课创建的那个最后会报异常的 Job 来进行测试,直接调用生产者的命令插入队列。

> php artisan q:p4

然后,我们不使用 --tries ,这样就不会进行重试了,一次失败就会进入到异常处理流程中,也就是插入到数据库中。

> php artisan queue:work
[2023-01-03 01:58:12][gAfPH03MvGWQXjjjk5nAFWtU7b3gkhqD] Processing: App\Jobs\Queue4
接收到了消息:测试 1672711092
[2023-01-03 01:58:22][gAfPH03MvGWQXjjjk5nAFWtU7b3gkhqD] Failed:     App\Jobs\Queue4

好了,去数据库看一眼吧,数据是不是插入进来了。

36aa3a7d22ca85b13e1dd118f614099f.png

从截图上可以看到,不仅有原始的队列信息,还有异常信息、队列使用的连接以及队列名、uuid 和失败时间这些字段。在结构上,还为 uuid 创建了一个唯一索引,这个 uuid 的作用我们后面马上就会看到。

接下来,使用命令行,我们还可以看到所有失败队列的信息。

> php artisan queue:failed+--------------------------------------+------------+---------+-----------------+---------------------+
| ID                                   | Connection | Queue   | Class           | Failed At           |
+--------------------------------------+------------+---------+-----------------+---------------------+
| c8774e27-1284-4657-a13a-7e5665789d74 | redis      | default | App\Jobs\Queue4 | 2023-01-03 01:58:22 |
| 4a38d37b-86e7-4755-a29a-f6843b7289cc | redis      | default | App\Jobs\Queue4 | 2022-12-31 03:57:23 |
| f4fa4cfd-cee2-4199-b048-c21dbbc188ca | redis      | default | App\Jobs\Queue4 | 2022-12-31 03:57:03 |
| 210c9716-89f1-438b-b252-32cc6552a68f | redis      | default | App\Jobs\Queue4 | 2022-12-31 03:52:57 |
| c971ddef-9098-4399-a404-bbce08bd97af | redis      | default | App\Jobs\Queue4 | 2022-12-31 03:52:41 |
+--------------------------------------+------------+---------+-----------------+---------------------+

然后,可以利用 uuid ,也就是上面命令中返回的 ID ,来让数据进行消息重试,使用 queue:retry 命令。

> php artisan queue:retry c8774e27-1284-4657-a13a-7e5665789d74 
The failed job [c8774e27-1284-4657-a13a-7e5665789d74] has been pushed back onto the queue!

执行之后,这条失败的数据又塞回之前的队列里了,消费者又会开始对它进行消费。这就是 uuid 的作用。另外,我们还可以批量执行重试,直接在命令后面写多个 uuid 就行。也可以一次性全部执行重试,只需要使用 all 参数即可,这个大家可以去官方文档再详细看一下。

我们还可以删除或者整个清除所有的失败任务数据,其实也就是删除 failed_jobs 中的数据。queue:forget 用于根据指定的 uuid 进行删除,而 queue:flush 则会清除所有数据。除了这两个命令之外,还有一个根据时间来清除失败任务的命令 queue:prune-failed 。它默认是默认 24 小时,可以用 --hours=xxx 来设置具体的时间。

> php artisan queue:prune-failed
4 entries deleted!

最后,我们还可以禁用失败存储。直接通过 .env 配置文件进行配置就行了,设置对应的属性值为 null 即可。

QUEUE_FAILED_DRIVER=null

任务错误处理

除了上面的失败处理之外,在 Laravel 中,还可以在出现错误的时候马上去执行一个方法,就像是失败事件后的回调函数一样。通过这个方法,我们可以在任务失败的时候马上就进行邮件、短信通知,或者也可以记录错误日志,甚至也可以不使用上面默认的异常处理功能以及相关的表,直接在这里用我们自己自定义的表来存储失败任务的信息。总之就是,任务失败后你想怎么处理都行。

只需要在任务类中实现 failed() 方法。

// /app/Jobs/Queue4.php
// ……………………
// ……………………
public function failed($exception = null)
{echo '如果发生错误就进入到这里了,错误信息是:'.$exception->getMessage(), PHP_EOL;
}

然后看一下效果。

> php artisan queue:work
[2023-01-03 02:19:00][d5MlKJkjnez3MBnqzrYCUkG1HSg1LJtC] Processing: App\Jobs\Queue4
接收到了消息:测试 1672712340
如果发生错误就进入到这里了,错误信息是:
[2023-01-03 02:19:10][d5MlKJkjnez3MBnqzrYCUkG1HSg1LJtC] Failed:     App\Jobs\Queue4

这个方法的执行是同步的,不是异步的,就像我们前面说的,任务失败了马上就会调用这个方法。因此,就可以安全有效地在这个方法中进行我们任何想要善后处理的操作了。

总结

好了,到现在为止,我们的队列系统其实已经相当安全可靠了。上一篇文章通过持久化和 ACK 机制解决了消息丢失的问题,这次即使是消费者出现了异常,我们也可以保证消息能够通过死信队列或者框架机制保存下来。从这里我们也可以看出,即使原生的系统不支持,也可以通过框架代码的形式来实现类似的功能,这也是队列这种系统灵活性的表现。补充一点,BLMOVE 这类 Redis 命令其实也可以实现消息备份,但和上面死信那种触发条件还是有区别,这是主动备份。再有,Redis 的 Stream 类型其实也已经是很完备的一套消息队列功能机制了,未应答 ACK 的数据是可以重复执行的,这也可以当成是一种异常处理的形式,只不过也一样需要我们自己编码干预进行转移,可以参考我们之前 Redis 系列中相关的内容:【Redis09】Redis基础:Stream操作 https://mp.weixin.qq.com/s/DsSnyU9xuJEijm9t-DVs2A 。

接下来,我们再看两种常见的队列形式,分别是延时队列和优先级队列,它们在 RabbitMQ 和 Laravel+Redis 中的实现又是怎样的呢?

测试代码:

https://github.com/zhangyue0503/dev-blog/blob/master/%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97/source/5.rq.c.deadletter.php

https://github.com/zhangyue0503/dev-blog/blob/master/%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97/source/5.rq.c.php

https://github.com/zhangyue0503/dev-blog/blob/master/%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97/source/5.rq.p.php

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/707313.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

带大家做一个,易上手的家常蒜香菠菜

一捆 菠菜 四瓣蒜 蒜去皮切末 菠菜切段 多清洗几次 因为菠菜上面的土真的是太多了 菠菜下锅 加水煮一分钟左右 因为菠菜内的草酸成分非常高 所以这一步肯定是要的 然后将菠菜捞出来 干和叶子分开 锅中水倒掉 清洗一下 然后起锅烧油 下蒜末炒香 然后 下菠菜干 因为干熟的…

Python + Selenium —— 网页元素定位之标签名和链接文本定位

tag name tag name 为标签名定位,使用网页元素的标签名如a, div, input, span 等。 但是有一个问题,常见的标签名比如 在同一个页面上有非常多。会不会觉得 tag name 没什么用呢? 当然普通的模拟操作是不大有用,这个重复性实在…

笔记:GO1.19 带来的优化(重新编译juicefs)

## 背景 go编写的应用程序(juicefs)在k8s(docker)中运行,时不时出现 OOM Killed。 ## 分析 发现某些应用使用juicefs会导致内存使用飙升; k8s的pod给的内存资源:request 2G,limit…

基于springboot实现线上阅读系统项目【项目源码+论文说明】计算机毕业设计

基于springboot实现线上阅读系统演示 摘要 随着社会发展速度的愈来愈快,以及社会压力变化的越来越快速,致使很多人采取各种不同的方法进行解压。大多数人的稀释压力的方法,是捧一本书籍,心情地让自己沉浸在情节里面,以…

基于沁恒微 ch643q 多通道采集 adc 驱动层实现

一、代码 #include "main.h"/********************************************************************** fn ADC_Function_Init** brief Initializes ADC collection.** return none*/ void ADC_Function_Init(void) {ADC_InitTypeDef ADC_InitStructure …

pdffactory pro 8中文破解版

详细介绍 PdfFactory,PDF文档虚拟打印机,无须Acrobat即可创建Adobe PDF文件,创建PDF文件的方法比其他方法更方便和高效。支持将多个文档整合到一个PDF文件、增加字体和便签、PDF加密、去水印、压缩优化。 FinePrint,Windows虚拟…

【踩坑】修复xrdp无法关闭Authentication Required验证窗口

转载请注明出处:小锋学长生活大爆炸[xfxuezhang.cn] 问题如下,时不时出现,有时还怎么都关不掉,很烦: 解决方法一:命令行输入 dbus-send --typemethod_call --destorg.gnome.Shell /org/gnome/Shell org.gn…

js 面试题--事件循环event loop--宏任务和微任务

1 事件循环event loop概念: js 是非阻塞单线程语言,js在执行过程中会产生执行环境,执行环境会按顺序添加到执行栈中,先执行同步栈中的任务,当遇到异步任务时会添加到task队列中,同步栈执行完后&#xff0c…

一文读懂什么是 OCR 识别

在数字化时代,信息处理和数据管理是企业运营的重要环节。然而,手工输入信息存在效率低和准确性低的问题,严重影响了企业的工作流程和决策过程。因此,OCR(Optical Character Recognition)识别技术的应用变得…

【Unity】导入IAP插件后依赖冲突问题 com.android.billingclient冲突

【Unity】Attribute meta-data#com.google.android.play.billingclient.version 多版本库冲突_unity billingclient-CSDN博客 打开mainTemplate.gradle 找到dependencies { } 在里面末尾加上如下: configurations.all {exclude group: com.android.billingclien…

uni-app 实现拍照后给照片加水印功能

遇到个需求需要实现&#xff0c;研究了一下后写了个demo 本质上就是把拍完照后的照片放到canvas里&#xff0c;然后加上水印样式然后再重新生成一张图片 代码如下&#xff0c;看注释即可~使用的话记得还是得优化下代码 <template><view class"content"&g…

单词倒排——c语言解法

以下是题目&#xff1a; 这个题中有三个点&#xff0c; 一个是将非字母的字符转换为空格&#xff0c; 第二是如果有两个连续的空格&#xff0c; 那么就可以将这两个连续的空格变成一个空格。 第三个点就是让单词倒排。 那么我们就可以将这三个点分别封装成三个函数。 还有就是…

特征融合篇 | YOLOv8 引入通用高效层聚合网络 GELAN | YOLOv9 新模块

今天的深度学习方法专注于如何设计最合适的目标函数,以使模型的预测结果最接近真实情况。同时,必须设计一个合适的架构,以便为预测提供足够的信息。现有方法忽视了一个事实,即当输入数据经过逐层特征提取和空间转换时,会丢失大量信息。本文将深入探讨数据通过深度网络传输…

linux系统Jenkins的安装

Jenkins安装 安装上传安装包解压包首次登录要去服务器查看密码&#xff0c;更改密码选择需要安装的插件设置Admin用户和密码安装完成 安装 上传安装包 上传 jdk17 tomcat jenkins.war的安装包 . 上传 tomcat安装包解压包 解压jdk tar xf jdk-11.0.18_linux-x64_bin.tar.gz解…

静态链表(1)

什么叫静态链表&#xff1f;——用顺序表模拟链表&#xff0c;就叫做静态链表 第一列相当于数据域data&#xff0c;第二列相当于指针域next&#xff0c; 第一行&#xff08;0&#xff09;相当于头结点&#xff08;头结点的数据域不放数据&#xff09; &#xff08;a&#xff…

【Unity每日一记】角色控制器Character Contorller

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;元宇宙-秩沅 &#x1f468;‍&#x1f4bb; hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍&#x1f4bb; 本文由 秩沅 原创 &#x1f468;‍&#x1f4bb; 收录于专栏&#xff1a;Uni…

校园微社区微信小程序源码/二手交易/兼职交友微信小程序源码

云开发校园微社区微信小程序开源源码&#xff0c;这是一款云开发校园微社区-二手交易_兼职_交友_项目微信小程序开源源码&#xff0c;可以给你提供快捷方便的校园生活&#xff0c;有很多有趣实用的板块和功能&#xff0c;如&#xff1a;闲置交易、表白交友、疑问互答、任务兼职…

解决内嵌帆软报表出现重定向问题

最近收到反馈&#xff0c;某些程序的前端通过iframe标签内嵌finebi帆软报表时&#xff0c;出现一系列问题。 问题1: 如下图所示&#xff0c;单点登录(单点登录地址schema是https)后service地址的schema协议是http, 浏览器内核的安全策略不允许http访问https。 解决方案&#xf…

UI自动化测试:playwright工具(一):python环境下安装、UI录制使用(需要些代码能力)

一、python环境下安装playwright工具 1. 安装playwright库 pip install playwright -i http://pypi.douban.com/simple/ --trusted-host pypi.douban.com#至于镜像源,可以选,也可不选:#阿里云 http://mirrors.aliyun.com/pypi/simple/ #中国科技大学 https://py…

如何鉴别医疗设备防漏费系统的实用性

19339904493&#xff08;康溪&#xff09; 市场上有很多医疗设备防漏费系统&#xff0c;鱼龙混杂。有些系统安装后不但效果不好&#xff0c;还增加了医生的负担&#xff0c;不但需要手动输号&#xff0c;本来需要做一次的动作&#xff0c;安装系统后需要重复两次。还有的是效果…