如何通过事务消息保障抢购业务的分布式一致性?

简介: 在柔性事务的多种实现中,事务消息是最为优雅易用的一种。基于阿里云RocketMQ高性能、高可用的特点,完全可以胜任抢购业务这类高并发大流量的场景。但引入事务消息机制在实现高性能的同时,也增加了整体的业务复杂度。我们需要对业务场景进行充分评估,选择与自身业务特点最为匹配的一种,才能更好地发挥柔性事务的价值。

前言

在电商领域,抢购和秒杀是非常普遍业务模式,抢购类业务在快速拉升用户流量并为消息者带来实惠的同时,也给电商系统带来了巨大考验。在高并发、大流量的冲击下,系统的性能和稳定性至关重要,任何一个环节出现故障,都会影响整体的购物体验,甚至造成电商系统的大面积崩溃。和电商领域抢购场景极为类似的业务模式还有很多,比如大型赛事和在线教育的报名系统,以及各类购票系统等。

针对抢购类业务在技术上带来的挑战,业界有一系列解决方案,通过不同维度来提升系统的性能与稳定性,包括动静分离、定时上架、异步处理、令牌队列、多级缓存、作弊行为侦测、流量防护、全链路压测等。

本文重点聚焦在如何确保抢购类业务的一致性上,分布式事务一直是IT界老大难的问题,而抢购业务所具备的高并发、大流量特征,更是成倍增加了分布式一致性的实现难度。以下将介绍如何通过事务消息构建满足抢购类业务要求的高性能高可用分布式一致性机制。

事务一致性原理回顾

事务是应用程序中一系列严密的操作,这一系列操作是一个不可分割的工作单位,它们要么全部执行成功,要么一个都不做。事务具有四个特征:原子性( Atomicity )、一致性( Consistency )、隔离性( Isolation )和持续性( Durability ),这四个特性简称为 ACID 特性。

在非并发状态下,保证事务的ACID特性是轻而易举的事情,如果某一个操作执行不成功,把前面的操作全部回滚就OK了。而在并发状态下,由于有多个事务同时操作同一个资源,对于事务ACID特性的保证就会困难一些,如果考虑得不周全,就会遇到如下几个问题:

  1. 脏读:事务A读到了事务B还没有提交的数据。
  2. 不可重复读:在一个事务里面对某个数据读取了两次,读出来的数据不一致。
  3. 幻读: 在一个事务对某个数据集用同样的方式读取了两次,数据集的条目数量不一致。

为了应对上述并发情况下出现的问题,就需要通过一定的事务隔离级别来解决。当事务的隔离级别越高的时候,上述问题发生的机会就越小,但是性能消耗也会越大。所以在实际生产过程中,要根据实际需求去确定隔离级别:

  1. READ_UNCOMMITTED(读未提交):最低的隔离级别,可以读到未提交的数据,无法解决脏读、不可重复读、幻读中的任何一种。
  2. READ_COMMITED (读已提交):能够防止脏读,但是无法解决不可重复读和幻读的问题。
  3. REPEATABLE_READ (重复读取):对同一条数据的多次重复读取能保持一致,解决了脏读、不可重复读的问题,但是幻读的问题还是无法解决。
  4. SERLALIZABLE ( 串行化):最高的事务隔离级别,避免了事务的并行执行,解决了脏读、不可重复读和幻读的问题,但性能最低。

关系型数据库提供了对于事务的支持,能够通过不同隔离级别的设置,确保并发状态下事务的ACID特性。但关系型数据库提供的能力是基于单机事务的,一旦遇到分布式事务场景,就需要通过更多其他技术手段来解决问题。

抢购业务中的分布式事务

有如下三种情况可能会产生分布式事务:

  1. 一个事务操作包含对两个数据库的操作:数据库所提供的事务保证仅能局限在对于自身的操作上,无法跨越到其他数据库。

 

2、一个事务包含对多个数据分片的操作:具体的分片规则由分库分表中间件或者分库分表SDK来实现,有可能跨越多个数据库或同一个数据库的多个表。对于业务逻辑而言,底层的数据分片情况是不透明的,因此也没有办法依赖于数据库提供的单机事务机制。

 

3、一个事务包括对多个服务的调用:在微服务领域,这是极为常见的场景,不同的服务使用不同的数据资源,甚至涉及到更为复杂的调用链路。在这种情况下,数据库提供的单机事务机制,仅仅能保证其中单一环节的ACID特性,没有办法延伸到全局。

 

微服务技术在电商领域的普及程度是非常高的,比较大型的电商应用还会通过中台思想将共性业务能力进行沉淀,因此抢购业务中的很多环境都属于跨服务的分布式调用,会涉及到上述第三种分布式事务形态。比如在订单支付成功后,交易中心会调用订单中心的服务把订单状态更新,并调用物流中心的服务通知商品发货,同时还要调用积分中心的服务为用户增加相应的积分。如何保障分布式事务一致性,成为了确保抢购业务稳定运行的核心诉求之一。

 

分布式事务的实现方式

传统分布式事务

传统的分布式事务通过XA模型实现,通过一个事务协调者,站在全局的角度将多个子事务合并成一个分布式事务。XA模型之所以能在分布式事务领域得到广泛使用,是因为其具有如下两个方面的优势:

  1. 提供了强一致性保证,在业务执行的任何时间点都能确保事务一致性。
  2. 使用简单。常见的关系型数据库都提供了对XA协议的支持,通过引入事务协调器,业务代码跟使用单机事务相比基本上没有差别。

但是在互联网领域,XA模型的分布式事务实现存在很多局限性,在抢购业务这样的高并发大流量场景中更是被完全弃用。我们拿XA分布式协议中最普遍的两阶式提交方案,来说明为什么XA模型并不适合互联网场景。

 

  1. 性能问题。在两段式提交的执行过程中,所有参与节点都是事务阻塞型的,需要长时间锁定资源。这会导致系统整体的并发吞吐量变低,在抢购业务中是不可接受的。
  2. 单点故障问题。事务协调者在链路中有着至关重要的作用,一旦协调者发生故障,参与者会一直阻塞下去,整个系统将无法工作,因此需要投入巨大的精力来保障事务协调者的高可用性。
  3. 数据不一致问题。在阶段二中,如果协调者向参与者发送commit请求之后,发生了网络异常,会导致只有一部分参与者接收到了commit请求,没有接收到commit请求的参与者最终会执行回滚操作,从而造成数据不一致现象。在抢购业务中,这样的数据不一致有可能会对企业或消费者造成巨大的经济损失。

 

因此XA模型是一个理想化的分布式事务模型,并没有考虑到高并发、网络故障等实际因素,即便是在两阶段提交的基础上,诞生了三阶段提交这样的实现方式,也没有办法从根本上解决性能和数据不一致的问题。

柔性事务

针对传统分布式事务方案在互联网领域的局限性,业界提出了CAP理论以及BASE理论,在此基础上诞生了在大型互联网应用中广泛使用的柔性事务。柔性事务的核心思想是放弃传统分布式事务中对于严格一致性的要求,允许在事务执行过程中存在数据不一致的中间状态,在业务上需要容忍中间状态的存在。柔性事务会提供完善的机制,保证在一段时间的中间状态后,系统能走向最终一致状态。

遵循BASE理论的柔性事务放弃了隔离性,减小了事务中锁的粒度,使得应用能够更好的利用数据库的并发性能,实现吞吐量的线性扩展。异步执行方式可以更好地适应分布式环境,在网络抖动、节点故障的情况下能够尽量保障服务的可用性。因此在高并发、大流量的抢购业务中,柔性事务是最佳的选择。

 

传统分布式事务

柔性事务

业务改造

一致性

强一致性

最终一致

回滚

支持

实现回退接口

隔离性

支持

放弃隔离性或实现资源锁定接口

并发性能

适合场景

低并发、短事务

高并发、长事务

 

柔性事务有多种实现方式,包括TCC、Saga、事务消息、最大努力通知等,本文将重点介绍通过事务消息实现柔性事务。

事务消息原理分析

抢购业务场景拆解

我们结合抢购业务的真实场景,分析如何通过事务消息实现分布式一致性。在抢购业务中,有两个非常关键的阶段,需要引入分布式事务机制,分别是订单创建阶段和付款成功阶段。

从字面含义来看,抢购业务就隐含了一个重要的前提:库存是有限的。因此在订单创建的时候,需要预先检查库存情况,并相对应的库存进行锁定,以防止商品超卖。如果库存锁定操作失败,代表库存不足,必须确保订单不能被成功创建。在锁定库存后,如果因为某种异常情况导致订单创建失败,必须及时将之前锁定的库存进行释放操作,以便让其他用户可以重新争夺对应的商品。

如果抢购系统实现了购物车机制,在订单创建的同时,则需要从购买车中将相应的条目删除。

 

基于微服务架构的业务拆分,订单创建阶段的3个行为很有可能通过3个不同的微服务应用完成,因此需要通过分布式事务来保证数据一致性。

 

订单创建完成后,会等待用户付款,一旦付款成功,就会触发付款成功阶段的执行逻辑。这个阶段同样是通过分布式事务来完成,包含修改订单状态、扣减库存、通知发货、增加积分这4个子事务,它们要么全部不执行,要么全部执行成功。

 

当然,在真实的抢购业务中,情况有可能会更加的复杂,本文列出的只是其中最具代表性的几类业务行为。

引入消息异步通知机制

传统的分布式事务存在一个很大的弊端是参与节点都是事务阻塞型的,需要长时间锁定资源。以锁定库存 ->创建订单这个流程为例,借助于Redis等缓存系统,单纯锁定库存的操作只需要花费毫秒级的时间,可以承载非常高的并发量。但如果把创建订单的操作也考虑进来,加上不同微服务应用之间相互通讯的时候,整体耗时有可能超过1秒,导致性能急剧下降。

假设存在一种异步消息机制,让分布式事务的第一个参与方在执行完本地事务后,通过触发一笔消息通知事务的其他参与方完成后续的操作,就能将大事务拆解为多个本地子事务来分开执行。在这种模式下,事务的多个参与方之间之间并不需要彼此阻塞和等待,就能极大程度地提升并发吞吐能力。对于库存中心而言,在高并发场景下,只需要不断的执行锁定库存记录操作,并不断通过异步消息通知订单中心创建订单,只要异步消息机制能确保消息一定送达,并得到正确处理,就能够实现分布式最终一致性。

 

先执行本地事务,还是先发送异步消息?

在这个模型中,异步消息的发送交给了分布式事务的第一个参与方来完成,这个参与方就拥有了两个职责:执行本地事务发送异步消息。到底应该先执行本地事务,还是先发异步消息呢?

第一种方案是先发送异步消息,再执行本地事务。这样做肯定是不行的,如果本地事务没有执行成功,异步消息已经发出去了,其他事务参与方收到消息后会执行对应的远程事务,造成数据不一致。

第二种方案是先执行本地事务,再发送异步消息。这样做能够解决本地事务执行失败导致的数据不一致问题,因为只有在本地事务执行成功的情况下,才会发送异步消息。但如果事务的参与方在执行本地事务成功后,自己宕机了,就再有没有机会发送异步消息了,因此这样做同样会造成数据不一致的问题。记住:在真实场景中,任何一个应用节点都不是100%可靠的,都存在宕机的可能性。

一个可行的方案是引入可以处理事务消息的消息队列集群,用于异步消息的中转。一个事务消息包含两种形态:首先,事务的参与方发送一笔半事务消息到消息队列,表示自己即将执行本地事务,消息队列集群在收到这个半事务消息后,不会马上进行投递,而是进行暂存。在执行完本地事务后,事务的参考方再发送一笔确认消息到消息队列集群,告知本地事务的执行状态。如果本地事务执行成功,消息队列集群会将之前收到的半事务消息进行投递;如果本地事务执行失败,消息队列集群直接删除之前收到的半事务消息,这样远程事务就不会被执行,从而保证了最终一致性。

同样,如果事务参与方在执行完本地事务后宕机了怎么办呢?这就需要消息队列集群具备回查机制:如果收到半事务消息后,在特定时间内没有再收到确认消息,会反过来请求事务参与方查询本地事务的执行状态,并给予反馈。这样,即便错过了确认消息,消息队列集群也有能力了解到本地事务的执行状态,从而决定是否将消息进行投递。在一个微服务应用中,会存在多个对等的应用实例,这也就代表着即便一个事务参与方的实例在执行完本地事务后宕机了,消息队列集群依然可以通过这个实例的兄弟实例了解到本地事务执行的最终状态

 

如何确保远程事务能执行成功?

 

如果一切本地事务的执行,以及异步消息的投递都一切顺利的话,接下来还会存在另外两种数据不一致的可能性:

 

  1. 消息队列集群在将异步消息投递到远程事务参与方的时候,由于网络不稳定,消息没能投递成功。
  2. 消息投递成功了,但远程事务参与方还没来得及执行远程事务,就宕机了。

 

这两种情况都会导致远程事务执行失败,所以需要建立一种消息重试机制,让远程事务参与方在完成任务后(实际上对远程事务参与方而言,这个任务是它要执行的本地任务),给予消息队列集群一个反馈,告知异步消息已经得到了正确的处理。否则,消息队列会在一定时间后,周期性的重复投递消息,直到它收到了来自远程事务参与方的反馈,以确保远程事务一定能执行成功。

 

和事务回查机制类似,远程事务参与方也有多个对等的微服务实例,即便某个实例在没来得及执行远程事务的时候宕机,消息队列也可以将任务交给这个实例的兄弟实例来完成。

 

完整流程

 

 

事务消息实战

了解到事务消息的原理后,我们不难得出一个结论:消息队列集群在整个流程中起着至关重要的作用,如果消息队列集群不可用,所有涉及到分布式事务的业务都将中止!因此,我们需要一个高可用的消息队列集群,能够始终保持在工作状态,即便其某个组件出现故障,也能够在短时间内自动恢复,不会影响业务,还能确保接收到的消息不丢失。

消息队列RocketMQ

消息队列 RocketMQ 版是阿里云基于 Apache RocketMQ 构建的低延迟、高并发、高可用、高可靠的分布式消息中间件。该产品最初由阿里巴巴自研并捐赠给 Apache 基金会,服务于阿里集团 13 年,覆盖全集团所有业务,包括种类金融级场景。作为双十一交易核心链路的官方指定产品,支撑千万级并发、万亿级数据洪峰,历年刷新全球最大的交易消息流转记录。

 

阿里云消息队列RocketMQ提供了对于事务消息机制最完整实现,包括半事务消息、确认消息、事务回查机制、消息重试等重要功能。此外,消息队列RocketMQ还提供了极强的高可用能力以及数据可靠性,可以确保在各种极端场景下都能提供稳定的服务,并确保消息不丢失。

 

对于开发者而言,使用云上的消息队列RocketMQ,可以免除消息队列集群的搭建和维护工作,将更多的精力投入到实现业务逻辑的工作中。当消息队列集群的性能不能满足要求时,还可以非常方便的进行集群一键扩容,以获得更高的并发吞吐量。

 

开通RocketMQ服务

 

