Skip to content

Instantly share code, notes, and snippets.

@SaidBySolo
Created October 21, 2021 07:43
Show Gist options
  • Save SaidBySolo/6da83b466f8c05504c53964cac9726da to your computer and use it in GitHub Desktop.
Save SaidBySolo/6da83b466f8c05504c53964cac9726da to your computer and use it in GitHub Desktop.
fast_download.py
from asyncio import run
from contextlib import suppress
from multiprocessing import freeze_support
from os import mkdir
from os.path import exists
from re import findall
from typing import Optional, Union
from aiofile import async_open # type: ignore
from aiohttp import ClientSession
from aiomultiprocess import Pool # type: ignore
def solve_shuffle_image_url(shuffled_image_url: str) -> Optional[str]:
with suppress(Exception):
solve_regex: list[str] = findall(
r"(.+)_(.+)_(pximg_net|hitomi_la)_(.+)_(.+_.+)", shuffled_image_url
)[0]
prefix = solve_regex[0]
type_ = solve_regex[1]
main_url = solve_regex[2].replace("_", ".")
img_date_or_hitomi_url_etc = solve_regex[3].replace("_", "/")
image = f"/{solve_regex[4]}"
if "pximg" not in main_url:
image = image.replace("_", "/")
if image.startswith("/p0"):
image = image.replace("/", "_")
return (
f"https://{prefix}.{main_url}/{type_}/{img_date_or_hitomi_url_etc}{image}"
)
return None
async def get_image_link(index: int):
solved: list[dict[str, Union[int, str]]] = []
name = None
async with ClientSession() as cs:
async with cs.get(f"https://heliotrope.me/v5/api/hitomi/images/{index}") as r:
d = await r.json()
for file in d["files"]:
file: dict[str, str]
for k, v in file.items():
if k == "name":
name = v
continue
assert name
solved_url = solve_shuffle_image_url(v)
assert solved_url
solved.append({"index": index, "name": name, k: solved_url})
return solved
async def get_image_chunk_and_write(file: dict[str, Union[int, str]]):
if not exists(str(file["index"])):
mkdir(str(file["index"]))
async with ClientSession(headers={"referer": "https://hitomi.la"}) as cs:
url = file["url"]
assert isinstance(url, str)
async with cs.get(url) as r:
async with async_open(f"{file['index']}/{file['name']}", "wb+") as afp:
async for data, _ in r.content.iter_chunks():
await afp.write(data)
async def main(index: int):
solved = await get_image_link(index)
async with Pool() as pool:
async for _ in pool.map(get_image_chunk_and_write, solved):
...
if __name__ == "__main__":
from time import perf_counter
freeze_support()
t = perf_counter()
run(main(1496588))
print(perf_counter() - t)
# 49.3389782
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment