Created
July 15, 2025 00:15
-
-
Save kbidlack/eaed6a330d77ddc354374e29938ddf1c to your computer and use it in GitHub Desktop.
draft async soundcloud downloader
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from pathlib import Path | |
import string | |
import time | |
from ffmpeg import Progress | |
from ffmpeg.asyncio import FFmpeg | |
from typing import Union, get_args, get_origin | |
from soundcloud.resource.base import BaseData | |
import aiohttp | |
from soundcloud import AlbumPlaylist, BasicTrack, MiniTrack, SoundCloud | |
from tqdm import tqdm | |
playlist_url = "https://soundcloud.com/soundcloud/sets/top-tracks-of-2024" | |
from typing import Type | |
# ------------------------------------------------- | |
# overwrite some soundcloud methods to use aiohttp | |
def _convert_dict[T: BaseData](d, return_type: Type[T]) -> T: | |
union = get_origin(return_type) is Union | |
if union: | |
for t in get_args(return_type): | |
try: | |
return t.from_dict(d) | |
except Exception: | |
pass | |
else: | |
return return_type.from_dict(d) | |
raise ValueError(f"Could not convert {d} to type {return_type}") | |
async def get_track( | |
client: SoundCloud, | |
track_id: int, | |
use_auth: bool = True, | |
body: dict | None = None, | |
**kwargs, | |
) -> BasicTrack | None: | |
""" | |
Requests the resource at the given url with | |
parameters given by kwargs. Converts the resource | |
to type T and returns it. If the | |
resource does not exist, returns None | |
""" | |
base = "https://api-v2.soundcloud.com" | |
format_url: str = "/tracks/{track_id}" | |
return_type: Type[BasicTrack] = BasicTrack | |
method: str = "GET" | |
def _format_url_and_remove_params(kwargs: dict) -> str: | |
format_args = {tup[1] for tup in string.Formatter().parse(format_url) if tup[1] is not None} | |
args = {} | |
for k in list(kwargs.keys()): | |
if k in format_args: | |
args[k] = kwargs.pop(k) | |
return base + format_url.format(track_id=track_id, **args) | |
resource_url = _format_url_and_remove_params(kwargs) | |
params = kwargs | |
params["client_id"] = client.client_id | |
headers = client._get_default_headers() | |
if use_auth and client._authorization is not None: | |
headers["Authorization"] = client._authorization | |
async with aiohttp.ClientSession() as session: | |
async with session.request( | |
method, resource_url, json=body, headers=headers, params=params | |
) as r: | |
if r.status in (400, 404, 500): | |
return None | |
r.raise_for_status() | |
return _convert_dict(await r.json(), return_type) | |
# ------------------------------------------------- | |
def close_pbar(pbar: tqdm, msg: str) -> None: | |
pbar.set_description(msg) | |
pbar.update(pbar.total - pbar.n) | |
pbar.close() | |
class Downloader: | |
async def download_track( | |
self, _track: MiniTrack | BasicTrack, session: aiohttp.ClientSession, sc: SoundCloud | |
) -> None: | |
pbar = tqdm(total=2, desc="Fetching track info", unit="task", leave=True) | |
if isinstance(_track, MiniTrack): | |
track = await get_track(sc, _track.id) | |
else: | |
track = _track | |
if track is None: | |
return close_pbar(pbar, f"❌ Failed to fetch track info") | |
pbar.update(1) | |
pbar.set_description(f"Fetching {track.title}") | |
try: | |
transcodings = [t for t in track.media.transcodings if t.format.protocol == "hls"] | |
transcoding = transcodings[0] | |
except IndexError: | |
return close_pbar(pbar, f"❌ No HLS transcodings available for {track.title}") | |
bitrate_kbps = 256 / 8 if "aac" in transcoding.preset else 128 / 8 | |
total_bytes = bitrate_kbps * transcoding.duration | |
if transcoding.url is not None: | |
headers = sc._get_default_headers() | |
params = { | |
"client_id": sc.client_id, | |
} | |
delay: int = 0 | |
r: aiohttp.ClientResponse | None = None | |
# If we got rate-limited | |
while not r or r.status == 429: | |
if delay > 0: | |
# print(f"Got rate-limited, delaying for {delay}sec") | |
await asyncio.sleep(delay) | |
r = await session.get(transcoding.url, headers=headers, params=params) | |
delay = int(r.headers.get("Retry-After", 1)) | |
if r.status != 200: | |
return close_pbar(pbar, f"❌ Error fetching {track.title} transcoding") | |
url = (await r.json())["url"] | |
pbar.update(1) | |
pbar.set_description(f"Downloading {track.title}") | |
pbar.reset(total=total_bytes) | |
pbar.unit = "B" | |
pbar.unit_scale = True | |
pbar.n = 0 | |
pbar.refresh() | |
file_path = Path(f"./output/{track.title}.mp3") | |
ffmpeg = FFmpeg().option("y").input(url).output(file_path) | |
@ffmpeg.on("progress") | |
def on_progress(progress: Progress) -> None: | |
pbar.update(progress.size - pbar.n) | |
await ffmpeg.execute() | |
pbar.total = file_path.stat().st_size | |
close_pbar(pbar, f"✅ Downloaded {track.title}") | |
async def main(self): | |
start_time = time.perf_counter() | |
sc = SoundCloud() | |
album: AlbumPlaylist = sc.resolve(playlist_url) # type: ignore[reportAssignmentType] | |
tasks = [] | |
async with aiohttp.ClientSession() as session: | |
for track in album.tracks: | |
tasks.append(self.download_track(track, session, sc)) | |
tracks = await asyncio.gather(*tasks, return_exceptions=True) | |
end_time = time.perf_counter() | |
tracks_downloaded = [tr for tr in tracks if not tr] | |
print(f"Downloaded {len(tracks_downloaded)} tracks in {end_time - start_time:.2f} seconds.") | |
if __name__ == "__main__": | |
downloader = Downloader() | |
asyncio.run(downloader.main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment