mq幂等mysql_膜拜!看完这篇你还不懂RocketMQ算我输

RocketMQ 介绍

Apache RocketMQ 是一款 低延迟、高并发、高可用、高可靠的分布式消息中间件。消息队列 RocketMQ 可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。

RocketMQ 概念Topic:消息主题,用于将一类的消息进行归类,比如订单主题,就是所有订单相关的消息都可以由这个主题去承载,生产者向这个主题发送消息。

生产者:负责生产消息并发送消息到 Topic 的角色。

消费者:负责从 Topic 接收并消费消息 的角色。

消息:生产者向 Topic 发送的内容,会被消费者消费。

消息属性:生产者发送的时候可以为消息自定义一些业务相关的属性,比如 Message Key 和 Tag 等。

Group:一类生产者或消费者,这类生产者或消费者通常生产或消费同一类消息,且消息发布或订阅的逻辑一致。

为什么要使用 RocketMQ?

异步解耦

随着微服务架构的流行,服务之间的关系梳理非常重要。异步解耦可以降低服务之间的耦合程度,同时也能提高服务的吞吐量。

使用异步解耦的业务场景非常多,因为每个行业的业务都会不太一样,以一些比较通用的业务来说明相信大家都能理解。

比如电商行业的下单业务场景,以最简单的下单流程来说,下单流程如下:锁库存

创建订单

用户支付

扣减库存

给用户发送购买短信通知

给用户增加积分

通知商家发货

我们以下单成功后,用户进行支付,支付完成会有个逻辑叫支付回调,在回调里面需要去做一些业务逻辑。首先来看下同步处理需要花费的时间,如下图:

8c3c3392638d31d3d9871c0e7e2142b6.png

同步流程

上面的下单流程从 3 到 5 都是可以采用异步流程进行处理,对于用户来说,支付完成后他就不需要关注后面的流程了。后台慢慢处理就行了,这样就能简化三个步骤,提高回调的处理时间。

bbd9473317713c6595913390b046d3d4.png

异步流程

削峰填谷

削峰填谷指的是在大流量的冲击下,利用 RocketMQ 可以抗住瞬时的大流量,保护系统的稳定性,提升用户体验。

在电商行业,最常见的流量冲击就是秒杀活动了,利用 RocketMQ 来实现一个完整的秒杀业务还是与很多需要做的工作,不在本文的范围内,后面有机会可以单独跟大家聊聊。想告诉大家的是像诸如此类的场景可以利用 RocketMQ 来扛住高并发,前提是业务场景支持异步处理。

d87ce00cde0152c4c95f82f95d6a7cb8.png

削峰填谷

分布式事务最终一致性

众所周知,分布式事务有 2PC,TCC,最终一致性等方案。其中使用消息队列来做最终一致性方案是比较常用的。

在电商的业务场景中,交易相关的核心业务一定要确保数据的一致性。通过引入消息队列 RocketMQ 版的分布式事务,既可以实现系统之间的解耦,又可以保证最终的数据一致性。

数据分发

数据分发指的是可以将原始数据分发到多个需要使用这份数据的系统中,实现数据异构的需求。最常见的有将数据分发到 ES, Redis 中为业务提供搜索,缓存等服务。

除了手动通过消息机制进行数据分发,还可以订阅 Mysql 的 binlog 来分发,在分发这个场景,需要使用 RocketMQ 的顺序消息来保证数据的一致性。

6d61921bd8628664ead094c92cf3ec20.png

数据分发

RocketMQ 架构

967fea57f17abaebc61054fec4bc0899.png

图片来源阿里云官方文档Name Server:是一个几乎无状态节点,可集群部署,在消息队列 RocketMQ 版中提供命名服务,更新和发现 Broker 服务。就是一个注册中心。

Broker:消息中转角色,负责存储消息,转发消息。分为 Master Broker 和 Slave Broker,一个 Master Broker 可以对应多个 Slave Broker,但是一个 Slave Broker 只能对应一个 Master Broker。Broker 启动后需要完成一次将自己注册至 Name Server 的操作;随后每隔 30s 定期向 Name Server 上报 Topic 路由信息。

生产者:与 Name Server 集群中的其中一个节点(随机)建立长链接(Keep-alive),定期从 Name Server 读取 Topic 路由信息,并向提供 Topic 服务的 Master Broker 建立长链接,且定时向 Master Broker 发送心跳。

