Skip to content

Instantly share code, notes, and snippets.

@Robpol86
Created January 30, 2013 05:55
Show Gist options
  • Save Robpol86/4671038 to your computer and use it in GitHub Desktop.
Save Robpol86/4671038 to your computer and use it in GitHub Desktop.
The script I've been using (which I wrote) to convert all of my FLAC files to MP3 when I add new music to my collection. Wrote the first version in November 2008, but this version was written on December 2012.
#!/home/robpol86/python27/bin/python2.7 -u
"""Converts all FLAC files in a directory to mp3 files. Avoids converting old files by using an SQLite database.
Requirements
------------
Python >= 2.7.3 (Linux)
mutagen >= 1.20 : http://code.google.com/p/mutagen/
psutil >= 0.6.1 : http://code.google.com/p/psutil/
robutils >= 0.1.0 : https://github.com/Robpol86/robutils/
/usr/bin/flac
/usr/bin/lame
"""
__author__ = 'Robpol86 (http://robpol86.com)'
__copyright__ = 'Copyright 2012, Robpol86'
__license__ = 'MIT'
__version__ = '2.1.4'
import os, sys, signal, sqlite3, tempfile, mutagen, psutil, argparse, re, time, threading, atexit
from robutils.Message import Message
from robutils.ExternalCmd import ExternalCmd
from robutils.Progress import Progress
# DEBUG - remove when done
#import rpdb2
#rpdb2.start_embedded_debugger('robpol86', fAllowRemote=True)
# DEBUG
class Settings:
"""
Holds this application's settings. Includes command line options, data obtained by prompting users, configuration
files and anything else.
"""
no_lyrics = False
source_dir = None
dest_dir = None
db_file = '/var/tmp/convertMusic.db'
flac = '/usr/bin/flac'
lame = '/usr/bin/lame'
message = None
def __init__(self):
self._cliargs()
self.message = Message()
self.message.retcodes[1] = 'An error occurred, check log.'
self.message.retcodes[10] = 'Source directory does not exist.'
self.message.retcodes[11] = 'Source directory is not an actual directory.'
self.message.retcodes[12] = 'No read access for source directory.'
self.message.retcodes[13] = 'Destination directory does not exist.'
self.message.retcodes[14] = 'Destination directory is not an actual directory.'
self.message.retcodes[15] = 'No write access for destination directory.'
self.message.retcodes[16] = 'Cannot create db file.'
self.message.retcodes[17] = 'DB file is not an actual file.'
self.message.retcodes[18] = 'No write access for db file.'
self.message.retcodes[19] = '/usr/bin/flac does not exist.'
self.message.retcodes[20] = '/usr/bin/lame does not exist.'
self.validate()
self.expand()
return None
def _cliargs(self):
global __version__
script_docstring = globals()['__doc__'].splitlines()[0] if globals()['__doc__'] else ''
parser = argparse.ArgumentParser(description=script_docstring,
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('-v', '--version', action='version', version='%(prog)s '+__version__)
parser.add_argument('--no-lyrics', dest='no_lyrics', action='store_true', help='Ignore checking for lyrics.')
parser.add_argument('source', metavar='SOURCEDIR', type=str, help='Source (flac) directory.')
parser.add_argument('dest', metavar='DESTDIR', type=str, help='Destination (mp3) directory.')
args = parser.parse_args()
self.no_lyrics = args.no_lyrics
self.source_dir = args.source
self.dest_dir = args.dest
return None
def validate(self):
if not os.path.exists(self.source_dir): self.message.quit(10)
if not os.path.isdir(self.source_dir): self.message.quit(11)
if not os.access(self.source_dir, os.R_OK): self.message.quit(12)
if not os.path.exists(self.dest_dir): self.message.quit(13)
if not os.path.isdir(self.dest_dir): self.message.quit(14)
if not os.access(self.dest_dir, os.W_OK): self.message.quit(15)
if not os.path.exists(self.db_file):
if not os.access(os.path.dirname(self.db_file), os.W_OK): self.message.quit(16)
else:
if not os.path.isfile(self.db_file): self.message.quit(17)
if not os.access(self.db_file, os.W_OK): self.message.quit(18)
if not os.path.isfile(self.flac): self.message.quit(19)
if not os.path.isfile(self.lame): self.message.quit(20)
return None
def expand(self):
self.source_dir = os.path.abspath(os.path.expanduser(self.source_dir))
self.dest_dir = os.path.abspath(os.path.expanduser(self.dest_dir))
return None
class Convert:
"""Coordinates converting FLAC files to mp3."""
flac_source = None # Absolute filepath to the source FLAC file.
mp3_dest = None # Absolute filepath to the destination mp3 file.
mp3_dest_temp = None # Temporary filename during conversion.
command = None # Holds the ExternalCmd object that does the file conversion.
filename_meta = {} # Dictionary of all the tags obtained from the source filename.
flac_meta = {} # Mutagen object containing tags for the source FLAC file.
def __init__(self, flac_source, full_dest_dir, flac, lame):
(artist, year, album, trackno, title) = os.path.splitext(os.path.basename(flac_source))[0].split(" - ")
self.flac_source = flac_source
self.mp3_dest = os.path.join(full_dest_dir, '{0} - {1}.mp3'.format(artist, title))
self.mp3_dest_temp = os.path.join(full_dest_dir, '{0} - {1}.mp3.convert'.format(artist, title))
self.filename_meta = dict(ARTIST=artist,DATE=year,ALBUM=album,TRACKNUMBER=trackno,TITLE=title,DISCNUMBER=0)
disc = re.findall(r' \(Disc (\d+)\)', album)
if disc:
self.filename_meta['DISCNUMBER'] = int(disc[0])
self.filename_meta['ALBUM'] = re.sub(r' \(Disc \d+\)', '', album)
command = '"{0}" -sdc "{1}" |"{2}" -ShV0 - "{3}"'.format(flac, flac_source, lame, self.mp3_dest_temp)
self.command = ExternalCmd(command)
self.flac_meta = mutagen.File(flac_source)
return None
def inconsistencies(self, no_lyrics=False):
"""
Validate FLAC "id3" tags. Build a list of reported inconsistencies and return the list.
Parameters
----------
no_lyrics : bool, default False
If True, does not report missing lyrics from FLAC tags.
Returns
-------
list : Python list containing strings (messages to be printed about found inconsistencies).
"""
results = []
# Find inconsistencies between metadata obtained from the filename and metadata in the FLAC "id3" tags.
for tag, value_file in self.filename_meta.viewitems():
if tag not in self.flac_meta:
if tag == 'DISCNUMBER' and not value_file:
continue # TODO optimize.
else:
results.append('missing {0} tag'.format(tag))
continue
value_meta = self.flac_meta[tag][0]
if tag == 'ALBUM' and re.findall(r' \(Disc (\d+)\)', value_meta):
results.append('disc number in album tag')
continue
if tag == 'DISCNUMBER':
if not value_meta.isdigit():
results.append('{0} not a digit'.format(tag))
continue
if not value_file: value_file = 1
value_meta = int(value_meta)
if (value_file) != (re.sub(r'[<>:"/\\|?*]','',value_meta) if isinstance(value_meta, str) else value_file):
results.append("{0} doesn't match".format(tag))
# Album art.
if not self.flac_meta.pictures: results.append('missing album art')
# Lyrics.
if 'LYRICS' in self.flac_meta: results.append('old/wrong LYRICS tag found')
if not no_lyrics and 'UNSYNCEDLYRICS' not in self.flac_meta or not self.flac_meta[tag][0]:
results.append('missing lyrics')
# Done.
return results
def write_tags(self):
"""Writes FLAC tags to the new mp3 file."""
mp3_meta = mutagen.id3.ID3()
pairs = dict(TITLE='TIT2', ARTIST='TPE1', ALBUM='TALB', DATE='TDRC', TRACKNUMBER='TRCK', GENRE='TCON')
# Regular tags.
for k, v in pairs.viewitems():
text = self.flac_meta[k][0] if k in self.flac_meta else ''
mp3_meta[v] = getattr(mutagen.id3, v)(encoding=0, text=[unicode(text),])
# Lyrics.
text = self.flac_meta['UNSYNCEDLYRICS'][0] if 'UNSYNCEDLYRICS' in self.flac_meta else ''
mp3_meta.add(mutagen.id3.USLT(encoding=0, lang='eng', text=[unicode(text),]))
# Album art.
for p in self.flac_meta.pictures:
mp3_meta[u'APIC:'] = mutagen.id3.APIC(encoding=0, mime=p.mime, type=int(p.type), data=p.data)
# Save to mp3 file.
mp3_meta.save(self.mp3_dest_temp)
return True
def rename(self):
"""Renames .mp3.conver file to .mp3 as the final step."""
os.rename(self.mp3_dest_temp, self.mp3_dest)
return None
def main():
settings = Settings()
message = settings.message
# Create database if new.
message('[yellow]Initializing database...[/all]')
if not os.path.isfile(settings.db_file): message('[higreen]creating {0}[/all]'.format(settings.db_file))
else: message('opening {0}'.format(settings.db_file))
db_con = sqlite3.connect(settings.db_file, isolation_level=None)
atexit.register(lambda db_con: db_con.close(), db_con)
db_con.execute("""CREATE TABLE IF NOT EXISTS converted (
flac_source text,
source_size int,
source_time int,
mp3_dest text,
dest_size int,
dest_time int
)""")
# Purge old or inconsistent entries from the database.
for row in db_con.execute('SELECT * FROM converted').fetchall():
if os.path.isfile(row[0]) and os.path.isfile(row[3]):
if os.path.getsize(row[0]) == row[1] and os.path.getsize(row[3]) == row[4]:
if int(os.path.getmtime(row[0])) == row[2] and int(os.path.getmtime(row[3])) == row[5]:
# Everything is consistent!
continue
# Something is inconsistent. Delete the row.
message('db purge: {0}'.format(os.path.basename(row[3])))
db_con.execute('DELETE FROM converted WHERE flac_source = ? OR mp3_dest = ?', (row[0], row[3]))
print
# Delete inconsistent mp3 files and empty folders from the destination directory.
message('[yellow]Searching for inconsistent files in the destination...[/all]')
files_to_delete = []
for root, dirs, files in os.walk(settings.dest_dir):
for dest_file in [os.path.join(root, f) for f in files]:
if not db_con.execute('SELECT rowid FROM converted WHERE mp3_dest = ?', (dest_file,)).fetchall():
# This file is not in the database.
files_to_delete.append(dest_file)
message(os.path.basename(dest_file))
if files_to_delete:
raw_input(message._convert('[b]Press Enter to delete these files (ctrl+c to cancel)...[/b]'))
for dest_file in files_to_delete: os.remove(dest_file)
dirs_to_delete = []
for root, dirs, files in os.walk(settings.dest_dir):
for dest_dir in [os.path.join(root, d) for d in dirs]:
if not os.listdir(dest_dir):
# This directory is empty.
dirs_to_delete.append(dest_dir)
message('{0}/'.format(dest_dir))
if dirs_to_delete:
raw_input(message._convert('[b]Press Enter to delete these empty directories...[/b]'))
for dest_dir in dirs_to_delete: os.rmdir(dest_dir)
print
# Find new files FLAC files and verify filenames.
message('[yellow]Searching for new FLAC files...[/all]')
files_to_convert = [] # List of Convert object instances.
warning = False # Set to true if an id3 tag or filename is inconsistent.
for root, dirs, files in os.walk(settings.source_dir):
for source_file in [os.path.join(root, f) for f in files]:
if db_con.execute('SELECT rowid FROM converted WHERE flac_source = ?', (source_file,)).fetchall(): continue
if os.path.splitext(os.path.basename(source_file))[0].count(" - ") != 4:
warning = True
message(os.path.basename(source_file))
else:
full_dest_dir = root.replace(settings.source_dir, settings.dest_dir)
files_to_convert.append(Convert(source_file, full_dest_dir, settings.flac, settings.lame))
if warning:
message('[hired]The above files cannot be converted.[/all]')
raw_input(message._convert('[b]Press Enter to continue with the remaining files...[/b]'))
print
if not files_to_convert: message('[higreen]No new FLAC files found to convert.[/all]').quit(0)
# Find inconsistent FLAC tags and warn the user.
message('[yellow]Looking for inconsistent FLAC tags...[/all]')
inconsistencies = [] # List of inconsistencies, including filenames.
for convert in files_to_convert:
results = convert.inconsistencies(settings.no_lyrics)
if not results: continue
basename = os.path.splitext(os.path.basename(convert.mp3_dest))[0]
inconsistencies.append('{0}: [hired]{1}[/all]'.format(basename, '; '.join(results)))
if inconsistencies:
inconsistencies.sort()
for result in inconsistencies:
message(result)
raw_input(message._convert('[b]Press Enter to continue with the conversion...[/b]'))
print
# Convert the files.
message('Converting...')
failed_to_convert = []
progress = Progress(len(files_to_convert))
progress.threaded_summary(message, max_width=125) # Starts progress-bar thread.
last_start_time = None # Last conversion's start time.
while not progress.summary_finished: # Loops until progress bar displayed to user shows 100%.
# Find completed conversions and update progress. Remove from list.
for convert in files_to_convert:
if convert.command.code == None: continue # This is still running.
if convert.command.code == 0:
convert.write_tags()
convert.rename()
progress.inc_pass()
insert_params = [
convert.flac_source, # flac_source
os.path.getsize(convert.flac_source), # source_size
int(os.path.getmtime(convert.flac_source)), # source_time
convert.mp3_dest, # mp3_dest
os.path.getsize(convert.mp3_dest), # dest_size
int(os.path.getmtime(convert.mp3_dest)), # dest_time
]
db_con.execute('INSERT INTO converted VALUES (?,?,?,?,?,?)', insert_params)
else:
progress.inc_fail()
failed_to_convert.append(convert)
if os.path.isfile(convert.mp3_dest_temp): os.remove(convert.mp3_dest_temp)
files_to_convert.remove(convert)
# Start more conversions.
running = len(psutil.Process(os.getpid()).get_children()) # Number of conversions happening concurrently.
running_min = max(2, os.sysconf('SC_NPROCESSORS_ONLN')) # Minimum of one conversion per logical CPU if >2 CPUs
overload = os.getloadavg()[0] >= 1.5 # True if the system is overloaded.
if files_to_convert and (running < running_min or not overload):
for convert in files_to_convert:
if convert.command.start_time: continue # This is already running.
if last_start_time and time.time() - last_start_time < 5: break # Wait at least 3s between starts.
dirname = os.path.dirname(convert.mp3_dest)
if not os.path.exists(dirname): os.makedirs(dirname)
convert.command.run_local()
last_start_time = convert.command.start_time
break # Only start one conversion at a time.
time.sleep(1)
print
# If any failed, print list.
if failed_to_convert:
message('[yellow]These files failed to convert:[/all]')
for convert in failed_to_convert:
message('[hired]{0}[/all]'.format(os.path.basename(convert.flac_source)))
if __name__ == "__main__":
signal.signal(signal.SIGINT, lambda signal, frame: sys.exit(0)) # Properly handle Control+C
main()
@Robpol86
Copy link
Author

Robpol86 commented Oct 6, 2014

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment