SpringBoot中使用RocketMQ实现事务消息来保证分布式事务的一致性(有代码)

前言

分布式事务是分布式系统中非常常见的问题。是非常必要钱常见的。实现的方式也是多种多样。今天这个视频主要来分享一下RocketMQ实现事务消息来保证分布式事务的一致性。不知道大家使用过这种方式没有。这种分布式事务的原理其实和本地消息表一样。

本地消息表实现分布式事务的基本原理

本地消息表实现分布式事务的基本原理是通过两个阶段的事务处理来保证分布式环境中的数据一致性。以下是其基本步骤:
大致就是将本地消息表和要执行的第一个业务逻辑放在一个事务中,这样就可以一起成功一起失败。当第一阶段成功后。根据本地消息表中的记录去让下游的业务执行成功。扫描本地消息表中的消息然后执行下游业务。执行成功后在删除本地消息表中消息。不成功则重试。

1.本地事务:

在开始分布式事务时,首先执行本地操作。例如,更新某个服务的数据。
如果本地操作成功,事务进入下一步;如果失败,则回滚本地事务,并结束流程。
消息记录:
创建一条消息记录,通常称为“本地消息”,将需要在后续阶段执行的远程操作信息保存在本地数据库的一个消息表中。这个消息记录包含了执行远程操作所需的所有数据。
消息发送:

将本地消息发送到消息队列,如RocketMQ或其他消息中间件。此时,消息队列并不保证消息已经被消费,只是简单地将消息放入队列。
消息消费:

消息队列的消费者监听并处理消息。消费者通常是另一个服务,它接收消息并执行相应的远程操作,比如更新另一个服务的数据。
确认与补偿:

如果远程操作成功,消费者会发送一个确认信号(ACK),通知生产者操作已完成。这时,生产者可以删除本地消息表中的记录。
如果远程操作失败,消费者可能会尝试重新消费消息,或者根据策略回滚本地事务,然后通知生产者消息处理失败。
最终一致性:

尽管可能有短暂的延迟,但最终所有服务的数据状态会达到一致,因为本地操作和远程操作都会成功完成,或者在失败时都会回滚。
异常处理:

为了处理异常情况,系统通常会有超时和重试机制。如果消费者长时间没有确认,生产者可能会重新发送消息,或者在一定时间后回滚本地事务。
本地消息表方案的优点在于它避免了分布式事务的复杂性,实现了最终一致性,而不是强一致性。但是,它也有一些缺点,比如增加了系统的复杂性,需要维护额外的消息表,以及可能出现消息丢失或重复消费的问题。因此,它更适合对实时性要求不高,但对最终一致性有要求的场景。

本地消息表是一种最终一致性方案。并不是强一致性方案。

rocketmq事务消息

今天重点来说一下rocketmq事务消息是怎么做的。先理解一下Rocketmq事务消息
在这里插入图片描述

这种类似的图片挺多的。简单的来看一下 然后一会结合代码看一下。生产者先送消息到MQserve。然后mq去执行本地事务。通过回查的方式来保证第一阶段消息执行的成功。然后下游消费者来消费这个消息。

代码

