Created
May 20, 2012 11:33
-
-
Save justinvw/2757713 to your computer and use it in GitHub Desktop.
Transcode FLAC audio files into MP4/AAC files
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# Requirements: | |
# - beets (https://github.com/sampsyo/beets) | |
# - flac (http://flac.sourceforge.net/) | |
# - faac (http://www.audiocoding.com/faac.html) | |
# TODOS: | |
# - Use multiple threads (multiprocessing) | |
# - Replaygain data is copied from src to dest file, but is this fair? | |
# Or is recalculation necessary on transcoded file? | |
# - Does albumart embedding in the dest_file work corectly? | |
# - Currently only supports FLAC to AAC/MP4 conversion, add other formats | |
# - Intergrate with beets? | |
import os | |
import shutil | |
import logging | |
import subprocess | |
import multiprocessing | |
import fnmatch | |
from beets.mediafile import MediaFile | |
logging.basicConfig(level=logging.DEBUG) | |
TRANSCODE_SRC_FORMATS = ['.flac'] | |
TRANSCODE_DEST_FORMAT = '.mp4' | |
# Files we don't want to copy under any condition | |
SKIP_FILES = ['.DS_Store', '._.DS_Store'] | |
MAX_PROCESSES = 7 | |
class File(object): | |
def __init__(self, src_file, dest_file): | |
self.src_file = src_file | |
self.dest_file = dest_file | |
# Create directory if it does not exist | |
self.create_dir(os.path.dirname(self.dest_file)) | |
def __repr__(self): | |
return '%s.%s(src_file=%s, dest_file=%s)' %(self.__class__.__module__, | |
self.__class__.__name__, self.src_file, self.dest_file) | |
def create_dir(self, directory): | |
""" Create directories for a given path (if it does not exist). """ | |
if not os.path.isdir(directory): | |
logging.info("Creating directory: %s", directory) | |
os.makedirs(directory) | |
class AudioFile(File): | |
""" Transcodes the audio file and copies the metadata """ | |
def __init__(self, src_file, dest_file): | |
super(AudioFile, self).__init__(src_file, dest_file) | |
# Transcode the audio file | |
transcode_returncode = self.transcode(src_file, dest_file) | |
if transcode_returncode == 0: | |
logging.info("Transcoded %s to %s", src_file, dest_file) | |
else: | |
logging.error("Failed transcodeing %s to %s", src_file, dest_file) | |
# Copy metadate to transcoded file | |
if transcode_returncode == 0: | |
self.copy_metadata(src_file, dest_file) | |
logging.info("Copied metadata from %s to %s", src_file, dest_file) | |
def transcode(self, src_file, dest_file): | |
# flac --totally-silent -dc <filename> | |
# faac -w -o "blah.mp4" -q 250 - | |
logging.debug("Transcoding %s to %s", src_file, dest_file) | |
#flac = subprocess.Popen(['flac', '-dcs', src_file], | |
# stdout=subprocess.PIPE) | |
#mp4 = subprocess.Popen(['faac', '-q 300', '-w', '-o', dest_file, '-'], | |
# stdin=flac.stdout) | |
flac = subprocess.Popen(['flac', '-d', src_file, '-o' | |
'%s.wav.temp' %(dest_file)]) | |
if flac.wait() != 0: | |
return 1 | |
mp4 = subprocess.Popen(['faac', '-q 300', '-w', '-o', dest_file, | |
'%s.wav.temp' %(dest_file)]) | |
returncode = mp4.wait() | |
os.remove('%s.wav.temp' %(dest_file)) | |
return returncode | |
def copy_metadata(self, src_file, dest_file): | |
src = MediaFile(src_file) | |
dest = MediaFile(dest_file) | |
# The attributes we want to copy if they exist in the src_file | |
attributes = ['acoustid_fingerprint', 'acoustid_id', 'album', | |
'albumartist', 'albumartist_sort', 'albumdisambig', | |
'albumstatus', 'albumtype', 'art', 'artist', | |
'artist_sort', 'asin', 'bpm', 'catalognum', | |
'comments', 'comp', 'composer', 'country', 'date', | |
'day', 'disc', 'disctitle', 'disctotal' | |
'genre', 'grouping', 'label', 'language', 'lyrics', | |
'mb_albumartistid', 'mb_albumid', 'mb_artistid', | |
'mb_releasegroupid', 'mb_trackid', 'month', | |
'rg_album_gain', 'rg_album_peak', 'rg_track_gain', | |
'rg_track_peak', 'title', 'track', 'tracktotal', | |
'year'] | |
for attribute in attributes: | |
# Get the value of the attribute from the src_file if it exists | |
try: | |
value = src.__getattribute__(attribute) | |
logging.debug(u'Metadata field \'%s\' for %s has values \'%s\'', | |
attribute, src_file, value) | |
except AttributeError: | |
logging.debug('Metadata field \'%s\' for %s does not exist', | |
attribute, src_file) | |
else: | |
# Do nothing if the value is empty | |
if value: | |
dest.__setattr__(attribute, value) | |
dest.save() | |
class OtherFile(File): | |
""" Handles all files that are not audio (covers, nfo's, etc.) """ | |
def __init__(self, src_file, dest_file): | |
super(OtherFile, self).__init__(src_file, dest_file) | |
# Copy the file | |
self.copy_file(self.src_file, self.dest_file) | |
def copy_file(self, src, dest): | |
logging.info("Copying file %s to %s", src, dest) | |
shutil.copyfile(src, dest) | |
class TranscodeFinder(object): | |
""" Finds the files that need to be transcoded or copied and creates the | |
directory structure. """ | |
def __init__(self, src_dir, src_exts, skip_files, dest_dir, dest_ext): | |
self.src_dir = src_dir | |
self.src_exts = src_exts | |
self.skip_files = skip_files | |
self.dest_dir = dest_dir | |
self.dest_ext = dest_ext | |
self.tasks = self.get_tasks(src_dir, skip_files, dest_dir) | |
def get_tasks(self, src_dir, skip_files, dest_dir): | |
""" Determine which files need to moved and/or transcoded. """ | |
source_files = self.walk_directories(src_dir) | |
files = [] | |
for src_file in source_files: | |
# If this file appears in skip_files stop processing it | |
if os.path.split(src_file)[1] in skip_files: | |
continue | |
src_file_rel = os.path.relpath(src_file, self.src_dir) | |
src_base, src_ext = os.path.splitext(src_file_rel) | |
if src_ext in self.src_exts: | |
dest_relpath = src_base + self.dest_ext | |
else: | |
dest_relpath = src_file_rel | |
dest_file = os.path.join(self.dest_dir, dest_relpath) | |
if self.requires_processing(src_file, dest_file): | |
if src_ext in self.src_exts: | |
files.append({ | |
'handler': AudioFile, | |
'args': {'src_file': src_file, 'dest_file': dest_file} | |
}) | |
else: | |
files.append({ | |
'handler': OtherFile, | |
'args': {'src_file': src_file, 'dest_file': dest_file} | |
}) | |
logging.info('Found %s files in src_dir that need to be processed.' | |
%(len(files))) | |
return files | |
def walk_directories(self, directory): | |
""" Recursively walk a directory, yield absolute paths to files. """ | |
for root, dirs, files in os.walk(directory): | |
for basename in files: | |
yield os.path.join(root, basename) | |
def requires_processing(self, src_file, dest_file): | |
""" Returns True if dest_file does not exist or is older than | |
src_file, else returens False. """ | |
logging.debug('Checking if \'%s\' needs to be processed.' %(dest_file)) | |
if os.path.isfile(dest_file): | |
# dest_file exist, check if it is newer than src_file | |
src_time = os.path.getmtime(src_file) | |
dest_time = os.path.getmtime(dest_file) | |
if src_time > dest_time: | |
logging.debug('\'%s\' does exist but source (\'%s\') is more'\ | |
'recent.' %(dest_file, dest_file)) | |
return True | |
else: | |
logging.debug('\'%s\' already exists.' %(dest_file)) | |
return False | |
else: | |
# dest_file does not exist and thus requires processing | |
logging.debug('\'%s\' does not yet exist.' %(dest_file)) | |
return True | |
if __name__ == '__main__': | |
transcode = TranscodeFinder('/data/audio/music_lossless/', | |
TRANSCODE_SRC_FORMATS, SKIP_FILES, | |
'/home/justin/tools/transfercoder_test/mp4/', TRANSCODE_DEST_FORMAT) | |
#for task in transcode.tasks: | |
# task['handler'](**task['args']) | |
pool = multiprocessing.Pool(processes=MAX_PROCESSES) | |
for task in transcode.tasks: | |
print task | |
pool.apply_async(func=task['handler'], kwds=task['args']) | |
pool.close() | |
pool.join() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment