如何处理消费过程中的重复消息?

欢迎大家到我的博客浏览。如何处理消费过程中的重复消息? | YinKai's Blog
本文来聊一聊消息队列过程中消息重复怎么办?<!--more-->

在消息传递过程中,如果出现消息传递失败,发送方就会进行重试,重试过程中就有可能产生重复的消息。对于使用消息队列的业务系统来说,如果没有对重复消息进行处理,就有可能会导致系统的数据出现错误。

比如一个消费订单的系统,统计下单金额的微服务,如果没有正确处理重复消息,就会出现重复统计,导致结果错误。

处理重复消息可以从两个方面进行考虑:

  1. 消息队列本身保证消息不重复

  2. 业务实现幂等

消息重复的情况是必然存在的

在 MQTT 协议中,针对传递消息能够提供的服务质量标准,提供了三种不同服务质量的定义,这三种服务质量从低到高依次是:

  • At most once: 至多一次。在消息传递时,最多被送达一次,这意味着无法保证消息可靠性,允许丢数据。适用于对消息可靠性要求不高的监控场景,比如每分钟上报一次机房温度数据,可以接受数据少量丢失。

  • At least once:至少一次。消息在传递时,至少会被送达一次,即不允许丢消息,但允许有少量重复消息出现。例如,金融交易处理系统通常采用至少一次传递,因为在这种场景中,任何一笔交易都不能丢失,但允许某些交易重复。虽然会有一些额外的重复处理工作,但系统必须保证每笔交易都会被处理。

  • Exactly once:恰好一次。消息在传递时,只会被送达一次。这个是最高等级。比如电信行业的通信系统可能采用恰好一次传递标准,以确保不仅数据不会丢失,而且不会发送重复的短信或通话记录。

上述的服务质量标准不仅适用于 MQTT,对所有的消息队列都是适用的。我们常用的消息队列提供的服务都是 At least once,包括 RocketMQ、RabbitMQ 和 Kafka。所以消息队列是很难保证消息不重复的。

因此我们想要保证消息不重复,就需要我们的代码中能接受 “ 消息可能会重复” 这一现状,然后通过一些方法来消除重复消息对业务的影响。

有人会说,“Kafka 的文档中说它是支持 Exactly once 的。”,但其实 Kafka 所支持的 “Exactly once” 和我们刚刚提到的服务质量标准 “Exactly once” 是不一样的,它是 Kafka 提供的另一个特性。

用幂等性解决消息重复问题

通常解决重复消息的办法是:在消费端,让消费消息的操作具备幂等性。

幂等性:幂等原本是数学上的一个概念,后来被拓展到计算机领域,被用来描述一个操作、方法或者服务。一个幂等操作的特点是:其任意执行多次所产生的影响均与执行一次的影响相同。

一个幂等的方法,使用同样的参数,对它进行多次调用和一次调用,对系统产生的影响是一样的。

举个例子,在不考虑并发的情况下,“将账户 X 的余额设置为 100 元”,执行一次和多次的最终结果都是 “账户 X 的余额设置为 100 元”。只要提供的参数 100 元不变,这个操作就是一个幂等的操作。

再举一个例子,“将账户 X 的余额加 100 元”,这个操作它就不是幂等的,每执行一次,账户余额就会增加 100 元,执行多次和执行一次对系统的影响(也就是账户的余额)是不一样的。

因此我们只要保证消费消息的逻辑是幂等的,就不用担心重复消息的执行会对系统造成任何改变。

于是我们可以总结出一个 “公式” :

从对系统的影响结果来说:At least once + 幂等消费 = Exactly once。

由于并不是所有的业务都能设计成天然幂等的,因此需要一些方法和技巧来实现幂等,下面就来介绍几种常用的设计幂等操作的方法:

1、利用数据库的唯一约束实现幂等

例如上面的那个不具备幂等特性的转账的例子:将账户 X 的余额加 100 元。我们可以通过修改业务逻辑,让它具备幂等性。

我们可以通过限定,每个账单每个用户只能执行一次变更操作。在分布式系统中,这个限制实现的方法非常多,最简单的是我们在数据库中建一张转账流水表,这个表有三个字段:转账单 ID、账户 ID 和变更金额,然后给账单 ID 和账户 ID 这两个字段创建一个联合的唯一约束,这样对于相同的转账单 ID 和账户 ID,表里只能存在一条记录。

这样我们消费信息的逻辑就变成了:” 在流水表中增加一条转账记录,然后根据转账记录,异步更新用户余额即可 。“ 在流水表增加一条记录这个操作,由于存在唯一约束,故对于同一个账单同一个账户 只能插入一条记录,后续重复插入都会失败,这样就实现了幂等的操作。我们只要写一个 SQL,正确实现它就可以了。

2、为更新的数据设置前置条件

另一种实现幂等的思路就是,给数据变更设置一个前置条件,如果满足条件就更新数据,否则就拒绝更新,在更新数据的时候,同时变更前置条件中需要判断的数据。后续执行重复操作的时候,由于第一次更新的时候已经修改了前置条件中的判断依据,不满足前置条件,则不会重复执行更新数据操作。

还是上面说的那个例子中,”将账户 X 的余额增加 100 元“ 这个操作不具备幂等性,我们可以给这个操作加上一个前置条件,变为:”如果账户余额为 500 元,就将余额加 100 元“,这下该操作就具备幂等性了。同样对应到消息队列中的使用时,可以在发消息时在消息体中带上当前的余额,在消费的时候进行判断数据库中,当前余额是否满足前置条件,只有满足,才能执行变更操作。

另一种更为通用的方法是,给数据增加一个版本号属性,每次更新数据前,判断当前版本号和消息中的版本号是否一致,如果不一致就拒绝更新数据,更新数据的时候同时将版本号 + 1,这样也可以实现幂等。

3、记录并检查操作

上面两种方法其实并不是万能的,如果恰好也不适用于你的场景,还有一种通用性最强,适用范围最广的实现幂等方法:记录并检查操作,也称 ”Token 机制或 GUID(全局唯一 ID)机制“,实现的思路很简单:在执行数据更新操作之前,先检查一下是否执行过这个更新操作。

具体的实现方法是,在发送消息时,给每条消息指定一个全局唯一 ID,消费时先检查这个 ID 是否被消费过,如果没有被消费过,才更新数据,然后将该 ID 的消费状态设置为已消费。

原理和实现看似都很简单,但在分布式系统中是非常难实现的。首先,需要给每个消息都指定一个全局唯一 ID,这件事本身就不简单,方法有很多,但都不太好同时满足简单、高可用和高性能,或多或少都会有些牺牲。

更麻烦的是,在 ”检查消费状态,然后更新数据并设置消费状态“ 中,三个操作必须为一组且具备原子性,才能真正实现幂等。对于这个问题,当然我们可以用事务来实现,也可以用锁来实现,但是在分布式系统中,无论是分布式事务还是分布式锁都是比较难解决问题。

小结

这篇文章,我们了解到消息队列在使用中可能会出现消息重复的问题,并介绍了几种通过实现幂等的操作来避免消息重复给系统带来的影响,可以利用数据库的唯一约束防止重复更新数据,也可以为数据更新设置一次性的前置条件,来防止重复消息,如果这两种方法都不适用,还可以用 ”记录并检查操作“ 的方式来保证幂等,这种方法适用范围最广,但是实现难度和复杂度也比较高,一般不推荐使用。

这些实现幂等的方法,不仅可以用于解决重复消息的问题,也同样适用于,在其他场景中来解决重复请求或者重复调用的问题。比如,我们可以将 HTTP 服务设计成幂等的,解决前端或者 APP 重复提交表单数据的问题;也可以将一个微服务设计成幂等的,解决 RPC 框架自动重试导致的重复调用问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/170646.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

redis基本数据结构(String,Hash,Set,List,SortedSet)【学习笔记】

redis数据结构介绍 redis是一个key-value的数据库&#xff0c;key一般是String类型&#xff0c;但是value的类型多种多样。 redis 通用命令 keys : 查看符合模板的所有key &#xff08;keys partten ,匹配表达式支持一些特殊字符 * &#xff1f;&#xff09;del&#xff1a;删…

项目实战详细讲解带有条件响应的 SQL 盲注、MFA绕过技术、MFA绕过技术、2FA绕过和技巧、CSRF绕过、如何寻找NFT市场中的XSS漏洞

项目实战详细讲解带有条件响应的 SQL 盲注、MFA绕过技术、MFA绕过技术、2FA绕过和技巧、CSRF绕过、如何寻找NFT市场中的XSS漏洞。 带有条件响应的 SQL 盲注 这篇文章的核心要点如下: 漏洞发现:作者在Portswigger提供的实验室中发现了一个盲SQL注入漏洞。这个漏洞存在于一个应…

paho mqtt的keepAliveInterval

一、keepAliveInterval 所用的版本为1.3.12 实验一、 这个值设置的30&#xff0c;打开mqtt的trace&#xff0c;发现每隔33s发送一次pingreq note&#xff1a; 期间&#xff0c;client和server一直保持qos0的消息交互&#xff08;client->server&#xff09; 实验二、 …

力扣:提莫攻击

代码&#xff1a; class Solution { public:int findPoisonedDuration(vector<int>& timeSeries, int duration){//根据数组中给出的元素的值来进行判断&#xff01;//若后面元素-前面元素>d 中了d秒&#xff01;// <d 中了差的秒数&…

cesium轨迹线(闪烁轨迹线)

cesium轨迹线(闪烁轨迹线) 下面有源码 实现思路 使用ellipse方法加载圆型,修改polyline中‘material’方法重写glsl来实现当前效果(cesium版本1.109) 示例代码 index.html <!DOCTYPE html> <html lang="en"><head

opencv入门1.1:从视频或摄像头读取图像

cv::VideoCapture是 OpenCV 中用于从视频文件或摄像头捕获图像帧的类。它提供了各种方法和函数&#xff0c;用于读取和处理视频数据。 以下是对 cv::VideoCapture类的详细解释和说明&#xff1a; 1. 打开视频源 为了使用 cv::VideoCapture&#xff0c;我们首先需要打开一个视…

Java多态:多态多态,多么变态

&#x1f451;专栏内容&#xff1a;Java⛪个人主页&#xff1a;子夜的星的主页&#x1f495;座右铭&#xff1a;前路未远&#xff0c;步履不停 目录 一、重写1、重写的规则2、重写与重载的区别 二、多态1、多态的概念2、多态的实现3、向上转移和向下转型Ⅰ、向上转型Ⅱ、向下转…

基于python+Django+SVM算法模型的文本情感识别系统

欢迎大家点赞、收藏、关注、评论啦 &#xff0c;由于篇幅有限&#xff0c;只展示了部分核心代码。 文章目录 一项目简介1. 简介2. 技术栈3. 系统架构4. 关键模块介绍5. 如何运行 二、功能三、系统四. 总结 一项目简介 # 基于 Python Django SVM 算法模型的文本情感识别系统介…

DeepWalk代码实战-维基百科词条图嵌入可视化

准备工作&#xff1a; 从爬虫网站中爬取维基百科See also关联词条&#xff1a;https://densitydesign.github.io/strumentalia-seealsology/ 维基百科网站&#xff1a;https://www.wikipedia.org/ 爬取过程&#xff1a; 下载 tsv 文件&#xff1a; import networkx as nx # 图…

【数据结构】D : 图的顶点可达闭包

D : 图的顶点可达闭包 Description 给定有向图的邻接矩阵A&#xff0c;其元素定义为&#xff1a;若存在顶点i到顶点j的有向边则A[i,j]1&#xff0c;若没有有向边则A[i,j] 0。试求A的可达闭包矩阵A*&#xff0c;其元素定义为&#xff1a;若存在顶点i到顶点j的有向路径则A*[i,j…

pat实现基于邻接矩阵表示的深度优先遍历[含非递归写法]

文章目录 1.递归2.非递归 1.递归 void DFS(Graph G, int v) {visited[v] 1;printf("%c ", G.vexs[v]);for (int i 0; i < G.vexnum; i) {if (!visited[i] && G.arcs[v][i]) DFS(G, i);} }2.非递归 #include <stack> #include <iostream> …

Faster R-CNN源码解析(三)

目录 todaytorch.meshgrid()函数 today 今天我们主要来捋一捋AnchorsGenerator这部分代码,对应在network_files文件夹中的rpn_function文件中&#xff0c;从RegionProposalNetwork()类的forward()函数开始看&#xff0c;首先会进入head部分也就是我们看到的RPNHead部分,也就是…

继承性和多态性实验

继承性和多态性实验 一、实验题目二、实验目的三、实验内容与实现1&#xff1a;【实验内容】2:【实验实现】雇员类&#xff08;Employee&#xff09;的实现&#xff0c;如下图所示&#xff1a;2&#xff1a;经理类&#xff08;Manager&#xff09;的实现&#xff0c;如下图所示…

Windows从源码构建tensorflow(离线编译)

由一开始的在线编译&#xff0c;到后面的离线编译&#xff0c;一路踩坑无数&#xff0c;历经整整6个半小时&#xff0c;终于编译成功&#xff01;在此记录一下参考过的文章&#xff0c;有时间整理一下踩坑记录。 一、环境配置 在tensorflow官网上有版本对应关系 win10 bazel …

30系列显卡在ubuntu下不能满血运行的问题

之前发现在ubuntu下&#xff0c;我的3080只能跑115w最高&#xff0c;而这在win下是可以跑165w的。于是乎google了所有结果&#xff0c;无解… 现已经过去一年&#xff0c;显卡价格飞涨&#xff0c;无奈只能使用笔记本跑自己的代码了。结果发现nvidia推了Linux下的动态加速&…

Postgresql WAL日志解析挖掘(walminer 4.0)

1.下载walminer https://gitee.com/movead/XLogMiner/releases 2.安装walminer ## 解压缩 [rootpg soft]# su - postgres [postgrespg soft]$ tar -zxvf walminer_x86_64_v4.4.2.tar.gz## 创建 walminer 运行目录 [postgrespg soft]# mkdir -p /usr/local/walminer [postgre…

SpringBoot应用手册

工作内容,不对外开放 文章目录 一、ApplicationContextInitializer实现向容器中注入属性实现方式一:使用spring.factories实现方式二:主启动类上添加实现方式三:配置文件中配置注意点:二、自定义监听器第一种方式:使用spring.factories第二种方式:主启动类上添加第三种方…

宝塔面板安装搭建DiscuzQ论坛教程与小程序上架发布后的展示效果

DiscuzQ论坛小程序上架发布后的展示效果&#xff1a; 1、需要用到的环境&#xff1a; php7.2 mysql5.7或者MariaDB 10.2(我安装用的mysql8.0) php除了必要的一些扩展外&#xff0c;还需要启用readlink、symlink函数等&#xff0c;具体看官方说明&#xff0c;安装的时候也会提醒…

Centos开机启动Java程序

Centos开机启动Java程序 创建一个服务文件&#xff1a;使用文本编辑器创建一个新的服务文件&#xff0c;例如 BunnyBBS-web.service。 在服务文件中添加以下内容&#xff1a; [Unit] DescriptionBunnyBBS web Afternetwork.target[Service] ExecStart/usr/bin/java -jar /www/…

xilinx FPGA multi boot之镜像切换

最近做的了一个无线通信的项目&#xff0c;需要在同一套设备上实现两套不同的波形软件&#xff0c;因为FPGA的逻辑资源不够同时放下两套代码&#xff0c;因此采用了镜像切换的方式来实现&#xff0c;xilinx的专业术语叫multi boot功能 。意思是在一片Flash中的不同地址放两个代…