学成在线 - 第3章任务补偿机制实现 + 分块文件清理

7.9 额外实现

7.9.1 任务补偿机制

问题:如果有线程抢占了某个视频的处理任务,如果线程处理过程中挂掉了,该视频的状态将会一直是处理中,其它线程将无法处理,这个问题需要用补偿机制。

单独启动一个任务找到待处理任务表中超过执行期限但仍在处理中的任务,将任务的状态改为执行失败。

任务执行期限是处理一个视频的最大时间,比如定为30分钟,通过任务的启动时间去判断任务是否超过执行期限。

大家思考这个sql该如何实现?

大家尝试自己实现此任务补偿机制。

数据库表结构

media_process:

在这里插入图片描述

根据status字段判断视频文件是否正在处理,如果status值为4(正在处理),且当前时间减去create_date超过30分钟,就把status的值修改为3(处理失败),并更新其他字段的值。

具体实现流程
  1. 编写检查超时的sql

    SELECT * FROM `media_process_history`
    WHERE TIMESTAMPDIFF(MINUTE,create_date,finish_date) < 30
    
  2. 在media_process的mapper接口中添加相应的接口

    MediaProcessMapper.java

    /*** 查询是否有执行超过30分钟的视频处理任务* @param nowDate 当前任务执行的时间* @return List<MediaProcess>*/
    @Select("SELECT * FROM media_process_history WHERE TIMESTAMPDIFF(MINUTE,create_date,#{date}) > 30")
    List<MediaProcess> selectTimeoutProcess(@Param("date")Date nowDate);
    
  3. 实现service代码

    /*** 超时任务列表* @param shardIndex 分片序号* @param shardTotal 分片总数* @param count 获取记录数(cpu核心数)* @return List<MediaProcess>*/
    @Override
    public List<MediaProcess> getTimeoutMediaProcessList(int shardIndex, int shardTotal, int count) {List<MediaProcess> mediaTimeoutProcessList = mediaProcessMapper.selectTimeoutProcess(LocalDateTime.now(), shardIndex, shardTotal, count);return mediaTimeoutProcessList;
    }
    
  4. 业务逻辑在task中实现

    查询相应超时记录,把status和errMsg字段更新后保存到media_process表里。

    /*** 处理视频超时任务* @throws Exception*/
    @XxlJob("videoTimeoutJobHandler")
    public void videoTimeoutJobHandler() throws Exception {// 分片参数int shardIndex = XxlJobHelper.getShardIndex();int shardTotal = XxlJobHelper.getShardTotal();List<MediaProcess> mediaTimeoutProcessList = null;int size = 0;try {// 取出cpu核心数作为一次检查超时的记录条数int availableProcessors = Runtime.getRuntime().availableProcessors();mediaTimeoutProcessList = mediaFileProcessService.getTimeoutMediaProcessList(LocalDateTime.now(), shardIndex, shardTotal, availableProcessors);size = mediaTimeoutProcessList.size();log.debug("取出的超时任务数量是 {} 条", size);if (size <= 0) {return ;}} catch (Exception e) {log.error("取出的超时任务超时", size);e.printStackTrace();}// 启动size个线程的线程池ExecutorService fixedThreadPool = Executors.newFixedThreadPool(size);// 定义size个插销CountDownLatch countDownLatch = new CountDownLatch(size);mediaTimeoutProcessList.forEach(mediaProcess -> {// 将任务加入线程池fixedThreadPool.execute(() -> {try {// 把status值设置为3,并把errMsg更新Long taskId = mediaProcess.getId();String fileId = mediaProcess.getFileId();mediaFileProcessService.saveProcessFinishStatus(taskId, "3", fileId, null, "处理视频超时(30分钟)");log.debug("删除超时任务");} catch (Exception e) {e.printStackTrace();log.error("删除超时任务失败,失败原因: {}", e.getMessage());} finally {countDownLatch.countDown();}});});// 等待,给一个充裕的超时事件,防止无限等待,到达超时时间还没有处理完成则结束任务countDownLatch.await(30, TimeUnit.MINUTES);
    }
    

7.9.2 达到最大失败次数(暂时未实现)

问题:需要找到前端对应的接口。

当任务达到最大失败次数时一般就说明程序处理此视频存在问题,这种情况就需要人工处理,在页面上会提示失败的信息,人工可手动执行该视频进行处理,或通过其它转码工具进行视频转码,转码后直接上传mp4视频。

7.9.3 分块文件清理问题

上传一个文件进行分块上传,上传一半不传了,之前上传到minio的分块文件要清理吗?怎么做的?

1、在数据库中有一张文件表记录minio中存储的文件信息。

2、文件开始上传时会写入文件表,状态为上传中,上传完成会更新状态为上传完成。(实现过程中并没有往文件表中插入正在上传的文件记录,只是在media_minio_files临时保存了分块信息)

3、当一个文件传了一半不再上传了说明该文件没有上传完成,会有定时任务去查询文件表中的记录,如果文件未上传完成则删除minio中没有上传成功的文件目录。

实现思路

难点:怎么判断文件上传了一半不再上传了?上传分块文件只在上传视频时有用,上传分块文件时不会往media_files表里添加记录,只有所有分块上传完成,并合并成功后,才会往media_files里插入数据。所以要建立一个media_minio_files表,每上传一个分块就往media_minio_files表里插入一条分块信息,记录包括分块的上传时间。如果超过30分钟分块记录还没有被删除,说明上传到一半不传了,把minio里的分块目录删除,并删除对应的数据库里的记录。在文件合并成功后,数据库里的分块文件上传记录也要删除。

media_minio_files表结构

在这里插入图片描述

具体实现流程
  1. 上传文件块的时候,上传一个文件块完毕要把这个块的信息保存到表里。

    /*** 把文件分块信息入库* @param fileMd5 文件md5* @param fileName 文件名称* @param bucket minio桶* @param objectName minio中块的存储路径* @param chunkSize 块大小* @return MediaMinioFiles*/
    @Transactional
    @Override
    public MediaMinioFiles addMediaChunkToDb(String fileMd5, String fileName, String bucket, String objectName, Long chunkSize) {MediaMinioFiles mediaMinioFile = new MediaMinioFiles();mediaMinioFile.setFileId(fileMd5);mediaMinioFile.setFilename(fileName);mediaMinioFile.setBucket(bucket);mediaMinioFile.setFilePath(objectName);mediaMinioFile.setFileSize(chunkSize);mediaMinioFile.setStatus("4");  // 上传中int insert = mediaMinioFilesMapper.insert(mediaMinioFile);if (insert < 0) {log.error("保存分块文件信息到数据库失败,{}", mediaMinioFile.toString());XueChengPlusException.cast("保存分块文件信息失败");}log.debug("保存分块文件信息到数据库成功,{}", mediaMinioFile.toString());return mediaMinioFile;
    }/*** 上传分块* @param fileMd5 文件md5* @param chunk 分块序号* @param localChunkFilePath 分块文件本地路径* @return RestResponse*/
    @Override
    public RestResponse uploadChunk(String fileMd5, int chunk, String localChunkFilePath) {// 得到分块文件的目录路径String chunkFileFolderPath = getChunkFileFolderPath(fileMd5);// 得到分块文件的路径String chunkFilePath = chunkFileFolderPath + chunk;// mimeTypeString mimeType = getMimeType(null);// 将文件存储到 minioboolean b = addMediaFilesToMinIO(localChunkFilePath, mimeType, bucketVideoFiles, chunkFilePath);// 将分块信息存到数据库中currentProxy.addMediaChunkToDb(fileMd5, null, bucketVideoFiles, chunkFilePath, 1024 * 5L);if (!b) {log.debug("上传分块文件失败: {}", chunkFilePath);return RestResponse.validfail(false, "上传分块失败");}log.debug("上传分块文件成功: {}", chunkFilePath);return RestResponse.success(true);
    }
    
  2. 如果没有上传失败,在成功合并块文件之后,要把上面数据库中记录删除。

    /*** 删除数据库中的分块文件上传记录* @param fileMd5 文件md5*/
    @Transactional
    public void clearChunkFromDb(String fileMd5) {LambdaQueryWrapper<MediaMinioFiles> deleteQueryWrapper = new LambdaQueryWrapper<>();deleteQueryWrapper.eq(MediaMinioFiles::getFileId, fileMd5);try {mediaMinioFilesMapper.delete(deleteQueryWrapper);log.debug("删除分块上传记录成功");} catch (Exception e) {e.printStackTrace();log.error("删除分块上传记录失败");}
    }
    

    在合并分块的最后几行:

    // 文件入库
    currentProxy.addMediaFilesToDb(companyId, fileMd5, uploadFileParamsDto, bucketVideoFiles, mergeFilePath);
    // 清除文件分块
    clearChunkFiles(chunkFileFolderPath, chunkTotal);
    // 清除数据库中文件分块上传信息
    currentProxy.clearChunkFromDb(fileMd5);
    return RestResponse.success(true);
    
  3. 如果上传文件一半失败,要根据数据库中media_minio_files表中存的块的路径删除所有超时块。

    首先查询所有的超时块,得到上传的超时块列表:

    /*** 拿到数据库中所有上传超时的文件分块信息* @param time 当前任务执行时间* @return List<MediaMinioFiles>*/
    @Override
    public List<MediaMinioFiles> getChunkTimeoutFiles(LocalDateTime time) {List<MediaMinioFiles> mediaMinioFiles = mediaMinioFilesMapper.selectTimeoutChunks(DateUtil.toDateTime(time));return mediaMinioFiles;
    }
    

    删除minio中所有超时块,并且删除所有超时记录

    @XxlJob("videoChunkTimeoutJobHandler")
    public void videoChunkTimeoutJobHandler() {log.debug(">>>>>>>>>> 开始执行检查上传超时块任务");// 拿到所有的超时分块任务List<MediaMinioFiles> chunkTimeoutFiles = mediaFileService.getChunkTimeoutFiles(LocalDateTime.now());if (chunkTimeoutFiles == null) {  // 没有超时任务log.debug("没有超时任务");return ;}// 根据记录中的file_path,删除minio中的所有文件chunkTimeoutFiles.forEach(chunk -> {String filePath = chunk.getFilePath();mediaFileService.clearSingleChunkFile(filePath);});// 删除数据库中对应的记录mediaFileService.clearChunkFromDb(null, LocalDateTime.now());
    }
    

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/833299.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java+SpringBoot+JSP实现在线心理评测与咨询系统

前言介绍 随着互联网技术的高速发展&#xff0c;人们生活的各方面都受到互联网技术的影响。现在人们可以通过互联网技术就能实现不出家门就可以通过网络进行系统管理&#xff0c;交易等&#xff0c;而且过程简单、快捷。同样的&#xff0c;在人们的工作生活中&#xff0c;也就…

一体化设计的ATA(FXS网关)设计——电源插头、WiFi、双网口、S口、USB等接口集于一身

目录 集成电源插头集成WiFi集成USB两个网口FXS接口&#xff08;Phone&#xff09;集成创新 ATA&#xff08;FXS网关&#xff09;已经走过几十年的发展&#xff0c;很难有创新。 下面介绍的这款ATA&#xff08;FXS网关&#xff09;通过一体化设计的集成创新&#xff0c;成为一款…

大数据Scala教程从入门到精通第三篇:Scala和Java的关系

一&#xff1a;Scala和Java的关系 1&#xff1a;详解 一般来说&#xff0c;学 Scala的人&#xff0c;都会 Java&#xff0c;而 Scala 是基于 Java 的&#xff0c;因此我们需要将 Scala和 Java 以及 JVM 之间的关系搞清楚&#xff0c;否则学习 Scala 你会蒙圈 Scala可以使用SDK…

爬虫学习:XPath匹配网页数据

目录 一、安装XPath 二、XPath的基础语法 1.选取节点 三、使用XPath匹配数据 1.浏览器审查元素 2.具体实例 四、总结 一、安装XPath 控制台输入指令&#xff1a;pip install lxml 二、XPath的基础语法 XPath是一种在XML文档中查找信息的语言&#xff0c;可以使用它在HTM…

Pycharm导入自定义模块报红

文章目录 Pycharm导入自定义模块报红1.问题描述2.解决办法 Pycharm导入自定义模块报红 1.问题描述 Pycharm 导入自定义模块报红&#xff0c;出现红色下划线。 2.解决办法 打开【File】->【Setting】->【Build,Execution,Deployment】->【Console】->【Python Con…

五分钟解决Springboot整合Mybaties

SpringBoot整合Mybaties 创建maven工程整合mybaties逆向代码生成 创建maven工程 1.通过idea创建maven工程如下图 2.生成的工程如下 以上我们就完成了一个maven工程&#xff0c;接下来我们改造成springboot项目。 这里主要分为三步&#xff1a;添加依赖&#xff0c;增加配置&…

运行一个jar包

目录 传送门前言一、Window环境二、Linux环境1、第一步&#xff1a;环境配置好&#xff0c;安装好jdk2、第二步&#xff1a;打包jar包并上传到Linux服务器3、第三步&#xff1a;运行jar包 三、docker环境1、Linux下安装docker和docker compose2、Dockerfile方式一运行jar包2.1、…

牛客网刷题 | BC80 奇偶统计

目前主要分为三个专栏&#xff0c;后续还会添加&#xff1a; 专栏如下&#xff1a; C语言刷题解析 C语言系列文章 我的成长经历 感谢阅读&#xff01; 初来乍到&#xff0c;如有错误请指出&#xff0c;感谢&#xff01; 描述 任意输入一个正整数…

迅饶科技 X2Modbus 网关 AddUser 任意用户添加漏洞复现

