RocketMQ学习笔记:分布式事务

这是本人学习的总结,主要学习资料如下

  • 马士兵教育
  • rocketMq官方文档

目录

  • 1、分布式事务的难题
  • 2、解决方式
    • 2.1、半事务消息和事务回查
    • 2.2、代码样例
      • 2.2.1、TransactionListener
      • 2.2.2、TransactionMQProducer
      • 2.2.3、MessageListenerConcurrently
      • 2.2.4、流程图


1、分布式事务的难题

现有两个系统,A向B转钱。A系统扣钱和B系统加钱就应该属于同一个事务,任何一个失败都要回滚。两个系统之间唯一的通信方式就是RocketMQ
请添加图片描述

以最朴素的想法,现在就有两个实现分布式事务的方案。但这两个都有比较大的不可靠性。

  • A系统先扣钱再发送MQ:这样的弊端是无法确定消息有没有发送到MQ,或者消息有没有被MQ保存。总之这做法缺少一些回查的机制。
  • A系统先发送MQ再扣钱:这样的弊端是发送消息后,A系统可能出现错误回滚。而B收到了消息就正常消费,完全不知道A那边出了问题。

2、解决方式

2.1、半事务消息和事务回查

  • 半事务消息:半事务消息是指向RocketMQ发送一条消息,但这个消息只存放在CommitLog中,并不在ConsumeQueue展示。也就是说该消息被RocketMQ接收了,但是消费者却无法消费到这条消息。
  • 事务回查:在半事务消息发送成功后。A系统执行事务,如果成功则MQ将消息变成正常消息,失败则不发送消息。这里如果业务太复杂还不能确定事务是否完成的话,还可以发送UNKNOWN给MQ,这样MQ就会有定时器去检查事务是否完成。
    RocketMQ会向生产者询问是否可以把半事务变成正常的消息让消费者可以消费到。在这篇文章的例子就是询问A系统扣款有没有扣成功。如果成功了那就让B系统消费消息。

请添加图片描述

所以呢,通过半事务消息事务回查就能保证A系统和发送消息具有事务,即扣款失败则不发送消息,扣款成功则发送消息。所以半事务消息至少保证了生产者和MQ之间的原子性。MQ和消费者之间的原子性需要另外处理。

消费者需要保证幂等性,失败后重试,即使称为死信后也特殊处理等操作来保证事务。这个例子中B系统成功加钱的话那交易结束,如果尝试多次后还是失败,那就需要一个机制来通知A系统,让他把扣掉的钱加回去。

2.2、代码样例

2.2.1、TransactionListener

一个接口规范,我们需要实现这个接口来定义本地事务和事务回查。

就是本地事务具体执行,成功后怎么办,失败了怎么办。定时的事务回查如何检查事务有没有完成。这些东西都要定义在TransactionListener的实现中。


TransactionListener transactionListener = new TransactionListener() {@Overridepublic LocalTransactionState executeLocalTransaction(Message message, Object o) {// 执行本地事务,A扣100块// 如果成功// return LocalTransactionState.COMMIT_MESSAGE;// 如果失败// return LocalTransactionState.ROLLBACK_MESSAGE;//或者业务比较复杂,不想在这个阶段就关闭事务,可以返回Unknown,之后就需要MQ定时事务回查return LocalTransactionState.UNKNOW;}@Override// 事务回查,默认一分钟一次public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {System.out.println("事务回查, " + new SimpleDateFormat("yyyyMMdd, HH:mm:ss").format(new Date()));// 如果成功// return LocalTransactionState.COMMIT_MESSAGE;// 如果失败// return LocalTransactionState.ROLLBACK_MESSAGE;// 业务比较长,还不确定成功或失败,返回unknown,下次再查return LocalTransactionState.UNKNOW;}};

2.2.2、TransactionMQProducer

半事务消息的生产者,在DefaultMQProducer的基础上新增了一个重要的参数,类型是ExecutorService。这个线程池是用来生产线程去完成事务回查。

但是事务回查的逻辑不需要定义在线程的run()方法中,这一部分放在TransactionListener中。

 TransactionMQProducer producer = new TransactionMQProducer("transaction_producer");producer.setNamesrvAddr("localhost:9876");// build a thread pool used to for MQ to call back to check transactionExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.MINUTES, new ArrayBlockingQueue<>(10), (r) -> {Thread thread = new Thread(r);thread.setName("client-transaction-msg-check-thread");return thread;});producer.setExecutorService(executorService);producer.setTransactionListener(transactionListener);producer.start();try{Message msg = new Message("transaction_producer", null, "A give B 100 dollar".getBytes());SendResult sendResult = producer.sendMessageInTransaction(msg, null);}catch(Exception e) {// rollbackSystem.out.println("rollback");}producer.shutdown();

2.2.3、MessageListenerConcurrently

消费者部分就比较简单,只要listener是MessageListenerConcurrently就好。

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction_consumer");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TransactionalTopic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {try{for(MessageExt msg: list) {// simulate DB actionSystem.out.println("update B where transactionId" + msg.getTransactionId());System.out.println("Success consume msg: " + msg.getMsgId());}} catch (Exception e) {e.printStackTrace();System.out.println("Failed to consume meg, try more times");// means that failed to consume this msg. In next time will still consume this msg.return ConsumeConcurrentlyStatus.RECONSUME_LATER;}// means that success to consume this msg. In the next time will consume next msg.return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
});consumer.start();
while(true){
}

2.2.4、流程图

请添加图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/773544.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++判断点是否在三角形内部

1.问题 判断点是否在三角形内部。 2.思路 计算向量AB和AP的叉积、向量BC和BP的叉积、向量CA和CP的叉积&#xff0c;如果所有的叉积符号相同&#xff0c;则点在三角形内部。 3.代码实现和注释 #include <iostream> #include <vector>// 计算两个二维向量的叉积 …

数据结构-队列-005

1链式队列 运行结果如下&#xff1a; 1.1链式队列结点定义 /*自定义一个数据类型*/ typedef struct student {char name[32];char sex;int age; }DATA_TYPE;/*定义一个链式队列结点*/ typedef struct link_queue_node {DATA_TYPE data;//数据域struct link_queue_node *pne…

图像识别中的特征提取技术

图像识别是计算机视觉领域的一个重要分支&#xff0c;它的基本任务是从图像中提取出有助于分类或识别的信息&#xff0c;这些信息通常称为“特征”。特征提取是图像识别中的关键技术之一&#xff0c;它决定了识别系统性能的好坏。以下是几种常见的特征提取技术&#xff1a; 边…

什么是甲状腺相关眼病,四川眼科医院院长孙丰源教授这么说!

近年来&#xff0c;随着人们健康意识的逐渐增强&#xff0c;越来越多人开始关注甲状腺疾病。甲状腺是人体最大的内分泌腺&#xff0c;是维护人体健康的关键&#xff0c;它一旦发生异常&#xff0c;则会危害到多个器官和组织。不同的甲状腺疾病会呈现不同的症状&#xff0c;比如…

C# 快速将数据写入 Excel 单元格

目录 性能问题 Excel元素结构及写入原理 范例运行环境 配置Office DCOM 实现代码 组件库引入 核心代码 WriteArrayToExcel 神奇的 911 事件 小结 性能问题 将生成或查询到的数据&#xff0c;导出到 Excel 是应用中常用的一项功能。其中一些标准的写入单元格的方法如…

数据库学习案例20240326-mysql主从复制对trigger,event是否会导致数据重复测试

1 MASTER -SLAVE TRRGER测试 binlog_formatROW 测试环境为master-master双主模式&#xff0c;配置的双向复制。 11:25: [(none)]> show variables like %binlog_format%; ---------------------- | Variable_name | Value | ---------------------- | binlog_format | RO…

智慧公厕,让数据和技术更好服务社会生活

智慧公厕&#xff0c;作为智慧城市建设中不可忽视的一部分&#xff0c;正逐渐受到越来越多人的关注。随着科技的不断进步&#xff0c;智能化公厕已经成为一种趋势&#xff0c;通过数据的流转和技术的整合&#xff0c;为社会生活带来了更好的服务。本文以智慧公厕源头实力厂家广…

基于51单片机的酒精检测警报系统Proteus仿真

地址&#xff1a;https://pan.baidu.com/s/1gddplAxS_ZKyrHaWE93dog 提取码&#xff1a;1234 仿真图&#xff1a; 芯片/模块的特点&#xff1a; AT89C52简介&#xff1a; AT89C52是一款经典的8位单片机&#xff0c;是意法半导体&#xff08;STMicroelectronics&#xff09;公…

最新的Flutter3.x版本获取应用包名的方法

以前的flutter项目可以在 AndroidManifest.xml 中获取应用包名&#xff0c; 最新的Flutter3.x版本要获取应用包名可以找到build.gradle 更多内容参考&#xff1a;最新的Flutter3.x版本如何获取应用包名

Linux:Jenkins全自动持续集成持续部署(4)

在上一章部署好了之后&#xff0c;还需要点击一下才能进行部署&#xff0c;本章的效果是&#xff1a;当gitlab上的代码发生了变化后&#xff0c;我们不需要做任何事情不需要去点击构建按钮&#xff0c;Jenkins直接自动检测变化&#xff0c;然后自动去集成部署Linux&#xff1a;…

(一)手把手教你如何通过ARM DesignStart计划在FPGA上搭建一个Cortex-M3软核

&#xff08;一&#xff09;手把手教你如何通过ARM DesignStart计划在FPGA上搭建一个Cortex-M3软核 一、ARM DesignStart计划 1.1 如何下载ARM DesignStart Cortex-M3相关文件 ​ 关于ARM DesignStart计划的介绍:ARM DesignStart计划——私人定制一颗ARM处理器 - 知乎 (zhih…

go中validate包使用教程

文章目录 前言安装简单使用错误处理翻译器Validator库介绍校验语法常用标记自定义校验需求【校验车身颜色】前言 在go项目中,经常有校验数据合法性的需求,比如邮箱、年龄、车牌号、网址、字符串长度、金额、枚举范围等。一个好的校验包能帮我们少写很多ifelse,提高系统的可…

docker在线安装centos7(windows版)

目录 1、docker本地安装2、拉取centos7镜像3、启动容器4、配置SSH以访问centos7 1、docker本地安装 windows安装docker比较简单&#xff0c;官网搜索有个docker desktop装上就完事。 2、拉取centos7镜像 可以登录到docker hub上拉&#xff0c;也可以搜出来对应的centos7镜像…

sqlite删除数据表

1.如何删除表 在SQLite中&#xff0c;删除表的SQL语句是DROP TABLE。如果你想要在Python中使用SQLite库&#xff08;如sqlite3&#xff09;来删除一个表&#xff0c;你可以按照以下步骤操作&#xff1a; 连接到SQLite数据库。创建一个cursor对象。执行DROP TABLE语句。提交事…

Kindling the Darkness:A Practical Low-light Image Enhancer

Abstract 在弱光条件下拍摄的图像通常会出现&#xff08;部分&#xff09;可见度较差的情况。,除了令人不满意的照明之外&#xff0c;多种类型的退化也隐藏在黑暗中&#xff0c;例如由于相机质量有限而导致的噪点和颜色失真。,换句话说&#xff0c;仅仅调高黑暗区域的亮度将不…

蓝桥杯练习系统(算法训练)ALGO-966 自行车停放

资源限制 内存限制&#xff1a;256.0MB C/C时间限制&#xff1a;1.0s Java时间限制&#xff1a;3.0s Python时间限制&#xff1a;5.0s 问题描述 有n辆自行车依次来到停车棚&#xff0c;除了第一辆自行车外&#xff0c;每辆自行车都会恰好停放在已经在停车棚里的某辆自行…

对标开源3D建模软件blender,基于web提供元宇宙3D建模能力的dtns.network德塔世界是否更胜一筹?

对标开源3D建模软件blender&#xff0c;基于web提供元宇宙3D建模能力的dtns.network德塔世界是否更胜一筹&#xff1f; blender是一款优秀的3D建模开源软件&#xff0c;拥有免费开源、功能强大、渲染速度优秀的优点。而开源的dtns.network德塔世界&#xff0c;亦是专业级的元宇…

3D程序中保证交互流畅性的常用技巧-备忘

opengl绘制一帧会绘制场景中会有多个模型&#xff0c;随着模型三角面片数量增加&#xff0c;GPU绘制一帧就会变慢&#xff0c;这将不能快速响应鼠标消息&#xff0c;例如鼠标旋转缩放视图&#xff0c;所以出现了很多牺退而求其次的交互技术&#xff0c;例如LOD技术&#xff0c;…

spring boot项目对接阿里云的RocketMq5

要在Spring Boot项目中实现一个通用的消息消费服务&#xff0c;可以将前面的概念整合并利用Spring的依赖注入特性来创建一个更灵活、可配置的服务。下面是如何创建这样的服务&#xff0c;包括通过application.properties来配置连接信息&#xff0c;以及使用Service注解定义消费…

【手写AI代码目录】准备发布的教程

文章目录 1. tensorboard2. F.cross_entropy(input_tensor, target) F.log_softmax() F.nll_loss() 1. tensorboard from torch.utils.tensorboard import SummaryWriter# TensorBoard writer SummaryWriter(runs/mnist_experiment_1) ...if i % 100 99: # 每 100 个 b…