🏷️个人主页:牵着猫散步的鼠鼠
🏷️系列专栏:Java全栈-专栏
🏷️个人学习笔记,若有缺误,欢迎评论区指正
目录
前言
1 基础知识回顾
1.1 线程的创建和启动
1.2 线程池的使用
2.运行环境说明
3.核心模块实现
3.1下载线程的实现
3.2日志线程的实现
3.3相关工具类的实现
3.4核心业务实现
4.功能测试
总结
前言
在当今快节奏的数字时代,大文件的下载已经成为我们日常生活中不可或缺的一部分。然而,传统的单线程下载器在面临大文件时往往显得力不从心,下载速度缓慢,用户体验不佳。
老读者应该知道,我最近在研究Java多线程并发编程这一块的内容,故想要编写一个多线程下载工具,一是为了知识的落地实践,二是可以将这个工具运用到平时下载大文件的地方。
1 基础知识回顾
为了照顾一些新来的小伙伴,我这里简单讲解一下在Java中一些常用的多线程实现
1.1 线程的创建和启动
在Java中,线程可以通过以下几种方式创建和启动一个新的线程:
继承Thread类:自定义一个类,继承自Thread类,并重写run()方法。
创建线程对象并调用start()方法启动线程。
public class MyThread extends Thread {@Overridepublic void run() {// 线程执行的代码}
}
实现Runnable接口:自定义一个类,实现Runnable接口,并重写run()方法。
创建Runnable对象,并将其传递给Thread对象,然后调用start()方法。
public class MyRunnable implements Runnable {@Overridepublic void run() {// 线程执行的代码}
}
使用ExecutorService:这是一个更高级的方式,用于管理线程池。本次的多线程下载器主要用的就是这种方式实现
ExecutorService executor = Executors.newFixedThreadPool(5);
executor.execute(new MyRunnable());
1.2 线程池的使用
线程池是一种管理线程的更高效的方式,可以避免频繁创建和销毁线程的开销。Java中使用ExecutorService接口来管理线程池。
固定大小的线程池:
创建一个固定大小的线程池,最多同时运行5个线程。
ExecutorService executor = Executors.newFixedThreadPool(5);
单线程的Executor:
创建一个只有一个线程的线程池,所有任务按顺序执行。
ExecutorService executor = Executors.newSingleThreadExecutor();
缓存线程池:
创建一个可以根据需要创建新线程的线程池,适合执行短期异步任务。
ExecutorService executor = Executors.newCachedThreadPool();
计划任务的ScheduledExecutorService:
创建一个可以定时执行任务的线程池,适合执行周期性任务。
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);
scheduler.scheduleAtFixedRate(new MyRunnable(), 0, 10, TimeUnit.SECONDS);
在本次案例中,使用了Executors.newFixedThreadPool(DOWNLOAD_THREAD_NUM + 1)来创建一个固定大小的线程池,用于管理下载任务和日志线程。这种方式确保了线程的复用,并且能够有效地控制线程的数量。
2.运行环境说明
Maven依赖如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.wdbyte</groupId><artifactId>down-bit</artifactId><version>1.0-SNAPSHOT</version><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.1.0</version><configuration><archive><manifest><mainClass>com.wdbyte.downbit.DownloadMain</mainClass></manifest></archive><descriptorRefs><!-- 这个jar-with-dependencies是assembly预先写好的一个,组装描述引用 --><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><!--工程名--><finalName>${project.name}</finalName></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>
</project>
项目整体结构如下
3.核心模块实现
3.1下载线程的实现
/*** 多线程下载工具类* @author 牵着猫散步的鼠鼠-LiuShiJie*/
public class DownloadThread implements Callable<Boolean> {/*** 每次读取的数据块大小*/private static int BYTE_SIZE = 1024 * 100;/*** 下载链接*/private String url;/*** 下载开始位置*/private long startPos;/*** 要下载的文件区块大小*/private Long endPos;/*** 标识多线程下载切分的第几部分*/private Integer part;/*** 文件总大小*/private Long contentLenth;public DownloadThread(String url, long startPos, Long endPos, Integer part, Long contentLenth) {this.url = url;this.startPos = startPos;this.endPos = endPos;this.part = part;this.contentLenth = contentLenth;}@Overridepublic Boolean call() throws Exception {if (url == null || url.trim() == "") {throw new RuntimeException("下载路径不正确");}// 文件名String httpFileName = HttpUtls.getHttpFileName(url);if (part != null) {httpFileName = httpFileName + DownloadMain.FILE_TEMP_SUFFIX + part;}// 本地文件大小Long localFileContentLength = FileUtils.getFileContentLength(httpFileName);LogThread.LOCAL_FINISH_SIZE.addAndGet(localFileContentLength);if (localFileContentLength >= endPos - startPos) {LogUtils.info("{} 已经下载完毕,无需重复下载", httpFileName);LogThread.DOWNLOAD_FINISH_THREAD.addAndGet(1);return true;}if (endPos.equals(contentLenth)) {endPos = null;}HttpURLConnection httpUrlConnection = HttpUtls.getHttpUrlConnection(url, startPos + localFileContentLength, endPos);// 获得输入流try (InputStream input = httpUrlConnection.getInputStream(); BufferedInputStream bis = new BufferedInputStream(input);RandomAccessFile oSavedFile = new RandomAccessFile(httpFileName, "rw")) {oSavedFile.seek(localFileContentLength);byte[] buffer = new byte[BYTE_SIZE];int len = -1;// 读到文件末尾则返回-1while ((len = bis.read(buffer)) != -1) {oSavedFile.write(buffer, 0, len);LogThread.DOWNLOAD_SIZE.addAndGet(len);}} catch (FileNotFoundException e) {LogUtils.error("ERROR! 要下载的文件路径不存在 {} ", url);return false;} catch (Exception e) {LogUtils.error("下载出现异常");e.printStackTrace();return false;} finally {httpUrlConnection.disconnect();LogThread.DOWNLOAD_FINISH_THREAD.addAndGet(1);}return true;}}
3.2日志线程的实现
/*** 多线程下载日志记录* @author 牵着猫散步的鼠鼠-LiuShiJie*/
public class LogThread implements Callable<Boolean> {// 本地下载的文件大小public static AtomicLong LOCAL_FINISH_SIZE = new AtomicLong();// 已经下载的文件大小public static AtomicLong DOWNLOAD_SIZE = new AtomicLong();// 下载完成的线程数public static AtomicLong DOWNLOAD_FINISH_THREAD = new AtomicLong();// 待下载的文件总大小private long httpFileContentLength;public LogThread(long httpFileContentLength) {this.httpFileContentLength = httpFileContentLength;}@Overridepublic Boolean call() throws Exception {int[] downSizeArr = new int[5];int i = 0;double size = 0;double mb = 1024d * 1024d;// 文件总大小String httpFileSize = String.format("%.2f", httpFileContentLength / mb);while (DOWNLOAD_FINISH_THREAD.get() != DownloadMain.DOWNLOAD_THREAD_NUM) {double downloadSize = DOWNLOAD_SIZE.get();downSizeArr[++i % 5] = Double.valueOf(downloadSize - size).intValue();size = downloadSize;// 每秒速度double fiveSecDownloadSize = Arrays.stream(downSizeArr).sum();int speed = (int)((fiveSecDownloadSize / 1024d) / (i < 5d ? i : 5d));// 剩余时间double surplusSize = httpFileContentLength - downloadSize - LOCAL_FINISH_SIZE.get();String surplusTime = String.format("%.1f", surplusSize / 1024d / speed);if (surplusTime.equals("Infinity")) {surplusTime = "-";}// 已下大小String currentFileSize = String.format("%.2f", downloadSize / mb + LOCAL_FINISH_SIZE.get() / mb);String speedLog = String.format("> 已下载 %smb/%smb,速度 %skb/s,剩余时间 %ss", currentFileSize, httpFileSize, speed, surplusTime);System.out.print("\r");System.out.print(speedLog);// 一秒更新一次日志Thread.sleep(1000);}System.out.println();return true;}}
3.3相关工具类的实现
文件操作工具类FileUtils ,主要用来获取文件大小长度
public class FileUtils {/*** 获取文件内容长度** @param name* @return*/public static long getFileContentLength(String name) {File file = new File(name);return file.exists() && file.isFile() ? file.length() : 0;}}
网络请求操作工具类HttpUtls,主要是一些常用的Http操作
/*** 网络请求操作工具类* @author 牵着猫散步的鼠鼠-LiuShiJie*/
public class HttpUtls {/*** 获取 HTTP 链接** @param url* @return* @throws IOException*/public static HttpURLConnection getHttpUrlConnection(String url) throws IOException {URL httpUrl = new URL(url);HttpURLConnection httpConnection = (HttpURLConnection)httpUrl.openConnection();httpConnection.setRequestProperty("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.116 Safari/537.36");return httpConnection;}/*** 获取 HTTP 链接** @param url* @param start* @param end* @return* @throws IOException*/public static HttpURLConnection getHttpUrlConnection(String url, long start, Long end) throws IOException {HttpURLConnection httpUrlConnection = getHttpUrlConnection(url);LogUtils.debug("此线程下载内容区间 {}-{}", start, end);if (end != null) {httpUrlConnection.setRequestProperty("RANGE", "bytes=" + start + "-" + end);} else {httpUrlConnection.setRequestProperty("RANGE", "bytes=" + start + "-");}Map<String, List<String>> headerFields = httpUrlConnection.getHeaderFields();for (String s : headerFields.keySet()) {LogUtils.debug("此线程相应头{}:{}", s, headerFields.get(s));}return httpUrlConnection;}/*** 获取网络文件大小 bytes** @param url* @return* @throws IOException*/public static long getHttpFileContentLength(String url) throws IOException {HttpURLConnection httpUrlConnection = getHttpUrlConnection(url);int contentLength = httpUrlConnection.getContentLength();httpUrlConnection.disconnect();return contentLength;}/*** 获取网络文件 Etag** @param url* @return* @throws IOException*/public static String getHttpFileEtag(String url) throws IOException {HttpURLConnection httpUrlConnection = getHttpUrlConnection(url);Map<String, List<String>> headerFields = httpUrlConnection.getHeaderFields();List<String> eTagList = headerFields.get("ETag");httpUrlConnection.disconnect();return eTagList.get(0);}/*** 获取网络文件名** @param url* @return*/public static String getHttpFileName(String url) {int indexOf = url.lastIndexOf("/");return url.substring(indexOf + 1);}
}
日志工具类LogUtils ,主要负责日志格式化输出
/*** 日志工具类,输出日志* @author 牵着猫散步的鼠鼠-LiuShiJie*/
public class LogUtils {public static boolean DEBUG = false;static DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss");public static void info(String msg, Object... arg) {print(msg, " -INFO- ", arg);}public static void error(String msg, Object... arg) {print(msg, " -ERROR-", arg);}public static void debug(String msg, Object... arg) {if (DEBUG) { print(msg, " -DEBUG-", arg); }}private static void print(String msg, String level, Object... arg) {if (arg != null && arg.length > 0) {msg = String.format(msg.replace("{}", "%s"), arg);}String thread = Thread.currentThread().getName();System.out.println(LocalDateTime.now().format(dateTimeFormatter) + " " + thread + level + msg);}
}
迅雷链接转换工具类ThunderUtils ,迅雷链接与普通链接不同,需要转换
/*** 迅雷链接转换工具* @author 牵着猫散步的鼠鼠-LiuShiJie*/
public class ThunderUtils {private static String THUNDER = "thunder://";/*** 判断是否是迅雷链接** @param url* @return*/public static boolean isThunderLink(String url) {return url.startsWith(THUNDER);}/*** 转换成 HTTP URL** @param url* @return*/public static String toHttpUrl(String url) {if (!isThunderLink(url)) {return url;}LogUtils.info("当前链接是迅雷链接,开始转换...");url = url.replaceFirst(THUNDER, "");try {// base 64 转换url = new String(Base64.getDecoder().decode(url.getBytes()), "UTF-8");// url 解码url = URLDecoder.decode(url, "UTF-8");} catch (UnsupportedEncodingException e) {e.printStackTrace();}// 去头去尾if (url.startsWith("AA")) {url = url.substring(2);}if (url.endsWith("ZZ")) {url = url.substring(0, url.length() - 2);}LogUtils.info("当前链接是迅雷链接,转换结果:{}", url);return url;}
}
3.4核心业务实现
/*** 多线程下载* 断点续传下载 demo* @author 牵着猫散步的鼠鼠-LiuShiJie*/
public class DownloadMain {// 下载线程数量public static int DOWNLOAD_THREAD_NUM = 5;// 下载线程池private static ExecutorService executor = Executors.newFixedThreadPool(DOWNLOAD_THREAD_NUM + 1);// 临时文件后缀public static String FILE_TEMP_SUFFIX = ".temp";// 支持的 URL 协议private static HashSet<String> PROTOCAL_SET = new HashSet();static {PROTOCAL_SET.add("thunder://");PROTOCAL_SET.add("http://");PROTOCAL_SET.add("https://");}public static void main(String[] args) throws Exception {Scanner scanner = new Scanner(System.in);System.out.println("请输入要下载的链接:");String url = scanner.nextLine();long count = PROTOCAL_SET.stream().filter(prefix -> url.startsWith(prefix)).count();if (count == 0) {LogUtils.info("不支持的协议类型");return;}LogUtils.info("要下载的链接是:{}", url);new DownloadMain().download(ThunderUtils.toHttpUrl(url));}public void download(String url) throws Exception {String fileName = HttpUtls.getHttpFileName(url);long localFileSize = FileUtils.getFileContentLength(fileName);// 获取网络文件具体大小long httpFileContentLength = HttpUtls.getHttpFileContentLength(url);if (localFileSize >= httpFileContentLength) {LogUtils.info("{}已经下载完毕,无需重新下载", fileName);return;}List<Future<Boolean>> futureList = new ArrayList<>();if (localFileSize > 0) {LogUtils.info("开始断点续传 {}", fileName);} else {LogUtils.info("开始下载文件 {}", fileName);}LogUtils.info("开始下载时间 {}", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss")));long startTime = System.currentTimeMillis();// 任务切分splitDownload(url, futureList);LogThread logThread = new LogThread(httpFileContentLength);Future<Boolean> future = executor.submit(logThread);futureList.add(future);// 开始下载for (Future<Boolean> booleanFuture : futureList) {booleanFuture.get();}LogUtils.info("文件下载完毕 {},本次下载耗时:{}", fileName, (System.currentTimeMillis() - startTime) / 1000 + "s");LogUtils.info("结束下载时间 {}", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss")));// 文件合并boolean merge = merge(fileName);if (merge) {// 清理分段文件clearTemp(fileName);}LogUtils.info("本次文件下载结束,下载位置为" + fileName);System.exit(0);}/*** 切分下载任务到多个线程** @param url* @param futureList* @throws IOException*/public void splitDownload(String url, List<Future<Boolean>> futureList) throws IOException {long httpFileContentLength = HttpUtls.getHttpFileContentLength(url);// 任务切分long size = httpFileContentLength / DOWNLOAD_THREAD_NUM;long lastSize = httpFileContentLength - (httpFileContentLength / DOWNLOAD_THREAD_NUM * (DOWNLOAD_THREAD_NUM - 1));for (int i = 0; i < DOWNLOAD_THREAD_NUM; i++) {long start = i * size;Long downloadWindow = (i == DOWNLOAD_THREAD_NUM - 1) ? lastSize : size;Long end = start + downloadWindow;if (start != 0) {start++;}DownloadThread downloadThread = new DownloadThread(url, start, end, i, httpFileContentLength);Future<Boolean> future = executor.submit(downloadThread);futureList.add(future);}}public boolean merge(String fileName) throws IOException {LogUtils.info("开始合并文件 {}", fileName);byte[] buffer = new byte[1024 * 10];int len = -1;try (RandomAccessFile oSavedFile = new RandomAccessFile(fileName, "rw")) {for (int i = 0; i < DOWNLOAD_THREAD_NUM; i++) {try (BufferedInputStream bis = new BufferedInputStream(new FileInputStream(fileName + FILE_TEMP_SUFFIX + i))) {while ((len = bis.read(buffer)) != -1) { // 读到文件末尾则返回-1oSavedFile.write(buffer, 0, len);}}}LogUtils.info("文件合并完毕 {}", fileName);} catch (Exception e) {e.printStackTrace();return false;}return true;}public boolean clearTemp(String fileName) {LogUtils.info("开始清理临时文件 {}{}0-{}", fileName, FILE_TEMP_SUFFIX, (DOWNLOAD_THREAD_NUM - 1));for (int i = 0; i < DOWNLOAD_THREAD_NUM; i++) {File file = new File(fileName + FILE_TEMP_SUFFIX + i);file.delete();}LogUtils.info("临时文件清理完毕 {}{}0-{}", fileName, FILE_TEMP_SUFFIX, (DOWNLOAD_THREAD_NUM - 1));return true;}/*** 使用CheckedInputStream计算CRC*/public static Long getCRC32(String filepath) throws IOException {InputStream inputStream = new BufferedInputStream(new FileInputStream(filepath));CRC32 crc = new CRC32();byte[] bytes = new byte[1024];int cnt;while ((cnt = inputStream.read(bytes)) != -1) {crc.update(bytes, 0, cnt);}inputStream.close();return crc.getValue();}}
4.功能测试
启动main程序,输入下载链接,我们这里使用腾讯QQ安装包的CDN链接(https://dldir1.qq.com/qqfile/qq/PCQQ9.7.17/QQ9.7.17.29225.exe)来测试下载功能
文件下载成功,在项目上级目录文件完整,能够正常使用
总结
本文通过详细解析一个多线程下载器的实现,深入探讨了Java多线程编程的基础知识、工具类的使用以及核心模块的实现。我们学习了如何创建和启动线程、线程的同步和互斥、以及线程池的使用,这些都是多线程下载器的基础。同时,我们了解了如何使用Java的标准库来实现文件操作、网络请求、日志记录等功能。
希望文章对您的学习有帮助,有时间会继续出Java并发编程相关的内容~