Created
January 30, 2013 05:55
-
-
Save Robpol86/4671038 to your computer and use it in GitHub Desktop.
The script I've been using (which I wrote) to convert all of my FLAC files to MP3 when I add new music to my collection. Wrote the first version in November 2008, but this version was written on December 2012.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/home/robpol86/python27/bin/python2.7 -u | |
"""Converts all FLAC files in a directory to mp3 files. Avoids converting old files by using an SQLite database. | |
Requirements | |
------------ | |
Python >= 2.7.3 (Linux) | |
mutagen >= 1.20 : http://code.google.com/p/mutagen/ | |
psutil >= 0.6.1 : http://code.google.com/p/psutil/ | |
robutils >= 0.1.0 : https://github.com/Robpol86/robutils/ | |
/usr/bin/flac | |
/usr/bin/lame | |
""" | |
__author__ = 'Robpol86 (http://robpol86.com)' | |
__copyright__ = 'Copyright 2012, Robpol86' | |
__license__ = 'MIT' | |
__version__ = '2.1.4' | |
import os, sys, signal, sqlite3, tempfile, mutagen, psutil, argparse, re, time, threading, atexit | |
from robutils.Message import Message | |
from robutils.ExternalCmd import ExternalCmd | |
from robutils.Progress import Progress | |
# DEBUG - remove when done | |
#import rpdb2 | |
#rpdb2.start_embedded_debugger('robpol86', fAllowRemote=True) | |
# DEBUG | |
class Settings: | |
""" | |
Holds this application's settings. Includes command line options, data obtained by prompting users, configuration | |
files and anything else. | |
""" | |
no_lyrics = False | |
source_dir = None | |
dest_dir = None | |
db_file = '/var/tmp/convertMusic.db' | |
flac = '/usr/bin/flac' | |
lame = '/usr/bin/lame' | |
message = None | |
def __init__(self): | |
self._cliargs() | |
self.message = Message() | |
self.message.retcodes[1] = 'An error occurred, check log.' | |
self.message.retcodes[10] = 'Source directory does not exist.' | |
self.message.retcodes[11] = 'Source directory is not an actual directory.' | |
self.message.retcodes[12] = 'No read access for source directory.' | |
self.message.retcodes[13] = 'Destination directory does not exist.' | |
self.message.retcodes[14] = 'Destination directory is not an actual directory.' | |
self.message.retcodes[15] = 'No write access for destination directory.' | |
self.message.retcodes[16] = 'Cannot create db file.' | |
self.message.retcodes[17] = 'DB file is not an actual file.' | |
self.message.retcodes[18] = 'No write access for db file.' | |
self.message.retcodes[19] = '/usr/bin/flac does not exist.' | |
self.message.retcodes[20] = '/usr/bin/lame does not exist.' | |
self.validate() | |
self.expand() | |
return None | |
def _cliargs(self): | |
global __version__ | |
script_docstring = globals()['__doc__'].splitlines()[0] if globals()['__doc__'] else '' | |
parser = argparse.ArgumentParser(description=script_docstring, | |
formatter_class=argparse.ArgumentDefaultsHelpFormatter) | |
parser.add_argument('-v', '--version', action='version', version='%(prog)s '+__version__) | |
parser.add_argument('--no-lyrics', dest='no_lyrics', action='store_true', help='Ignore checking for lyrics.') | |
parser.add_argument('source', metavar='SOURCEDIR', type=str, help='Source (flac) directory.') | |
parser.add_argument('dest', metavar='DESTDIR', type=str, help='Destination (mp3) directory.') | |
args = parser.parse_args() | |
self.no_lyrics = args.no_lyrics | |
self.source_dir = args.source | |
self.dest_dir = args.dest | |
return None | |
def validate(self): | |
if not os.path.exists(self.source_dir): self.message.quit(10) | |
if not os.path.isdir(self.source_dir): self.message.quit(11) | |
if not os.access(self.source_dir, os.R_OK): self.message.quit(12) | |
if not os.path.exists(self.dest_dir): self.message.quit(13) | |
if not os.path.isdir(self.dest_dir): self.message.quit(14) | |
if not os.access(self.dest_dir, os.W_OK): self.message.quit(15) | |
if not os.path.exists(self.db_file): | |
if not os.access(os.path.dirname(self.db_file), os.W_OK): self.message.quit(16) | |
else: | |
if not os.path.isfile(self.db_file): self.message.quit(17) | |
if not os.access(self.db_file, os.W_OK): self.message.quit(18) | |
if not os.path.isfile(self.flac): self.message.quit(19) | |
if not os.path.isfile(self.lame): self.message.quit(20) | |
return None | |
def expand(self): | |
self.source_dir = os.path.abspath(os.path.expanduser(self.source_dir)) | |
self.dest_dir = os.path.abspath(os.path.expanduser(self.dest_dir)) | |
return None | |
class Convert: | |
"""Coordinates converting FLAC files to mp3.""" | |
flac_source = None # Absolute filepath to the source FLAC file. | |
mp3_dest = None # Absolute filepath to the destination mp3 file. | |
mp3_dest_temp = None # Temporary filename during conversion. | |
command = None # Holds the ExternalCmd object that does the file conversion. | |
filename_meta = {} # Dictionary of all the tags obtained from the source filename. | |
flac_meta = {} # Mutagen object containing tags for the source FLAC file. | |
def __init__(self, flac_source, full_dest_dir, flac, lame): | |
(artist, year, album, trackno, title) = os.path.splitext(os.path.basename(flac_source))[0].split(" - ") | |
self.flac_source = flac_source | |
self.mp3_dest = os.path.join(full_dest_dir, '{0} - {1}.mp3'.format(artist, title)) | |
self.mp3_dest_temp = os.path.join(full_dest_dir, '{0} - {1}.mp3.convert'.format(artist, title)) | |
self.filename_meta = dict(ARTIST=artist,DATE=year,ALBUM=album,TRACKNUMBER=trackno,TITLE=title,DISCNUMBER=0) | |
disc = re.findall(r' \(Disc (\d+)\)', album) | |
if disc: | |
self.filename_meta['DISCNUMBER'] = int(disc[0]) | |
self.filename_meta['ALBUM'] = re.sub(r' \(Disc \d+\)', '', album) | |
command = '"{0}" -sdc "{1}" |"{2}" -ShV0 - "{3}"'.format(flac, flac_source, lame, self.mp3_dest_temp) | |
self.command = ExternalCmd(command) | |
self.flac_meta = mutagen.File(flac_source) | |
return None | |
def inconsistencies(self, no_lyrics=False): | |
""" | |
Validate FLAC "id3" tags. Build a list of reported inconsistencies and return the list. | |
Parameters | |
---------- | |
no_lyrics : bool, default False | |
If True, does not report missing lyrics from FLAC tags. | |
Returns | |
------- | |
list : Python list containing strings (messages to be printed about found inconsistencies). | |
""" | |
results = [] | |
# Find inconsistencies between metadata obtained from the filename and metadata in the FLAC "id3" tags. | |
for tag, value_file in self.filename_meta.viewitems(): | |
if tag not in self.flac_meta: | |
if tag == 'DISCNUMBER' and not value_file: | |
continue # TODO optimize. | |
else: | |
results.append('missing {0} tag'.format(tag)) | |
continue | |
value_meta = self.flac_meta[tag][0] | |
if tag == 'ALBUM' and re.findall(r' \(Disc (\d+)\)', value_meta): | |
results.append('disc number in album tag') | |
continue | |
if tag == 'DISCNUMBER': | |
if not value_meta.isdigit(): | |
results.append('{0} not a digit'.format(tag)) | |
continue | |
if not value_file: value_file = 1 | |
value_meta = int(value_meta) | |
if (value_file) != (re.sub(r'[<>:"/\\|?*]','',value_meta) if isinstance(value_meta, str) else value_file): | |
results.append("{0} doesn't match".format(tag)) | |
# Album art. | |
if not self.flac_meta.pictures: results.append('missing album art') | |
# Lyrics. | |
if 'LYRICS' in self.flac_meta: results.append('old/wrong LYRICS tag found') | |
if not no_lyrics and 'UNSYNCEDLYRICS' not in self.flac_meta or not self.flac_meta[tag][0]: | |
results.append('missing lyrics') | |
# Done. | |
return results | |
def write_tags(self): | |
"""Writes FLAC tags to the new mp3 file.""" | |
mp3_meta = mutagen.id3.ID3() | |
pairs = dict(TITLE='TIT2', ARTIST='TPE1', ALBUM='TALB', DATE='TDRC', TRACKNUMBER='TRCK', GENRE='TCON') | |
# Regular tags. | |
for k, v in pairs.viewitems(): | |
text = self.flac_meta[k][0] if k in self.flac_meta else '' | |
mp3_meta[v] = getattr(mutagen.id3, v)(encoding=0, text=[unicode(text),]) | |
# Lyrics. | |
text = self.flac_meta['UNSYNCEDLYRICS'][0] if 'UNSYNCEDLYRICS' in self.flac_meta else '' | |
mp3_meta.add(mutagen.id3.USLT(encoding=0, lang='eng', text=[unicode(text),])) | |
# Album art. | |
for p in self.flac_meta.pictures: | |
mp3_meta[u'APIC:'] = mutagen.id3.APIC(encoding=0, mime=p.mime, type=int(p.type), data=p.data) | |
# Save to mp3 file. | |
mp3_meta.save(self.mp3_dest_temp) | |
return True | |
def rename(self): | |
"""Renames .mp3.conver file to .mp3 as the final step.""" | |
os.rename(self.mp3_dest_temp, self.mp3_dest) | |
return None | |
def main(): | |
settings = Settings() | |
message = settings.message | |
# Create database if new. | |
message('[yellow]Initializing database...[/all]') | |
if not os.path.isfile(settings.db_file): message('[higreen]creating {0}[/all]'.format(settings.db_file)) | |
else: message('opening {0}'.format(settings.db_file)) | |
db_con = sqlite3.connect(settings.db_file, isolation_level=None) | |
atexit.register(lambda db_con: db_con.close(), db_con) | |
db_con.execute("""CREATE TABLE IF NOT EXISTS converted ( | |
flac_source text, | |
source_size int, | |
source_time int, | |
mp3_dest text, | |
dest_size int, | |
dest_time int | |
)""") | |
# Purge old or inconsistent entries from the database. | |
for row in db_con.execute('SELECT * FROM converted').fetchall(): | |
if os.path.isfile(row[0]) and os.path.isfile(row[3]): | |
if os.path.getsize(row[0]) == row[1] and os.path.getsize(row[3]) == row[4]: | |
if int(os.path.getmtime(row[0])) == row[2] and int(os.path.getmtime(row[3])) == row[5]: | |
# Everything is consistent! | |
continue | |
# Something is inconsistent. Delete the row. | |
message('db purge: {0}'.format(os.path.basename(row[3]))) | |
db_con.execute('DELETE FROM converted WHERE flac_source = ? OR mp3_dest = ?', (row[0], row[3])) | |
# Delete inconsistent mp3 files and empty folders from the destination directory. | |
message('[yellow]Searching for inconsistent files in the destination...[/all]') | |
files_to_delete = [] | |
for root, dirs, files in os.walk(settings.dest_dir): | |
for dest_file in [os.path.join(root, f) for f in files]: | |
if not db_con.execute('SELECT rowid FROM converted WHERE mp3_dest = ?', (dest_file,)).fetchall(): | |
# This file is not in the database. | |
files_to_delete.append(dest_file) | |
message(os.path.basename(dest_file)) | |
if files_to_delete: | |
raw_input(message._convert('[b]Press Enter to delete these files (ctrl+c to cancel)...[/b]')) | |
for dest_file in files_to_delete: os.remove(dest_file) | |
dirs_to_delete = [] | |
for root, dirs, files in os.walk(settings.dest_dir): | |
for dest_dir in [os.path.join(root, d) for d in dirs]: | |
if not os.listdir(dest_dir): | |
# This directory is empty. | |
dirs_to_delete.append(dest_dir) | |
message('{0}/'.format(dest_dir)) | |
if dirs_to_delete: | |
raw_input(message._convert('[b]Press Enter to delete these empty directories...[/b]')) | |
for dest_dir in dirs_to_delete: os.rmdir(dest_dir) | |
# Find new files FLAC files and verify filenames. | |
message('[yellow]Searching for new FLAC files...[/all]') | |
files_to_convert = [] # List of Convert object instances. | |
warning = False # Set to true if an id3 tag or filename is inconsistent. | |
for root, dirs, files in os.walk(settings.source_dir): | |
for source_file in [os.path.join(root, f) for f in files]: | |
if db_con.execute('SELECT rowid FROM converted WHERE flac_source = ?', (source_file,)).fetchall(): continue | |
if os.path.splitext(os.path.basename(source_file))[0].count(" - ") != 4: | |
warning = True | |
message(os.path.basename(source_file)) | |
else: | |
full_dest_dir = root.replace(settings.source_dir, settings.dest_dir) | |
files_to_convert.append(Convert(source_file, full_dest_dir, settings.flac, settings.lame)) | |
if warning: | |
message('[hired]The above files cannot be converted.[/all]') | |
raw_input(message._convert('[b]Press Enter to continue with the remaining files...[/b]')) | |
if not files_to_convert: message('[higreen]No new FLAC files found to convert.[/all]').quit(0) | |
# Find inconsistent FLAC tags and warn the user. | |
message('[yellow]Looking for inconsistent FLAC tags...[/all]') | |
inconsistencies = [] # List of inconsistencies, including filenames. | |
for convert in files_to_convert: | |
results = convert.inconsistencies(settings.no_lyrics) | |
if not results: continue | |
basename = os.path.splitext(os.path.basename(convert.mp3_dest))[0] | |
inconsistencies.append('{0}: [hired]{1}[/all]'.format(basename, '; '.join(results))) | |
if inconsistencies: | |
inconsistencies.sort() | |
for result in inconsistencies: | |
message(result) | |
raw_input(message._convert('[b]Press Enter to continue with the conversion...[/b]')) | |
# Convert the files. | |
message('Converting...') | |
failed_to_convert = [] | |
progress = Progress(len(files_to_convert)) | |
progress.threaded_summary(message, max_width=125) # Starts progress-bar thread. | |
last_start_time = None # Last conversion's start time. | |
while not progress.summary_finished: # Loops until progress bar displayed to user shows 100%. | |
# Find completed conversions and update progress. Remove from list. | |
for convert in files_to_convert: | |
if convert.command.code == None: continue # This is still running. | |
if convert.command.code == 0: | |
convert.write_tags() | |
convert.rename() | |
progress.inc_pass() | |
insert_params = [ | |
convert.flac_source, # flac_source | |
os.path.getsize(convert.flac_source), # source_size | |
int(os.path.getmtime(convert.flac_source)), # source_time | |
convert.mp3_dest, # mp3_dest | |
os.path.getsize(convert.mp3_dest), # dest_size | |
int(os.path.getmtime(convert.mp3_dest)), # dest_time | |
] | |
db_con.execute('INSERT INTO converted VALUES (?,?,?,?,?,?)', insert_params) | |
else: | |
progress.inc_fail() | |
failed_to_convert.append(convert) | |
if os.path.isfile(convert.mp3_dest_temp): os.remove(convert.mp3_dest_temp) | |
files_to_convert.remove(convert) | |
# Start more conversions. | |
running = len(psutil.Process(os.getpid()).get_children()) # Number of conversions happening concurrently. | |
running_min = max(2, os.sysconf('SC_NPROCESSORS_ONLN')) # Minimum of one conversion per logical CPU if >2 CPUs | |
overload = os.getloadavg()[0] >= 1.5 # True if the system is overloaded. | |
if files_to_convert and (running < running_min or not overload): | |
for convert in files_to_convert: | |
if convert.command.start_time: continue # This is already running. | |
if last_start_time and time.time() - last_start_time < 5: break # Wait at least 3s between starts. | |
dirname = os.path.dirname(convert.mp3_dest) | |
if not os.path.exists(dirname): os.makedirs(dirname) | |
convert.command.run_local() | |
last_start_time = convert.command.start_time | |
break # Only start one conversion at a time. | |
time.sleep(1) | |
# If any failed, print list. | |
if failed_to_convert: | |
message('[yellow]These files failed to convert:[/all]') | |
for convert in failed_to_convert: | |
message('[hired]{0}[/all]'.format(os.path.basename(convert.flac_source))) | |
if __name__ == "__main__": | |
signal.signal(signal.SIGINT, lambda signal, frame: sys.exit(0)) # Properly handle Control+C | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This has been moved to https://github.com/Robpol86/general/blob/master/convert_music.py