消费者:与 Name Server 集群中的其中一个节点(随机)建立长连接,定期从 Name Server 拉取 Topic 路由信息,并向提供 Topic 服务的 Master Broker、Slave Broker 建立长连接,且定时向 Master Broker、Slave Broker 发送心跳。Consumer 既可以从 Master Broker 订阅消息,也可以从 Slave Broker 订阅消息,订阅规则由 Broker 配置决定。

RocketMQ 消息类型

RocketMQ 支持丰富的消息类型,可以满足多场景的业务需求。不同的消息有不同的应用场景,下面为大家介绍常用的四种消息类型。

普通消息

普通消息是指 RocketMQ 中无特性的消息。当没有特殊的业务场景,使用普通消息就够了。如果有特殊的场景,就可以使用特殊的消息类型,比如顺序,事务等。

同步发送

同步发送:消息发送方发送出去一条消息,会同步得到服务端返回的结果。

异步发送

异步发送:消息发送方发出去一条消息,不用等待服务端返回结果,可以接着发送下一条消息。发送方可以通过回调接口接收服务端响应,并处理响应结果。

单向发送

单向发送:消息发送方只负责发送消息,发送出去后就不管了,这种方式发送速度非常快,存在丢失消息的风险。

顺序消息

顺序消息是指生产者按照一定的先后顺序发布消息;消费者按照既定的先后顺序订阅消息,即先发布的消息一定会先被消费者接收到。

比如数据分发的场景,如果我们订阅了 Mysql 的 binlog 来进行数据异构。消息要是没有顺序,就会出现数据错乱问题。

比如新增一条 id=1 的数据,然后马上删除。这样就产生了两条消息。正常的消费顺序是先新增,然后删除,此时数据是没有的。如果消息没有顺序,删除的先被消费了,然后消费新增的,此时数据还在,没被删除掉,就会导致不一致。

定时消息

定时消息是指消息具备定时发送的功能,当消息发送到服务端后,不会立即投递给消费者。而是要等到消息指定的时间后才会投递给消费者进行消费。

延迟消息也就是定时消息,定时消息是定在某个时间点进行发送,比如 2020-11-11 12:00:00 发送。

延迟消息一般是在当前发送时间的基础上延迟多久进行发送,比如当前时间是 2020-09-10 12:00:00,延迟 10 分钟,那么消息发送成功后将在 2020-09-10 12:10:00 进行投递给消费者。

定时消息可以在订单超时未支付自动取消等场景使用。

事务消息

RocketMQ 提供类似 X/Open XA 的分布式事务功能,通过 RocketMQ 事务消息能达到分布式事务的最终一致。

交互流程:

ca83887f357723f5311d2ea3615f858d.png

图片来源阿里云官方文档发送方首先发送半事务消息到 RocketMQ 服务端。

RocketMQ 服务端接收到消息,然后将消息持久化成功之后,向发送方返回 Ack 确认消息已经发送成功,此时消息为半事务消息,不会投递给消费方。

收到半事务消息的 Ack 后,发送方开始执行本地事务逻辑。

发送方根据本地事务执行结果向服务端提交二次确认,如果本地事务执行成则进行消息的 Commit,如果执行失败则进行消息的 Rollback,服务端收到 Commit 状态则将半事务消息标记为可投递,消费方最终将收到该消息;服务端收到 Rollback 状态则删除半事务消息,消费方将不会收到该消息。

如果出现意外情况,步骤 4 没有进行消息的二次确认,等待固定时间后服务端将对该消息发起消息回查。

发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤 4 对半事务消息进行操作。

最佳实践

消息重试

消息在消费方消费失败后,RocketMQ 服务端会重新进行消息的投递,知道消费者成功消费消息,当然重试有次数限制,默认 16 次。

消息重试在一定程度上保证了消息不丢失,通过重试来达到最终被消费的目的。需要注意的是消费者在消费的时候一定要等本地业务成功后才能进行 ACK(消费确认),不然就会出现消费失败,但是已经 ACK,消息将不会重复投递。

如果采取异步消费的方式,需要进行异步转同步,等异步操作完才进行 ACK,具体可以参考我之前写的一篇文章https://mp.weixin.qq.com/s/Bb...。

最后需要做好对应的监控,如果重试了 4,5 次还是失败的,基本上后面重试也是失败的。这个时候需要让开发人员知道,该人工处理的就人工介入。或者直接监控死信队列。

消息过滤

消息主题,一般用于一类消息的统一分类。比如订单主题,但是订单下的消息会分为很多种。比如创建订单,取消订单等。

不同类型的消息有不同的业务处理,我们可以统一定义消息格式,然后通过一个字段去区分消息类型来做不同的业务逻辑。不好的点在于所有消息都会推送到消费方,不能按需消费。

在 RocketMQ 中可以给消息指定 tag,通过 tag 来区分消息类型。消费者可以根据 Tag 在 RocketMQ 服务端完成消息过滤,以确保消费者最终只消费到其关注的消息类型。

我曾经遇到过一个 tag 没有正确使用的方式,只有一个 MQ 实例,用 tag 来区分环境。所有消息都在一个主题中,测试环境消费测试环境的 tag,线上消费线上的 tag。

这种方式的问题在于消息没做隔离,线上线下的消息都在一起。另一个就是 tag 被固定成了环境的区分,无法用于消息类型场景,导致只能建多个 topic 来承载多个业务消息类型。

7823df000ff28ff92c1a61657ed8e6ee.png

消息过滤

消费模式

RocketMQ 消费模式有两种,集群消费和广播消费。

集群消费:

cb9cc5bda01dcda595f46d42919fcab0.png

集群消费

消费者部署了多个实例我们称之为一个集群,集群消费只会被其中的某一个实例进行消费。

适合大部分的业务场景,大部分的场景我们的消息只允许被消费一次,而且只能有一个消费者去消费,比如支付回调场景,如果一个消息被多个实例同时消费,那么就会出现同时去修改订单状态,同时去扣减库存的情况。

广播消费:

5aefcac7ef076ba31ea44a63ec44cd47.png

广播消费

广播消费会让集群中每个实例都消费一次。

比如我们使用了本地缓存,当数据变更的时候,我们需要刷新每个节点本地的缓存,所以每个节点都需要收到消息。

消费幂等

幂等问题,无论是在 API 请求场景还是在消息消费场景,都会遇到。一条消息不能重复消费多次这个肯定是要保证的,因为我们不能保证消息发送方不发送多次,也不能保证消息不重复投递。

RocketMQ 的 Exactly-Once 投递语义,就是用于解决幂等问题。Exactly-Once 是指发送到消息系统的消息只能被消费端处理且仅处理一次,即使生产端重试消息发送导致某消息重复投递,该消息在消费端也只被消费一次。

最佳的幂等处理方式还是需要有一个唯一的业务标识,虽然每条消息都有 MessageId,但是不建议用 MessageId 来做幂等判断,在发送消息的时候,可以为每条消息设置一个 MessageKey,这个 MessageKey 就可以用来做业务的唯一标识。

dc5305d7c62f25c1f4426dfde4c26d93.png

本地事务消息封装

上面介绍了事务消息,RocketMQ 的事务消息采用了二阶段提交的方式。并且结合了消息反查的机制来确保最终一致性。

从使用层面来说,每个业务场景都要去实现一个反查的逻辑,有点烦。

下面介绍另一种经常被使用的方式,就是本地事务消息。本地消息表这个方案最初是 ebay 提出的,本地事务消息需要在服务对应的数据库中创建一个消息表,发送消息的时候不是真正的将消息发送给 MQ,而是往消息表中插入一条消息数据。

插入的动作跟本地的业务逻辑是同一个事务,如果本地事务执行成功,消息才会落表成功,才会发送给 MQ, 本地事务失败,消息数据回滚。

然后需要有一个专门的程序去拉取消息表中未发送的消息投递给 MQ,如果投递失败,可以一直重试,直到成功或者人工介入。

5e4cd99fbadbbe386d428bca13ddf19d.png

本地事务消息

消息写到消息表,然后会一直给 MQ 发送,这个步骤没问题。如果 MQ 收到消息后,消息还在 PageCache 中的时候,Broker 宕机了,这个时候是会出现消息丢失。当然你也可以使用同步刷盘等方式来避免丢失。假如我们就是异步刷盘,有办法保证消息不丢失吗?

前面我们提到,RocketMQ 的事务消息会有回查的机制,消息表的方式,也需要有一个机制来保证消息被消费了,否则就需要不断的重试去发送消息,直到消息被消费。

在消息表中需要有一个字段来标识当前这条消息的状态,比如 未发送,已发送,已消费。当消息还是未发送的时候就会被发送到 MQ, 如果发送成功了,状态就是已发送。但是过了几分钟,状态还是已发送,这个时候就要去做一些动作了。

这个场景下,有可能是消费者跟不上生产的速度,消息堆积了,导致消息一直没被消费。另一种可能就是消息是不是丢失了?

可以获取对应的消息堆积数据来判断是否消息堆积了,如果不是就重新发送消息给 MQ,知道消息被消费。

问题是消息被消费了,我怎么知道?

像我使用的云服务,是有对应的 Open API 可以直接查询消息轨迹。开源的应该也有,没有仔细去研究,跟商业版应该差不多。

根据消息轨迹就可以知道消息有没有被消费,到此为止流程结束。消息发送给 MQ 如果失败会重试,消息如果长时间没消费,也会重新发送,即使最后进入了死信队列,也可以通过死信队列的监控来人工干预,一定会是最终一致性。

跟自带的事务消息比,本地消息表的方式不需要实现回查逻辑,但是要增加消息表,同时也要配套各种发送,检查等逻辑,也挺麻烦了。特别是当消息量大的时候,如何快速的将消息表中的消息发送出去,也需要做很多处理,简单的查表轮询在量大的情况下不太适用。

两种方式都可以使用,能实现我们要的目的即可。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/429108.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

从一个数组中找出最接近目标_LeetCode每日一题 | 转变数组后最接近目标值的数组和...

题目来源:LC1300这道题目是一道比较经典的二分查找题。我们注意到,当value越大时,数组之和越大,当value越小时,数组之和越小。因此,我们可以利用数组之和是value的单调递增函数这个性质来进行二分查找。最大…

C++ 初始化列表初始化列表性能问题的简单的探索

C 初始化列表性能问题的简单的探索 从概念上来讲&#xff0c;构造函数的执行可以分成两个阶段&#xff0c;初始化阶段和计算阶段&#xff0c;初始化阶段先于计算阶段。 在执行构造函数时&#xff0c;如果没有给定初始值&#xff0c;那系统就会自动进行初始化。 #include <st…

mysql 5.5半同步复制_(5.5)mysql高可用系列——MySQL半同步复制(实践)

关键词&#xff0c;mysql半同步复制【0】实验环境操作系统&#xff1a;CentOS linux 7.5数据库版本&#xff1a;5.7.24数据库架构&#xff1a;主从复制&#xff0c;主库用于生产&#xff0c;从库用于数据容灾和主库备机&#xff0c;采用默认传统的异步复制。主库IP&#xff1a;…

Java+Jmeter接口测试

一、创建工程、引包1、创建JAVA工程2、引入Jmeter中lib\ext基础包&#xff1a;ApacheJMeter_java.jar、ApacheJMeter_core.jar3、引入Jmeter日志包&#xff1a;jorphan.jar&#xff0c;logkit-2.0.jar&#xff0c;commons-logging-1.1.1.jar&#xff0c;avalon-framework-4.1.4…

Discuz3.3精仿小米风格整站模板制作——1、新建模板方案

术语说明&#xff1a; 模板——模板是一堆按照规定命名方式的html文件&#xff0c;用于指定整个论坛不同页面的外观。标签——标签和模板共同作用以实现论坛换肤功能&#xff0c;其中标签主要控制页面显示什么数据&#xff0c;显示多少条等。风格——风格是一个xml配置文件&…

普元连接mysql_普元EOS 案例 - 还有一行代码的个人空间 - OSCHINA - 中文开源技术交流社区...

1 创建项目1) 进入EOS Studio开发透视图&#xff1b;单击主菜单项“文件”选择“新建”->“空EOS项目2) 在弹出的“创建空EOS项目”视图中&#xff0c;配置相关参数。“项目名称”&#xff1a;EOSTriping&#xff0c;3) 单击【完成】按钮&#xff0c;系统完成该项目框架…

十年WEB技术发展历程

Ajax03年的时候我上六年级。那时候网吧刚在小县城的角落萌生。传奇&#xff0c;大话西游第一代网游一时风靡。我抱着试一试的心态给了网吧老板两块钱想申请个号玩玩&#xff0c;然后接下来的一个小时我一直在&#xff0c;注&#xff0c;冊&#xff0c;账。号。彼时网吧用的512k…

Python之编写登陆接口

1.输入用户名密码&#xff1b; 2.认证成功后显示欢迎信息&#xff1b; 3.错误三次后&#xff0c;账号被锁定。 账号文件&#xff1a;user.txt 锁定文件&#xff1a;locked.txt 流程图如下&#xff1a; # -*- coding:utf-8 -*- # Author Caoxl import sysaccount_fileE:\user…

好全的Android面试题

转载&#xff1a;http://www.jianshu.com/p/84ee896c3329 需求描述 各种新技术接触渠道比较狭窄, 面试没有底气。 常见面试知识点及回答没有较系统的准备&#xff0c;就业指导提供的资料没时间看&#xff0c;看了记不住。 面试没有经验, 不清楚面试前要做的各种准备、面试过程…

