上图
import os
import time
import threading
import requests
import subprocess
import importlib
import tkinter as tk
from tkinter import ttk, messagebox, scrolledtext
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import urljoinclass PackageInstallerApp:def __init__(self, root):self.root = rootself.root.title("Design By Tim")self.root.geometry("800x600")# 默认设置self.install_dir = r"C:\Users\AAA\Python_Package"self.default_packages = ["numpy", "opencv-python", "pyttsx3"]self.mirrors = ["https://pypi.tuna.tsinghua.edu.cn/simple/","https://mirrors.aliyun.com/pypi/simple/","https://pypi.mirrors.ustc.edu.cn/simple/","https://mirrors.bfsu.edu.cn/pypi/web/simple/","https://mirrors.cloud.tencent.com/pypi/simple/","https://mirrors.nju.edu.cn/pypi/web/simple/","https://mirrors.hit.edu.cn/pypi/web/simple/","https://mirror.sjtu.edu.cn/pypi/web/simple/","https://pypi.doubanio.com/simple/","https://mirrors.zju.edu.cn/pypi/web/simple/","https://mirrors.pku.edu.cn/pypi/simple/","https://mirrors.yun-idc.com/pypi/simple/","https://mirrors.neusoft.edu.cn/pypi/web/simple/","https://mirrors.xjtu.edu.cn/pypi/web/simple/","https://mirrors.huaweicloud.com/repository/pypi/simple/"]# UI控件引用self.start_button = Noneself.cancel_button = None# 创建UIself.create_widgets()def create_widgets(self):# 主框架main_frame = ttk.Frame(self.root, padding="10")main_frame.pack(fill=tk.BOTH, expand=True)# 包列表输入ttk.Label(main_frame, text="要下载的包(逗号分隔):").grid(row=0, column=0, sticky=tk.W)self.pkg_entry = ttk.Entry(main_frame, width=50)self.pkg_entry.grid(row=0, column=1, sticky=tk.EW)self.pkg_entry.insert(0, ",".join(self.default_packages))# 安装目录ttk.Label(main_frame, text="安装目录:").grid(row=1, column=0, sticky=tk.W)self.dir_entry = ttk.Entry(main_frame, width=50)self.dir_entry.grid(row=1, column=1, sticky=tk.EW)self.dir_entry.insert(0, self.install_dir)# 按钮框架btn_frame = ttk.Frame(main_frame)btn_frame.grid(row=2, column=0, columnspan=2, pady=10)# 开始按钮self.start_button = ttk.Button(btn_frame, text="开始下载安装", command=self.start_process)self.start_button.pack(side=tk.LEFT, padx=5)# 取消按钮self.cancel_button = ttk.Button(btn_frame, text="取消", command=self.root.quit)self.cancel_button.pack(side=tk.LEFT, padx=5)# 进度条self.progress = ttk.Progressbar(main_frame, orient=tk.HORIZONTAL, length=500, mode='determinate')self.progress.grid(row=3, column=0, columnspan=2, pady=10)# 状态标签self.status_label = ttk.Label(main_frame, text="准备就绪")self.status_label.grid(row=4, column=0, columnspan=2)# 日志输出ttk.Label(main_frame, text="进度日志:").grid(row=5, column=0, sticky=tk.W)self.log_text = scrolledtext.ScrolledText(main_frame, width=80, height=20, state='normal')self.log_text.grid(row=6, column=0, columnspan=2, sticky=tk.NSEW)# 配置网格权重main_frame.columnconfigure(1, weight=1)main_frame.rowconfigure(6, weight=1)def log_message(self, message):self.log_text.config(state='normal')self.log_text.insert(tk.END, message + "\n")self.log_text.config(state='disabled')self.log_text.see(tk.END)self.root.update()def update_progress(self, value):self.progress['value'] = valueself.root.update()def update_status(self, message):self.status_label.config(text=message)self.root.update()def set_ui_state(self, enabled):state = tk.NORMAL if enabled else tk.DISABLEDself.start_button.config(state=state)self.cancel_button.config(state=state)self.root.update()def start_process(self):# 获取用户输入packages = [pkg.strip() for pkg in self.pkg_entry.get().split(",") if pkg.strip()]self.install_dir = self.dir_entry.get().strip()if not packages:messagebox.showerror("错误", "请输入至少一个包名")returnos.makedirs(self.install_dir, exist_ok=True)# 禁用UIself.set_ui_state(False)self.log_text.config(state='normal')self.log_text.delete(1.0, tk.END)self.log_text.config(state='disabled')self.update_progress(0)# 开始下载安装流程threading.Thread(target=self.download_and_install, args=(packages,), daemon=True).start()def download_and_install(self, packages):overall_success = True # 跟踪整体成功状态try:# 1. 测试镜像源速度self.update_status("正在测试镜像源速度...")self.log_message("="*50)self.log_message("开始测试镜像源速度")fastest_mirrors = self.find_fastest_mirrors()# 2. 下载包self.update_status("开始下载包...")self.log_message("="*50)self.log_message("开始下载包")downloaded_files = []total_packages = len(packages)download_success = Truefor i, package in enumerate(packages):self.update_progress((i/total_packages)*50)mirror = fastest_mirrors[i % len(fastest_mirrors)]success, files = self.download_package(mirror, package)if not success:download_success = Falseoverall_success = Falseself.log_message(f"⚠️ 包 {package} 下载失败,将继续尝试其他包")else:downloaded_files.extend(files)if not download_success:self.log_message("警告: 部分包下载失败")# 3. 安装包self.update_status("开始安装包...")self.log_message("="*50)self.log_message("开始安装包")install_success = Truefor i, file in enumerate(downloaded_files):self.update_progress(50 + (i/len(downloaded_files))*40)if not self.install_package(file):install_success = Falseoverall_success = Falseself.log_message(f"⚠️ 文件 {file} 安装失败")if not install_success:self.log_message("警告: 部分包安装失败")# 4. 验证安装self.update_status("验证安装...")self.log_message("="*50)self.log_message("开始验证安装")verify_success = Truefor i, package in enumerate(packages):self.update_progress(90 + (i/len(packages))*10)if not self.test_installation(package):verify_success = Falseoverall_success = Falseself.log_message(f"⚠️ 包 {package} 验证失败")if not verify_success:self.log_message("警告: 部分包验证失败")self.update_progress(100)# 显示最终结果if overall_success:self.update_status("所有操作成功完成!")self.log_message("="*50)self.log_message("✅ 所有包下载安装完成并验证成功!")messagebox.showinfo("完成", "所有包下载安装完成并验证成功!")else:self.update_status("操作完成,但有错误发生")self.log_message("="*50)self.log_message("⚠️ 操作完成,但部分步骤失败,请检查日志")messagebox.showwarning("完成但有错误", "操作完成,但部分步骤失败,请检查日志了解详情")except Exception as e:self.log_message(f"❌ 发生严重错误: {str(e)}")self.update_status("操作因错误中止")messagebox.showerror("错误", f"处理过程中发生严重错误: {str(e)}")overall_success = Falsefinally:# 重新启用UIself.set_ui_state(True)def find_fastest_mirrors(self):"""找出最快的3个镜像源"""with ThreadPoolExecutor(max_workers=15) as executor:futures = [executor.submit(self.test_mirror_speed, mirror) for mirror in self.mirrors]results = []for future in as_completed(futures):speed, mirror = future.result()if speed != float('inf'):results.append((speed, mirror))self.log_message(f"测试镜像源 {mirror} 速度: {speed:.2f}秒")results.sort()fastest_mirrors = [mirror for speed, mirror in results[:3]]self.log_message(f"最快的3个镜像源: {', '.join(fastest_mirrors)}")return fastest_mirrorsdef test_mirror_speed(self, mirror):"""测试镜像源速度"""try:start = time.time()response = requests.get(urljoin(mirror, "simple/"), timeout=5)if response.status_code == 200:return time.time() - start, mirrorexcept:passreturn float('inf'), mirrordef download_package(self, mirror, package):"""下载单个包"""try:self.log_message(f"正在从 {mirror} 下载 {package}...")cmd = ["python", "-m", "pip", "download", package, "-d", self.install_dir, "-i", mirror, "--trusted-host", mirror.split('//')[1].split('/')[0]]process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)stdout, stderr = process.communicate()if process.returncode == 0:# 获取下载的文件列表files = [f for f in os.listdir(self.install_dir) if f.startswith(package.replace("-", "_"))]self.log_message(f"✅ 成功下载 {package}: {', '.join(files)}")return True, fileselse:self.log_message(f"❌ 下载 {package} 失败: {stderr.strip()}")return False, []except Exception as e:self.log_message(f"❌ 下载 {package} 时发生错误: {str(e)}")return False, []def install_package(self, filename):"""安装单个包"""try:filepath = os.path.join(self.install_dir, filename)self.log_message(f"正在安装 {filename}...")cmd = ["python", "-m", "pip", "install", filepath]process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)stdout, stderr = process.communicate()if process.returncode == 0:self.log_message(f"✅ 成功安装 {filename}")return Trueelse:self.log_message(f"❌ 安装 {filename} 失败: {stderr.strip()}")return Falseexcept Exception as e:self.log_message(f"❌ 安装 {filename} 时发生错误: {str(e)}")return Falsedef test_installation(self, package):"""测试包是否安装成功"""try:# 转换包名(如opencv-python -> opencv_python)import_name = package.replace("-", "_")self.log_message(f"正在验证 {package} 是否可以导入...")module = importlib.import_module(import_name)version = getattr(module, "__version__", "未知版本")self.log_message(f"✅ 验证成功: {package} (版本: {version})")return Trueexcept Exception as e:self.log_message(f"❌ 验证失败: 无法导入 {package} - {str(e)}")return Falseif __name__ == "__main__":root = tk.Tk()app = PackageInstallerApp(root)root.mainloop()