Created
July 14, 2024 06:37
-
-
Save noaione/33f55293e2a0d000a60b79a0bc79e479 to your computer and use it in GitHub Desktop.
use our own custom script to download stuff from asmr.one since their downloader keep OOM-ing for me
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import re | |
import sys | |
from dataclasses import dataclass | |
from pathlib import Path | |
from typing import Literal | |
import aiofiles | |
import aiohttp | |
import orjson | |
import tqdm | |
HOSTNAME = "https://api.asmr-200.com" | |
RJ_RE = re.compile(r"(?:RJ)?(?P<id>[\d]+)") | |
# My internet speed is 12MiB/s, use 2/3 of that for download speed | |
CHUNK_SIZE = 8 * 1024 | |
try: | |
work_raw = sys.argv[1] | |
except IndexError: | |
print("Usage: python aone-grab.py <work_id>") | |
sys.exit(1) | |
CURRENT_DIR = Path(__file__).resolve().parent | |
WORK_MATCH = RJ_RE.search(work_raw) | |
if WORK_MATCH is None: | |
print("Invalid work ID") | |
sys.exit(1) | |
WORK_ID = WORK_MATCH.group("id") | |
@dataclass | |
class WorkTrack: | |
filename: str | |
url: str | |
type: Literal["folder", "text", "image", "audio"] | |
save_path: Path | |
size: int | None = None | |
def is_hq(self) -> bool | None: | |
if self.type != "audio": | |
return None | |
return self.filename.endswith(".flac") or self.filename.endswith(".wav") | |
def create_session() -> aiohttp.ClientSession: | |
return aiohttp.ClientSession( | |
headers={ | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:128.0) Gecko/20100101 Firefox/128.0", | |
"Origin": "https://asmr.one", | |
"Referer": "https://asmr.one/" | |
} | |
) | |
def transform_work_data( | |
data: list[dict], | |
base_folder: Path, | |
) -> list[WorkTrack]: | |
if not data: | |
return [] | |
current_data = [] | |
for item in data: | |
match item["type"]: | |
case "folder": | |
folder_path = base_folder / item["title"] | |
# Call recursively on "children" key | |
current_data.extend(transform_work_data(item["children"], folder_path)) | |
case "text": | |
current_data.append(WorkTrack( | |
filename=item["title"], | |
url=item["mediaDownloadUrl"], | |
type="text", | |
save_path=base_folder / item["title"], | |
size=item["size"], | |
)) | |
case "image": | |
current_data.append(WorkTrack( | |
filename=item["title"], | |
url=item["mediaDownloadUrl"], | |
type="image", | |
save_path=base_folder / item["title"], | |
size=item["size"], | |
)) | |
case "audio": | |
current_data.append(WorkTrack( | |
filename=item["title"], | |
url=item["mediaDownloadUrl"], | |
type="audio", | |
save_path=base_folder / item["title"], | |
size=item["size"], | |
)) | |
return current_data | |
async def fetch_work_tracks(work_id: str) -> list[WorkTrack]: | |
async with create_session() as session: | |
async with session.get(f"{HOSTNAME}/api/tracks/{work_id}?v=2") as response: | |
# Data response is list of tracks | |
data = await response.json() | |
return transform_work_data(data, CURRENT_DIR / f"RJ{work_id}") | |
async def fetch_work_metadata(work_id: str) -> dict: | |
async with create_session() as session: | |
async with session.get(f"{HOSTNAME}/api/workInfo/{work_id}") as response: | |
response.raise_for_status() | |
return await response.json() | |
async def download_track(track: WorkTrack) -> None: | |
print(f"Downloading {track.filename}...") | |
# Make progress bar | |
pbar = tqdm.tqdm(total=track.size, unit="B", unit_scale=True, desc="Downloading") | |
async with create_session() as session: | |
async with session.get(track.url) as response: | |
response.raise_for_status() | |
track.save_path.parent.mkdir(parents=True, exist_ok=True) | |
async with aiofiles.open(track.save_path, "wb") as fp: | |
async for chunk in response.content.iter_chunked(CHUNK_SIZE): | |
pbar.update(len(chunk)) | |
await fp.write(chunk) | |
pbar.close() | |
async def check_file_integrity(track: WorkTrack) -> bool: | |
if track.save_path.exists(): | |
return track.size == track.save_path.stat().st_size | |
return False | |
async def main(): | |
DOWNLOAD_DIR = CURRENT_DIR / f"RJ{WORK_ID}" | |
print(f"Fetching metadata for RJ{WORK_ID}") | |
work_metadata = await fetch_work_metadata(WORK_ID) | |
print(f" Title: {work_metadata['title']}") | |
print("Downloading tracks...") | |
work_tracks = await fetch_work_tracks(WORK_ID) | |
DOWNLOAD_DIR.mkdir(parents=True, exist_ok=True) | |
metadata_file = DOWNLOAD_DIR / "metadata.json" | |
async with aiofiles.open(metadata_file, "wb") as fp: | |
await fp.write(orjson.dumps(work_metadata, option=orjson.OPT_INDENT_2)) | |
print(f" Total tracks/items: {len(work_tracks)}") | |
for track in work_tracks: | |
if await check_file_integrity(track): | |
print(f"Skipping {track.filename} as it already exists") | |
continue | |
await download_track(track) | |
if __name__ == "__main__": | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment