Last active
January 2, 2016 13:39
-
-
Save ameliaikeda/8311458 to your computer and use it in GitHub Desktop.
R/a/dio database dumper (for songs).
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
from mutagen.easyid3 import EasyID3 | |
from mutagen.id3 import ID3 | |
import MySQLdb | |
import MySQLdb.cursors | |
import shutil | |
import os | |
# trailing slashes because im too lazy to use os.path.join and fuck windows | |
music_folder = "/radio/www/music/" | |
backup_folder = "/radio/backups/music/files/" | |
# R/a/dio's MySQLCursor | |
class MySQLCursor: | |
"""Return a connected MySQLdb cursor object""" | |
counter = 0 | |
cache = {} | |
def __init__(self, cursortype=MySQLdb.cursors.DictCursor, lock=None): | |
threadid = "fake" | |
if (threadid in self.cache): | |
self.conn = self.cache[threadid] | |
self.conn.ping(True) | |
else: | |
self.conn = MySQLdb.connect(host="127.0.0.1", | |
user="hanyuu", | |
passwd="", | |
db="radio_main", | |
charset='utf8', | |
use_unicode=True) | |
self.cache[threadid] = self.conn | |
self.curtype = cursortype | |
self.lock = lock | |
def __enter__(self): | |
if (self.lock is not None): | |
self.lock.acquire() | |
self.cur = self.conn.cursor(self.curtype) | |
return self.cur | |
def __exit__(self, type, value, traceback): | |
self.cur.close() | |
self.conn.commit() | |
if (self.lock is not None): | |
self.lock.release() | |
return | |
# R/a/dio mass song dumper. THIS WILL RAPE I/O AND I DONT CARE. | |
def copy_music_file(filename): | |
global music_folder, backup_folder | |
shutil.copy("{0}{1}".format(music_folder, filename), backup_folder) | |
def clean_metadata(folder): | |
files = list([f for f in os.listdir(folder)]) | |
total = len(files) | |
counter = 1 | |
for file in files: | |
if counter % 100 == 0: | |
print u"Cleaning {0} of {1} files...".format(counter, total) | |
counter += 1 | |
try: | |
song = ID3(os.path.join(folder, file)) | |
song.delete() | |
song.save() | |
except: | |
continue | |
def fetch_metadata(): | |
with MySQLCursor() as cur: | |
cur.execute("select * from `tracks` where usable = 1") | |
songs = {} | |
for row in cur: | |
songs[row['path']] = { | |
"title": row['track'], | |
"artist": row['artist'], | |
"album": row['album'], | |
"id": row["id"], | |
} | |
return songs | |
def update_metadata(file, metadata): | |
global backup_folder | |
try: | |
audio = EasyID3(os.path.join(backup_folder, file)) | |
audio["title"] = metadata["title"] | |
audio["artist"] = metadata["artist"] | |
audio["album"] = metadata["album"] | |
audio.save() | |
except: | |
audio = EasyID3() | |
audio["title"] = metadata["title"] | |
audio["artist"] = metadata["artist"] | |
audio["album"] = metadata["album"] | |
audio.save(os.path.join(backup_folder, file)) | |
def rename_file(file, metadata): | |
global backup_folder | |
# naive assumption that album means artist is present. | |
if metadata["album"]: | |
filename = u"{0} - {1} [{2}].mp3".format(metadata["artist"].replace("/", ""), metadata["title"].replace("/", ""), metadata["album"].replace("/", "")) | |
else: | |
if metadata["artist"]: | |
filename = u"{0} [{1}].mp3".format(metadata["title"].replace("/", ""), metadata["id"]) | |
else: | |
filename = u"{0} - {1}.mp3".format(metadata["artist"].replace("/", ""), metadata["title"].replace("/", "")) | |
try: | |
# do the actual renaming once filename is known | |
os.rename(os.path.join(backup_folder, file), os.path.join(backup_folder, filename)) | |
except: | |
# FUCK THOSE 5-liner ARTIST FIELDS | |
filename = u"{0} [{1}L].mp3".format(metadata["title"].replace("/", "")[:100], metadata["id"]) | |
os.rename(os.path.join(backup_folder, file), os.path.join(backup_folder, filename)) | |
def run(): | |
global backup_folder | |
songs = fetch_metadata() | |
total = len(songs) | |
counter = 1 | |
for path, metadata in songs.iteritems(): | |
if counter % 100 == 0: | |
print u"Copying {0} of {1} files...".format(counter, total) | |
counter += 1 | |
copy_music_file(path) | |
# Remove extra metadata because why the heck not (Also this needs to be clean) | |
clean_metadata(backup_folder) | |
counter = 1 | |
for path, metadata in songs.iteritems(): | |
if counter % 100 == 0: | |
print u"Updating {0} of {1} files...".format(counter, total) | |
counter += 1 | |
if os.path.exists(os.path.join(backup_folder, path)): | |
# update ID3 metadata | |
update_metadata(path, metadata) | |
# update filename | |
rename_file(path, metadata) | |
if __name__ == "__main__": | |
run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment