分布式事务AP控制方案(上)

分布式事务控制方案

本篇文章给出一种要求高可用性(AP思想)的分布式事务控制方案

下篇新鲜出炉:点我查看

  • 分布式事务控制方案
  • 1、业务背景
  • 2、本地消息表的设计
  • 3、对消息表的操作
  • 4、任务调度
  • 5、任务流程控制的抽象类
  • 6、课程发布的实现类
  • 7、总结

1、业务背景

业务背景:在线学习平台,教学机构在上传课程时,需要将课程内容同步到数据库,缓存,文件系统,搜索系统,这里需要用到分布式事务,来确保四个组件的业务顺利完成。

CAP理论中,分布式系统只能满足一致性C、可用性A、分区容错性P三者中的两个,由于分布式系统天然要求分区容错,否则就是单体项目,所以只能选CP或AP

其中CP可以使用Seata框架基于AT和TCC模式去实现,AP也有多种实现方式。

我们的业务背景中这四个组件并不要求强一致性,而是要求高可用性,如果其中某个组件没有完成数据同步,那之前已经完成的组件不必回退到事务开始前的状态,所以我们实现AP思想,采用本地消息表+任务调度完成最终一致性

具体的项目环境:SpringBoot框架,数据库MySQL,使用MyBatis-Plus快速开发,缓存Redis,分布式文件系统MinIO,搜索系统ES

2、本地消息表的设计

在业务背景中,如果用户要进行课程发布,我们向MySQL中的消息表里插入一条记录,记录中应当包含四个组件(MySQL,Redis,MinIO,ES)完成的状态,如果四个组件全部完成,就删除这条记录,向历史消息表中插入一条记录,如果四个组件有哪个没有完成,通过查询记录就可以从未完成的地方重新进行数据同步,从而实现最终一致性。

我们除了课程这个业务场景,还会在其他业务场景执行相似的业务,所以我们要考虑如何进行代码的抽象、封装和复用。

我们发现在消息表中没有必要注明每个具体的组件,而是通过小任务一,小任务二…的方式设计数据表,具体的业务逻辑由具体的业务代码实现,而具体的业务代码通过继承抽象的类,来实现对数据库的控制。

设计一个抽象的类,这个类应当实现对数据表的处理,并提供一个接口,让具体的业务代码实现这个接口。

3、对消息表的操作

首先创建数据库和数据表,表中字段包括业务相关字段(消息类型代码,关联业务信息、代码等等),小任务的状态、上一次成功失败时间、重试次数(暂定五个小任务,提高适用性)。

其次创建一个微服务模块,添加MyBatis-Plus依赖和配置

<!-- MySQL 驱动 -->
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><scope>runtime</scope>
</dependency>
<!-- mybatis plus的依赖 -->
<dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId>
</dependency>

spring:application:name: servicedatasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/table?serverTimezone=UTC&userUnicode=true&useSSL=false&username: xxxxpassword: xxxx

实现DAO和service层开发
在这里插入图片描述

在实现流程控制的抽象类之前我们要思考一个问题,具体的业务代码在实现了任务后,如何开启调用任务的执行?

首先肯定不是在发布课程的方法中直接调用方法,这是同步调用,并且只能执行一次,不适合AP思想的分布式事务。对于数据同步实时性要求不高的技术解决方案有很多,例如MQ、Canal、Logstash、任务调度等等

我们可以在插入数据表后,向消息队列添加一条消息,消费者收到消息后检查数据库是否存在对应记录,没有就执行一次任务,如果任务执行失败,就像消息队列添加一条消息。

我们也可以使用中间件canal,解决耦合性问题,canal通过模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 请求,得到 binglog 日志,

我们也可以通过任务调度的方式执行任务,在任务流程代码中查询数据库,根据数据库中的记录执行任务。

这里我们采用任务调度的方式执行任务。

4、任务调度

任务调度是指对计算任务进行合理安排和调度的过程。分布式任务调度是指在分布式系统中,将任务分割成若干份,根据调度规则交由不同的实例并行执行。

XXL-JOB,是一个轻量级分布式任务调度平台,开发迅速,学习简单,易扩展。包含调度中心(管理执行器、任务、日志,监控运维),执行器(注册服务,执行服务,执行结果上报,记录日志)和任务(具体的业务代码)。

如何确保任务不重复执行?任务调度采用分片广播的方式,查询数据表得到任务的id(自增id),模上分片总数,如果等于当前执行器的任务号,就执行该任务,否则不执行,对于任务分配超出执行器执行能力的情况,通过合理设置任务广播频率,以及设置任务拒绝策略为丢弃任务来确保没有任务被重复执行。

执行流程:

启动XXL-JOB调度中心,创建执行器和任务,任务执行实现通过cron表达式设置为每小时一次,设置分片广播和丢弃策略。

在课程发布微服务中添加XXL-JOB的依赖、配置文件和配置类,创建任务方法,在该方法前添加注解@XxlJob(“JobHandler”)

@XxlJob("JobHandler")
public void coursePublishJobHandler() throws Exception {// 分片参数int shardIndex = XxlJobHelper.getShardIndex();int shardTotal = XxlJobHelper.getShardTotal();// 在下一节中实现process,这里仅是测试方法System.out.println("XXL-JOB任务调度测试成功");// process(......);
}

启动微服务,可以在物理地址中看到一个实例,执行一次任务,在输出窗口看到
在这里插入图片描述

我们可以发现,任务的触发是根据创建任务时设置的执行时间来完成的,但是从用户的角度出发,有些用户允许在一段时间以内完成数据同步,例如一到两个工作日内完成,但是有的用户希望在发布课程后能及时的完成数据同步,例如一小时内完成数据同步,这就需要在代码端对xxl-job的控制中心进行通知,xxl-job也提供了这个接口

打开从github下载的xxl-job项目,在JobInfoController的接口中有体现,通过调用start方法,传入任务的id,来触发一次任务。

下面是代码逻辑

