什么是断点续传
简单来说断点续传指的是文件在上传或下载的过程中,由于网络差断开了,那么下次上传或下载时应该从断点处开始。
怎么实现
- 前端对文件进行分块
- 前端使用多线程一块一块上传,上传前给服务端发一个消息检验该分块是否上传,如果已上传则不再上传。
- 等所有分块上传完毕,服务端合并所有分块,校验文件的完整性。(
因为分块全部上传到了服务器,服务器将所有分块按顺序进行合并,就是写每个分块文件内容按顺序依次写入一个文件中。) - 前端给服务传一个md5值,服务端合并文件后计算合并后的文件的md5是否一样,一样说明完整,否则不完整,需要重新上传。
此外针对文件上传一半不传了,之前上传到的minio 分块文件需要清理
- 在数据库中记录minio存储的文件信息的文件表
- 文件开始上传时写入文件表,状态为上传中,上传成功更新状态为上传完成
- 当一个文件传了一半不在上传了说明该文件没有上传完成,会有定时任务去查询文件表中的记录,如果文件未上传完成则删除minio 中没有上传成功的文件目录。
需要注意的是 minio 合并 每一分块文件最少是5MB 否则会报错,此外上传分块的文件如果大于1MB,SpringBoot不允许 默认是1MB 因此通过以下配置修改为50MB。
#文件上传大小默认1M 改为50M
spring:servlet:multipart:max-file-size: 50MBmaxrequest-size: 50MB
大文件分块 合并 本地单元测试
package com.jhj.media;import io.minio.ComposeObjectArgs;
import io.minio.ComposeSource;
import io.minio.MinioClient;
import io.minio.UploadObjectArgs;
import io.minio.errors.*;
import org.apache.commons.codec.digest.DigestUtils;
import org.junit.jupiter.api.Test;import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;/*** @author jhj* @version 1.0.0* @ClassName BigFileTest.java* @Description TODO* @createTime 2024年06月23日 00:35:00*/
public class BigFileTest {MinioClient minioClient = MinioClient.builder().endpoint("http://192.168.56.200:49160").credentials("minio", "12345678").build();//分块测试@Testpublic void testChunk() throws IOException {//源文件File sourceFile = new File("G:\\1.mp4");//分块文件存储路径String chunkFilePath = "G:\\chunk\\";//分块文件大小int chunkSize = 1024 * 1024 * 5;//分块文件的个数int chunkNum = (int) Math.ceil(sourceFile.length() * 1.0 / chunkSize);//使用流对源文件读数据,向分块文件中写数据RandomAccessFile raf_r = new RandomAccessFile(sourceFile, "r");//缓存区 临时存文件数据的byte[] bytes = new byte[1024];for (int i = 0; i < chunkNum; i++) {File chunkFile = new File(chunkFilePath + i);//分块文件的写入流RandomAccessFile raf_rw = new RandomAccessFile(chunkFile, "rw");int len = -1;while ((len = raf_r.read(bytes)) != -1) {raf_rw.write(bytes, 0, len);if (chunkFile.length() >= chunkSize) {break;}}raf_rw.close();}raf_r.close();}@Testpublic void testMerge() throws IOException {//分块路径String chunkFilePath = "G:\\chunk\\";File chunkFolder = new File(chunkFilePath);//源文件File sourceFile = new File("G:\\1.mp4");//合并后的文件File mergeFile = new File("G:\\2.mp4");//取出所有的分块文件File[] files = chunkFolder.listFiles();//将数组转为listList<File> fileList = Arrays.asList(files);Collections.sort(fileList, new Comparator<File>() {@Overridepublic int compare(File o1, File o2) {return Integer.parseInt(o1.getName()) -Integer.parseInt(o2.getName());}});RandomAccessFile raf_rw = new RandomAccessFile(mergeFile, "rw");byte[] bytes = new byte[1024];for (File file : fileList) {//读RandomAccessFile raf_r = new RandomAccessFile(file, "r");int len = -1;while ((len = raf_r.read(bytes)) != -1) {raf_rw.write(bytes, 0, len);}raf_r.close();}raf_rw.close();//合并文件完成,进行校验,判断有没有丢包String s = DigestUtils.md5Hex(new FileInputStream(mergeFile));String s1 = DigestUtils.md5Hex(new FileInputStream(sourceFile));if (s.equals(s1)) {System.out.println("合并成功");}}//将分块文件上传到minio@Testpublic void uploadChunk() throws IOException, ServerException, InsufficientDataException, ErrorResponseException, NoSuchAlgorithmException, InvalidKeyException, InvalidResponseException, XmlParserException, InternalException {for (int i = 0; i <= 254; i++) {minioClient.uploadObject(UploadObjectArgs.builder().bucket("testbucket").filename("G:/chunk/" + i)//指定本地文件路径.object("chunk/" + i) //桶下路径.build());System.out.println("上传分块" + i + "成功");}}//调用minio 接口合并分块@Testpublic void testMergeFile() throws ServerException, InsufficientDataException, ErrorResponseException, IOException, NoSuchAlgorithmException, InvalidKeyException, InvalidResponseException, XmlParserException, InternalException {//指定分块信息List<ComposeSource> sources = Stream.iterate(0, i -> ++i).limit(255).map(i -> {return ComposeSource.builder().bucket("testbucket").object("chunk/" + i).build();}).collect(Collectors.toList());//合并信息 minio默认合并大小为5MComposeObjectArgs testbucket = ComposeObjectArgs.builder().bucket("testbucket").object("merge01.mp4").sources(sources).build();//合并文件minioClient.composeObject(testbucket);}}
按照业务要求 前端分块 检查分块是否上传 上传分块 合并分块 检验完整性 清除分块 service 实现断点续传
保证文件只被上传一次利用 加密流的md5作为唯一值
package com.jhj.media.service.impl;import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.j256.simplemagic.ContentInfo;
import com.j256.simplemagic.ContentInfoUtil;
import com.jhj.base.exception.UltimateException;
import com.jhj.base.model.PageParams;
import com.jhj.base.model.PageResult;
import com.jhj.base.model.RestResponse;
import com.jhj.media.mapper.MediaFilesMapper;
import com.jhj.media.model.dto.QueryMediaParamsDto;
import com.jhj.media.model.dto.UploadFileParamsDto;
import com.jhj.media.model.dto.UploadFileResultDto;
import com.jhj.media.model.po.MediaFiles;
import com.jhj.media.service.MediaFileService;
import io.minio.*;
import io.minio.messages.DeleteError;
import io.minio.messages.DeleteObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.compress.utils.IOUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;import java.io.*;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;/*** @author jhj* @version 1.0* @description TODO* @date 2022/9/10 8:58*/
@Service
@Slf4j
public class MediaFileServiceImpl implements MediaFileService {@AutowiredMediaFilesMapper mediaFilesMapper;@AutowiredMinioClient minioClient;@Autowired@LazyMediaFileService currentProxy;//存储普通文件@Value("${minio.bucket.files}")private String bucket_mediafiles;//存储视频@Value("${minio.bucket.videofiles}")private String bucket_video;@Overridepublic PageResult<MediaFiles> queryMediaFiels(Long companyId, PageParams pageParams, QueryMediaParamsDto queryMediaParamsDto) {//构建查询条件对象LambdaQueryWrapper<MediaFiles> queryWrapper = new LambdaQueryWrapper<>();//分页对象Page<MediaFiles> page = new Page<>(pageParams.getPageNo(), pageParams.getPageSize());// 查询数据内容获得结果Page<MediaFiles> pageResult = mediaFilesMapper.selectPage(page, queryWrapper);// 获取数据列表List<MediaFiles> list = pageResult.getRecords();// 获取数据总数long total = pageResult.getTotal();// 构建结果集PageResult<MediaFiles> mediaListResult = new PageResult<>(list, total, pageParams.getPageNo(), pageParams.getPageSize());return mediaListResult;}@Override//为什么不在这里加事务 是因为防止minio 网络连接时间长而导致数据库连接超时 占用数据库事务资源public UploadFileResultDto uploadFile(Long companyId, UploadFileParamsDto uploadFileParamsDto, String localFilePath) {//文件名String filename = uploadFileParamsDto.getFilename();//拿到扩展名String extension = filename.substring(filename.lastIndexOf("."));//拿到mimeTypeString mimeType = getMimeType(extension);//文件的md5String fileMd5 = getFileMd5(new File(localFilePath));//目录String defaultFolderPath = getDefaultFolderPath();String objectName = defaultFolderPath + fileMd5 + extension;//将文件上传Minioboolean result = addMediaFilesToMinio(localFilePath, mimeType, bucket_mediafiles, objectName);if(!result){UltimateException.cast("上传文件失败");}//将文件信息保存到数据库MediaFiles mediaFiles = currentProxy.addMediaFilesToDb(companyId, fileMd5, uploadFileParamsDto, bucket_mediafiles, objectName);if(mediaFiles==null){UltimateException.cast("文件上传后,保存信息失败");}//返回对象UploadFileResultDto uploadFileResultDto = new UploadFileResultDto();BeanUtils.copyProperties(mediaFiles,uploadFileResultDto);return uploadFileResultDto;}//根据扩展名获取mimeTypeprivate String getMimeType(String extension) {if (extension == null) {extension = "";}ContentInfo extensionMatch = ContentInfoUtil.findExtensionMatch(extension);String mimeType = MediaType.APPLICATION_OCTET_STREAM_VALUE; //通用mimeTypeif (extensionMatch != null) {mimeType = extensionMatch.getMimeType();}return mimeType;}/*** 将文件上传到minio* @param localFilePath 本地路径* @param mimeType 媒体类型* @param bucket 桶* @param objectName 文件名* @return*/private boolean addMediaFilesToMinio(String localFilePath, String mimeType, String bucket, String objectName) {try {minioClient.uploadObject(UploadObjectArgs.builder().bucket(bucket).filename(localFilePath).object(objectName).contentType(mimeType).bucket(bucket).build());log.debug("上传文件到minio成功,bucket:{},objectName:{}",bucket,objectName);return true;} catch (Exception e) {e.printStackTrace();log.error("上传文件出错,bucket:{},objectName:{},错误信息:{}",bucket,objectName,e.getMessage());}return false;}private String getDefaultFolderPath(){SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");String folder = sdf.format(new Date()).replace("-", "/") + "/";return folder;}private String getFileMd5(File file){try(FileInputStream fileInputStream=new FileInputStream(file)){String fileMd5 = DigestUtils.md5Hex(fileInputStream);return fileMd5;}catch (Exception e){e.printStackTrace();return null;}}/*** 将文件信息保存到数据库* @param companyId 机构id* @param fileMd5 md5* @param uploadFileParamsDto 文件信息* @param bucket 桶信息* @param objectName 路径* @return*/@Transactional//事务只能用于public 方法 原理是Aop jdk cglib 两种方式 访问不了private//注意看是不是被代理对象调用 如果不是 则事务不会生效//同serive 调用同类的事务方法 事务是无法控制的 因为用的是this 并不是代理对象//注入进去的都是代理对象 this就是对象本身 解决办法 把service 在本类中进行注入public MediaFiles addMediaFilesToDb(Long companyId, String fileMd5, UploadFileParamsDto uploadFileParamsDto,String bucket,String objectName){MediaFiles mediaFiles = mediaFilesMapper.selectById(fileMd5);if (mediaFiles == null){mediaFiles=new MediaFiles();BeanUtils.copyProperties(uploadFileParamsDto,mediaFiles);//文件idmediaFiles.setId(fileMd5);mediaFiles.setFileId(fileMd5);//机构idmediaFiles.setCompanyId(companyId);//桶mediaFiles.setBucket(bucket);//file_pathmediaFiles.setFilePath(objectName);//urlmediaFiles.setUrl("/"+bucket+"/"+objectName);//上传时间mediaFiles.setCreateDate(LocalDateTime.now());//状态mediaFiles.setStatus("1");//审核状态mediaFiles.setAuditStatus("002003");//插入数据库int insert = mediaFilesMapper.insert(mediaFiles);int i=1/0;if(insert<0){log.error("向数据库保存文件失败,bucket:{},objectName:{}",bucket,objectName);return null;}return mediaFiles;}return mediaFiles;}@Overridepublic RestResponse<Boolean> checkFile(String fileMd5) {//先查询数据库MediaFiles mediaFiles = mediaFilesMapper.selectById(fileMd5);//如果数据库存在再查询 minioif(mediaFiles!=null){String bucket = mediaFiles.getBucket();String filePath = mediaFiles.getFilePath();GetObjectArgs getObjectArgs = GetObjectArgs.builder().bucket(bucket).object(filePath).build();try {FilterInputStream inputStream = minioClient.getObject(getObjectArgs);if(inputStream!=null){//文件已经存在了return RestResponse.success(true);}} catch (Exception e) {e.printStackTrace();}}return RestResponse.success(false);}@Overridepublic RestResponse<Boolean> checkChunk(String fileMd5, int chunkIndex) {String chunkFileFolderPath = getChunkFileFolderPath(fileMd5)+chunkIndex;//md5前两位为两个目录,chunk存储分块文件GetObjectArgs getObjectArgs = GetObjectArgs.builder().bucket(bucket_video).object(chunkFileFolderPath).build();try {FilterInputStream inputStream = minioClient.getObject(getObjectArgs);if(inputStream!=null){//文件已经存在了return RestResponse.success(true);}} catch (Exception e) {e.printStackTrace();}return RestResponse.success(false);}@Overridepublic RestResponse uploadChunk(String fileMd5, int chunk, String localChunkFilePath) {String mimeType = getMimeType(null);boolean b = addMediaFilesToMinio(localChunkFilePath, mimeType, bucket_video, getChunkFileFolderPath(fileMd5) + chunk);if(!b){return RestResponse.validfail("上传分块文件失败",false);}return RestResponse.success(true);}@Overridepublic RestResponse mergechunks(Long companyId, String fileMd5, int chunkTotal, UploadFileParamsDto uploadFileParamsDto) {// 找到分块文件调用minio的sdk进行文件合并// 分块文件所在目录String chunkFileFolderPath = getChunkFileFolderPath(fileMd5);// 找到所有的分块文件List<ComposeSource> sources = Stream.iterate(0, i -> ++i).limit(chunkTotal).map(i -> {return ComposeSource.builder().bucket(bucket_video).object(chunkFileFolderPath + i).build();}).collect(Collectors.toList());String filename = uploadFileParamsDto.getFilename();String extension = filename.substring(filename.lastIndexOf("."));//合并后的文件名String filePathByMd5 = getFilePathByMd5(fileMd5, extension);// 合并信息 minio默认合并大小为5MComposeObjectArgs testbucket = ComposeObjectArgs.builder().bucket(bucket_video).object(filePathByMd5) //合并后的文件.sources(sources).build();//合并文件try {minioClient.composeObject(testbucket);} catch (Exception e) {e.printStackTrace();log.error("合并文件出错,bucket:{},onjectName:{},错误信息:{}",bucket_video,filePathByMd5,e.getMessage());return RestResponse.validfail("合并文件异常",false);}//校验合并后的和源文件是否一致,视频上传才成功File file = downloadFileFromMinIo(bucket_video, filePathByMd5);try(FileInputStream fileInputStream = new FileInputStream(file)){String mergeFile_md5 = DigestUtils.md5Hex(fileInputStream);//比较原始md5和合并后的if(!fileMd5.equals(mergeFile_md5)){log.error("校验合并文件md5值不一致,原始文件:{},合并文件:{}",fileMd5,mergeFile_md5);return RestResponse.validfail("文件校验失败",false);}//文件大小uploadFileParamsDto.setFileSize(file.length());}catch (Exception e){return RestResponse.validfail("文件校验失败",false);}//将文件信息入库MediaFiles mediaFiles = currentProxy.addMediaFilesToDb(companyId, fileMd5, uploadFileParamsDto, bucket_video, filePathByMd5);if(mediaFiles==null){return RestResponse.validfail("文件入库失败",false);}//清理分块文件clearChunkFiles(chunkFileFolderPath,chunkTotal);return RestResponse.success(true);}//得到分块目录private String getChunkFileFolderPath(String fileMd5){return fileMd5.substring(0,1)+"/"+fileMd5.substring(1,2)+"/"+fileMd5+"/"+"chunk"+"/";}//得到合并后的文件名private String getFilePathByMd5(String fileMd5,String fileExt){return fileMd5.substring(0,1)+"/"+fileMd5.substring(1,2)+"/"+fileMd5+"/"+fileMd5+fileExt;}//下载minio 文件public File downloadFileFromMinIo(String bucket,String objectName){File minioFile=null;FileOutputStream outputStream=null;try {InputStream inputStream = minioClient.getObject(GetObjectArgs.builder().bucket(bucket).object(objectName).build());minioFile=File.createTempFile("minio",".merge");outputStream = new FileOutputStream(minioFile);IOUtils.copy(inputStream,outputStream);return minioFile;} catch (Exception e) {e.printStackTrace();}finally {try {outputStream.close();} catch (IOException e) {e.printStackTrace();}}return minioFile;}//清除分块文件private void clearChunkFiles(String chunkFileFolderPath, int chunkTotal){Iterable<DeleteObject> objects=Stream.iterate(0,i->++i).limit(chunkTotal).map(i->{return new DeleteObject(chunkFileFolderPath+i);}).collect(Collectors.toList());RemoveObjectsArgs removeObjectArgs = RemoveObjectsArgs.builder().bucket(bucket_video).objects(objects).build();Iterable<Result<DeleteError>> results = minioClient.removeObjects(removeObjectArgs);//要想真正的删除results.forEach(f->{try {DeleteError deleteError = f.get();} catch (Exception e){e.printStackTrace();}});}
}
作者声明
如有问题,欢迎指正!