代码如下:
# main.py
import functools
import itertools
import json
import os
import pickle
import sys
import time
from pathlib import Path
from typing import Callable, NoReturn, TypeVar
# pip install asynctor httpx rich fastdfs-client tqdm
import tqdm
from asynctor import bulk_gather, run, timeit
from asynctor.tasks import ThreadGroup
from httpx import AsyncClient
from rich import print
from fastdfs_client import FastdfsClient
T = TypeVar("T")
def catch_cost(func: Callable[..., T]) -> Callable[..., tuple[float, T]]:
@functools.wraps(func)
def wrapper(*args, **kw) -> tuple[float, T]:
start = time.time()
rv = func(*args, **kw)
cost = round(time.time() - start, 1)
return cost, rv
return wrapper
@timeit
async def show_result(output: Path, dfs: FastdfsClient) -> None:
"""展示上传结果,验证返回的URL"""
results = json.loads(output.read_bytes())
print("Upload result:")
print(results)
urls = [url for _, url in results]
if not (_nf := os.getenv("NO_FETCH")) or _nf == "0":
# 使用协程并发请求图片URL,验证是否能按预期拿到图片
async with AsyncClient(follow_redirects=True, timeout=80) as client:
checks = (client.get(i) for i in urls)
rs = await bulk_gather(checks, limit=50) # 同一时刻的并行协程数为50
print("URL concurrency result:\nidx\tstatus_code\telapsed\turl\tContentLength")
for i, r in enumerate(rs, 1):
print(
i,
r.status_code,
r.elapsed,
r.url,
len(r.content) if r.status_code == 200 else r.text,
)
else:
print(f"{len(results) = }")
if "-d" in sys.argv or "--delete" in sys.argv:
print("=" * 20)
delete_all(urls, dfs)
@timeit
def delete_all(urls: list[str], dfs: FastdfsClient) -> None:
"""使用多线程批量删除远程文件"""
with ThreadGroup() as tg:
for url in urls:
tg.soonify(catch_cost(dfs.delete_file))(url)
results = tg.results
for res in results:
print(res)
print(f"total={len(results)}; success={sum(isinstance(i, tuple) for i in results)}")
def abort(msg: str) -> NoReturn:
print(f"[red]ERROR:[/red] {msg}")
sys.exit(1)
@timeit
def main() -> None:
total = 10
client = FastdfsClient(["dfs.waketzheng.top"])
if args := sys.argv[1:]:
if (a1 := args[0]).isdigit():
total = int(a1)
elif (p := Path(a1)).is_file():
run(show_result(p, client))
return
else:
abort("Invalid argument `{a1}`! Must be int or filepath.")
d = Path.home() / "Pictures"
assert d.exists(), f"文件夹({d})不存在"
images = list(d.rglob("*.jp*g")) + list(d.rglob("*.JP*G"))
assert images, f"{d}中没有jpeg图片"
# 多线程并发上传文件
with ThreadGroup() as tg:
for index, p in tqdm.tqdm(zip(range(total), itertools.cycle(images))):
tg.soonify(catch_cost(client.upload_as_url))(p.read_bytes())
try:
res = json.dumps(tg.results)
except TypeError:
print(tg.results)
success = [i for i in tg.results if isinstance(i, tuple)]
print(f"total={len(tg.results)}; success={len(success)}")
p = Path("err.pickle")
size = p.write_bytes(pickle.dumps(tg.results))
print(f"Failed to dump results: Write err info to {p} with {size=}")
res = json.dumps(success)
(p := Path("output.json")).write_text(res)
print(f"{total = }\nSave results to '{p}'.")
if "--show" in args:
run(show_result(p, client))
if __name__ == "__main__":
main()
使用:
python main.py 120 --show