Skip to content

Instantly share code, notes, and snippets.

@noaione
Created July 14, 2024 06:37
Show Gist options
  • Save noaione/33f55293e2a0d000a60b79a0bc79e479 to your computer and use it in GitHub Desktop.
Save noaione/33f55293e2a0d000a60b79a0bc79e479 to your computer and use it in GitHub Desktop.
use our own custom script to download stuff from asmr.one since their downloader keep OOM-ing for me
import asyncio
import re
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Literal
import aiofiles
import aiohttp
import orjson
import tqdm
HOSTNAME = "https://api.asmr-200.com"
RJ_RE = re.compile(r"(?:RJ)?(?P<id>[\d]+)")
# My internet speed is 12MiB/s, use 2/3 of that for download speed
CHUNK_SIZE = 8 * 1024
try:
work_raw = sys.argv[1]
except IndexError:
print("Usage: python aone-grab.py <work_id>")
sys.exit(1)
CURRENT_DIR = Path(__file__).resolve().parent
WORK_MATCH = RJ_RE.search(work_raw)
if WORK_MATCH is None:
print("Invalid work ID")
sys.exit(1)
WORK_ID = WORK_MATCH.group("id")
@dataclass
class WorkTrack:
filename: str
url: str
type: Literal["folder", "text", "image", "audio"]
save_path: Path
size: int | None = None
def is_hq(self) -> bool | None:
if self.type != "audio":
return None
return self.filename.endswith(".flac") or self.filename.endswith(".wav")
def create_session() -> aiohttp.ClientSession:
return aiohttp.ClientSession(
headers={
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:128.0) Gecko/20100101 Firefox/128.0",
"Origin": "https://asmr.one",
"Referer": "https://asmr.one/"
}
)
def transform_work_data(
data: list[dict],
base_folder: Path,
) -> list[WorkTrack]:
if not data:
return []
current_data = []
for item in data:
match item["type"]:
case "folder":
folder_path = base_folder / item["title"]
# Call recursively on "children" key
current_data.extend(transform_work_data(item["children"], folder_path))
case "text":
current_data.append(WorkTrack(
filename=item["title"],
url=item["mediaDownloadUrl"],
type="text",
save_path=base_folder / item["title"],
size=item["size"],
))
case "image":
current_data.append(WorkTrack(
filename=item["title"],
url=item["mediaDownloadUrl"],
type="image",
save_path=base_folder / item["title"],
size=item["size"],
))
case "audio":
current_data.append(WorkTrack(
filename=item["title"],
url=item["mediaDownloadUrl"],
type="audio",
save_path=base_folder / item["title"],
size=item["size"],
))
return current_data
async def fetch_work_tracks(work_id: str) -> list[WorkTrack]:
async with create_session() as session:
async with session.get(f"{HOSTNAME}/api/tracks/{work_id}?v=2") as response:
# Data response is list of tracks
data = await response.json()
return transform_work_data(data, CURRENT_DIR / f"RJ{work_id}")
async def fetch_work_metadata(work_id: str) -> dict:
async with create_session() as session:
async with session.get(f"{HOSTNAME}/api/workInfo/{work_id}") as response:
response.raise_for_status()
return await response.json()
async def download_track(track: WorkTrack) -> None:
print(f"Downloading {track.filename}...")
# Make progress bar
pbar = tqdm.tqdm(total=track.size, unit="B", unit_scale=True, desc="Downloading")
async with create_session() as session:
async with session.get(track.url) as response:
response.raise_for_status()
track.save_path.parent.mkdir(parents=True, exist_ok=True)
async with aiofiles.open(track.save_path, "wb") as fp:
async for chunk in response.content.iter_chunked(CHUNK_SIZE):
pbar.update(len(chunk))
await fp.write(chunk)
pbar.close()
async def check_file_integrity(track: WorkTrack) -> bool:
if track.save_path.exists():
return track.size == track.save_path.stat().st_size
return False
async def main():
DOWNLOAD_DIR = CURRENT_DIR / f"RJ{WORK_ID}"
print(f"Fetching metadata for RJ{WORK_ID}")
work_metadata = await fetch_work_metadata(WORK_ID)
print(f" Title: {work_metadata['title']}")
print("Downloading tracks...")
work_tracks = await fetch_work_tracks(WORK_ID)
DOWNLOAD_DIR.mkdir(parents=True, exist_ok=True)
metadata_file = DOWNLOAD_DIR / "metadata.json"
async with aiofiles.open(metadata_file, "wb") as fp:
await fp.write(orjson.dumps(work_metadata, option=orjson.OPT_INDENT_2))
print(f" Total tracks/items: {len(work_tracks)}")
for track in work_tracks:
if await check_file_integrity(track):
print(f"Skipping {track.filename} as it already exists")
continue
await download_track(track)
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment