Kafka保证消息幂等以及解决方案

1、幂等的基本概念
幂等简单点讲,就是用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会产生任何副作用。幂等分很多种,比如接口的幂等、消息的幂等,它是分布式系统设计时必须要考虑的一个方面。

查询操作(天然幂等)
查询一次和查询多次,在数据不变的情况下,查询结果是一样的。查询是天然的幂等操作删除操作 (天然幂等) 删除操作也是幂等的,删除一次和删除多次都是把数据删除(注意可能返回结果不一样,删除的数据不存在返回 0,删除的数据多条,返回结果多个)。

删除操作 (天然幂等)
删除操作也是幂等的,删除一次和删除多次都是把数据删除(注意可能返回结果不一样,删除的数据不存在, 返回 0,删除的数据多条,返回结果多个).

新增操作
新增操作,这种情况下多次请求,可能会产生重复数据;

修改操作
修改操作,如果只是单纯的更新数据,比如: update account set money=100 where id=1,是没有问题的,如果还有计算,比如: update account set money=money+100 where id=l,这种情况下多次请求,可能会导致数据错误。

总结:当出现消费者对某条消息重复消费的情况时,重复消费的结果与消费一次的结果是相同的,并且多次消费并未对业务系统产生任何负面影响,那么这个消费者的处理过程就是幂等的。例如,在支付场景下,消费者消费扣款消息,对一笔订单执行扣款操作,扣款金额为 100 元。如果因网络不稳定等原因导致扣款消息重复投递,消费者重复消费了该扣款消息,但最终的业务结果是只扣款一次扣费 100 元,且用户的扣款记录中对应的订单只有一条扣款流水,不会多次扣除费用。那么这次扣款操作是符合要求的,整个消费过程实现了消费幂等。

2、产生消息重复的原因
在可联网应用中,尤其在网络不稳定的情况下,消息队列的消息有可能会出现重复,如果消息重复消费会影响您的业务处理,请对消息做幂等处理。消息重复的可能原因如下:

发送时消息重复 当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者生产者宕机,导致服务端对生产者应答失败。 如果此时生产者 Producer 意识到消息发送失败并尝试再次发送消息,消费者 Consumer 后续会收到两条内容相同的消息。

投递时消息重复 消息消费的场景下,消息已投递到消费者 Consumer 并完成业务处理,当消费者给服务端反馈应答的时候网络闪断,为了保证消息至少被消费一次,消息队列的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者 Consumer 后续会收到两条内容相同的消息;

负载均衡时消息重复 当消息队列的服务端或消费者重启、扩容或缩容时,都有可能会触发 rebalance,此时消费者 Consumer 可能会收到重复消息。

3、解决方案及案例分析
既然消息可能会产生重复,那如何解决消息幂等的问题呢?我们需要从生产者、中间件、消费者这几个不同层面,来保证消息的幂等,[消息的幂等业界有很多种方案,我这里列出常见的几种方案供大家参考

3.1 设置业务唯一 key 方案 (应用最广泛)
业务唯一key 可以是单个字段或者组合字段,这个方案是怎么实现的呢?

生产者消息休构造业务唯一 key,消息端针对这个 key 加分布式锁;

在消费端,创建一个消息防重表,利用插入记录唯一健约束控制, 但是这会与业务有一定的耦合,另外高并发下频繁对消息防重表进行操作,性能比较低,不太建议使用,我们通常是在消费端加一个redis分布式锁,防止短期内消息的重复投递;

数据库业务表加唯一索引 (数据库)

以用户在积分商城下单为例,具体业务流程如下:

1、客户发起支付流程;

2、生产者生产消息构造一个订单号作为消息体幂等的唯一 key;

3、发送消息给 broker,broker 持久化消息到磁盘;

4、消费者开始消费消息,在消费逻辑中加一个分布式锁,key为订单号,防止短时间内消息重复投递;

5、当加锁成功后,执行核心业务逻辑,然后释放分布式锁,当加锁失败,直接结束;

6、最后,为了防止后续生产者重复推送相同唯一key 的消息我们需要在数据库的业务表中给这个订单号加一个唯一索引,通过唯一健约束来保证数据库表不会出现两条相同的记录,从而实现消息幂等

3.2 设置业务唯一id方案
这个其实跟上一个方案类似,只是唯一id是需要我们通过 分布式id服务 生成,其他的处理方法跟上一个方案一样。

3.3 基于业务的状态机方案
在设计单据相关的业务,或者是任务相关的业务,肯定会涉及到状态机(状态变更图),我们以业务单据为例:在业务单据上面会有个状态,状态在不同的情况下会发生变更,一般情况下存在有限状态机,当消费业务消息的时候,如果状态机已经处于下一个状态,这时候来了一个上一个状态的消息,直接丢弃消息不处理,保证了有限状态机的幂等。

3.4 基于version版本号的乐观锁方案
此方案一般是适用于更新业务的场景,更新表的时候通过版本号对比来保证消息的幂等

具体业务流程
1、客户购买商品,完成后准备发送一条扣减账户200 的消息;

2、生产端开始生产消息,构造消息体{ id=1,money=200,version=1 };

3、发送消息给 broker,broker 持久化这条消息后,返回确认消息给生产者,此时时出现了网络闪断或者生产者宕机,导致 broker 对生产者响应失败, 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同的消息;

4、消费者收到相同的消息,开始消费第一条消息{id=1,money=200,version=1},根据 version=1 更新记录更新成功;

5、接着开始消费第二条消息{id=1,money=200,version=l },根据 version=1更新记录,但是此时 version 已经被更新为 2,条件不满足更新失败;

6、消费者通过基于version的乐观锁保证了消息幂等。

3.5 insert ... on duplicate key update 方案
此方案一般适合一些统计更新类的业务或者定时同步第三方平台数据到自己数据库的场景,例如: 定时同步企业微信的成员数据到自已企微库的成员表,就可以采用这种方案实现。

on dupdate key update 语句基本功能是:当表中没有原来记录时,就插入,有的话就更新。

1. on duplicate key update 语句根据主键id或唯一键来判断当前插入是否已存在。
2. 记录已存在时,只会更新on duplicate key update之后指定的字段。
3. 如果同时传递了主键和唯一键,以主键为判断存在依据,唯一键字段内容可以被修改。

具体业务流程:
1、张三关注某 A 公司企业微信,加人深圳区企微群;
2、管理员在深圳区企微群投放一个活动,张三第一次点击这个活动,这个时候活动模块发送一条 kafka 消息;
3、在数据库创建一张活动效果统计表,act_code 和 usr_id 两个字段作为联合唯一索引;
4、数据处理模块消费这条消息,通过 insert on duplicate key update 插人一条记录;
5、过了几小时,张三第二次点击这个活动,这个时候活动模块再发送一条 kafka 消息,数据处理模块再次消费这条消息,通过 insert...on duplicate key update 更新对应唯一索引的这条记录的更新时间字段;
6、通过 insert...on duplicate key update 命令,可以实现数据库表不会出现重复的记录,还能实现业务的更新逻辑。

补充:

消息消费失败的时候,可以做好监控报警,以便进行人工干预;
消费消息的方法,确保在同一个事务,以便消费失败的时候,可以回滚;
 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/104916.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【VR开发】【Unity】0-课程简介和概述

【说明】 这是我录制的一套VR基础开发课程的文字版本,更加便于快速参考。 应大家在后台所提的需求,从今天开始,我计划带给大家一套完整达40课时的VR开发基础课程。 在开始学习前需要注意如下几点: 本教程基于Unity2022.2.1f1版…

SqlServer安装教程

百度网盘地址: 链接:https://pan.baidu.com/s/1ntqoK9uVc6fBVTm7twh8kw 提取码:grdt 安装: 双击:SQLEXPRADV_x64_CHS.exe ,等待;点击计划,系统配置检查器,根据要求修改(我被要求重启了)点击安装,全新SQL Server独立安装或向现有安装添加功能,接受功能选…

2023年09月 C/C++(七级)真题解析#中国电子学会#全国青少年软件编程等级考试

C/C编程(1~8级)全部真题・点这里 Python编程(1~6级)全部真题・点这里 第1题:红与黑 有一间长方形的房子,地上铺了红色、黑色两种颜色的正方形瓷砖。你站在其中一块黑色的瓷砖上,只能向相邻的黑色…

kafka详解(三)

2.2 Kafka命令行操作 2.2.1 主题命令行操作 1)查看操作主题命令参数 [aahadoop102 kafka]$ bin/kafka-topics.sh2)查看当前服务器中的所有topic (配置了环境变量不需要写bin/) [aahadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop10…

SpringBoot-黑马程序员-学习笔记(五)

74.自定义bean属性绑定以及第三方bean属性绑定 自定义bean属性绑定 1.自定义一个bean Data Component public class ServerConfig {private String ipAddress;private int port;private long timeout; } 2.在yml配置文件中中定义一组值 3.在bean中进行属性绑定 加上这个注…

mysql中的几种排名函数

mysql中的排名函数 mysql里面的排名函数&#xff0c;涉及有以下几个&#xff1a; rank()、dense_rank()、row_number() 1、rank() 函数 RANK() OVER (PARTITION BY <expression>[{,<expression>...}]ORDER BY <expression> [ASC|DESC], [{,<expression…

【Nginx32】Nginx学习:随机索引、真实IP处理与来源处理模块

Nginx学习&#xff1a;随机索引、真实IP处理与来源处理模块 完成了代理这个大模块的学习&#xff0c;我们继续其它 Nginx 中 HTTP 相关的模块学习。今天的内容都比较简单&#xff0c;不过最后的来源处理非常有用&#xff0c;可以帮我们解决外链问题。另外两个其实大家了解一下就…

C#开发的OpenRA游戏之金钱系统(1)

C#开发的OpenRA游戏之金钱系统(1) 设计一个游戏,肯定要有一个唯一的资源,用这个资源来管理整个游戏的进度,以及相互争夺的焦点。在OpenRA里,就是使用矿产资源。所以在地图上分布几个矿场,玩家就需要相互争夺矿场,谁开采多谁就更有钱,谁有钱了就可以升级更好的科技,以…

Linux Kernel 4.13 RC6发布:正式版9月3日发布

美国当地时间上周末&#xff0c;大神Linus Torvalds发布了Linux Kernel 4.13内核的又一候选版本。上周发布的RC5版本更新幅度也要比上上周的RC4要小&#xff0c;Linus Torvalds表示本周发布的RC6版本属于常规更新&#xff0c;在过去一周的开发过程中并没有出现任何意外。RC6版本…

Spring MVC 十一:@EnableWebMvc

我们从两个角度研究EnableWebMvc&#xff1a; EnableWebMvc的使用EnableWebMvc的底层原理 EnableWebMvc的使用 EnableWebMvc需要和java配置类结合起来才能生效&#xff0c;其实Spring有好多Enablexxxx的注解&#xff0c;其生效方式都一样&#xff0c;通过和Configuration结合…

Linux 64位 C++协程池原理分析及代码实现

导语 本文介绍了协程的作用、结构、原理&#xff0c;并使用C和汇编实现了64位系统下的协程池。文章内容避免了协程晦涩难懂的部分&#xff0c;用大量图文来分析原理&#xff0c;适合新手阅读学习。 GitHub源码 1. Web服务器问题 现代分布式Web后台服务逻辑通常由一系列RPC请…

【java学习—七】单继承和多层继承(30)

文章目录 1. 相关概念2. 从代码中理解 1. 相关概念 Java 只支持单继承&#xff0c;不允许多重继承&#xff1a; &#xff08;1&#xff09;一个子类只能有一个父类 &#xff08;2&#xff09;一个父类可以派生出多个子类      举例区分&#xff1a; class SubDemo extend…

Hermes - 指尖上的智慧:自定义问答系统的崭新世界

在希腊神话中&#xff0c;有一位智慧与消息的传递者神祇&#xff0c;他就是赫尔墨斯&#xff08;Hermes&#xff09;。赫尔墨斯是奥林匹斯众神中的一员&#xff0c;传说他是乌尔阿努斯&#xff08;Uranus&#xff09;和莫伊拉&#xff08;Maia&#xff09;的儿子&#xff0c;同…

Git纯操作版 项目添加和提交、SSH keys添加、远程仓库控制、冲突解决、IDEA连接使用

Git 文章目录 Git项目简单克隆通用操作添加和提交回滚分支变基分支优选 远程项目推送认证抓取、拉取和冲突解决 IEDA类软件连接 最近学原理学的快头秃了&#xff0c;特此想出点不讲原理的纯操作版&#xff0c;不过还是放个图吧 项目简单克隆 git在本人日常中最重要的功能还是…

Linux中怎么启动Zookeeper

首先进入Zookeeper安装目录下的bin目录 比如&#xff1a; cd /root/zookeeper-3.4.9/bin 然后在此目录下执行命令。 1. 启动Zookeeper Server端 ./zkServer.sh start 2.启动Zookeeper Client端 ./zkCli.sh 启动Zookeeper Client端后如下&#xff1a;

Electron基础学习笔记

Electron基础学习笔记 官网&#xff1a; https://www.electronjs.org/ 文档&#xff1a;https://www.electronjs.org/zh/docs/latest/ Electon概述 Electron 是由 Github开发的开源框架它允许开发者使用Web技术构建跨平台的桌面应用 Electron Chromium Node.js Native AP…

面部检测与特征分析:视频实时美颜SDK的核心组件

随着视频直播、社交媒体和在线会议的流行&#xff0c;人们对于美颜工具的需求不断增加。无论是自拍照片还是视频聊天&#xff0c;美颜技术已经成为现代应用程序的不可或缺的一部分。本文将深入探讨视频实时美颜SDK的一个核心组件——面部检测与特征分析。 一、面部检测技术 …

C++内存管理(new和delete)

目录 1.C的内存分布 2.C内存管理方式 1.C的内存分布 在内存里面是分好几个区的 1. 栈又叫堆栈--非静态局部变量/函数参数/返回值等等&#xff0c;栈是向下增长的。 2. 内存映射段是高效的I/O映射方式&#xff0c;用于装载一个共享的动态内存库。用户可使用系统接口 创建共享…

AI换脸之Faceswap技术原理与实践

目录 1.方法介绍 2.相关资料 3.实践记录 ​4.实验结果 1.方法介绍 Faceswap利用深度学习算法和人脸识别技术&#xff0c;可以将一个人的面部表情、眼睛、嘴巴等特征从一张照片或视频中提取出来&#xff0c;并将其与另一个人的面部特征进行匹配。主要应用在图像/视频换脸&am…

数字图像处理实验记录一(图像基本灰度变换)

文章目录 基础知识图像是什么样的&#xff1f;1&#xff0c;空间分辨率&#xff0c;灰度分辨率2&#xff0c;灰度图和彩色图的区别3&#xff0c;什么是灰度直方图&#xff1f; 实验要求1&#xff0c;按照灰度变换曲线对图像进行灰度变换2&#xff0c;读入一幅图像&#xff0c;分…