我们需要实现分布式事务的两个服务分别是用户中心的服务以及im业务服务。功能是注册的功能。用户的注册信息基本信息存储在用户中心表。然后其他信息存储在im_user表里面。这个听起来有点奇怪。因为我这套代码是计划用户中心存储多个app的用户信息。通义提供鉴权服务什么的。然后基本信息存储在自己的业务用户表里面。大概是这样的设计思路。可以看代码。

	/*** 使用rocketmq实现事务* @param dto* @return* @throws Exception*/@ApiOperation("使用邮箱和密码注册")@PostMapping("/sys/registByWeb")public GenericResponse registByWebTX(@RequestBody SysRegisterForm dto) throws Exception {String uuid = UUID.randomUUID().toString() + new Random().nextInt();SysUserEntity sysUserEntity = new SysUserEntity();sysUserEntity.setPassword(dto.getPassword());sysUserEntity.setUsername(dto.getUsername());sysUserEntity.setOpenid(uuid);//注册需要的实体类RegisterFeign registerFeign = new RegisterFeign();registerFeign.setOpenid(uuid);registerFeign.setUsername(dto.getUsername());registerFeign.setEmail(dto.getEmail());TransactionSendResult sendResult= rocketMqHelper.transactionSend(Topic.REGISTER,MessageBuilder.withPayload(sysUserEntity).build(),registerFeign);String sendStatus = sendResult.getSendStatus().name();String localTXState = sendResult.getLocalTransactionState().name();logger.info("sendStatus---" + sendStatus);logger.info("localTXState---"+localTXState);// 注意:这里不能立即返回成功,因为事务还未完成,实际应用中可能需要设计异步回调通知客户端事务结果// 以下仅为示例逻辑,实际应用中需根据业务需求调整return GenericResponse.response(ServiceError.NORMAL);}

这里实现注册功能。然后
TransactionSendResult sendResult= rocketMqHelper.transactionSend(Topic.REGISTER,
MessageBuilder.withPayload(sysUserEntity).build(),registerFeign);
这行代码用来发送事务消息;
需要给rocketmq配置一个生产者端的消息监听器

@Slf4j
@RocketMQTransactionListener
public class UserRegistrationTransactionListener implements RocketMQLocalTransactionListener {@Autowiredprivate SysUserService sysUserService;@AutowiredSysUserDao sysUserDao;@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {log.info(">>>> TX message listener execute local transaction, message={},args={} <<<<",msg,arg);// 执行本地事务RocketMQLocalTransactionState result = RocketMQLocalTransactionState.COMMIT;try {String jsonString = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
//            OrderEntity order = GSON.fromJson(jsonString, OrderEntity.class);SysUserEntity  sysUserEntity = JSON.parseObject(jsonString, SysUserEntity.class);sysUserService.saveUser(sysUserEntity);} catch (Exception e) {log.error(">>>> exception message={} <<<<",e.getMessage());result = RocketMQLocalTransactionState.UNKNOWN;}
//        return  RocketMQLocalTransactionState.UNKNOWN;return result;}/*** 步骤四* 描述:mq回调检查本地事务执行情况* @param msg* @return*/@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {log.info(">>>> TX message listener check local transaction, message={} <<<<",msg.getPayload());// 检查本地事务RocketMQLocalTransactionState result = RocketMQLocalTransactionState.COMMIT;try {String jsonString = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);SysUserEntity  sysUserEntity = JSON.parseObject(jsonString, SysUserEntity.class);
//            OrderEntity order = GSON.fromJson(jsonString, OrderEntity.class);
//            List<OrderEntity> list = orderService.selectOrder(order);List<Map> list = sysUserDao.queryUserByOpenid(sysUserEntity.getOpenid(),sysUserEntity.getUsername());if(list.size()<=0){result = RocketMQLocalTransactionState.UNKNOWN;}} catch (Exception e) {// 异常就回滚log.error(">>>> exception message={} <<<<",e.getMessage());result = RocketMQLocalTransactionState.ROLLBACK;}return result;}}

@RocketMQTransactionListener注意这个注解不能落下。
然后可以配置一下下游消费者。

@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "consumeRegister", topic = "TX_REGISTER_ADD",consumeMode = ConsumeMode.ORDERLY)
public class RegisterListener implements RocketMQListener<RegisterFeign> {@Autowiredprivate WeChatService weChatService;/**** @param dto*/@Overridepublic void onMessage(RegisterFeign dto) {log.info("接收到消息,开始消费..dto" + dto);weChatService.registByOpenid(dto);}}

我们在这个地方来接受一下消息。然后调用这个服务的保存。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/9275.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

从零开始学习C语言,常用语法一文打尽

C语言常用语法 前言hello world数据类型变量变量的命名规范变量的分类变量的作用域和生命周期 常量字面常量const修饰的常变量#define定义的标识符常量枚举常量 字符串转义字符注释选择语句if循环语句while函数数组 前言 这篇博客旨在帮助零基础的朋友们入门&#xff0c;我会尽…

知名专业定制线缆知名智造品牌品牌推荐-精工电联:如何实现清扫机器人线缆产品的精益求精

在科技日新月异的今天&#xff0c;智能清扫机器人已经融入我们的日常生活。然而&#xff0c;其背后不可或缺的一部分&#xff0c;就是那些被称为机器人血管的精密线缆。精工电联作为高科技智能化产品及自动化设备专用连接线束和连接器配套服务商&#xff0c;致力于通过精益求精…

WINDOWS下zookeeper突然无法启动但是端口未占用的解决办法(用了WSL)

windows下用着用着时候突然zookeeper启动不了了。netstat查也没有找到端口占用&#xff0c;就是起不来。控制台报错 java.lang.reflect.UndeclaredThrowableException: nullat org.springframework.util.ReflectionUtils.rethrowRuntimeException(ReflectionUtils.java:147) ~…

如何使用phpMyAdmin删除数据库中的表?

本周有一个客户&#xff0c;购买Hostease的Linux虚拟主机&#xff0c;询问我们的在线客服&#xff0c;如何使用phpMyAdmin删除数据库中的表&#xff1f;我们为用户提供相关教程&#xff0c;用户很快解决了遇到的问题。在此&#xff0c;我们分享这个操作教程&#xff0c;希望可以…

细说温度测量—热电偶2

接上一篇《细说温度测量——热电偶1》 目录 1、硬件补偿 2、电压到温度转换 3、实用热电偶测量 &#xff08;1&#xff09;噪声抑制 &#xff08;2&#xff09;不良连接点连接 &#xff08;3&#xff09;标定降级 &#xff08;4&#xff09;并联阻抗 &#xff08;5&am…

GUI-图形化的用户界面

一、概述 所谓GUI&#xff0c;即图形化的用户界面/接口&#xff08;Graphical User Interface&#xff09;&#xff0c;实现了采用图形方式显示的计算机操作用户界面。比如下面的QQ登录界面&#xff1a; 为了不被落下&#xff0c;Java依旧稳定发挥&#xff0c;它也提供了一套…

WebRtc 视频通话,语音通话实现方案

先了解一下流程 和 流程图(chatGpt的回答) 实现 (底层代码实现, 可作为demo熟悉) 小demo <template><div><video ref"localVideo" autoplay muted></video> <!-- 本地视频元素&#xff0c;用于显示本地视频 --><video ref"r…

基于Springboot的线上教学平台

基于SpringbootVue的线上教学平台设计与实现 开发语言&#xff1a;Java数据库&#xff1a;MySQL技术&#xff1a;SpringbootMybatis工具&#xff1a;IDEA、Maven、Navicat 系统展示 用户登录 首页 学习资料 交流论坛 试卷列表 公告信息 后台登录 后台首页 学员管理 资料类型…

实习报告怎么写?笔灵AI实习体验报告模版分享:AI产品前端实习生

实习报告怎么写&#xff1f;笔灵AI实习体验报告模版可以帮你 点击即可使用&#xff1a;https://ibiling.cn/scene/inex?fromcsdnsx 下面分享AI产品前端实习生的实习报告 尊敬的导师和领导们&#xff1a;首先&#xff0c;我想对你们表达我的诚挚感谢&#xff0c;感谢你们给我…

FPGA OSD 方案,应用于XBOX游戏机收费等领域

FPGA方案&#xff0c;HDMI IN接收原始HDMI 信号&#xff0c;HDMI OUT输出叠加字符/图片后的HDMI信号 客户应用&#xff1a;XBOX游戏机收费 主要特性&#xff1a; 1.支持多分辨率格式显示 2.支持OSD 叠加多个图层 3.支持字体大小随意配置 4.支持字体格式随意配置 5.零延时&…

多线程典型例子(4)——线程池

文章目录 一、线程池的基本情况1.1、使用线程池的必要性1.2、线程池为什么比直接在系统中创建线程更高效&#xff1f;1.2.1、纯内核态操作1.2.2、纯用户态操作 1.3、那为什么用户态操作比内核态操作更高效&#xff1f;二、如何在Java中使用线程池2.1、ExecutorService2.1、Thre…

OBS插件--自定义着色器

自定义着色器 自定义着色器是一个滤镜插件&#xff0c;可以用于源和场景。插件自带一百多款滤镜效果&#xff0c;支持自己编写效果代码。 下面截图演示下操作步骤&#xff1a; 首先&#xff0c;打开 OBS直播助手 在插件中心左侧导航栏&#xff0c;选择 滤镜 项&#xff0c;然…

【Linux】深浅睡眠状态超详解!!!

1.浅度睡眠状态【S】&#xff08;挂起&#xff09; ——S (sleeping)可中断睡眠状态 进程因等待某个条件&#xff08;如 I/O 完成、互斥锁释放或某个事件发生&#xff09;而无法继续执行。在这种情况下&#xff0c;进程会进入阻塞状态&#xff0c;在阻塞状态下&#xff0c;进程…

C++从入门到入土(二)——初步认识类与对象

目录 前言 类与对象的引入 类的定义 类的访问限定符及封装 访问限定符&#xff1a; 封装&#xff1a; 类的作用域 类的实例化 类的大小 this指针 this指针的特性 前言 各位佬们&#xff0c;在开始本篇文章的内容之前&#xff0c;我想先向大家道个歉&#xff0c;由于…

linux中进程相关概念(一)

什么是程序&#xff0c;什么是进程&#xff0c;有什么区别&#xff1f; 程序是静态的概念&#xff0c;当我们使用gcc xxx.c -o pro进行编译时&#xff0c;产生的pro文件&#xff0c;就是一个程序。 进程是程序的一次运行活动&#xff0c;通俗点就是说程序跑起来了就是进程。 …

利用智能私信软件,快速拓展潜在客户群体

在数字化营销的浪潮中&#xff0c;企业如何快速而有效地触及并吸引潜在客户&#xff0c;已成为一个不可忽视的挑战。随着人工智能技术的不断进步&#xff0c;智能私信软件作为一种新型工具&#xff0c;正逐渐改变着企业的市场拓展方式。本文将探讨如何通过这类软件&#xff0c;…

RBAC 权限设计(五)

序言 本文介绍 RBAC2 模型以及具体的实现方案。本文仅提供实现思路供大家参考&#xff0c;在生产实践中请根据业务场景进行具体设计。 一、RBAC2 模型 RBAC2 是 RBAC&#xff08;基于角色的访问控制&#xff09;模型中的一种&#xff0c;它在 RBAC0 的基础上&#xff0c;对用…

GitHub中Asterank源码python修改成C++(本人python不太会)

GitHub - typpo/asterank: asteroid database, interactive visualizations, and discovery tools 主要目的是在进行多元线性回归的时候将枚举型转换为数值型 python: # # The constants used in calculations for the values of asteroids. ## General constants GENERAL_I…

学习torchmd分子动力学模拟

TorchMD打算提供一种简单易用的API&#xff0c;用于使用PyTorch进行分子动力学。这使研究人员能够更快地进行力场开发研究&#xff0c;并以PyTorch的简单性和强大性将神经网络潜力无缝集成到动力学中。 TorchMD使用与经典MD代码&#xff08;如ACEMD&#xff09;一致的化学单位&…

引入OSS

前置条件 AccessKey 引入依赖 都是官网上的&#xff1a;https://help.aliyun.com/zh/oss/developer-reference/java-installation?spma2c4g.11186623.0.i16 <!--若是创建项目的时候这个依赖勾选了就不用了--><!--不加启动会报错No active profile set, falling back…