(项目实战)业务场景中学透RocketMQ5.0-事务消息在预付卡系统中的应用

在这里插入图片描述

1 什么是事务消息

RocketMQ中事务消息主要是解决分布式场景下各业务系统事务一致性问题,常见的分布式事务解决方案有传统XA事务方案、TCC、本地消息表、MQ事务等。今天我们基于RocketMQ事务消息解决预付卡系统资金账户子系统和会员积分子系统、短信子系统分布式事务业务场景
在这里插入图片描述

1.1 事务消息功能原理

事务消息是 RocketMQ 提供的一种高级消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性。

事务消息处理流程
在这里插入图片描述

  1. 生产者将半事务消息发送至RocketMQ服务端。
  2. RocketMQ服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息。
  3. 生产者开始执行本地事务逻辑。
  4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:
    • 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
    • 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
  5. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。
  6. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
  7. 生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。

2 分布式事务的应用场景

分布式系统调用的特点为一个核心业务逻辑的执行,同时需要调用多个下游业务进行处理。因此,如何保证核心业务和多个下游业务的执行结果完全一致,是分布式事务需要解决的主要问题。接下来我们基于预付卡系统-资金账户子系统和会员积分子系统、短信子系统业务流程来对分布式应用场景进行分析
在这里插入图片描述

  1. 预付卡系统-资金账户子系统为整个分布式应用场景中的上游系统,上游系统资金账户的变动会影响下游系统(会员积分子系统,短信子系统)事务的执行
  2. 在上游系统事务执行成功后会提交事务消息到下游系统,这时下游系统(会员积分和短信子系统)执行自己的业务逻辑
  3. 如果上游系统未执行成功最终状态为rollback则不会发送消息到下游系统,这时下游系统就不会执行相关业务逻辑保证了事务的一致性

3 基于RocketMQ事务消息解决卡系统分布式事务

在这里插入图片描述

  1. 预付卡系统-资金账户模块向RocketMQ服务端发送半事务消息,该消息暂时不会投递下游系统
  2. RocketMQ服务端向资金账户返回接收成功
  3. 执行资金账户本地事务
  4. 向RocketMQ二次提交事务执行结果(commit或rollback)
  5. 如果RocketMQ服务端出现异常情况会在资金账户系统(生产者)回查状态再执行第4步
  6. 检查资金账户本地事务最终执行结果
  7. 根据资金账户本地事务最终状态再次进行二次提交
  8. 如果事务状态最终为commit则会交付消息到会员积分子系统,短信子系统(消息订阅者),如果状态为rollback则不交付消息到订阅者

4 RocketMQ事务消息核心代码实现

本技术文档采用SpringCloud2021.x和RocketMQ5.0进行代码实现

Spring Cloud Alibaba VersionSpring Cloud VersionSpring Boot Version
2021.xSpring Cloud 2021.x2.7.18

4.1 在SpringCloud中集成RocketMQ流程

4.1.1 引入RocketMQ Stream Starter
<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
4.1.2 修改application.properties配置文件
server.port=1001
spring.application.name=ecard-tx-pay
spring.cloud.stream.function.definition=consumer;
spring.cloud.stream.rocketmq.binder.name-server=localhost:9876
spring.cloud.stream.rocketmq.bindings.producer-out-0.producer.group=output_1
spring.cloud.stream.rocketmq.bindings.producer-out-0.producer.transactionListener=ecardTransactionListener
spring.cloud.stream.rocketmq.bindings.producer-out-0.producer.producerType=Trans
spring.cloud.stream.bindings.producer-out-0.destination=tx
spring.cloud.stream.bindings.consumer-in-0.destination=tx
spring.cloud.stream.bindings.consumer-in-0.group=tx-group
logging.level.org.springframework.context.support=debug
4.1.3 代码实现

1 事务消息生产者

package cn.itbeien.ecard.tx;import java.util.function.Consumer;import cn.itbeien.ecard.common.SimpleMsg;
import com.alibaba.cloud.stream.binder.rocketmq.constant.RocketMQConst;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.MimeTypeUtils;/*** @author beien* @date 2024-06-20 22:15* Copyright© 2024 beien* 发送半事务消息到RocketMQ服务端*/
@Service
@Slf4j
public class EcardTXService {@Autowiredprivate StreamBridge streamBridge;//生产半事务消息@Beanpublic void producer() {String orderId = "10001";MessageBuilder builder = MessageBuilder.withPayload(new SimpleMsg("Hello Tx msg " + orderId));builder.setHeader("test", String.valueOf(i)).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON);builder.setHeader(RocketMQConst.USER_TRANSACTIONAL_ARGS, "binder");Message<SimpleMsg> msg = builder.build();streamBridge.send("producer-out-0", msg);log.info("send Msg:" + msg.toString());}
}

2 事务消息监听

package cn.itbeien.ecard.tx;import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;import org.springframework.stereotype.Component;/*** @author beien* @date 2024-06-20 22:09* Copyright© 2024 beien* 执行本地事务和事务状态检查*/
@Component("ecardTransactionListener")
@Slf4j
public class TransactionListenerImpl implements TransactionListener {/*** @param msg messages* @param arg message args* @return Transaction state* 执行本地事务方法 对应事务消息执行流程中的第3步*/@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {Object num = msg.getProperty("test");if ("1".equals(num)) {log.info("executer: " + new String(msg.getBody()) + " unknown");return LocalTransactionState.UNKNOW;}else if ("2".equals(num)) {log.info("executer: " + new String(msg.getBody()) + " rollback");return LocalTransactionState.ROLLBACK_MESSAGE;}log.info("executer: " + new String(msg.getBody()) + " commit");return LocalTransactionState.COMMIT_MESSAGE;}/*** @param msg messages* @return Transaction state* 检查本地事务方法,根据事务执行结果返回事务状态*/@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {log.info("check: " + new String(msg.getBody()));return LocalTransactionState.COMMIT_MESSAGE;}}

3 事务消息订阅(消费)者

package cn.itbeien.pay.tx;import java.util.function.Consumer;import cn.itbeien.ecard.common.SimpleMsg;
import com.alibaba.cloud.stream.binder.rocketmq.constant.RocketMQConst;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.MimeTypeUtils;/*** @author beien* @date 2024-06-20 22:20* Copyright© 2024 beien* 该部分代码为会员积分子系统、短信子系统用来处理预付卡系统账户扣款状态的方法*/
@Service
@Slf4j
public class PayConsumerService {@Beanpublic Consumer<Message<SimpleMsg>> consumer() {/***会员积分子系统、短信子系统根据订阅自动执行此业务逻辑* 根据消费金额记录积分,下发短信通知用户预付卡余额变更或下发消息到小程序*/return msg -> {Object arg = msg.getHeaders();log.info(Thread.currentThread().getName() + " Receive New Messages: " + msg.getPayload().getMsg() + " ARG:"+ arg.toString());};}
}

5 关注我

欢迎关注我的视频号和公众号,视频号有相关技术和业务视频可学习支付业务/文旅行业数字化。探讨技术(系统架构、微服务、容器化、云原生、分布式事务),支付系统实战。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/31523.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

光伏、储能一体化监控及运维解决方案

前言&#xff1a;今年以来&#xff0c;在政策利好推动下光伏、风力发电、电化学储能及抽水蓄能等新能源行业发展迅速&#xff0c;装机容量均大幅度增长&#xff0c;新能源发电已经成为新型电力系统重要的组成部分&#xff0c;同时这也导致新型电力系统比传统的电力系统更为复杂…

某联参考答案

<!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title></title> </head> <body> <form action""><h2>智能招聘注册</h2><table width1000 align"cen…

SQLITE日期类型问题:该字符串未被识别为有效的 DATETIME

使用SQLite抛出异常&#xff1a; 该字符串未被识别为有效的 DateTime 错误 解决方法&#xff1a; 1. 查询的列createdate的数据类型为date 1&#xff09;在SQLite数据库管理器SQLiteStudio能正常查询到。 2&#xff09;在.net中&#xff0c;就会抛出异常&#xff1a;该字符串未…

如何通过AI进行智能日志异常检测

智能日志异常检测是一种利用人工智能&#xff08;AI&#xff09;技术来自动识别日志数据中异常模式或行为的方法。传统日志监控依赖于预定义规则&#xff0c;而智能日志异常检测可以适应不同的日志模式和异常类型&#xff0c;提高检测准确性和效率。下面是一个完整的步骤指南&a…

想要成为程序员,首先你需要掌握这这三种编程语言!

作为程序员&#xff0c;掌握多种编程语言是非常有价值的&#xff0c;因为不同的编程语言有不同的优势和适用场景。然而&#xff0c;要指定“必须掌握”的三种编程语言是相当主观的&#xff0c;因为这取决于个人的职业目标、所在行业的需求以及技术趋势。不过&#xff0c;以下三…

【深海王国】小学生都能做的APP?AppInventor、BLE蓝牙、Arduino联合开发你的第一个手机远程控制程序(7)

Hi~ (o^^o)♪, 各位深海王国的同志们&#xff0c;早上下午晚上凌晨好呀~ 辛勤工作的你今天也辛苦啦(/≧ω) 今天大都督依旧为大家带来小学生都能学会的APP制作教程&#xff0c;帮你一周内快速开发一款可以和单片机无线通讯的手机蓝牙APP&#xff0c;let’s go&#xff01; &a…

error: the type ‘const zjloc::<lambda(const Vec2i, const Vec2i)>’

catkin_make 编译时遇到这个问题&#xff1a; /home/robot/ct_lio/src/ct-lio/src/common/eigen_types.h:114:20: error: the type ‘const zjloc::<lambda(const Vec2i&, const Vec2i&)>’ of ‘constexpr’ variable ‘zjloc::less_vec2i’ is not literal 114…

AI落地不容乐观-从神话到现实

开篇 在这儿我不是给大家泼冷水&#xff0c;而是我们一起来看一下从2022年11月左右GPT3.0掀起了一股“AI狂潮”后到现在&#xff0c;AI在商用、工业、军用下到底有没有得到了大规模应用呢&#xff1f; 这个答案每一个参与者其实心里有数那就是&#xff1a;没有。 但是呢它的…

OrangePi连接Wi-Fi步骤

下面介绍的是用终端命令行的方式配置WIFI&#xff1a; 首先输入以下命令用于扫描并查看周围的WiFi热点。也可以直接连接。 nmcli dev wifi之后会在终端打出周围所有可以连接的WiFi&#xff0c;按方向键上下可以查看显示更多&#xff0c;按q键退出。 然后同样使用nmcli命令连接…

提升教学效率的全方位解决方案

在现代教育环境中&#xff0c;教学管理的复杂性与日俱增。如何高效管理教学活动、优化教师资源、提升教学质量&#xff0c;是每个教育机构面临的重要挑战。搭贝教务教学管理系统提供了一套全面的解决方案&#xff0c;涵盖了巡检、调课代课、生源登记、监考、外派、作业发布、听…

机器学习(V)--无监督学习(六)流形学习

title: 机器学习(V)–无监督学习(二)流形学习 date: katex: true categories: Artificial IntelligenceMachine Learning tags:机器学习 cover: /img/ML-unsupervised-learning.png top_img: /img/artificial-intelligence.jpg abbrlink: 26cd5aa6 description: 流形学习 【降…

L54--- 404.左叶子之和(深搜)---Java版

1.题目描述 2.思路 递归遍历左子树 &#xff0c;然后再把左子树的和相加 3.代码实现 /*** Definition for a binary tree node.* public class TreeNode {* int val;* TreeNode left;* TreeNode right;* TreeNode() {}* TreeNode(int val) { this.val …

代码随想录算法训练营第四十五天|LeetCode337 打家劫舍Ⅲ

题1&#xff1a; 指路&#xff1a;337. 打家劫舍 III - 力扣&#xff08;LeetCode&#xff09; 思路与代码&#xff1a; 方法一&#xff1a;暴力 暴力解法&#xff1a;我们讨论根结点处的偷取形式。此时情况分为两种&#xff1a;考虑偷取根节点&#xff0c;和不考虑偷取根节…

深度学习调参笔记

就是因为增加了下面这个代码片段&#xff0c;使得训练过程耗时增加了3倍。。。 def set_seed(seed):random.seed(seed)np.random.seed(seed)torch.manual_seed(seed)torch.cuda.manual_seed(seed)torch.cuda.manual_seed_all(seed)cudnn.benchmark Falsecudnn.deterministic …

【Git】--Part4--多人协作

在之前的Git博客中&#xff0c;已经把Git本地相关的操作以及远程操作的介绍完了。如下&#xff1a; Git–Part1–基础操作 - 掘金 (juejin.cn)Git–Part2–分支管理 - 掘金 (juejin.cn)Git–Part3–远程操作 & 配置 & 标签管理 - 掘金 (juejin.cn) 这篇文章会介绍两种…

iptables(5)常用扩展模块

简介 之前我们已经介绍过扩展模块的简单使用,比如使用-m tcp/udp ,-m multiport参数通过--dports,--sports可以设置连续和非连续的端口范围。那么我们如何匹配其他的一些参数呢,比如源地址范围,目的地址范围,时间范围等,这就是我们这篇文章介绍的内容。 iprange扩展模块…

Eclipse使用TFS(Team Foundation Server) 超详细

Eclipse使用TFS 1、什么是TFS2、TFS和Git的区别3、签出代码4、签入代码4.1、签出以进行编辑4.2、修改本地代码4.3、签入挂起的更改4.4、签入 如果不能 签入挂起的更改&#xff0c;则先 签出以进行编辑如果 签入挂起的更改不可选中&#xff0c;则 如下操作 1、什么是TFS Team F…

thinkphp5模型的高级应用

ThinkPHP5 是一个基于 PHP 的轻量级框架&#xff0c;它提供了许多便利的功能来简化 Web 开发。在 ThinkPHP5 中&#xff0c;模型&#xff08;Model&#xff09;是 MVC&#xff08;Model-View-Controller&#xff09;架构中的重要组成部分&#xff0c;负责处理数据逻辑。以下是一…

音视频开发—FFmpeg 打开摄像头进行RTMP推流

实验平台&#xff1a;Ubuntu20.04 摄像头&#xff1a;普通USB摄像头&#xff0c;输出格式为YUV422 1.配置RTMP服务器推流平台 使用Nginx 配置1935端口即可&#xff0c;贴上教程地址 ubuntu20.04搭建Nginxrtmp服务器) 2.配置FFmpeg开发环境 过程较为简单&#xff0c;这里不…

决战技术管理转型:决策之道-管理中的智慧与策略

文章目录 引言一、决策的重要性二、常见的决策方式1. 理性决策&#xff08;Rational Decision Making&#xff09;2. 有限理性&#xff08;Bounded Rationality&#xff09;3. 直觉决策&#xff08;Intuitive Decision Making&#xff09;4. 循证管理&#xff08;Evidence-Base…