Skip to content

Instantly share code, notes, and snippets.

@kbidlack
Created July 15, 2025 00:15
Show Gist options
  • Save kbidlack/eaed6a330d77ddc354374e29938ddf1c to your computer and use it in GitHub Desktop.
Save kbidlack/eaed6a330d77ddc354374e29938ddf1c to your computer and use it in GitHub Desktop.
draft async soundcloud downloader
import asyncio
from pathlib import Path
import string
import time
from ffmpeg import Progress
from ffmpeg.asyncio import FFmpeg
from typing import Union, get_args, get_origin
from soundcloud.resource.base import BaseData
import aiohttp
from soundcloud import AlbumPlaylist, BasicTrack, MiniTrack, SoundCloud
from tqdm import tqdm
playlist_url = "https://soundcloud.com/soundcloud/sets/top-tracks-of-2024"
from typing import Type
# -------------------------------------------------
# overwrite some soundcloud methods to use aiohttp
def _convert_dict[T: BaseData](d, return_type: Type[T]) -> T:
union = get_origin(return_type) is Union
if union:
for t in get_args(return_type):
try:
return t.from_dict(d)
except Exception:
pass
else:
return return_type.from_dict(d)
raise ValueError(f"Could not convert {d} to type {return_type}")
async def get_track(
client: SoundCloud,
track_id: int,
use_auth: bool = True,
body: dict | None = None,
**kwargs,
) -> BasicTrack | None:
"""
Requests the resource at the given url with
parameters given by kwargs. Converts the resource
to type T and returns it. If the
resource does not exist, returns None
"""
base = "https://api-v2.soundcloud.com"
format_url: str = "/tracks/{track_id}"
return_type: Type[BasicTrack] = BasicTrack
method: str = "GET"
def _format_url_and_remove_params(kwargs: dict) -> str:
format_args = {tup[1] for tup in string.Formatter().parse(format_url) if tup[1] is not None}
args = {}
for k in list(kwargs.keys()):
if k in format_args:
args[k] = kwargs.pop(k)
return base + format_url.format(track_id=track_id, **args)
resource_url = _format_url_and_remove_params(kwargs)
params = kwargs
params["client_id"] = client.client_id
headers = client._get_default_headers()
if use_auth and client._authorization is not None:
headers["Authorization"] = client._authorization
async with aiohttp.ClientSession() as session:
async with session.request(
method, resource_url, json=body, headers=headers, params=params
) as r:
if r.status in (400, 404, 500):
return None
r.raise_for_status()
return _convert_dict(await r.json(), return_type)
# -------------------------------------------------
def close_pbar(pbar: tqdm, msg: str) -> None:
pbar.set_description(msg)
pbar.update(pbar.total - pbar.n)
pbar.close()
class Downloader:
async def download_track(
self, _track: MiniTrack | BasicTrack, session: aiohttp.ClientSession, sc: SoundCloud
) -> None:
pbar = tqdm(total=2, desc="Fetching track info", unit="task", leave=True)
if isinstance(_track, MiniTrack):
track = await get_track(sc, _track.id)
else:
track = _track
if track is None:
return close_pbar(pbar, f"❌ Failed to fetch track info")
pbar.update(1)
pbar.set_description(f"Fetching {track.title}")
try:
transcodings = [t for t in track.media.transcodings if t.format.protocol == "hls"]
transcoding = transcodings[0]
except IndexError:
return close_pbar(pbar, f"❌ No HLS transcodings available for {track.title}")
bitrate_kbps = 256 / 8 if "aac" in transcoding.preset else 128 / 8
total_bytes = bitrate_kbps * transcoding.duration
if transcoding.url is not None:
headers = sc._get_default_headers()
params = {
"client_id": sc.client_id,
}
delay: int = 0
r: aiohttp.ClientResponse | None = None
# If we got rate-limited
while not r or r.status == 429:
if delay > 0:
# print(f"Got rate-limited, delaying for {delay}sec")
await asyncio.sleep(delay)
r = await session.get(transcoding.url, headers=headers, params=params)
delay = int(r.headers.get("Retry-After", 1))
if r.status != 200:
return close_pbar(pbar, f"❌ Error fetching {track.title} transcoding")
url = (await r.json())["url"]
pbar.update(1)
pbar.set_description(f"Downloading {track.title}")
pbar.reset(total=total_bytes)
pbar.unit = "B"
pbar.unit_scale = True
pbar.n = 0
pbar.refresh()
file_path = Path(f"./output/{track.title}.mp3")
ffmpeg = FFmpeg().option("y").input(url).output(file_path)
@ffmpeg.on("progress")
def on_progress(progress: Progress) -> None:
pbar.update(progress.size - pbar.n)
await ffmpeg.execute()
pbar.total = file_path.stat().st_size
close_pbar(pbar, f"✅ Downloaded {track.title}")
async def main(self):
start_time = time.perf_counter()
sc = SoundCloud()
album: AlbumPlaylist = sc.resolve(playlist_url) # type: ignore[reportAssignmentType]
tasks = []
async with aiohttp.ClientSession() as session:
for track in album.tracks:
tasks.append(self.download_track(track, session, sc))
tracks = await asyncio.gather(*tasks, return_exceptions=True)
end_time = time.perf_counter()
tracks_downloaded = [tr for tr in tracks if not tr]
print(f"Downloaded {len(tracks_downloaded)} tracks in {end_time - start_time:.2f} seconds.")
if __name__ == "__main__":
downloader = Downloader()
asyncio.run(downloader.main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment