Created
October 21, 2021 07:43
-
-
Save SaidBySolo/6da83b466f8c05504c53964cac9726da to your computer and use it in GitHub Desktop.
fast_download.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from asyncio import run | |
from contextlib import suppress | |
from multiprocessing import freeze_support | |
from os import mkdir | |
from os.path import exists | |
from re import findall | |
from typing import Optional, Union | |
from aiofile import async_open # type: ignore | |
from aiohttp import ClientSession | |
from aiomultiprocess import Pool # type: ignore | |
def solve_shuffle_image_url(shuffled_image_url: str) -> Optional[str]: | |
with suppress(Exception): | |
solve_regex: list[str] = findall( | |
r"(.+)_(.+)_(pximg_net|hitomi_la)_(.+)_(.+_.+)", shuffled_image_url | |
)[0] | |
prefix = solve_regex[0] | |
type_ = solve_regex[1] | |
main_url = solve_regex[2].replace("_", ".") | |
img_date_or_hitomi_url_etc = solve_regex[3].replace("_", "/") | |
image = f"/{solve_regex[4]}" | |
if "pximg" not in main_url: | |
image = image.replace("_", "/") | |
if image.startswith("/p0"): | |
image = image.replace("/", "_") | |
return ( | |
f"https://{prefix}.{main_url}/{type_}/{img_date_or_hitomi_url_etc}{image}" | |
) | |
return None | |
async def get_image_link(index: int): | |
solved: list[dict[str, Union[int, str]]] = [] | |
name = None | |
async with ClientSession() as cs: | |
async with cs.get(f"https://heliotrope.me/v5/api/hitomi/images/{index}") as r: | |
d = await r.json() | |
for file in d["files"]: | |
file: dict[str, str] | |
for k, v in file.items(): | |
if k == "name": | |
name = v | |
continue | |
assert name | |
solved_url = solve_shuffle_image_url(v) | |
assert solved_url | |
solved.append({"index": index, "name": name, k: solved_url}) | |
return solved | |
async def get_image_chunk_and_write(file: dict[str, Union[int, str]]): | |
if not exists(str(file["index"])): | |
mkdir(str(file["index"])) | |
async with ClientSession(headers={"referer": "https://hitomi.la"}) as cs: | |
url = file["url"] | |
assert isinstance(url, str) | |
async with cs.get(url) as r: | |
async with async_open(f"{file['index']}/{file['name']}", "wb+") as afp: | |
async for data, _ in r.content.iter_chunks(): | |
await afp.write(data) | |
async def main(index: int): | |
solved = await get_image_link(index) | |
async with Pool() as pool: | |
async for _ in pool.map(get_image_chunk_and_write, solved): | |
... | |
if __name__ == "__main__": | |
from time import perf_counter | |
freeze_support() | |
t = perf_counter() | |
run(main(1496588)) | |
print(perf_counter() - t) | |
# 49.3389782 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment