Created
October 27, 2011 16:41
-
-
Save slinkp/1320090 to your computer and use it in GitHub Desktop.
Script that rips a CD, and encodes to both ogg and flac, attempting to set metadata too.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
Rip a CD and encode the results to ogg. | |
Try to tag the ogg files, too. | |
And flac. | |
""" | |
# Stdlib imports | |
import glob | |
import os | |
import popen2 | |
import Queue | |
import string | |
import sys | |
import threading | |
import time | |
# 3rd-party imports | |
import CDDB | |
import DiscID | |
#import ID3 | |
#import pyid3lib | |
#import tageditor | |
import ogg.vorbis | |
def _rstrip(input, stripped): | |
"strip a whole trailing string, not just any of its characters" | |
if input.endswith(stripped): | |
return input[:len(stripped)] | |
return input | |
def _lstrip(input, stripped): | |
"strip a whole leading string, not just any of its characters" | |
if input.startswith(stripped): | |
return input[len(stripped):] | |
return input | |
def _encode_flac(in_path, logfunc): | |
flacfile = os.path.splitext(in_path)[0] + '.flac' | |
if os.path.exists(flacfile): | |
logfunc("Already have %s, skipping..." % flacfile) | |
return | |
cmd = 'flac --delete-input-file -o %r %r' % (flacfile, in_path) | |
ret = os.system(cmd) | |
if ret != 0: | |
raise RuntimeError("Command exited nonzero status %d: %r" | |
% (ret, cmd)) | |
class Main(object): | |
compressed_extension = '.ogg' | |
def __init__(self, artist, title, quality=None, | |
do_cddb_query=True): | |
# XXX Dirs should be configurable. | |
# for now, hardcoded. | |
self.base_dir = '/home/pw/morestuff/' | |
self.track_prefix = 'track_' | |
self.base_wavdir = '%s/ripped_wavs' % self.base_dir | |
self.base_oggdir = '%s/Music' % self.base_dir | |
self.quality = quality or 6 | |
self.artist = '_'.join(artist.strip().split()) | |
self.title = '_'.join(title.strip().split()) | |
self.log( "Making directories...", 0) | |
if do_cddb_query: | |
self.query_cddb() | |
else: | |
self.cddb_info = {} | |
assert hasattr(self, 'cddb_info') | |
self.wavdir = self.prepdirs(self.base_wavdir) | |
self.oggdir = self.prepdirs(self.base_oggdir) | |
self.log( "done.") | |
self.fileQueue = Queue.Queue() | |
def run(self): | |
print "Ripping..." | |
ripper = threading.Thread(target=self.rip) | |
ripper.start() | |
print "Encoding...", | |
encoder = threading.Thread(target=self.encode_all) | |
encoder.start() | |
ripper.join() | |
print "done with rip stage." | |
encoder.join() | |
print "done with encoder stage." | |
self.tag_all() | |
print "done." | |
def log(self, msg, newline=1): | |
if newline: | |
print msg | |
else: | |
print msg, | |
sys.stdout.flush() | |
def query_cddb(self): | |
# Get the CD info early, while we have the disk still in there... | |
# might be running this script in multiple terminals :-) | |
self.log( "Examining...", 0) | |
device = DiscID.open('/dev/cdrom') | |
try: | |
self.disc_info = DiscID.disc_id(device) | |
except DiscID.cdrom.error: | |
self.log("Could not read disc info. No disc?") | |
self.cddb_info = {} | |
return | |
self.log( "Querying freedb.org...", 0) | |
cddb_result = CDDB.query(self.disc_info) | |
# Would be nice if we could feed in artist & title and do a search. | |
all_albums = cddb_result[1] | |
if isinstance(all_albums, list): | |
self.log("Got %d albums" % len(all_albums)) | |
album = all_albums[0] | |
else: | |
# XXX Not sure but i think sometimes it returns a string? | |
album = all_albums | |
if album is None: | |
cddb_info = None | |
else: | |
cddb_info = CDDB.read(category=album['category'], | |
disc_id=album['disc_id']) | |
if cddb_info is None: | |
cddb_info={} | |
else: | |
cddb_info = cddb_info[1] | |
self.log(cddb_info) | |
self.cddb_info = cddb_info | |
def prepdirs(self, basepath): | |
""" create directories named appropriately. | |
return path to the created dirs. | |
""" | |
dirs = [] | |
artist = self._name_from_title(self.artist) | |
title = self._name_from_title(self.title) | |
path = os.path.join(basepath, artist, title) | |
path = os.path.normpath(path) | |
try: | |
os.makedirs(path) | |
except OSError: | |
# It exists. | |
pass | |
return path | |
def rip(self): | |
""" | |
rip from a CD to a batch of wav files. | |
""" | |
existing = glob.glob('%s%s*wav' % (self.wavdir, os.path.sep)) | |
existing += glob.glob('%s%s*flac' % (self.wavdir, os.path.sep)) | |
existing.sort() | |
# Ignore track 00 - this is a short, silent track | |
# that some CDs have for who knows what evil reason. | |
first_to_rip = 1 | |
if len(existing): # > 1: | |
self.log("We already have some wav files.") | |
# Assume we were interrupted during the last one. | |
# XXX This should be defeatable. | |
# Also it should be smarter - sometimes the last | |
# one is really the last track and we're all done! | |
# Rip again starting from the last one. | |
first_to_rip = self._getTrackNoFromFname(existing[-1]) | |
# Encode all others. | |
for fname in existing: #existing[:-1]: | |
self.log("Already have %s, putting it in the encoder queue" % | |
fname) | |
self.fileQueue.put(fname) | |
# We want to rip tracks one at a time and put each on the | |
# encoder queue as soon as it's done. | |
# Easiest is to call cdparanoia multiple times, | |
# but that has problems: overhead of restarting it each time, | |
# and detecting when we're out of tracks. | |
try: | |
for i in xrange(first_to_rip, sys.maxint): | |
fname = os.path.join(self.wavdir, | |
'%s%02d.wav' % (self.track_prefix, i)) | |
cmd = 'cdparanoia %d %s' % (i, fname) | |
self.log("Ripping track %d to %s..." % (i, fname)) | |
sub_process = popen2.Popen3(cmd, capturestderr=True) | |
ret_value = sub_process.wait() | |
errors = sub_process.childerr.read() | |
status = os.WEXITSTATUS(ret_value) | |
if os.WIFEXITED(ret_value): | |
if status == 0: | |
self.log("ok") | |
else: | |
if errors.count("Track #%d does not exist" % i): | |
self.log("Track %d was the last track." % (i-1)) | |
break | |
else: | |
self.log("***** EXITED WITH ERROR ************") | |
self.log(errors) | |
raise OSError, \ | |
"Command '%s' exited with status %d" % ( | |
cmd, status) | |
else: | |
self.log("********* PROBLEM **************") | |
self.log(errors) | |
raise OSError, "Command '%s' exited abnormally" % cmd | |
self.log("Putting %s on queue for encoding..." % fname, 0) | |
self.fileQueue.put(fname) | |
self.log("ok") | |
finally: | |
# Signal that we're done by putting None in the queue. | |
self.log("All files are in the queue.") | |
self.fileQueue.put(None) | |
def encode_all(self): | |
"""Encode wav files in wavdir to ogg files in oggdir. | |
""" | |
while True: | |
try: | |
fname = self.fileQueue.get_nowait() | |
except Queue.Empty: | |
time.sleep(1) | |
# XXX this assumes the queue eventually gets None. | |
# Maybe this should time out? | |
continue | |
if fname is None: | |
self.log("Queue exhausted, done encoding!") | |
return | |
self.log("Pulled %s from queue" % fname) | |
fname = os.path.basename(fname) | |
new_name = self._makeNewCompressedFileName(fname) | |
self.encode(fname, new_name) | |
self.log("encoded %s" % new_name) | |
def encode(self, in_file, out_file): | |
"""Encode one file from wavdir to oggdir, | |
given basenames for *in_file* and *out_file*. | |
Also packs the wav using flac. | |
""" | |
in_path = os.path.join(self.wavdir, in_file) | |
out_path = os.path.join(self.oggdir, out_file) | |
if os.path.exists(out_path): | |
self.log("Already have %s, skipping..." % out_path) | |
return | |
# XXX this should blow up if oggenc not found | |
binary = 'oggenc' # 'oggenc-aotuv' | |
cmd = '%s -Q -q %d -o %r %r' % (binary, self.quality, out_path, | |
in_path) | |
ret = os.system(cmd) | |
if ret != 0: | |
raise RuntimeError("Command exited nonzero status %d: %r" | |
% (ret, cmd)) | |
_encode_flac(in_path, self.log) | |
def tag_all(self): | |
""" | |
Tag all OGG files in the current directory | |
with info from cddb. | |
""" | |
artist, title = self._getArtistAndTitle() | |
files = glob.glob(self.oggdir + '/*ogg') | |
self.log("Tagging %d files." % len(files)) | |
files.sort() | |
year = self.cddb_info.get('DYEAR', '') | |
genre = self.cddb_info.get('DGENRE', '') | |
titles = [k for k in self.cddb_info.keys() if k.startswith('TTITLE')] | |
if len(titles) < len(files): | |
self.log("%d titles but I have %d files, cowardly refusing to tag." | |
% (len(titles), len(files))) | |
return | |
for i in range(len(files)): | |
track = self.cddb_info.get('TTITLE%d' % i, | |
'%s, track %2d' % (title, i)) | |
#self.log( '---------\n %s' % track) | |
f = files[i] | |
#self.log(f) | |
#self.log('YYY', 0) | |
tag = {'genre': genre, | |
'date': str(year), | |
'album': title, | |
'title': track, | |
'tracknumber': '%d/%d' % (i+1, len(files)), | |
'artist': artist, | |
} | |
# XXX VorbisComment appears to be flaky! | |
comment = ogg.vorbis.VorbisComment() | |
for k, v in tag.items(): | |
comment[k] = v # worky?? | |
comment.write_to(f) | |
del(comment) # That doesn't trigger an error... | |
#self.log("wheee") # We always get this far... | |
# Sometimes we barfed mysteriously before this line. | |
# The ogg.vorbis code would apparently raise exceptions | |
# and the traceback would show the wrong line. | |
# Think it's fixed now. | |
#self.log("hello???") | |
def _name_from_title(self, s, number=None): | |
"""Clean up a title, make it suitable for a filename.""" | |
# Whitespaces to underscores. | |
s = s.strip() | |
s = '_'.join(s.split()) | |
if number is not None: | |
s = '%02d_%s' % (number, s) | |
allchars = string.maketrans('', '') | |
# Get rid of a lot of nasty characters. | |
s = s.translate(allchars, '\\/!@#$%^&*()[]{};:"\'<>,./?+`~') | |
return s | |
def _getTrackNoFromFname(self, fname): | |
fname = os.path.basename(fname) | |
name, ext = os.path.splitext(fname) | |
# Maybe it's the default output from cdparanoia -B. | |
name = _rstrip(name, '.cdda') | |
name = _lstrip(name, 'track') | |
try: | |
# Maybe it's of the form 01_Name_Of_Song.foo | |
i = int(name.split('_')[0]) | |
return i | |
except ValueError: | |
pass | |
try: | |
# Maybe it's of the form track_01.foo | |
i = int(name.split('_')[-1]) | |
return i | |
except ValueError: | |
pass | |
raise ValueError, "Couldn't get track number from name %s" % fname | |
def _makeNewCompressedFileName(self, fname): | |
# Use our CDDB info to get a better name for this filename. | |
name = os.path.splitext(fname)[0] + self.compressed_extension | |
i = self._getTrackNoFromFname(name) | |
artist, title = self._getArtistAndTitle() | |
# cddb counts from 0, but our human-readable numbers count from 1. | |
track = self.cddb_info.get('TTITLE%d' % (i - 1), | |
'%s, track %02d' % (title, i) | |
) | |
new_name = self._name_from_title(track, number=i) + self.compressed_extension | |
return new_name | |
def _getArtistAndTitle(self): | |
# Best guess at artist and album title. | |
try: | |
dtitle, dartist = self.cddb_info.get('DTITLE', '/').split('/', 1) | |
artist = self.artist or dartist | |
title = self.title or dtitle | |
except ValueError: | |
title = self.title or self.cddb_info.get('DTITLE', '') | |
artist = self.artist | |
artist = artist.replace('_', ' ') | |
title = title.replace('_', ' ') | |
return artist, title | |
if __name__ == '__main__': | |
if len(sys.argv) < 2: | |
sys.stderr.write( | |
"""Usage: \ncd2ogg 'artist' 'title' (quality) | |
\n""" | |
) | |
sys.exit(1) | |
artist = sys.argv[1] | |
title = sys.argv[2] | |
try: | |
quality = int(sys.argv[3]) | |
except IndexError: | |
quality = None | |
main = Main(artist, title, quality) | |
main.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment