如何使用rocketmq实现分布式事务?

什么是rocketmq事务消息

事务消息是 Apache RocketMQ 提供的一种高级消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性。

RocketMQ的分布式事务又称为“半消息事务”。

事务消息处理流程

RocketMQ是靠半消息机制实现分布式事务

事务消息:MQ 提供类似 X/Open XA 的分布事务功能,通过 MQ 事务消息能达到分布式事务的最终一致。

半消息:暂不能投递的消息,发送方已经将消息成功发送到了 MQ 服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半消息。

半消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,MQ 服务端通过扫描发现某条消息长期处于“半消息”时,需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该过程即消息回查。

事务消息交互流程如下图所示。

图片

1. 生产者将消息发送至Apache RocketMQ服务端。

2. Apache RocketMQ服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息。

3. 生产者开始执行本地事务逻辑。

4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:

• 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。

• 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。

5. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。 说明 服务端回查的间隔时间和最大回查次数,请参见参数限制。

6. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。

7. 生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。

事务消息生命周期

图片

事务消息

• 初始化:半事务消息被生产者构建并完成初始化,待发送到服务端的状态。

• 事务待提交:半事务消息被发送到服务端,和普通消息不同,并不会直接被服务端持久化,而是会被单独存储到事务存储系统中,等待第二阶段本地事务返回执行结果后再提交。此时消息对下游消费者不可见。

• 消息回滚:第二阶段如果事务执行结果明确为回滚,服务端会将半事务消息回滚,该事务消息流程终止。

• 提交待消费:第二阶段如果事务执行结果明确为提交,服务端会将半事务消息重新存储到普通存储系统中,此时消息对下游消费者可见,等待被消费者获取并消费。

• 消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,Apache RocketMQ会对消息进行重试处理。

• 消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。Apache RocketMQ默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。

• 消息删除:Apache RocketMQ按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。

示例

下面是使用 RocketMQ 实现事务的一个例子:

生产者实现事务监听器:

首先,需要实现一个 RocketMQ 的事务监听器接口RocketMQLocalTransactionListener,这个接口定义了在发送和确认事务消息时的回调方法。您需要根据业务逻辑来实现这些方法。

executeLocalTransaction 方法:

这个方法在发送事务消息时被调用,用于执行本地事务。具体步骤如下:

1. 获取消息中的事务 ID。

2. 根据事务索引来模拟本地事务执行的状态。

3. 将事务状态放入 localTrans 映射中,以备后续 checkLocalTransaction 方法使用。

根据您的代码,executeLocalTransaction 方法中模拟了三种状态:

• 如果状态为 0,表示本地事务成功,返回 RocketMQLocalTransactionState.COMMIT,消息将被提交。

• 如果状态为 1,表示本地事务失败,返回 RocketMQLocalTransactionState.ROLLBACK,消息将被回滚。

• 如果状态为 2,表示本地事务状态未知,返回 RocketMQLocalTransactionState.UNKNOWN

checkLocalTransaction 方法:

这个方法在消息的确认状态时被调用,用于检查本地事务的状态。具体步骤如下:

  1. 获取消息中的事务 ID。

  2. 根据之前保存在 localTrans 映射中的事务状态,决定消息的提交、回滚或未知。

checkLocalTransaction 方法会根据之前在 executeLocalTransaction 方法中保存的状态来返回相应的事务状态。

@RocketMQTransactionListener  
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {  private AtomicInteger transactionIndex = new AtomicInteger(0);  private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<String, Integer>();  @Override  public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {  String transId = (String) msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);  System.out.printf("#### executeLocalTransaction is executed, msgTransactionId=%s %n",  transId);  int value = transactionIndex.getAndIncrement();  int status = value % 3;  localTrans.put(transId, status);  if (status == 0) {  // Return local transaction with success(commit), in this case,  // this message will not be checked in checkLocalTransaction()  System.out.printf(" # COMMIT # Simulating msg %s related local transaction exec succeeded! ### %n", msg.getPayload());  return RocketMQLocalTransactionState.COMMIT;  }  if (status == 1) {  // Return local transaction with failure(rollback) , in this case,  // this message will not be checked in checkLocalTransaction()  System.out.printf(" # ROLLBACK # Simulating %s related local transaction exec failed! %n", msg.getPayload());  return RocketMQLocalTransactionState.ROLLBACK;  }  System.out.printf(" # UNKNOW # Simulating %s related local transaction exec UNKNOWN! \n");  return RocketMQLocalTransactionState.UNKNOWN;  }  @Override  public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {  String transId = (String) msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);  RocketMQLocalTransactionState retState = RocketMQLocalTransactionState.COMMIT;  Integer status = localTrans.get(transId);  if (null != status) {  switch (status) {  case 0:  retState = RocketMQLocalTransactionState.COMMIT;  break;  case 1:  retState = RocketMQLocalTransactionState.ROLLBACK;  break;  case 2:  retState = RocketMQLocalTransactionState.UNKNOWN;  break;  }  }  System.out.printf("------ !!! checkLocalTransaction is executed once," +  " msgTransactionId=%s, TransactionState=%s status=%s %n",  transId, retState, status);  return retState;  }  
}

消费者

@Service  
@RocketMQMessageListener(topic = "${demo.rocketmq.transTopic}", consumerGroup = "string_trans_consumer")  
public class StringTransactionalConsumer implements RocketMQListener<String> {  @Override  public void onMessage(String message) {  System.out.printf("------- StringTransactionalConsumer received: %s \n", message);  }  
}

这些步骤基本上涵盖了使用 RocketMQ 实现事务的主要过程。可以根据具体的业务需求和环境进行调整和配置。

总结

使用半消息实现分布式事务在提供分布式事务支持和保证消息传递的原子性方面具有优势,但需要引入MQ并提供查询事务接口。在选择是否使用半消息实现分布式事务时,需要根据具体的业务需求和系统性能要求来进行权衡和选择。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/696789.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Spring之AOP源码解析(上)

Aop相关注解 EnableTransactionManagementEnableAspectJAutoProxyEnableAsync... 从注解切入来看看这些注解都干了什么 Import注解作用简述 注入的类一般继承ImportSelector或者ImportBeanDefinitionRegistrar接口 继承ImportSelector接口&#xff1a;selectImports方法返回…

pandas/geopandas 笔记:判断地点在不在路网上 不在路网的点和路网的距离

0 导入库 import osimport pandas as pd pd.set_option(display.max_rows,5)import osmnx as oximport geopandas as gpd from shapely.geometry import Point 1 读取数据 假设我们有 如下的数据&#xff1a; 1.1 新加坡室外基站位置数据 cell_stationpd.read_csv(outdoor…

TSINGSEE青犀AI智能分析网关V4初始配置与算法相关配置介绍

TSINGSEE青犀AI智能分析网关V4内置了近40种AI算法模型&#xff0c;支持对接入的视频图像进行人、车、物、行为等实时检测分析&#xff0c;上报识别结果&#xff0c;并能进行语音告警播放。硬件管理平台支持RTSP、GB28181协议、以及厂家私有协议接入&#xff0c;可兼容市面上常见…

linux下ffmpeg调用GPU硬件解码(VDPAU/VAAPI)保存文件

本文讲解在linux下面&#xff0c;如何通过ffmpeg调用GPU硬件解码&#xff0c;并保存解码完的yuv文件。 其实&#xff0c;ffmpeg自带的例子hw_decode.c这个文件&#xff0c;就已经能满足要求了&#xff0c;因此&#xff0c;本文就尝试讲解以下hw_decode这个例子。hw_decode.c可以…

watchpoint

前言 内存被踩&#xff0c;通过 watchpoint 找到真凶 实例 以 smsc911x 网卡驱动为基体&#xff0c;进行实验&#xff0c;和网卡本身功能无关&#xff0c; 每执行一次 ifconfig eth0 up&#xff0c;就会调用一次 smsc911x_open()&#xff0c;我在这里设计了一段代码&#xf…

数学知识(四)(容斥原理、博弈论)

一、容斥原理 容斥原理公式 一共加或者减的式子个数 &#xff08;一&#xff09;利用容斥原理解决求能被质数整除的数的个数 890计算能被整除的数的个数 因为一共有2^n-1种选法&#xff0c;可以用位运算的方式枚举&#xff0c;对于得到的每一种选法&#xff0c;根据存在的数…

六、回归与聚类算法 - 逻辑回归与二分类

线性回归欠拟合与过拟合线性回归的改进 - 岭回归分类算法&#xff1a;逻辑回归模型保存与加载无监督学习&#xff1a;K-means算法 1、应用场景 2、原理 2.1 输入 2.2 激活函数 3、损失以及优化 3.1 损失 3.2 优化 4、逻辑回归API 5、分类的评估方法 5.1 精确率和召回率 5.2…

【Spring】IoC容器 控制反转 与 DI依赖注入 配置类实现版本 第四期

文章目录 基于 配置类 方式管理 Bean一、 配置类和扫描注解二、Bean定义组件三、高级特性&#xff1a;Bean注解细节四、高级特性&#xff1a;Import扩展五、基于注解配置类方式整合三层架构组件总结 基于 配置类 方式管理 Bean Spring 完全注解配置&#xff08;Fully Annotatio…

Kotlin学习 6

1.接口 interface Movable {var maxSpeed: Intvar wheels: Intfun move(movable: Movable): String}class Car(var name: String, override var wheels: Int 4, _maxSpeed: Int) : Movable {override var maxSpeed: Int _maxSpeedget() fieldset(value) {field value}overr…

C语言读取 ini 配置文件,修改/添加键值对

C语言读取 ini 配置文件&#xff0c;修改/添加键值对 C语言读取 ini 配置文件&#xff0c;对section中的键值对进行修改/添加&#xff0c;如果section不存在&#xff0c;则在末尾将新的section/key/value 添加进去。 一、了解什么是INI文件&#xff1f; ini 文件是Initializ…

【大数据】Flink 之部署篇

Flink 之部署篇 1.概述和参考架构2.可重复的资源清理3.部署模式3.1 Application 模式3.2 Per-Job 模式&#xff08;已废弃&#xff09;3.3 Session 模式 Flink 是一个多用途框架&#xff0c;支持多种不同的混合部署方案。下面&#xff0c;我们将简要介绍 Flink 集群的构建模块、…

【html学习笔记】3.表单元素

1.文本框 1.1 语法 <input type "text">表示文本框。且只能写一行 1.2 属性 使用属性size 设置文本框大小 <input type"text" size"10">2. 使用属性value 来设置文本框的默认文字 <input type"text" size"…

Vue状态管理库-Pinia

一、Pinia是什么&#xff1f; Pinia 是 Vue 的专属状态管理库&#xff0c;它允许支持跨组件或页面共享状态&#xff0c;即共享数据&#xff0c;他的初始设计目的是设计一个支持组合式API的 Vue 状态管理库&#xff08;因为vue3一个很大的改变就是组合式API&#xff09;,当然这…

PFA三角烧瓶实验室PFA锥形瓶本底纯净耐腐蚀性强

PFA三角烧瓶外观呈平底圆锥状&#xff0c;下阔上狭&#xff0c;有一圆柱形颈部&#xff0c;上方有一较颈部阔的开口&#xff0c;可用塞子封闭。PFA三角烧瓶也称PFA锥形瓶&#xff0c;PFA反应瓶&#xff0c;PFA三角烧瓶、PFA依氏烧瓶、PFA锥形烧瓶&#xff0c;PFA鄂伦麦尔瓶等。…

普中51单片机学习(串口通信)

串口通信 原理 计算机通信是将计算机技术和通信技术的相结合&#xff0c;完成计算机与外部设备或计算机与计算机之间的信息交换 。可以分为两大类&#xff1a;并行通信与串行通信。并行通信通常是将数据字节的各位用多条数据线同时进行传送 。控制简单、传输速度快&#xff1…

【Python】Python实现串口通信(Python+Stm32)

&#x1f389;欢迎来到Python专栏~Python实现串口通信 ☆* o(≧▽≦)o *☆嗨~我是小夏与酒&#x1f379; ✨博客主页&#xff1a;小夏与酒的博客 &#x1f388;该系列文章专栏&#xff1a;Python学习专栏 文章作者技术和水平有限&#xff0c;如果文中出现错误&#xff0c;希望…

springboot208基于springboot物流管理系统

基于spring boot物流管理系统设计与实现 摘 要 社会发展日新月异&#xff0c;用计算机应用实现数据管理功能已经算是很完善的了&#xff0c;但是随着移动互联网的到来&#xff0c;处理信息不再受制于地理位置的限制&#xff0c;处理信息及时高效&#xff0c;备受人们的喜爱。…

maven工程打包引入本地jar包

1、通过maven生成本地区仓库包 mvn install:install-file --settings D:\lkx\download\apache-maven-3.6.3\conf\settings.xml -Dfileaspose-cad-21.8.jar -DartifactIdaspose-cad -DgroupIdsystem.core -Dversion21.8 -Dpackagingjar -DgeneratePomtrue # --settings&#xf…

进程线程间的通信:2024/2/22

作业1&#xff1a;代码实现线程互斥机制 代码&#xff1a; #include <myhead.h>//临界资源 int num10;//创建一个互斥锁 pthread_mutex_t mutex;//任务一 void *task1(void *arg) {//获取锁资源pthread_mutex_lock(&mutex);num123;sleep(3);printf("task1:num…

PacketSender-用于发送/接收 TCP、UDP、SSL、HTTP 的网络实用程序

PacketSender-用于发送/接收 TCP、UDP、SSL、HTTP 的网络实用程序 PacketSender是一款开源的用于发送/接收 TCP、UDP、SSL、HTTP 的网络实用程序&#xff0c;作者为dannagle。 其官网地址为&#xff1a;https://packetsender.com/&#xff0c;Github源代码地址&#xff1a;htt…