在阿里云官方网站开通消息队列服务后方可开始使用消息队列RocketMQ,如果使用RAM用户访问RocketMQ,还必须先为RAM用户进行授权。在完成阿里云账户注册以及实名认证后,打开消息队列RocketMQ版产品页,点击免费开通,页面跳转至消息队列RocketMQ版控制台,在弹出的提示对话框中,完成RocketMQ服务的开通。

 

接下来,登录RAM控制台,在左侧导航栏选择人员管理 > 用户,用户页面,单击目标RAM用户操作列的添加权限,在添加权限面板,单击需要授予RAM用户的权限策略,单击确定。消息队列RocketMQ提供多种系统策略,可以根据权限范围为RAM用户授予相关权限。为了简单起见,我们先开通AliyunMQFullAccess权限策略,授予该RAM用户所有消息收发权限和控制台所有功能操作权限。

 

创建资源

 

在调用SDK收发消息前,需在消息队列RocketMQ控制台创建相关资源,在调用SDK时需填写这些资源信息。首先,我们要创建RocketMQ实例,实例是用于消息队列RocketMQ服务的虚拟机资源,相当于一个独立的消息队列集群,会存储消息Topic和客户端Group ID信息。我们还需要注意,只有在同一个地域下的同一个实例中的Topic和Group ID才能互通,例如,某Topic创建在华东1(杭州)地域的实例A中,那么该Topic只能被在华东1(杭州)地域的实例A中创建的Group ID对应的生产端和消费端访问。

 

登录到消息队列RocketMQ控制台,在左侧导航栏,单击实例列表,在顶部菜单栏,选择地域,如华东1(杭州),实例列表页面,单击创建实例,创建 RocketMQ 实例面板,完成实例的创建。

 

接下来,在实例所在页面的左侧导航栏,单击Topic 管理。在Topic 管理页面,单击创建 Topic,创建 Topic面板,输入名称描述,选择该Topic的消息类型事务消息,完成Topic的创建。

 

Topic是消息队列RocketMQ版里对消息的一级归类,例如创建Topic_Trade这一Topic来识别交易类消息,消息生产者将消息发送到Topic_Trade,而消息消费者则通过订阅该Topic来获取和消费消息。

 

创建完实例和Topic后,需要为消息的消费者和或生产者创建客户端ID,即Group ID作为标识。在事务消息的场景中,需要创建2个不同的Group ID,分别代表本地事务参与方和远程事务参与方。在实例所在页面的左侧导航栏,单击Group 管理,Group 管理页面,选择TCP 协议 > 创建 Group ID,创建可用于 TCP 协议的 Group面板,完成本地事务客户端Group ID的创建。重复此操作,完成远程事务参与方Group ID的创建。

 

本地事务参与方的业务代码

 

本文将通过Java代码介绍如何实现事务消息相关的业务逻辑,为了简化业务逻辑,我们继续基于锁定库存 - > 创建订单这个流程来演示,在这个流程中,仅有2个事务参与方。

初始化TransactionProducer

 

我们先通过Maven引入消息队列RocketMQ的SDK,优先使用阿里云官方提供的TCP版SDK。

<dependency><groupId>com.aliyun.openservices</groupId><artifactId>ons-client</artifactId><version>1.8.7.2.Final</version>
</dependency>

顺利引入Log4j2用于日志输出。

<dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.7</version>
</dependency>
<dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>2.13.1</version>
</dependency>

在库存中心的代码中,我们需要初始化一个TransactionProducer,用于异步消息的发送,需要填入如下信息:

  • Group ID:之前创建的用于本地事务参与方的Group ID。
  • Access key和Secret Key:RAM用户对应的密钥信息,从RAM用户控制台获得。
  • Nameserver Address:RocketMQ实例的接入点信息,从RocketMQ控制台获得。
Properties properties = new Properties();
// 您在控制台创建的Group ID。注意:事务消息的Group ID不能与其他类型消息的Group ID共用。
properties.put(PropertyKeyConst.GROUP_ID, "XXX");
// AccessKey ID阿里云身份验证,在阿里云RAM控制台创建。
properties.put(PropertyKeyConst.AccessKey, "XXX");
// AccessKey Secret阿里云身份验证,在阿里云RAM控制台创建。
properties.put(PropertyKeyConst.SecretKey, "XXX");
// 设置TCP接入域名,进入消息队列RocketMQ版控制台的实例详情页面的TCP协议客户端接入点区域查看。
properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX");
// LocalTransactionCheckerImpl本地事务回查类的实现
TransactionProducer producer = ONSFactory.createTransactionProducer(properties,new LocalTransactionCheckerImpl());
producer.start();

TransactionProducer是线程安全的,启动后能在多线程环境中复用。

 

获取全局唯一的交易流水号

 

在发送半事务消息以及执行本地事务之前,我们需要先获取一个全局唯一的交易流水号,订单与交易流水号一一对应,接下来的事务消息机制都会依赖于这个这个交易流水号。我们可以通过引入第三方ID生成组件,或者在本地通过Snowflake算法实现。

 

实现本地事务回查逻辑

 