0x01 免责声明 请勿利用文章内的相关技术从事非法测试&#xff0c;由于传播、利用此文所提供的信息而造成的任何直接或者间接的后果及损失&#xff0c;均由使用者本人负责&#xff0c;作者不为此承担任何责任。工具来自网络&#xff0c;安全性自测&#xff0c;如有侵权请联系删…

Python运维-文本处理、系统和文件信息监控、外部命令

本节主要目录如下&#xff1a; 一、文本处理 1.1、Python编码解码 1.2、文件操作 1.3、读写配置文件 1.4、解析XML文件 二、系统信息监控 2.1、监控CPU信息 2.2、监控内存信息 2.3、监控磁盘信息 2.4、监控网络信息 2.5、获取进程信息 2.6、实例&#xff1a;常见的…

【知识点随笔分享 | 第十篇】快速介绍一致性Hash算法

前言&#xff1a; 在分布式系统中&#xff0c;数据的分布和负载均衡是至关重要的问题。一致性哈希算法是一种解决这些挑战的有效工具&#xff0c;它在分布式存储、负载均衡和缓存系统等领域得到了广泛应用。 随着互联网规模的不断扩大&#xff0c;传统的哈希算法在面对大规模…

cmake进阶:变量的作用域(目录作用域与全局作用域)

一. 简介 前面从函数作用域方面学习了变量的作用域&#xff0c;本文从目录作用域方面来学习变量的作用域。 二. cmake进阶&#xff1a;从目录作用域方面学习变量的作用域 1. 目录作用域 什么是目录作用域&#xff1f; 我把这个作用域叫做目录作用域。子目录会将父目录的所…

Web3 ETF软件系统的主要功能

下面是Web3 ETF系统软件的主要功能&#xff0c;这些功能共同构成了Web3 ETF系统软件的核心&#xff0c;使其能够有效地为投资者提供Web3技术相关的投资机会&#xff0c;同时确保合规性、安全性和透明度。北京木奇移动软件有限公司&#xff0c;专业的软件外包开发公司&#xff0…

【Git】Git学习-10-11:GitHub,SHH配置,克隆仓库

学习视频链接&#xff1a;【GeekHour】一小时Git教程_哔哩哔哩_bilibilihttps://www.bilibili.com/video/BV1HM411377j/?vd_source95dda35ac10d1ae6785cc7006f365780 创建仓库 配置SSH密钥可以更加安全&#xff0c;方便地推送、拉取代码 根目录下&#xff0c;进入.ssh文件&am…

【C语言】——联合体与枚举

【C语言】——联合体与枚举 一、联合体1.1、联合体类型的声明1.2、联合体的特点1.3、相同成员的结构体和联合体对比1.4、联合体的大小计算1.5、联合体的应用举例 二、枚举2.1、枚举类型的声明2.2、枚举类型的优点 一、联合体 1.1、联合体类型的声明 联合体也叫做共用体   与…

学习R语言第五天

文章目录 语法学习创建数据的方式绘制图形的方式图形添加颜色如何操作数据的方式数据进行验算的判断加付值的方式修改变量名称的方式判断是否存在缺失值在计算的方式忽略缺失值通过函数的方式忽略缺失值日期处理的方式字符串转化成日期的方式格式化数据框中数据返回当前的日期的…

19_Scala集合概述

文章目录 集合回顾javaScala集合三大类String & StringBuilderScala集合两大类 集合 回顾java scala与Java有所不同 函数式编程语言更侧重集合本身提供的哪些功能&#xff1b; Scala集合三大类 1.Seq 存储有序数据可重复 类比 List 2.Set 存储无序数据不可重复 3.Map…

【算法系列】字符串

目录 leetcode题目 一、最长公共前缀 二、最长回文子串 三、二进制求和 四、字符串相加 五、字符串相乘 六、仅仅反转字母 七、字符串最后一个单词的长度 八、验证回文串 九、反转字符串 十、反转字符串 II 十一、反转字符串中的单词 III leetcode题目 一、最长公…

frp内网穿透服务搭建与使用

frp内网穿透服务搭建与使用 1、frp简介 frp 是一个专注于内网穿透的高性能的反向代理应用&#xff0c;支持 TCP、UDP、HTTP、HTTPS 等多种协议。 可以将内网服务以安全、便捷的方式通过具有公网 IP 节点的中转暴露到公网。frp工作原理 服务端运行&#xff0c;监听一个主端口…

Parts2Whole革新:多参照图定制人像,创新自定义肖像生成框架!

DeepVisionary 每日深度学习前沿科技推送&顶会论文分享&#xff0c;与你一起了解前沿深度学习信息&#xff01; Parts2Whole革新&#xff1a;多参照图定制人像&#xff0c;创新自定义肖像生成框架&#xff01; 引言&#xff1a;探索多条件人像生成的新篇章 在数字内容创作…