SpringCloud基于RocketMQ实现分布式事务

前言

分布式事务是在微服务开发中经常会遇到的一个问题,之前的文章中我们已经实现了利用Seata来实现强一致性事务,其实还有一种广为人知的方案就是利用消息队列来实现分布式事务,保证数据的最终一致性,也就是我们常说的柔性事务。

消息队列实现分布式事务原理

首先让我们来看一下基于消息队列实现分布式事务的原理方案。

6b4486963889d7414396d5c3f29a0c17.png

柔性事务

发送消息的服务有个OUTBOX数据表,在进行INSERT、UPDATE、DELETE 业务操作时也会给OUTBOX数据表INSERT一条消息记录,这样可以保证原子性,因为这是基于本地的ACID事务。

OUTBOX表充当临时消息队列,然后我们在引入一个消息中继(MessageRelay)的服务,由他从OUTBOX表中读取数据并发布消息到消息组件。

消息中继的实现可以很简单,只需要通过定时任务定期从OUTBOX表中拉取最新未发布的数据,获取到数据后将数据发送给消息组件,最后将完成发送的消息从OUTBOX表中删除即可,对于失败的消息可以根据业务规则进行重试。

RocketMQ的事务消息

RocketMQ本身已经支持事务消息,如果你们项目使用了RocketMQ,可以直接借助RocketMQ的事务消息实现分布式事务,我们先看一下RocketMQ事务消息的原理然后再借助RocketMQ来实现分布式事务。

RocketMQ采用了2PC的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败的消息,如下图所示。

outside_default.png

分布式事务

RocketMQ实现事务消息主要分为两个阶段:正常事务的发送及提交、事务信息的补偿流程

整体流程为:

  • 正常事务发送与提交阶段
    1、生产者发送一个半消息给MQServer(半消息是指消费者暂时不能消费的消息)
    2、服务端响应消息写入结果,半消息发送成功
    3、开始执行本地事务
    4、根据本地事务的执行状态执行Commit或者Rollback操作

  • 事务信息的补偿流程
    1、如果MQServer长时间没收到本地事务的执行状态会向生产者发起一个确认回查的操作请求
    2、生产者收到确认回查请求后,检查本地事务的执行状态
    3、根据检查后的结果执行Commit或者Rollback操作
    补偿阶段主要是用于解决生产者在发送Commit或者Rollback操作时发生超时或失败的情况。

RocketMQ事务流程关键

  1. 事务消息在一阶段对用户不可见
    事务消息相对普通消息最大的特点就是一阶段发送的消息对用户是不可见的,也就是说消费者不能直接消费。这里RocketMQ的实现方法是原消息的主题与消息消费队列,然后把主题改成 RMQ_SYS_TRANS_HALF_TOPIC ,这样由于消费者没有订阅这个主题,所以不会被消费。

  2. 如何处理第二阶段的失败消息?
    在本地事务执行完成后会向MQServer发送Commit或Rollback操作,此时如果在发送消息的时候生产者出故障了,那么要保证这条消息最终被消费,MQServer会像服务端发送回查请求,确认本地事务的执行状态。
    当然了rocketmq并不会无休止的的信息事务状态回查,默认回查15次,如果15次回查还是无法得知事务状态,RocketMQ默认回滚该消息。

  3. 消息状态 事务消息有三种状态:TransactionStatus.CommitTransaction:提交事务消息,消费者可以消费此消息
    TransactionStatus.RollbackTransaction:回滚事务,它代表该消息将被删除,不允许被消费。
    TransactionStatus.Unknown :中间状态,它代表需要检查消息队列来确定状态。

代码实现

业务需求:用户请求订单微服务 order-service 接口删除订单(退货),删除订单时需要调用 account-service的方法给账户增加余额,一个典型的分布式事务问题。

acc511a1fd3011fd840a08853e55854a.png


基础配置

  • 在Order-Service和Account-Service中引入Rocket消息组件

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
  • 在配置中心添加RocketMQ的相关配置

rocketmq:name-server: xxx.xx.x.xx:9876producer:group: cloud-group
  • 在OrderService服务中建立一张事务日志表rocketmq_transaction_log(作用稍后说)

81dcdd466cb67eee2c1df28abbc49d28.png

发送半消息

Order-Service作为分布式事务开始的入口,在Service层我们给RocketMQ发送一条半消息

  • OrderController入口

/*** 根据订单号删除订单* @param orderNo 订单编号*/
@PostMapping("/order/delete")
public ResultData<String> delete(@RequestParam String orderNo){log.info("delete order id is {}",orderNo);orderService.delete(orderNo);return ResultData.success("订单删除成功");
}

直接调用orderService的delete方法

  • OrderServiceImpl业务逻辑

@Override
public void delete(String orderNo) {Order order = orderMapper.selectByNo(orderNo);//如果订单存在且状态为有效,进行业务处理if (order != null && CloudConstant.VALID_STATUS.equals(order.getStatus())) {String transactionId = UUID.randomUUID().toString();//如果可以删除订单则发送消息给rocketmq,让用户中心消费消息rocketMQTemplate.sendMessageInTransaction("add-amount",MessageBuilder.withPayload(UserAddMoneyDTO.builder().userCode(order.getAccountCode()).amount(order.getAmount()).build()).setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId).setHeader("order_id",order.getId()).build(),order);}
}

首先校验一下订单状态,然后使用 rocketMQTemplate.sendMessageInTransaction()发送事务消息。

sendMessageInTransaction方法有三个参数:

  • destination:目的地(主题),这里发送给add-amount 这个topic

  • message:发送给消费者的消息体,需要使用MessageBuilder.withPayload() 来构建消息

  • arg:参数

注意,这里我们生成了一个transactionId,并放在header中跟消息一起发送(这里实际也可以构造成一个对象,放在arg里进行发送),作用后面再讲!

  • 消息封装实体UserAddMoneyDTO

@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class UserAddMoneyDTO {/*** 用户编码*/private String userCode;/*** 金额*/private BigDecimal amount;
}

这个类生产者和消费者都需要用到,所以我直接丢到common包中,大家根据项目实际情况决定放哪。

执行本地事务与回查

MQServer收到半消息后会告诉生产者order-service确认收到半消息,这时候order-service需要执行本地事务,执行完本地事务后再告诉MQServer本地事务的执行状态,确认此消息究竟是Commit还是Rollback。

RocketMQ提供了 RocketMQLocalTransactionListener 接口,本地事务监听器,这个接口类的实现如下:

b2db8ffd14bab6b89c5b9d938e10b46c.png

第一个方法 executeLocalTransaction 为执行本地事务;第二个方法 checkLocalTransaction 为检查本地事务的执行状态,也就是回查动作。

我们需要实现 RocketMQLocalTransactionListener接口,在 executeLocalTransaction方法中执行本地事务,在执行 checkLocalTransaction回查方法时告诉RocketMQ到底该提交还是回滚。

这里大家思考一个问题,本地事务已经执行完成了,怎么去回查本地事务的执行结果呢?

答案如下:我们可以在执行本地事务的时候同时生成一条事务日志,让本地事务与日志事务在同一个方法中,同时添加 @Transactional 注解,保证两个操作事务是一个原子操作。

这样如果事务日志表中有这个本地事务的信息,那就代表本地事务执行成功,需要Commit,相反如果没有对应的事务日志,则表示执行失败,需要Rollback。这就是为什么我们上面在OrderService中需要建立一张事务日志表的原因。

  • 实现RocketMQLocalTransactionListener接口,完成事务执行逻辑

/*** 监听事务消息* @author javadaily*/
@Slf4j
@RocketMQTransactionListener
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class AddUserAmountListener implements RocketMQLocalTransactionListener {private final OrderService orderService;private final RocketMqTransactionLogMapper rocketMqTransactionLogMapper;/*** 执行本地事务*/@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {log.info("执行本地事务");MessageHeaders headers = message.getHeaders();//获取事务IDString transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);Integer orderId = Integer.valueOf((String)headers.get("order_id"));log.info("transactionId is {}, orderId is {}",transactionId,orderId);try{//执行本地事务,并记录日志orderService.changeStatuswithRocketMqLog(orderId, CloudConstant.INVALID_STATUS,transactionId);//执行成功,可以提交事务return RocketMQLocalTransactionState.COMMIT;}catch (Exception e){return RocketMQLocalTransactionState.ROLLBACK;}}/*** 本地事务的检查,检查本地事务是否成功*/@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message message) {MessageHeaders headers = message.getHeaders();//获取事务IDString transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);log.info("检查本地事务,事务ID:{}",transactionId);//根据事务id从日志表检索QueryWrapper<RocketmqTransactionLog> queryWrapper = new QueryWrapper<>();queryWrapper.eq("transaction_id",transactionId);RocketmqTransactionLog rocketmqTransactionLog = rocketMqTransactionLogMapper.selectOne(queryWrapper);if(null != rocketmqTransactionLog){return RocketMQLocalTransactionState.COMMIT;}return RocketMQLocalTransactionState.ROLLBACK;}
}
  • 本地事务执行逻辑

@Transactional(rollbackFor = RuntimeException.class)
@Override
public void changeStatuswithRocketMqLog(Integer id,String status,String transactionId){orderMapper.changeStatus(id,status);rocketMqTransactionLogMapper.insert(RocketmqTransactionLog.builder().transactionId(transactionId).log("执行删除订单操作").build());
}

修改订单状态为删除状态,同时往事务日志表中插入一条事务日志,用@Transactional注解保证事务。

Account-Service消费消息

  • 监听消息并处理给用户增加余额逻辑

@Slf4j
@Service
@RocketMQMessageListener(topic = "add-amount",consumerGroup = "cloud-group")
@RequiredArgsConstructor(onConstructor = @__(@Autowired) )
public class AddUserAmountListener implements RocketMQListener<UserAddMoneyDTO> {private final AccountMapper accountMapper;/*** 收到消息的业务逻辑*/@Overridepublic void onMessage(UserAddMoneyDTO userAddMoneyDTO) {log.info("received message: {}",userAddMoneyDTO);accountMapper.increaseAmount(userAddMoneyDTO.getUserCode(),userAddMoneyDTO.getAmount());log.info("add money success");}
}

测试

  • 测试数据

2a59ff5cf84c2e079080e429ad1b0d51.png

订单表

f25075f72024101fe2a99b0cbe25e9f4.png

用户表

53b1052144744ccbf829efd4bb955003.png

事务日志表

如果事务消息成功消费最终用户表中jianzh5这个用户的amount应该变成300(100+200)

  • 测试准备

a32ef3c8ec87c9b3d71ae41f6500ab5c.png

我们在执行本地事务成功并需要通知消息队列提交事务处打个断点,然后在执行到此处时手动模拟异常

  • 模拟异常

d246d23e565d2f58a332ecc05f23bd71.png

7ff96c503345922e0580b0b8bbdbe304.png

在准备提交事务时我们通过命令 taskkill /pid 10116 -t -f命令强制杀掉OrderService进程。(先通过jps获取OrderService进程ID)

  • 重启服务器,检查是否会执行回查方法

6c774600fbabb256420c291968575149.png

重启OrderService程序会自动执行回查方法,结合事务日志表判断是否提交事务。

  • 运行后的结果

20db0478b97621c4b18b300f2f114571.png

小结

本篇文章我们介绍了使用消息队列实现柔性事务的方案,重点剖析了RocketMQ事务消息的原理,并通过Demo案例实现了分布式事务(柔性事务)。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/544035.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

漫画:怎么证明sleep不释放锁,而wait释放锁?

wait 加锁示例public class WaitDemo {private static Object locker new Object();public static void main(String[] args) throws InterruptedException {WaitDemo waitDemo new WaitDemo();// 启动新线程&#xff0c;防止主线程被休眠new Thread(() -> {try {waitDemo…

就国内某个程序员问答网站的简单的分析

为什么80%的码农都做不了架构师&#xff1f;>>> 一、数据抓取 分析页面数据&#xff0c;设计数据表结构数据只要包含投票、回答数、问题状态、最后谁回答过、浏览数、问题标题、标签&#xff0c;数据样例如下&#xff1a;由于一开只打算爬问题标题&#xff0c;问题…

iOS开发中 常用枚举和常用的一些运算符(易错总结)

1、色值的随机值&#xff1a;#define kColorValue arc4random_uniform(256)/255.0 // arc4random_uniform(256)/255.0; 求出0.0~1.0之间的数字view.backgroundColor [UIColor colorWithRed:kColorValue green: kColorValue blue: kColorValue alpha: 0.5]; 2、定时器的使用&…

明明加了唯一索引,为什么还是产生重复数据?

前段时间我踩过一个坑&#xff1a;在mysql8的一张innodb引擎的表中&#xff0c;加了唯一索引&#xff0c;但最后发现数据竟然还是重复了。到底怎么回事呢&#xff1f;本文通过一次踩坑经历&#xff0c;聊聊唯一索引&#xff0c;一些有意思的知识点。1.还原问题现场前段时间&…

nmap入门之主机发现

2019独角兽企业重金招聘Python工程师标准>>> #主机发现&#xff08;HOST DISCOVERY&#xff09; ##仅列出IP&#xff0c;不扫描 nmap -sL 192.168.70.0/24 > nmap_result.txt 2>&1##仅ping扫描&#xff0c;不扫描端口 nmap -sn 192.168.70.0/24##不ping扫…

面试官:为什么ConcurrentHashMap要放弃分段锁?

今天我们来讨论一下一个比较经典的面试题就是 ConcurrentHashMap 为什么放弃使用了分段锁&#xff0c;这个面试题阿粉相信很多人肯定觉得有点头疼&#xff0c;因为很少有人在开发中去研究这块的内容&#xff0c;今天阿粉就来给大家讲一下这个 ConcurrentHashMap 为什么在 JDK8 …

面试突击72:输入URL之后会执行什么流程?

作者 | 磊哥来源 | Java面试真题解析&#xff08;ID&#xff1a;aimianshi666&#xff09;转载请联系授权&#xff08;微信ID&#xff1a;GG_Stone&#xff09;在浏览器中输入 URL 之后&#xff0c;它会执行以下几个流程&#xff1a;执行 DNS 域名解析&#xff1b;封装 HTTP 请…

面试必备:TCP 经典 15 连问!

TCP协议是大厂面试必问的知识点。整理了15道非常经典的TCP面试题&#xff0c;希望大家都找到理想的offer呀1. 讲下TCP三次握手流程开始客户端和服务器都处于CLOSED状态&#xff0c;然后服务端开始监听某个端口&#xff0c;进入LISTEN状态第一次握手(SYN1, seqx)&#xff0c;发…

ISP QoS Lab

ISP QoS Lab1-PQ优先级队列&#xff08;PQ&#xff0c;Priority Queue&#xff09;中&#xff0c;有高、中、普通、低优先级四个队列。数据包根据事先的定义放在不同的队列中&#xff0c;路由器按照高、中、普通、低顺序服务&#xff0c;只有高优先级的队列为空后才为中优先级的…

面渣逆袭:JVM经典五十问,这下面试稳了!

引言1.什么是JVM?JVM——Java虚拟机&#xff0c;它是Java实现平台无关性的基石。Java程序运行的时候&#xff0c;编译器将Java文件编译成平台无关的Java字节码文件&#xff08;.class&#xff09;,接下来对应平台JVM对字节码文件进行解释&#xff0c;翻译成对应平台匹配的机器…

操作系统大内核和微内核_操作系统中的内核类型

操作系统大内核和微内核As we have already studied about the Kernels, we know that the Kernel is a program which is the main component of the Operating System. Now let us study about the types of Kernels. 正如我们已经研究了内核一样 &#xff0c;我们知道内核是…

【论文解读】Learning based fast H.264 to H.265 transcoding

时间&#xff1a; 2015 年 级别&#xff1a;APSIPA 机构&#xff1a; 上海电力大学 摘要 新提出的视频编码标准HEVC (High Efficiency video coding)以其比H.264/AVC更好的编码效率&#xff0c;被工业界和学术界广泛接受和采用。在HEVC实现了约40%的编码效率提升的同时&#…

面试必备:聊聊sql优化的15个小技巧

sql优化是一个大家都比较关注的热门话题&#xff0c;无论你在面试&#xff0c;还是工作中&#xff0c;都很有可能会遇到。如果某天你负责的某个线上接口&#xff0c;出现了性能问题&#xff0c;需要做优化。那么你首先想到的很有可能是优化sql语句&#xff0c;因为它的改造成本…

面试突击73:IoC 和 DI 有什么区别?

作者 | 磊哥来源 | Java面试真题解析&#xff08;ID&#xff1a;aimianshi666&#xff09;转载请联系授权&#xff08;微信ID&#xff1a;GG_Stone&#xff09;IoC 和 DI 都是 Spring 框架中的重要概念&#xff0c;就像玫瑰花与爱情一样&#xff0c;IoC 和 DI 通常情况下也是成…

MyBatis整合Spring的实现(13)

2019独角兽企业重金招聘Python工程师标准>>> 本章中分析insert元素的解析。 1 配置文件 <insert id"insert" parameterType"cn.vansky.schedule.time.menu.bo.Menu"><!--WARNING - mbggeneratedThis element is automatically generat…

麻了,代码改成多线程,竟有9大问题

很多时候&#xff0c;我们为了提升接口的性能&#xff0c;会把之前单线程同步执行的代码&#xff0c;改成多线程异步执行。比如&#xff1a;查询用户信息接口&#xff0c;需要返回用户基本信息、积分信息、成长值信息&#xff0c;而用户、积分和成长值&#xff0c;需要调用不同…

浅聊一下线程池的10个坑

日常开发中&#xff0c;为了更好管理线程资源&#xff0c;减少创建线程和销毁线程的资源损耗&#xff0c;我们会使用线程池来执行一些异步任务。但是线程池使用不当&#xff0c;就可能会引发生产事故。今天跟大家聊聊线程池的10个坑。大家看完肯定会有帮助的~线程池默认使用无界…

Linux命令行上执行操作,不退回命令行的解决方法

问题描述&#xff1a; 如果你现在登录Centos执行了某个操作&#xff0c;但是操作一直占用命令行&#xff0c;命令行显示的也都是这个命令相关的操作&#xff0c;我想做其它事情 &#xff0c;该怎么办呢 &#xff1f; 解决方法&#xff1a; 根据《Linux命令行与Shell编程大全第2…

SpringBoot 读取配置文件的 5 种方法!

作者 | 磊哥来源 | Java面试真题解析&#xff08;ID&#xff1a;aimianshi666&#xff09;转载请联系授权&#xff08;微信ID&#xff1a;GG_Stone&#xff09;Spring Boot 中读取配置文件有以下 5 种方法&#xff1a;使用 Value 读取配置文件。使用 ConfigurationProperties 读…

使用阿里巴巴 Druid 轻松实现加密!

作者 | 磊哥来源 | Java中文社群&#xff08;ID&#xff1a;javacn666&#xff09;转载请联系授权&#xff08;微信ID&#xff1a;GG_Stone&#xff09;为什么要加密&#xff1f;现在的开发习惯&#xff0c;无论是公司的项目还是个人的项目&#xff0c;都会选择将源码上传到 Gi…