@Overridepublic ReturnT<String> start(int id) {XxlJobInfo xxlJobInfo = xxlJobInfoDao.loadById(id);// validScheduleTypeEnum scheduleTypeEnum = ScheduleTypeEnum.match(xxlJobInfo.getScheduleType(), ScheduleTypeEnum.NONE);if (ScheduleTypeEnum.NONE == scheduleTypeEnum) {return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("schedule_type_none_limit_start")) );}// next trigger time (5s后生效,避开预读周期)long nextTriggerTime = 0;try {Date nextValidTime = JobScheduleHelper.generateNextValidTime(xxlJobInfo, new Date(System.currentTimeMillis() + JobScheduleHelper.PRE_READ_MS));if (nextValidTime == null) {return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("schedule_type")+I18nUtil.getString("system_unvalid")) );}nextTriggerTime = nextValidTime.getTime();} catch (Exception e) {logger.error(e.getMessage(), e);return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("schedule_type")+I18nUtil.getString("system_unvalid")) );}xxlJobInfo.setTriggerStatus(1);xxlJobInfo.setTriggerLastTime(0);xxlJobInfo.setTriggerNextTime(nextTriggerTime);xxlJobInfo.setUpdateTime(new Date());xxlJobInfoDao.update(xxlJobInfo);return ReturnT.SUCCESS;}

上述代码逻辑:
首先根据传入的任务id获取任务,判断任务的触发方式,如果为空,就报错,返回异常码500。
然后设置任务下一次的触发时间,为当前时间的5s后,更新触发状态,返回success。

我们可以在课程发布的逻辑中中调用这个方法,来实现及时同步课程内容。

注意,需要在调用方法头添加注解@PermissionLimit(limit = false),来绕开登录验证,但是这增加了代码的不安全性,需要对这种权限的使用进行限制。

@RequestMapping("/startJob")@ResponseBody@PermissionLimit(limit = false)public ReturnT<String> startJob(@RequestBody XxlJobInfo jobInfo) {return xxlJobService.start(jobInfo.getId());}

5、任务流程控制的抽象类

接下来实现抽象类,在这个类中需要提供任务执行的流程,而非具体的代码,提供一个抽象方法,业务代码通过实现这个抽象方法,在这个方法中实现具体的业务执行代码

此时我们已经得到分片总数和分片号,通过查询数据库中记录,自增id模上分片总数等于分片号的方式判断是否由当前执行实例实行

MyBatis-Plus没有提供查询方法,在Mapper中进行实现

@Select("SELECT t.* FROM mq_message t WHERE t.id % #{shardTotal} = #{shardindex} and t.state='0' and t.message_type=#{messageType} limit #{count}")
List<MqMessage> selectListByShardIndex(@Param("shardTotal") int shardTotal, @Param("shardindex") int shardindex, @Param("messageType") String messageType,@Param("count") int count);

在得到消息记录后,这是一个列表的形式,我们开启线程池,使用newFixedThreadPool,线程总数就是任务数,没有临时线程,使用CountDownLatch控制线程完成情况,每个线程中执行process方法,process是一个抽象方法,由具体的实现类进行实现,返回一个boolean变量,表示任务是否完成,并记录日志。


public abstract class MessageProcessAbstract {@AutowiredMqMessageService mqMessageService;/*** @param mqMessage 执行任务内容* @return boolean true:处理成功,false处理失败* @description 任务处理* @author zkp15* @date 2023/9/21 19:47*/public abstract boolean execute(MqMessage mqMessage);/*** @description 扫描消息表多线程执行任务* @param shardIndex 分片序号* @param shardTotal 分片总数* @param messageType  消息类型* @param count  一次取出任务总数* @param timeout 预估任务执行时间,到此时间如果任务还没有结束则强制结束 单位秒* @return void* @author zkp15* @date 2023/9/21 20:35*/public void process(int shardIndex, int shardTotal,  String messageType,int count,long timeout) {try {//扫描消息表获取任务清单List<MqMessage> messageList = mqMessageService.getMessageList(shardIndex, shardTotal,messageType, count);//任务个数int size = messageList.size();if(size<=0){return ;}//创建线程池ExecutorService threadPool = Executors.newFixedThreadPool(size);//计数器CountDownLatch countDownLatch = new CountDownLatch(size);messageList.forEach(message -> {threadPool.execute(() -> {//处理任务try {boolean result = execute(message);if(result){//更新任务状态,删除消息表记录,添加到历史表int completed = mqMessageService.completed(message.getId());if (completed>0){log.debug("任务执行成功:{}",message);}else{log.debug("任务执行失败:{}",message);}}} catch (Exception e) {e.printStackTrace();log.debug("任务出现异常:{},任务:{}",e.getMessage(),message);}//计数countDownLatch.countDown();});});//等待,给一个充裕的超时时间,防止无限等待,到达超时时间还没有处理完成则结束任务countDownLatch.await(timeout,TimeUnit.SECONDS);} catch (InterruptedException e) {e.printStackTrace();}}
}

6、课程发布的实现类

在这个实现类中,继承上一节的抽象类,以及抽象方法execute,在execute中,分别执行数据库操作,建立缓存,上传分布式文件系统,建立搜索索引。


@Component
public class CoursePublishTask extends MessageProcessAbstract {......//任务调度入口@XxlJob("CoursePublishJobHandler")public void coursePublishJobHandler() throws Exception {// 分片参数int shardIndex = XxlJobHelper.getShardIndex();int shardTotal = XxlJobHelper.getShardTotal();log.debug("shardIndex="+shardIndex+",shardTotal="+shardTotal);//参数:分片序号、分片总数、消息类型、一次最多取到的任务数量、一次任务调度执行的超时时间process(shardIndex,shardTotal,"course_publish",30,60);}//课程发布任务处理@Overridepublic boolean execute(MqMessage mqMessage) {//获取消息相关的业务信息String businessKey1 = mqMessage.getBusinessKey1();long courseId = Integer.parseInt(businessKey1);// 课程发布表saveCourseToMQ(mqMessage, courseId);// 课程缓存saveCourseCache(mqMessage, courseId);// 课程静态化generateCourseHtml(mqMessage, courseId);// 课程索引saveCourseIndex(mqMessage, courseId);return true;}......}

7、总结

本文在实际开发业务场景的基础上,给出了一种遵循AP思想的分布式事务控制方案,通过本地消息表+任务调度的方式实现。

项目亮点有:

  • 本地消息表通过任务123代替具体的任务,结合流程控制抽象类,只给出流程控制的代码,具体的业务实现由具体的实现类完成,从而实现解耦合,提高代码复用。

  • 任务流程控制中开启多实例和多线程,并行高效的执行任务。

  • 使用任务调度XXL-JOB进行任务执行,采用分片广播的方式,保证了任务执行的幂等性。其中控制中心提供了两种任务调度的规则,按照Cron的定时执行策略,和非登录任务执行通知的及时执行策略,为用户提供了多样化的体验

由于篇幅原因,四个小任务的实现,数据库、缓存、文件系统、搜索系统的数据同步,我们放在下一篇继续论述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/25947.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

优质免费的 5 款翻译 API 接口推荐

当谈到翻译API时&#xff0c;我们通常指的是一种编程接口&#xff0c;它允许开发者将文本从一种语言翻译成另一种语言。这些API通常由专业的翻译服务提供商提供&#xff0c;如谷歌翻译 API、实时翻译API、腾讯翻译API、DeepL翻译API、Azure翻译API等。 这些API通常提供多种语言…

使用Redis的优势以及会引发的问题

优势 ①使用redis代表着高性能还有高并发&#xff0c;高性能很好理解&#xff0c;redis会缓存我们访问的数据。他是基于内存的&#xff0c;第一次访问数据库我们可能需要800ms&#xff0c;但是访问后如果使用redis进行缓存&#xff0c;第二次乃至后面访问相同的数据就只需要去…

使用opencv在图像上画带刻度线的对角线,以图像中心点为0点

使用OpenCV在图像上绘制带刻度线的对角线&#xff0c;可以通过以下步骤实现。我们将首先找到图像的中心点&#xff0c;然后绘制对角线线&#xff0c;并在这些线的适当位置绘制刻度线。以下是详细的C代码示例&#xff1a; void Draw_diagonal(cv::Mat& mat, double dFactor…

ViT:2 理解CLIP

大模型技术论文不断&#xff0c;每个月总会新增上千篇。本专栏精选论文重点解读&#xff0c;主题还是围绕着行业实践和工程量产。若在某个环节出现卡点&#xff0c;可以回到大模型必备腔调或者LLM背后的基础模型新阅读。而最新科技&#xff08;Mamba,xLSTM,KAN&#xff09;则提…

《永生之后》读后

文章以2120年背景创作&#xff0c;人类进入永生之年&#xff0c;发现了延长寿命的药物。停滞的死亡&#xff0c;新生的继续造生了人口大爆炸&#xff0c;于是分成两个阵营-长生区&#xff08;不再繁衍后代&#xff09;与生死区&#xff08;不服用药物&#xff0c;仍然生老病死&…

PySpark教程(001):基础准备与数据输入

PySpark 学习目标 了解什么是Spark、PySpark了解为什么学习PySpark了解如何和大数据开发方向进行衔接 Spark是什么&#xff1f; Apache Spark是用于大规模数据处理的统一分析引擎。 简单来说&#xff0c;Spark是一款分布式的计算框架&#xff0c;用于调度成百上千的服务器…

MyBatis总结(2)- MyBatis实现原理(一)

Mybatis实现原理&#xff1a; 概括一句话&#xff1a;约定配置参数mybatis-config.xml&#xff0c;映射关系JavaBean-mapper.xml&#xff0c;用SqlSessionFactoryBuilder构建应用程序运行期间需要的SqlSessionFactory实例对象&#xff0c;当请求或方法需要执行CURD操作时&…

初识volatile

volatile&#xff1a;可见性、不能保证原子性(数据不安全)、禁止指令重排 可见性&#xff1a;多线程修改共享内存的变量的时候&#xff0c;修改后会通知其他线程修改后的值&#xff0c;此时其他线程可以读取到修改后变量的值。 指令重排&#xff1a;源代码的代码顺序与编译后字…

基于STM32开发的智能空气质量监控系统

⬇帮大家整理了单片机的资料 包括stm32的项目合集【源码开发文档】 点击下方蓝字即可领取&#xff0c;感谢支持&#xff01;⬇ 点击领取更多嵌入式详细资料 问题讨论&#xff0c;stm32的资料领取可以私信&#xff01; 目录 引言环境准备智能空气质量监控系统基础代码实现&…

三十七篇:大数据架构革命:Lambda与Kappa的深度剖析

大数据架构革命:Lambda与Kappa的深度剖析 1. 引言 在这个数据驱动的时代,我们面临着前所未有的挑战和机遇。随着数据量的爆炸性增长,传统的数据处理方法已无法满足现代业务的需求。大数据处理不仅涉及数据量的增加,还包括数据类型的多样化、数据来源的广泛性以及对实时数据…

Policy-Based Reinforcement Learning(1)

之前提到过Discount Return&#xff1a; Action-value Function &#xff1a; State-value Function: &#xff08;这里将action A积分掉&#xff09;这里如果策略函数很好&#xff0c;就会很大&#xff1b;反之策略函数不好&#xff0c;就会很小。 对于离散类型&#xff1a; …

深度学习之文本分类模型-基于transformer

1、transformer transformer就是大名鼎鼎的论文《Attention Is All You Need》[1]&#xff0c;其在一些翻译任务上获得了SOTA的效果。其模型整体结构如下图所示 encoder和decoder 其整体结构由encoder和decoder组成&#xff0c;其中encoder由6个相同的block组成&#xff0c;…

【设计模式】结构型-桥接模式

当抽象与实现&#xff0c;各自独立&#xff0c; 桥接模式&#xff0c;如彩虹桥&#xff0c;连接两岸。 文章目录 一、类爆炸与代码重复二、桥接模式三、桥接模式的核心组成四、运用桥接模式五、桥接模式的应用场景六、小结推荐阅读 一、类爆炸与代码重复 场景假设&#xff1a…

单片机嵌入式计算器(带程序EXE)

单片机嵌入式计算器 主要功能&#xff1a;完成PWM占空比计算&#xff0c;T溢出时间&#xff08;延时&#xff09;&#xff1b; [!NOTE] 两个程序EXE&#xff1b; [!CAUTION] 百度网盘链接&#xff1a;链接&#xff1a;https://pan.baidu.com/s/1VJ0G7W5AEQw8_MiagM7g8A?pwdg8…

代码随想录算法训练营第五十四 | ● 392.判断子序列 ● 115.不同的子序列

392.判断子序列 https://programmercarl.com/0392.%E5%88%A4%E6%96%AD%E5%AD%90%E5%BA%8F%E5%88%97.html class Solution { public:bool isSubsequence(string s, string t) {if(s.size()0 )return true;if(t.size()0)return false;vector<vector<int>> dp(s.size(…

为什么选择海外服务器?

如何选择跨境电商服务器&#xff1a;详细指南 选择合适的服务器是跨境电商企业成功的基础。服务器的性能和稳定性直接影响着网站的访问速度、用户体验和安全性&#xff0c;进而影响着企业的销量和利润。那么&#xff0c;跨境电商企业该如何选择服务器呢&#xff1f; ​​​​​…

Jenkins构建 Maven项目(微服务)并自动发布

前面讲了docker 安装Jenkins和gitlab代码管理工具&#xff0c;接下来我们讲一下Jenkins怎么构建 Maven项目。 1. 首先Jenkins配置下面3中工具类 首先是在本地安装三个jenkins自动配置相关的工具 1.1 JDK 由于我们使用docker来启动jenkins&#xff0c;其自带有jdk&#xff0c;…

oracle 12.1 rac to rac adg(maa)搭建保姆级教程

目录 资源配置 一、主库集群操作 1.主库增加standbylog 2.主库开启force logging及归档 3.主库配置参数 4.生成参数文件并将参数文件、密码文件拷贝至备库 4.1参数文件处理 4.2密码文件处理 二、备库操作 1.备库修改参数文件 1.1创建adump目录并在参数文件修改&#…

02-JAVA面向对象编程

一、面向对象编程 1、面向过程编程思想&#xff08;Process Oritented Programming&#xff09; 将实现一个功能的一组指令组合在一起&#xff0c;成为一个函数。这个函数就能实现这一个功能&#xff0c;是对功能实现的一种抽象。通过这种抽象方式&#xff0c;将代码实现复用。…

代码随想录算法训练营第三十一天| 455.分发饼干,376. 摆动序列 ,53. 最大子序和

455. 分发饼干 - 力扣&#xff08;LeetCode&#xff09; class Solution {public int findContentChildren(int[] g, int[] s) {Arrays.sort(g); //递增Arrays.sort(s); int result 0;//遍历&#xff0c;先满足小的int i0,j0;for(;i<g.length && j<s.length;i){…