RocketMQ学习笔记:分布式事务

这是本人学习的总结,主要学习资料如下

  • 马士兵教育
  • rocketMq官方文档

目录

  • 1、分布式事务的难题
  • 2、解决方式
    • 2.1、半事务消息和事务回查
    • 2.2、代码样例
      • 2.2.1、TransactionListener
      • 2.2.2、TransactionMQProducer
      • 2.2.3、MessageListenerConcurrently
      • 2.2.4、流程图


1、分布式事务的难题

现有两个系统,A向B转钱。A系统扣钱和B系统加钱就应该属于同一个事务,任何一个失败都要回滚。两个系统之间唯一的通信方式就是RocketMQ
请添加图片描述

以最朴素的想法,现在就有两个实现分布式事务的方案。但这两个都有比较大的不可靠性。

  • A系统先扣钱再发送MQ:这样的弊端是无法确定消息有没有发送到MQ,或者消息有没有被MQ保存。总之这做法缺少一些回查的机制。
  • A系统先发送MQ再扣钱:这样的弊端是发送消息后,A系统可能出现错误回滚。而B收到了消息就正常消费,完全不知道A那边出了问题。

2、解决方式

2.1、半事务消息和事务回查

  • 半事务消息:半事务消息是指向RocketMQ发送一条消息,但这个消息只存放在CommitLog中,并不在ConsumeQueue展示。也就是说该消息被RocketMQ接收了,但是消费者却无法消费到这条消息。
  • 事务回查:在半事务消息发送成功后。A系统执行事务,如果成功则MQ将消息变成正常消息,失败则不发送消息。这里如果业务太复杂还不能确定事务是否完成的话,还可以发送UNKNOWN给MQ,这样MQ就会有定时器去检查事务是否完成。
    RocketMQ会向生产者询问是否可以把半事务变成正常的消息让消费者可以消费到。在这篇文章的例子就是询问A系统扣款有没有扣成功。如果成功了那就让B系统消费消息。

请添加图片描述

所以呢,通过半事务消息事务回查就能保证A系统和发送消息具有事务,即扣款失败则不发送消息,扣款成功则发送消息。所以半事务消息至少保证了生产者和MQ之间的原子性。MQ和消费者之间的原子性需要另外处理。

消费者需要保证幂等性,失败后重试,即使称为死信后也特殊处理等操作来保证事务。这个例子中B系统成功加钱的话那交易结束,如果尝试多次后还是失败,那就需要一个机制来通知A系统,让他把扣掉的钱加回去。

2.2、代码样例

2.2.1、TransactionListener

一个接口规范,我们需要实现这个接口来定义本地事务和事务回查。

就是本地事务具体执行,成功后怎么办,失败了怎么办。定时的事务回查如何检查事务有没有完成。这些东西都要定义在TransactionListener的实现中。


TransactionListener transactionListener = new TransactionListener() {@Overridepublic LocalTransactionState executeLocalTransaction(Message message, Object o) {// 执行本地事务,A扣100块// 如果成功// return LocalTransactionState.COMMIT_MESSAGE;// 如果失败// return LocalTransactionState.ROLLBACK_MESSAGE;//或者业务比较复杂,不想在这个阶段就关闭事务,可以返回Unknown,之后就需要MQ定时事务回查return LocalTransactionState.UNKNOW;}@Override// 事务回查,默认一分钟一次public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {System.out.println("事务回查, " + new SimpleDateFormat("yyyyMMdd, HH:mm:ss").format(new Date()));// 如果成功// return LocalTransactionState.COMMIT_MESSAGE;// 如果失败// return LocalTransactionState.ROLLBACK_MESSAGE;// 业务比较长,还不确定成功或失败,返回unknown,下次再查return LocalTransactionState.UNKNOW;}};

2.2.2、TransactionMQProducer

半事务消息的生产者,在DefaultMQProducer的基础上新增了一个重要的参数,类型是ExecutorService。这个线程池是用来生产线程去完成事务回查。

但是事务回查的逻辑不需要定义在线程的run()方法中,这一部分放在TransactionListener中。

 TransactionMQProducer producer = new TransactionMQProducer("transaction_producer");producer.setNamesrvAddr("localhost:9876");// build a thread pool used to for MQ to call back to check transactionExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.MINUTES, new ArrayBlockingQueue<>(10), (r) -> {Thread thread = new Thread(r);thread.setName("client-transaction-msg-check-thread");return thread;});producer.setExecutorService(executorService);producer.setTransactionListener(transactionListener);producer.start();try{Message msg = new Message("transaction_producer", null, "A give B 100 dollar".getBytes());SendResult sendResult = producer.sendMessageInTransaction(msg, null);}catch(Exception e) {// rollbackSystem.out.println("rollback");}producer.shutdown();

2.2.3、MessageListenerConcurrently

消费者部分就比较简单,只要listener是MessageListenerConcurrently就好。

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction_consumer");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TransactionalTopic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {try{for(MessageExt msg: list) {// simulate DB actionSystem.out.println("update B where transactionId" + msg.getTransactionId());System.out.println("Success consume msg: " + msg.getMsgId());}} catch (Exception e) {e.printStackTrace();System.out.println("Failed to consume meg, try more times");// means that failed to consume this msg. In next time will still consume this msg.return ConsumeConcurrentlyStatus.RECONSUME_LATER;}// means that success to consume this msg. In the next time will consume next msg.return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
});consumer.start();
while(true){
}

2.2.4、流程图

请添加图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/773544.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++判断点是否在三角形内部

1.问题 判断点是否在三角形内部。 2.思路 计算向量AB和AP的叉积、向量BC和BP的叉积、向量CA和CP的叉积&#xff0c;如果所有的叉积符号相同&#xff0c;则点在三角形内部。 3.代码实现和注释 #include <iostream> #include <vector>// 计算两个二维向量的叉积 …

数据结构-队列-005

1链式队列 运行结果如下&#xff1a; 1.1链式队列结点定义 /*自定义一个数据类型*/ typedef struct student {char name[32];char sex;int age; }DATA_TYPE;/*定义一个链式队列结点*/ typedef struct link_queue_node {DATA_TYPE data;//数据域struct link_queue_node *pne…

什么是甲状腺相关眼病,四川眼科医院院长孙丰源教授这么说!

近年来&#xff0c;随着人们健康意识的逐渐增强&#xff0c;越来越多人开始关注甲状腺疾病。甲状腺是人体最大的内分泌腺&#xff0c;是维护人体健康的关键&#xff0c;它一旦发生异常&#xff0c;则会危害到多个器官和组织。不同的甲状腺疾病会呈现不同的症状&#xff0c;比如…

C# 快速将数据写入 Excel 单元格

目录 性能问题 Excel元素结构及写入原理 范例运行环境 配置Office DCOM 实现代码 组件库引入 核心代码 WriteArrayToExcel 神奇的 911 事件 小结 性能问题 将生成或查询到的数据&#xff0c;导出到 Excel 是应用中常用的一项功能。其中一些标准的写入单元格的方法如…

智慧公厕,让数据和技术更好服务社会生活

智慧公厕&#xff0c;作为智慧城市建设中不可忽视的一部分&#xff0c;正逐渐受到越来越多人的关注。随着科技的不断进步&#xff0c;智能化公厕已经成为一种趋势&#xff0c;通过数据的流转和技术的整合&#xff0c;为社会生活带来了更好的服务。本文以智慧公厕源头实力厂家广…

基于51单片机的酒精检测警报系统Proteus仿真

地址&#xff1a;https://pan.baidu.com/s/1gddplAxS_ZKyrHaWE93dog 提取码&#xff1a;1234 仿真图&#xff1a; 芯片/模块的特点&#xff1a; AT89C52简介&#xff1a; AT89C52是一款经典的8位单片机&#xff0c;是意法半导体&#xff08;STMicroelectronics&#xff09;公…

最新的Flutter3.x版本获取应用包名的方法

以前的flutter项目可以在 AndroidManifest.xml 中获取应用包名&#xff0c; 最新的Flutter3.x版本要获取应用包名可以找到build.gradle 更多内容参考&#xff1a;最新的Flutter3.x版本如何获取应用包名

Linux:Jenkins全自动持续集成持续部署(4)

在上一章部署好了之后&#xff0c;还需要点击一下才能进行部署&#xff0c;本章的效果是&#xff1a;当gitlab上的代码发生了变化后&#xff0c;我们不需要做任何事情不需要去点击构建按钮&#xff0c;Jenkins直接自动检测变化&#xff0c;然后自动去集成部署Linux&#xff1a;…

(一)手把手教你如何通过ARM DesignStart计划在FPGA上搭建一个Cortex-M3软核

&#xff08;一&#xff09;手把手教你如何通过ARM DesignStart计划在FPGA上搭建一个Cortex-M3软核 一、ARM DesignStart计划 1.1 如何下载ARM DesignStart Cortex-M3相关文件 ​ 关于ARM DesignStart计划的介绍:ARM DesignStart计划——私人定制一颗ARM处理器 - 知乎 (zhih…

docker在线安装centos7(windows版)

目录 1、docker本地安装2、拉取centos7镜像3、启动容器4、配置SSH以访问centos7 1、docker本地安装 windows安装docker比较简单&#xff0c;官网搜索有个docker desktop装上就完事。 2、拉取centos7镜像 可以登录到docker hub上拉&#xff0c;也可以搜出来对应的centos7镜像…

Kindling the Darkness:A Practical Low-light Image Enhancer

Abstract 在弱光条件下拍摄的图像通常会出现&#xff08;部分&#xff09;可见度较差的情况。,除了令人不满意的照明之外&#xff0c;多种类型的退化也隐藏在黑暗中&#xff0c;例如由于相机质量有限而导致的噪点和颜色失真。,换句话说&#xff0c;仅仅调高黑暗区域的亮度将不…

蓝桥杯练习系统(算法训练)ALGO-966 自行车停放

资源限制 内存限制&#xff1a;256.0MB C/C时间限制&#xff1a;1.0s Java时间限制&#xff1a;3.0s Python时间限制&#xff1a;5.0s 问题描述 有n辆自行车依次来到停车棚&#xff0c;除了第一辆自行车外&#xff0c;每辆自行车都会恰好停放在已经在停车棚里的某辆自行…

对标开源3D建模软件blender,基于web提供元宇宙3D建模能力的dtns.network德塔世界是否更胜一筹?

对标开源3D建模软件blender&#xff0c;基于web提供元宇宙3D建模能力的dtns.network德塔世界是否更胜一筹&#xff1f; blender是一款优秀的3D建模开源软件&#xff0c;拥有免费开源、功能强大、渲染速度优秀的优点。而开源的dtns.network德塔世界&#xff0c;亦是专业级的元宇…

FPGA之组合逻辑与时序逻辑

数字逻辑电路根据逻辑功能的不同&#xff0c;可以分成两大类&#xff1a;组合逻辑电路和时序逻辑电路&#xff0c;这两种电路结构是FPGA编程常用到的&#xff0c;掌握这两种电路结构是学习FPGA的基本要求。 1.组合逻辑电路 组合逻辑电路概念&#xff1a;任意时刻的输出仅仅取决…

2024年福建事业单位招聘详细流程

2024年福建事业单位招聘详细流程&#xff0c;速速查收&#xff01;

使用Nginx1.25.4版本做负载均衡、搭建Nacos2.3.0服务集群

关于使用版本问题上&#xff0c;其实小白更喜欢使用新的版本&#xff0c;因为新的版本功能更多&#xff0c;肯定优化方面不言而喻&#xff0c;懂得都懂&#xff0c;但是新的版本&#xff0c;肯定使用起来更加的速度&#xff0c;性能&#xff0c;也是不言而喻的啊&#xff0c;那…

US-T65 DM蓝牙5.2双模热插拔PCB

键盘使用说明索引&#xff08;均为出厂默认值&#xff09; 一些常见问题解答&#xff08;FAQ&#xff09;注意首次使用步骤蓝牙配对&#xff08;重要&#xff09;蓝牙和USB切换键盘默认层默认触发层0的FN键配置的功能默认功能层1配置的功能默认的快捷键 蓝牙参数蓝牙MAC地址管理…

求助:配置脚手架代理,跨域问题proxyTable配置无效,访问后显示404?

已经在这里卡了一天了。找了很多解决办法&#xff0c;比如重启&#xff0c;修改proxytable等等&#xff0c;但是每次但是404&#xff0c;求助各位大佬&#xff0c;怎么解决&#xff1f; 1、代码 &#xff08;1&#xff09;config的index.js &#xff08;2&#xff09; App.v…

python爬虫-----输入输出与流程控制语句(第四天)

&#x1f388;&#x1f388;作者主页&#xff1a; 喔的嘛呀&#x1f388;&#x1f388; &#x1f388;&#x1f388;所属专栏&#xff1a;python爬虫学习&#x1f388;&#x1f388; ✨✨谢谢大家捧场&#xff0c;祝屏幕前的小伙伴们每天都有好运相伴左右&#xff0c;一定要天天…

.NET6.0智慧医院手术室麻醉管理平台源码

目录 一、手麻系统概述 二、系统开发环境 三、手麻系统功能 手术进程 手术排班 手术记录 术前访视与评估 术中麻醉记录 麻醉总结 术后访视 模版配置 自动评分 文书模板 ​​​​​​​ 一、手麻系统概述 手术麻醉临床信息系统有着完善的临床业务功能&#xff0c;…