JavaSE--类加载器

参考&#xff1a;http://www.importnew.com/6581.html Java 编译器会为虚拟机转换源指令。虚拟机代码存储在以 .class 为扩展名的类文件中&#xff0c;每个类文件都包含某个类或者接口的定义和代码实现。这些类文件必须由一个程序进行解释&#xff0c;该程序能够将虚拟机的指令…

java netbeans 教程_NetBeans 教程

NetBeans IDE Java 快速入门教程第一章本章通过指导您创建一个简单的 "Hello World" Java 控制台应用程序&#xff0c;简要介绍 NetBeans IDE 工作流。学习完本教程后&#xff0c;您将对如何在 IDE 中创建和运行应用程序有一个基本了解。学习完本教程所需的时间不到 …

几个python小程序

python小程序 1-100求和 1 def Sum(x, y):2 return xy3 print reduce(lambda x,y:xy,range(1,101))4 5 i 16 j 07 while i < 101:8 j i j9 i 1 10 print j View Code输出1-100之间的奇偶数1 i 1 2 a [] 3 while i < 100: 4 if i%2 0: 5 …

Linux下git使用

一、安装 本人使用的是centos 7&#xff0c;首先安装git 1.下载git&#xff1a;wget https://Github.com/Git/Git/archive/v2.3.0.tar.gz 2.下载之后解压&#xff1a;tar xvf v2.3.0.tar.gz 3.进入解押文件目录&#xff1a;cd git-2.3.0 4.依次运行如下命令&#xff1a; ./conf…

java 水表识别_一种水表数字的AI智能识别方法与流程

本发明涉及模式识别与人工智能技术领域&#xff0c;特别涉及一种直观的水表数字的AI智能识别方法。背景技术&#xff1a;深度学习在目标检测的应用发展迅速&#xff0c;在YOLO(You Only Look Once)之后又出现了升级版本YOLOv2&#xff0c;采用的是Darknet-19作为基础网络&#…

mysql.w002_mysql简单例子

登陆数据库&#xff1a;mysql -u用户名 -p密码 -P端口 -h数据库地址‍‍‍‍‍‍修改mysql提示符&#xff1a;(仅本次连接有效)‍‍‍‍‍‍方法一&#xff1a;执行mysql -uroot -proot -prompt \h结果为&#xff1a;localhost方法二&#xff1a;进入mysql后&#xff0c;执行PR…

JQuery中的Deferred-详解和使用

首先&#xff0c;为什么要使用Deferred&#xff1f; 先来看一段AJAX的代码&#xff1a; 1 var data; 2 $.get(api/data, function(resp) { 3 data resp.data; 4 }); 5 doSomethingFancyWithData(data); View Code 这段代码极容易出问题&#xff0c;请…

【Eclipse】eclipse在线安装反编译插件

1.help->install new software 2.Add Name&#xff1a;jd-eclipse_update_site Location&#xff1a;http://jd.benow.ca/jd-eclipse/update 3.等待加载出来 4.持续点击下一步&#xff0c;直到完成。 转载于:https://www.cnblogs.com/flydkPocketMagic/p/7170283.html

Spring Boot 系列(一)快速入门

简介 Spring Boot是由Pivotal团队提供的全新框架&#xff0c;其设计目的是用来简化新Spring应用的初始搭建以及开发过程。该框架使用了特定的方式来进行配置&#xff0c;从而使开发人员不再需要定义样板化的配置。通过这种方式&#xff0c;Spring Boot致力于在蓬勃发展的快速应…

1. 在虚拟机中 添加内容

步骤&#xff1a; 1. 找到要添加的内容&#xff0c;按住 ctrl c 复制 &#xff0c;例如&#xff1a;复制 飞秋 2. 打开 虚拟机&#xff0c;找到 要复制文件的位置。 3. 将 复制的文件添加到 共享文件夹下面。 4. 打开虚拟机&#xff0c;安装飞秋 5&#xff0c; 最后就完成了…

java web scala_spring boot+scala编写web接口

本人是Java开发者&#xff0c;有面向对象的基础&#xff0c;而Scala也是面向对象的语言&#xff0c;学习后可快速入门。通过学习Scala的面向对象(和java面向对象类似)、Scala的高级函数(map,reduce等&#xff0c;和Java8中的stream编程类似)、Scala的隐式转换(在Java中可通过sp…