代码如下:
# main.py
import functools
import itertools
import json
import os
import sys
import time
from pathlib import Path
from typing import Callable, TypeVar# pip install asynctor httpx rich fastdfs-client
from asynctor import bulk_gather, run, timeit
from asynctor.tasks import ThreadGroup
from httpx import AsyncClienttry:from rich import print
except ImportError:...from fastdfs_client import FastdfsClientT = TypeVar("T")def catch_cost(func: Callable[..., T]) -> Callable[..., tuple[float, T]]:@functools.wraps(func)def wrapper(*args, **kw) -> tuple[float, T]:start = time.time()rv = func(*args, **kw)cost = round(time.time() - start, 1)return cost, rvreturn wrapper@timeit
async def show_result(output: Path, dfs: FastdfsClient) -> None:"""展示上传结果,验证返回的URL"""results = json.loads(output.read_text())print("Upload result:")print(results)urls = [url for _, url in results]if not os.getenv("NO_FETCH"):group_name = "group1"group_name_new = os.getenv("NEW_GROUP_NAME", group_name)changes = (group_name, group_name_new)# 使用协程并发请求图片URL,验证是否能按预期拿到图片async with AsyncClient(follow_redirects=True) as client:rs = await bulk_gather(client.get(url.replace(*changes)) for url in urls)print("URL concurrency result:\nstatus_code\telapsed\turl\tContentLength")for r in rs:print(r.status_code,r.elapsed,r.url,len(r.content) if r.status_code == 200 else r.text,)if "-d" in sys.argv or "--delete" in sys.argv:print("=" * 20)delete_all(urls, dfs)@timeit
def delete_all(urls: list[str], dfs: FastdfsClient) -> None:"""使用多线程批量删除远程文件"""with ThreadGroup() as tg:for url in urls:tg.soonify(catch_cost(dfs.delete_file))(url)for result in tg.results:print(result)@timeit
def main() -> None:total = 10client = FastdfsClient(["dfs.waketzheng.top"])if args := sys.argv[1:]:if (a1 := args[0]).isdigit():total = int(a1)elif (p := Path(a1)).is_file():run(show_result(p, client))returnd = Path.home() / "Pictures"assert d.exists(), f'文件夹({d})不存在'images = list(d.rglob("*.jp*g")) + list(d.rglob("*.JP*G"))assert images, f"{d}中没有jpeg图片"# 多进程并发上传文件with ThreadGroup() as tg:for index, p in zip(range(total), itertools.cycle(images)):tg.soonify(catch_cost(client.upload_as_url))(p.read_bytes())(p := Path("output.json")).write_text(json.dumps(tg.results))print(f"{total = }")if "--show" in args:run(show_result(p, client))if __name__ == "__main__":main()
使用:
python main.py --show