创建一个实现了LocalTransactionChecker接口的
LocalTransactionCheckerImpl类,实现其中的check(Message)方法,该方法返回本地事务的最终状态。至于具体的业务逻辑如何实现,不在本文讨论的范围之前,我们将其封装在BusinessService类中。

package transaction;import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionChecker;
import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class LocalTransactionCheckerImpl implements LocalTransactionChecker {private static Logger LOGGER = LoggerFactory.getLogger(LocalTransactionCheckerImpl.class);private static BusinessService businessService = new BusinessService();@Overridepublic TransactionStatus check(Message msg) {// 从消息体中获得的交易IDString transactionKey = msg.getKey();TransactionStatus transactionStatus = TransactionStatus.Unknow;try {boolean isCommit = businessService.checkbusinessService(transactionKey);if (isCommit) {transactionStatus = TransactionStatus.CommitTransaction;} else {transactionStatus = TransactionStatus.RollbackTransaction;}} catch (Exception e) {LOGGER.error("Transaction Key:{}", transactionKey, e);}LOGGER.warn("Transaction Key:{}transactionStatus:{}", transactionKey, transactionStatus.name());return transactionStatus;}
}

 

执行本地事务并发送异步消息

 

我们先组装一条异步消息,其中包含了全局交易ID,消息将要发往的Topic,以及消息体。远程事务参与方将通过这个消息体中获取执行远程事务所必须的数据信息。

 

接下来,将这条异步消息连同一个实现了LocalTransactionExecuter接口的匿名类,通过send方法进行发送,这就是本地事务参与方所需要实现的所有业务代码了。当然,这个匿名类实现了TransactionStatus execute.execute()方法,其中包含了对于本地事务的执行。完整代码如下:

package transaction;import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.transaction.TransactionProducer;
import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.Properties;
import java.util.concurrent.TimeUnit;public class TransactionProducerClient {private static Logger LOGGER = LoggerFactory.getLogger(TransactionProducerClient.class);private static final BusinessService businessService = new BusinessService();private static final String TOPIC = "create_order";private static final TransactionProducer producer = null;static {Properties properties = new Properties();// 您在控制台创建的Group ID。注意:事务消息的Group ID不能与其他类型消息的Group ID共用。properties.put(PropertyKeyConst.GROUP_ID, "XXX");// AccessKey ID阿里云身份验证,在阿里云RAM控制台创建。properties.put(PropertyKeyConst.AccessKey, "XXX");// AccessKey Secret阿里云身份验证,在阿里云RAM控制台创建。properties.put(PropertyKeyConst.SecretKey, "XXX");// 设置TCP接入域名,进入消息队列RocketMQ版控制台的实例详情页面的TCP协议客户端接入点区域查看。properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX");// LocalTransactionCheckerImpl本地事务回查类的实现TransactionProducer producer = ONSFactory.createTransactionProducer(properties,new LocalTransactionCheckerImpl());producer.start();}public static void main(String[] args) throws InterruptedException {String transactionKey = getGlobalTransactionKey();String messageContent = String.format("lock inventory for: %s", transactionKey);Message message = new Message(TOPIC, null, transactionKey, messageContent.getBytes());try {SendResult sendResult = producer.send(message, (msg, arg) -> {// 此处用Lambda表示,实际是实现TransactionStatus execute(final Message msg, final Object arg)方法TransactionStatus transactionStatus = TransactionStatus.Unknow;try {boolean localTransactionOK = businessService.execbusinessService(transactionKey);if (localTransactionOK) {transactionStatus = TransactionStatus.CommitTransaction;} else {transactionStatus = TransactionStatus.RollbackTransaction;}} catch (Exception e) {LOGGER.error("Transaction Key:{}", transactionKey, e);}LOGGER.warn("Transaction Key:{}", transactionKey);return transactionStatus;}, null);LOGGER.info("send message OK, Transaction Key:{}, result:{}", message.getKey(), sendResult);} catch (Exception e) {LOGGER.info("send message failed, Transaction Key:{}", message.getKey());}// demo example防止进程退出TimeUnit.MILLISECONDS.sleep(Integer.MAX_VALUE);}private static String getGlobalTransactionKey() {// TODOreturn "";}
}

得益于RocketMQ SDK优秀的封装,发送半事务消息、发送确认消息、事务回查等重要步骤都已经完整实现,不需要开发者再编写代码了,这将为用户带来特别顺畅开发体验。

 

远程事务参与方的业务代码

 

相对本地事务参与方而言,远程事务参与方的代码更加简单,只需要从异步消息中提取出对应信息,完成对远程事务的执行即可。

package transaction;import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.Properties;public class TransactionConsumerClient {private static Logger LOGGER = LoggerFactory.getLogger(TransactionProducerClient.class);private static final BusinessService businessService = new BusinessService();private static final String TOPIC = "create_order";private static final Consumer consumer = null;static {Properties properties = new Properties();// 在控制台创建的Group ID,不同于本地事务参与方使用的Group IDproperties.put(PropertyKeyConst.GROUP_ID, "XXX");// AccessKey ID阿里云身份验证,在阿里云RAM控制台创建。properties.put(PropertyKeyConst.AccessKey, "XXX");// Accesskey Secret阿里云身份验证,在阿里云服RAM控制台创建。properties.put(PropertyKeyConst.SecretKey, "XXX");// 设置TCP接入域名,进入控制台的实例详情页面的TCP协议客户端接入点区域查看。properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX");Consumer consumer = ONSFactory.createConsumer(properties);consumer.start();}public static void main(String[] args) {consumer.subscribe(TOPIC, "*", (message, context) -> {LOGGER.info("Receive: " + message);businessService.doBusiness(message);// 返回CommitMessage,代表给予消息队列集群异步消息已经得到正常处理的回馈return Action.CommitMessage;});}
}

 

事务回滚

 

是否存在这样的情况:当本地事务执行成功后,因为远程事务没有办法执行,而导致本地事务需要进行回滚操作呢?在事务消息原理分析一节,我们已经介绍过如何通过消息重试,确保远程事务能够执行成功,这是不是已经说明只要异步消息被确认,远程事务就一定可以执行成功,从而不存在对本地事务的回滚呢?

 

实际生产情况下,确实存在远程事务无法正常执行的情况。比如在付款成功阶段,当本地事务“修改订单状态”执行完成后,在执行远程事务“通知发货”的时,因为订单地址有误而被物流公司拒绝,这种情况下就必须对订单状态进行回退操作,并发起退款流程。

 

所以在执行远程事务的时候,我们有必要区分如下两种完全不同的异常:

 

  1. 技术异常:远程事务参与方宕机、网络故障、数据库故障等。
  2. 业务异常:远程逻辑在业务上无法执行、代码业务逻辑错误等。

 

简单来讲,当远程事务执行失败的时候,能够通过消息重试的方式解决问题的,属于技术异常;否则,属于业务异常。基于事务消息的分布式事务机制不能实现自动回滚,当业务异常发生的时候,必须通过回退流程确保已经完成的本地事务得到恢复。比如在修改订单状态 -> 通知发货这个场景中,如果由于业务异常导致无法发货的时候,需要通过额外的回退流程,将订单状态设置为“已取消”,并执行退款流程。

 

在事务消息机制中,回退流程相当于远程事务参与方和本地事务参与方调换了角色,和正常流程一样,同样也可以通过事务消息来完成分布式事务。由于正常流程和回退流程的业务逻辑是完全不一样的,所以最理想的方式是建立另外一个Topic来实现。这也就说明,我们在创建事务消息Topic的时候,要充分考虑到这个Topic背后的业务含义,并在Topic命名上尽可能的与真实业务相匹配

多个事务参与方

本节展示的示例中,都只涉及到2个事务参与方,但在真实世界中,分布式事务往往涉及到更多的事务参与方,比如之前提到的付款成功环节,有修改订单状态->扣减库存->通知发货->增加积分这4个需要同时进行的操作,涉及到4个事务参与方。这种情况下如何通过事务消息来实现分布式事务呢?

我们依然可以继续使用之前的架构,只需要加入多个远程事务参与方就行了。可以通过RocketMQ的多消费组关联多个远程事务参与方,每一个参与方对应一个Group ID,在这种情况下,同一个异步消息会复制成多份投递给不同的事务参与方。

需要特别引起注意的是,当某个远程事务参与方遇到业务异常的时候,需要通知其他所有事务参与方执行回退流程,这无疑会增加业务逻辑的整体复杂度。为了简化事务消息的执行流程,我们可以对业务逻辑预先进行梳理,将子事务分为如下两类:

  1. 有可能发生业务异常的:比如锁定库存的操作,有可能因为库存不足而执行失败。又比如扣除积分的操作,有可能因为用户积分不足而无法扣除。
  2. 不太可能发生业务异常的:比如删除购物车条目的操作,除非是技术类故障,一定可以执行成功,即便对应的条目并不存在,也没有关系。又比如积分增加的操作,只要对应的用户没有注销,是不可能遇到业务异常的。

 

我们尽量将第一类事务作为本地事务而实现,将第二类事务作为远程事务而实现,这样就可以最大程度避免回退流程。

其他注意事项

消息幂等

RocketMQ能保证消息不丢失,但不能保证消息不重复,所以消费者在接收到消息后,有必要根据业务上的唯一Key对消息做幂等处理。在抢购业务中,唯一Key当然就是全局唯一的交易流水号,具体幂等处理方法在互联网上有很多文章供读者参考。当然,不是每一种业务远程事务都需要确保消息的幂等性,比如删除购物车指定条目这样的操作,在业务上是可以容忍多次反复执行的,就没有必要引入额外的幂等处理了。

每日对账

不同于传统事务的强一致性保证,柔性事务需要经历一个中间状态,才到达成事务的最终一致性。有某些特殊情况下,这个中间状态会持续非常长的时间,甚至需要人工主动介入才能实现最终一致性:

  1. 消息重试多次后,依然不成功:当消费者完全无法正常工作的时候,RocketMQ不可能永无止境地重试消息,事实上,如果16次重试后异步消息依然没有办法被正常处理,RocketMQ会停止尝试,将消息放到一个特殊的队列中。
  2. 未处理的业务异常:比如给某个账号加积分的时候,发现此账号被注销了,这是一个非常罕见的业务现象,有可能事先对此并没有健壮的处理机制。
  3. 幂等校验失败:处理幂等所依赖的系统比如Redis发生了故障,导致某些消息被重复处理。
  4. 其他严重的系统故障:比如网络长时间中断,留下了大量执行到一半的事务。
  5. 其他漏网之鱼。

这些情况下,我们都有需要通过定期对账机制来进行排查,在必要的时候发起人工主动介入流程,修复不一致的数据。事实上,在任何柔性事务的实现中,每日对账都是必不可少的数据安全保障性手段。

总结

在柔性事务的多种实现中,事务消息是最为优雅易用的一种。基于阿里云RocketMQ高性能、高可用的特点,完全可以胜任抢购业务这类高并发大流量的场景。在阿里巴巴自身的业务中,事务消息也广泛使用于双11这样的大型营销活动中,有着非常高的通用性。

但在IT领域,没有任何一种技术是银弹,引入事务消息机制需要针对性的修改业务逻辑,还需要借助于每日对账等额外的手段确保数据安全,在实现高性能的同时,也增加了整体的业务复杂度。我们需要对业务场景进行充分评估,对比多种不同的技术实现方案,从中挑选与自身业务特点最为匹配的一种,才能更好地发挥柔性事务的价值。

作者:山猎,阿里云解决方案架构师

原文链接

本文为阿里云原创内容,未经允许不得转载

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/514193.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

海量秋招面试资料等你来拿!你离大厂也许并不远

秋招在即&#xff0c;你还在为秋招如何准备而发愁吗&#xff1f;你还在为拿不到大厂offer而苦恼吗&#xff1f;工欲善其事&#xff0c;必先利其器。金秋开学季&#xff0c;CSDN助力你的技术学习与成长&#xff0c;为你免费提供海量大厂面试资料&#xff0c;让你的秋招不再慌乱&…

基于Ganos百行代码实现亿级矢量空间数据在线可视化

简介&#xff1a; 本文介绍如何使用RDS PG或PolarDB&#xff08;兼容PG版或Oracle版&#xff09;的Ganos时空引擎提供的数据库快显技术&#xff0c;仅用百行代码实现亿级海量几何空间数据的在线快速显示和流畅地图交互&#xff0c;且无需关注切片存储和效率问题。 01 引言 如何…

流批一体生产应用!Bigo 实时计算平台建设实践

简介&#xff1a; 本文由 Bigo 计算平台负责人徐帅分享&#xff0c;主要介绍 Bigo 实时计算平台建设实践的介绍 本文由 Bigo 计算平台负责人徐帅分享&#xff0c;主要介绍 Bigo 实时计算平台建设实践的介绍。内容包括&#xff1a; Bigo 实时计算平台的发展历程特色与改进业务场…

一部手机是否能用 7 年?苹果、三星、Google:三年差不多!

整理 | 苏宓出品 | CSDN&#xff08;ID&#xff1a;CSDNnews&#xff09;一部手机如果可以流畅地使用 7 年&#xff0c;是种什么样的感觉&#xff1a;有人说&#xff0c;这对于 iPhone 而言&#xff0c;或许会很轻松做到&#xff0c;但也会给一些平价的 Android 手机制造商带来…

五福背后的 Web 3D 引擎 Oasis Engine 正式开源

简介&#xff1a; Oasis 从开源走向新的起点&#xff0c;用 3D 化的交互和表达让世界变得更美好。 相信大家已经体验了今年支付宝五福的活动&#xff0c;无论是今年的五福首页还是打年兽游戏都是由蚂蚁互动图形引擎&#xff08;代号&#xff1a;Oasis Engine&#xff09;驱动的…

我用 Python 自制成语接龙小游戏,刺激!

作者&#xff1a;小小明原文链接&#xff1a;https://blog.csdn.net/as604049322/article/details/118154687本文为读者投稿在 https://github.com/pwxcoo/chinese-xinhua 项目中可以下载到中华成语的语料库&#xff0c;该项目收录包括 14032 条歇后语&#xff0c;16142 个汉字…

基于SLS构建RDS审计合规监控

简介&#xff1a; 数据库是企业业务的数据核心&#xff0c;其安全方面的问题在传统环境中已经成为泄漏和被篡改的重要根源。因此&#xff0c;对数据库的操作行为尤其是全量 SQL 执行记录的审计日志&#xff0c;就显得尤为重要。 背景 数据库是企业业务的数据核心&#xff0c;其…

云效DevOps实践-如何基于云效实现测试自动化集成和分析

简介&#xff1a; 对于现代软件研发来说&#xff0c;持续、快速、高质量、低风险地交付需求特性&#xff0c;是业务对研发的主要诉求。而要做到这一点&#xff0c;除了要有良好的架构设计、卓越的工程能力&#xff0c;快速可靠的测试反馈也是其非常重要的一环&#xff0c;达到这…

spring 使用其他类protected方法_Java操作bean、属性、方法的使用工具类

在实际的项目开发中&#xff0c;反射操作类的实例、属性赋值、执行方法是常规的操作&#xff0c;虽然spring提供了比较完整的API来执行上述操作&#xff0c;不过在实际的应用中&#xff0c;spring的函数隐藏比较深&#xff0c;比较分散&#xff0c;小伙伴们可能懒得花时间去寻找…

2021年阿里云采购季大促主会场全攻略

在疫情的影响下&#xff0c;企业都在谋求各种转机&#xff0c;探寻各种转型之路&#xff0c;为助力企业复工复产低成本上云&#xff0c;日前阿里云开年采购季优惠活动于3月1日正式开启。 从主会场页面来看&#xff0c;活动分为三个阶段&#xff1a; 3月1日-3月16日&#xff1a…

应云而生,幽灵的威胁 - 云原生应用交付与运维的思考

简介&#xff1a; 过去的 2020 是充满不确定性的一年&#xff0c;但也是充满机遇的一年。突发的新冠疫情为全社会的数字化转型按下加速键。云计算已经不再是一种技术&#xff0c;而是成为支撑数字经济发展和业务创新的关键基础设施。在利用云计算重塑企业 IT 的过程中&#xff…

技术干货 | mPaaS 小程序高玩带你起飞:客户端预置小程序无视网络质量

简介&#xff1a; 弱网拉包无障碍&#xff0c;深度提升用户体验 传统的小程序技术容易受到网络环境影响&#xff0c;当网络质量不佳时可能导致拉取不到小程序包的情况。通过预置小程序&#xff0c;即可规避该问题。本文介绍了预置小程序的原理和预置小程序的实现过程。 什么是预…

Delta Lake在Soul的应用实践

简介&#xff1a; 传统离线数仓模式下&#xff0c;日志入库前首要阶段便是ETL&#xff0c;我们面临如下问题&#xff1a;天级ETL任务耗时久&#xff0c;影响下游依赖的产出时间&#xff1b;凌晨占用资源庞大&#xff0c;任务高峰期抢占大量集群资源&#xff1b;ETL任务稳定性不…

亚马逊云科技中国线上峰会开幕,发力汽车产业链、少年人工智能等

亚马逊云科技于9月9日-14日举办以“构建新格局 重塑云时代”为主题的中国线上峰会&#xff0c;推出涵盖行业视野、技术创新、开发者和开源、云安全、以及人工智能的5大主题演讲、覆盖云计算各细分领域的8大技术分论坛&#xff0c;以及汇聚各行业上云趋势及创新实践的10大行业分…

【产品能力深度解读】连续入围Gartner魔力象限的Quick BI有何魔力?

简介&#xff1a; 国际权威分析机构Gartner发布2021年商业智能和分析平台魔力象限报告&#xff0c;阿里云Quick BI再度入选&#xff0c;并继续成为该领域魔力象限唯一入选的中国企业。 Quick BI凭借在增强分析能力上的持续投入、数据中台矩阵化产品优势和电商行业的专业度&…

mysql的json函数与实例_Mysql实例详解Mysql中的JSON系列操作函数

《Mysql实例详解Mysql中的JSON系列操作函数》要点&#xff1a;本文介绍了Mysql实例详解Mysql中的JSON系列操作函数&#xff0c;希望对您有用。如果有疑问&#xff0c;可以联系我们。MYSQL必读前言MYSQL必读JSON是一种轻量级的数据交换格式,采用了独立于语言的文本格式,类似XML,…

256变4096:分库分表扩容如何实现平滑数据迁移?

简介&#xff1a; 本文作者就一个高德打车弹外订单系统进行了一次扩分库分表和数据库迁移。 一、 背景 2020年&#xff0c;笔者负责的一个高德打车弹外订单系统进行了一次扩分库分表和数据库迁移。该订单系统整体部署在阿里云上&#xff0c;服务使用阿里云ECS部署&#xff0c;…

OpenYurt 如何 “0 侵入” 攻破云边融合难点

简介&#xff1a; 随着 5G、IoT、直播、CDN 等行业和业务的发展&#xff0c;越来越多的算力和业务开始下沉到距离数据源或者终端用户更近的位置&#xff0c;以期获得很好的响应时间和成本&#xff0c;这是一种明显区别于传统中心模式的计算方式——边缘计算。 随着 5G、IoT、直…

Python - 深夜数据结构与算法之 Graph

目录 一.引言 二.图的简介 1.Graph 图 2.Undirected graph 无向图 3.Directed Graph 有向图 4.DFS / BFS 遍历 三.经典算法实战 1.Num-Islands [200] 2.Land-Perimeter [463] 3.Largest-Island [827] 四.总结 一.引言 Graph 无论是应用还是算法题目在日常生活中比较…

Docker Desktop宣布收费;腾讯7月已申请注册WECHAT CLOUD商标;MongoDB成为当前最具价值开源软件公司...

NEWS本周新闻回顾Docker Desktop 宣布收费近日 Docker 官方宣布一项新的动作&#xff0c;即将产品订阅划分为个人、专业、团队和商业不同版本。如果企业规模在 250 名员工以上或年收入超过 1000 万美元的公司想要使用 Docker Desktop&#xff0c;那么必须使用付费订阅。付费订阅…