Last active
November 10, 2017 00:13
-
-
Save divadsn/15eb8adcd59de8afcc67ff2f2d189c46 to your computer and use it in GitHub Desktop.
Some hacky SoundCloud grabber for my radio station
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# -*- coding: utf-8 -*- | |
import os | |
import eyed3 | |
import soundcloud | |
import sqlite3 | |
import sys | |
import re | |
import socket | |
import urllib | |
def remove_tags(rgx_list, text): | |
new_text = text | |
for r in rgx_list: | |
rgx = re.compile(r"[\(\[][^\(\[]*" + r + ".*?[\)\]]", re.IGNORECASE) | |
new_text = re.sub(rgx, '', new_text) | |
return new_text | |
def parse_meta(trackname): | |
# Check if it's valid for meta split | |
if '-' in trackname: | |
pos = trackname.index('-') + 1 | |
elif '|' in trackname: | |
pos = trackname.index('|') + 1 | |
else: | |
return False | |
# Retrieve our meta from track name | |
artist = trackname[:pos - 1].strip() | |
title = trackname[pos + 1:].strip() | |
# Tags to remove from title | |
tags = [ | |
"download", | |
"free", | |
"release", | |
"monstercat", | |
"out now", | |
"available", | |
"new artist week" | |
] | |
# Remove tags and strip left-over whitespace | |
new_title = remove_tags(tags, title) | |
new_title = new_title.strip() | |
return { "artist": artist, "title": new_title } | |
# Enforce utf8 encoding everywhere | |
reload(sys) | |
sys.setdefaultencoding('utf8') | |
# Set default timeout to 10 seconds | |
socket.setdefaulttimeout(10) | |
# SoundCloud client id for API access | |
sc_key = 'c6CU49JDMapyrQo06UxU9xouB9ZVzqCn' | |
# Establish database connection | |
db = sqlite3.connect('tagindex.db') | |
# Create table if not exists | |
db.execute("""CREATE TABLE IF NOT EXISTS tracks ( | |
id INT NOT NULL PRIMARY KEY, | |
artist TEXT NOT NULL, | |
title TEXT NOT NULL, | |
file TEXT NOT NULL | |
)""") | |
db.commit() | |
# Working directory | |
rootdir = os.path.dirname(os.path.realpath(__file__)) | |
# Output directory for new tracks | |
outdir = os.path.join(rootdir, "monstercat") | |
# List of directories to not check for music | |
exclude = [ | |
'.temp' | |
] | |
# Start rebuilding tag database | |
print "Rebuild music database..." | |
# Clear table before rebuilding | |
db.execute("DELETE FROM tracks") | |
db.execute("VACUUM") | |
db.commit() | |
# Loop for every file in every directory, subdirectory etc. | |
filelist = [] | |
for subdir, dirs, files in os.walk(rootdir, topdown=True): | |
dirs[:] = [d for d in dirs if d not in exclude] | |
for file in files: | |
if file.endswith(".mp3"): | |
filelist.append(os.path.join(subdir, file)) | |
# Check count of listed mp3 files | |
print "Found " + str(len(filelist)) + " tracks to update." | |
# Add all tracks to database for later use | |
for file in filelist: | |
# Load ID3 tag info | |
id3 = eyed3.load(file) | |
# Extract track meta | |
artist = id3.tag.artist.decode("utf-8") | |
title = id3.tag.title.decode("utf-8") | |
id = id3.tag.track_num[0] | |
# Fuck utf-8 encoding in Python | |
file = file.decode("utf-8") | |
try: | |
# Add track to the table | |
db.execute("INSERT INTO tracks (id, artist, title, file) VALUES (?, ?, ?, ?)", (id, artist, title, file)) | |
except Exception as err: | |
print "Failed to add track " + title + " (" + str(id) + ")" | |
print err | |
# Save changes to database | |
db.commit() | |
# Prepare SoundCloud API client | |
client = soundcloud.Client(client_id=sc_key) | |
# List of playlists to fetch from SoundCloud | |
playlists = [ | |
"https://soundcloud.com/monster-playlists/sets/monstercat", | |
"https://soundcloud.com/monster-playlists/sets/monstercat2", | |
"https://soundcloud.com/gamer-nation/sets/every-monstercat-song", | |
"https://soundcloud.com/gamer-nation/sets/every-monstercat-song-pt-2", | |
"https://soundcloud.com/gamer-nation/sets/every-monstercat-song-pt-3" | |
] | |
# List of tracks to download | |
queue = [] | |
# Fetch tracks from SoundCloud | |
print "\nDone! Fetching playlists from SoundCloud..." | |
for playlist in playlists: | |
print "Fetching " + playlist + "..." | |
# Resolve also returns the playlist with it's contents, pretty neat | |
info = client.get('/resolve', url=playlist) | |
if info.tracks: | |
print str(len(info.tracks)) + " tracks found, checking for new tracks..." | |
temp = [] | |
# Search for every track and check if track is in our database | |
for track in info.tracks: | |
# We will lookup the database, so we need a cursor | |
cursor = db.cursor() | |
# Check if we can retrieve track info from title for advanced search | |
meta = parse_meta(track['title']) | |
if meta: | |
# Execute advanced search (search by id or by title and artist) | |
cursor.execute("SELECT * FROM tracks WHERE id = ? OR (artist LIKE ? AND title LIKE ?)", (str(track['id']), meta['artist'] + "%", meta['title'] + "%")) | |
else: | |
# Execute basic search (search by id) | |
cursor.execute("SELECT * FROM tracks WHERE id = ?", (str(track['id']),)) | |
# Get data from database | |
data = cursor.fetchone() | |
if data is None: | |
# Add track to download queue if not exists | |
stream_url = track['stream_url'] | |
if stream_url.startswith("http"): | |
temp.append(track) | |
else: | |
print "Unsupported stream url: " + stream_url | |
print "Added " + str(len(temp)) + " new tracks to download queue." | |
queue.extend(temp) | |
else: | |
print "No tracks found, is it a valid playlist url?" | |
# It's time to download tracks! | |
print "\nA total " + str(len(queue)) + " tracks needs to be downloaded, preparing..." | |
# Prepare temporary download folder | |
tempdir = os.path.join(rootdir, ".temp") | |
if not os.path.exists(tempdir): | |
os.makedirs(tempdir) | |
# List of failed tracks | |
failed = [] | |
# Amount of tracks before | |
cursor = db.cursor() | |
cursor.execute("SELECT Count(*) FROM tracks") | |
prev_amount = cursor.fetchone()[0] | |
# Start downloading missing tracks | |
for track in queue: | |
# Prepare metadata for track | |
trackname = track['title'] | |
# Check if it's valid title to parse meta | |
meta = parse_meta(trackname) | |
if not meta: | |
print trackname + " is not a valid track name, skipping." | |
failed.append(track) | |
continue | |
# Retrieve our parsed meta | |
artist = meta['artist'] | |
title = meta['title'] | |
# Track id for later use | |
url = track['stream_url'] + "?client_id=" + sc_key | |
id = track['id'] | |
# Final filename | |
filename = artist + " - " + title + "-" + str(id) + ".mp3" | |
# Check if file already exists and skip | |
file = os.path.join(tempdir, filename) | |
if os.path.exists(file): | |
print "Track " + title + " (" + str(id) + ") already exists, skipping." | |
continue | |
# Download track and save to tempdir | |
print "Downloading " + title + " (" + str(id) + ")..." | |
try: | |
urllib.urlretrieve(url, os.path.join(tempdir, filename)) | |
except Exception as err: | |
print "Failed to download track " + title + " (" + str(id) + ")" | |
print err | |
# Delete file if exists | |
if os.path.exists(file): | |
os.remove(file) | |
# We don't want to stop here... | |
continue | |
# Check if file is valid mp3 | |
id3 = eyed3.load(file) | |
if id3 is None: | |
print "This track seems to be not downloadable, skipping." | |
os.remove(file) | |
continue | |
print "Done! Adding ID3 tag info..." | |
print "- Artist: " + artist | |
print "- Title: " + title | |
print "- Track num: " + str(id) | |
# Add metadata to track | |
id3.initTag() | |
id3.tag.artist = artist | |
id3.tag.title = title | |
track_num = int(id) | |
id3.tag.track_num = track_num | |
# Save ID3 tag info | |
print "Saved! Adding track to music database..." | |
id3.tag.save() | |
# Move file to output dir | |
file = os.path.join(outdir, filename) | |
#os.rename(os.path.join(tempdir, filename), file) | |
try: | |
# Add track to the table | |
db.execute("INSERT INTO tracks (id, artist, title, file) VALUES (?, ?, ?, ?)", (id, artist, title, file)) | |
except Exception as err: | |
print "Failed to add track " + title + " (" + str(id) + ")" | |
print err | |
# Save changes to database | |
db.commit() | |
# Amount of tracks now | |
cursor = db.cursor() | |
cursor.execute("SELECT Count(*) FROM tracks") | |
new_amount = cursor.fetchone()[0] | |
# Print some statistics | |
print "\nFinished! New tracks added: " + str(new_amount - prev_amount) | |
print "Total amount of tracks now: " + str(new_amount) | |
# Output failed tracks | |
if len(failed) > 0: | |
print "\nFound invalid tracks: " + str(len(failed)) | |
print "Please check if there are named correctly and try again!" | |
for track in failed: | |
print " - " + track['title'] | |
print " " + track['permalink_url'] | |
# Finishing stuff... | |
db.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment