Skip to content

Instantly share code, notes, and snippets.

@slinkp
Created October 27, 2011 16:41
Show Gist options
  • Save slinkp/1320090 to your computer and use it in GitHub Desktop.
Save slinkp/1320090 to your computer and use it in GitHub Desktop.
Script that rips a CD, and encodes to both ogg and flac, attempting to set metadata too.
#!/usr/bin/env python
"""
Rip a CD and encode the results to ogg.
Try to tag the ogg files, too.
And flac.
"""
# Stdlib imports
import glob
import os
import popen2
import Queue
import string
import sys
import threading
import time
# 3rd-party imports
import CDDB
import DiscID
#import ID3
#import pyid3lib
#import tageditor
import ogg.vorbis
def _rstrip(input, stripped):
"strip a whole trailing string, not just any of its characters"
if input.endswith(stripped):
return input[:len(stripped)]
return input
def _lstrip(input, stripped):
"strip a whole leading string, not just any of its characters"
if input.startswith(stripped):
return input[len(stripped):]
return input
def _encode_flac(in_path, logfunc):
flacfile = os.path.splitext(in_path)[0] + '.flac'
if os.path.exists(flacfile):
logfunc("Already have %s, skipping..." % flacfile)
return
cmd = 'flac --delete-input-file -o %r %r' % (flacfile, in_path)
ret = os.system(cmd)
if ret != 0:
raise RuntimeError("Command exited nonzero status %d: %r"
% (ret, cmd))
class Main(object):
compressed_extension = '.ogg'
def __init__(self, artist, title, quality=None,
do_cddb_query=True):
# XXX Dirs should be configurable.
# for now, hardcoded.
self.base_dir = '/home/pw/morestuff/'
self.track_prefix = 'track_'
self.base_wavdir = '%s/ripped_wavs' % self.base_dir
self.base_oggdir = '%s/Music' % self.base_dir
self.quality = quality or 6
self.artist = '_'.join(artist.strip().split())
self.title = '_'.join(title.strip().split())
self.log( "Making directories...", 0)
if do_cddb_query:
self.query_cddb()
else:
self.cddb_info = {}
assert hasattr(self, 'cddb_info')
self.wavdir = self.prepdirs(self.base_wavdir)
self.oggdir = self.prepdirs(self.base_oggdir)
self.log( "done.")
self.fileQueue = Queue.Queue()
def run(self):
print "Ripping..."
ripper = threading.Thread(target=self.rip)
ripper.start()
print "Encoding...",
encoder = threading.Thread(target=self.encode_all)
encoder.start()
ripper.join()
print "done with rip stage."
encoder.join()
print "done with encoder stage."
self.tag_all()
print "done."
def log(self, msg, newline=1):
if newline:
print msg
else:
print msg,
sys.stdout.flush()
def query_cddb(self):
# Get the CD info early, while we have the disk still in there...
# might be running this script in multiple terminals :-)
self.log( "Examining...", 0)
device = DiscID.open('/dev/cdrom')
try:
self.disc_info = DiscID.disc_id(device)
except DiscID.cdrom.error:
self.log("Could not read disc info. No disc?")
self.cddb_info = {}
return
self.log( "Querying freedb.org...", 0)
cddb_result = CDDB.query(self.disc_info)
# Would be nice if we could feed in artist & title and do a search.
all_albums = cddb_result[1]
if isinstance(all_albums, list):
self.log("Got %d albums" % len(all_albums))
album = all_albums[0]
else:
# XXX Not sure but i think sometimes it returns a string?
album = all_albums
if album is None:
cddb_info = None
else:
cddb_info = CDDB.read(category=album['category'],
disc_id=album['disc_id'])
if cddb_info is None:
cddb_info={}
else:
cddb_info = cddb_info[1]
self.log(cddb_info)
self.cddb_info = cddb_info
def prepdirs(self, basepath):
""" create directories named appropriately.
return path to the created dirs.
"""
dirs = []
artist = self._name_from_title(self.artist)
title = self._name_from_title(self.title)
path = os.path.join(basepath, artist, title)
path = os.path.normpath(path)
try:
os.makedirs(path)
except OSError:
# It exists.
pass
return path
def rip(self):
"""
rip from a CD to a batch of wav files.
"""
existing = glob.glob('%s%s*wav' % (self.wavdir, os.path.sep))
existing += glob.glob('%s%s*flac' % (self.wavdir, os.path.sep))
existing.sort()
# Ignore track 00 - this is a short, silent track
# that some CDs have for who knows what evil reason.
first_to_rip = 1
if len(existing): # > 1:
self.log("We already have some wav files.")
# Assume we were interrupted during the last one.
# XXX This should be defeatable.
# Also it should be smarter - sometimes the last
# one is really the last track and we're all done!
# Rip again starting from the last one.
first_to_rip = self._getTrackNoFromFname(existing[-1])
# Encode all others.
for fname in existing: #existing[:-1]:
self.log("Already have %s, putting it in the encoder queue" %
fname)
self.fileQueue.put(fname)
# We want to rip tracks one at a time and put each on the
# encoder queue as soon as it's done.
# Easiest is to call cdparanoia multiple times,
# but that has problems: overhead of restarting it each time,
# and detecting when we're out of tracks.
try:
for i in xrange(first_to_rip, sys.maxint):
fname = os.path.join(self.wavdir,
'%s%02d.wav' % (self.track_prefix, i))
cmd = 'cdparanoia %d %s' % (i, fname)
self.log("Ripping track %d to %s..." % (i, fname))
sub_process = popen2.Popen3(cmd, capturestderr=True)
ret_value = sub_process.wait()
errors = sub_process.childerr.read()
status = os.WEXITSTATUS(ret_value)
if os.WIFEXITED(ret_value):
if status == 0:
self.log("ok")
else:
if errors.count("Track #%d does not exist" % i):
self.log("Track %d was the last track." % (i-1))
break
else:
self.log("***** EXITED WITH ERROR ************")
self.log(errors)
raise OSError, \
"Command '%s' exited with status %d" % (
cmd, status)
else:
self.log("********* PROBLEM **************")
self.log(errors)
raise OSError, "Command '%s' exited abnormally" % cmd
self.log("Putting %s on queue for encoding..." % fname, 0)
self.fileQueue.put(fname)
self.log("ok")
finally:
# Signal that we're done by putting None in the queue.
self.log("All files are in the queue.")
self.fileQueue.put(None)
def encode_all(self):
"""Encode wav files in wavdir to ogg files in oggdir.
"""
while True:
try:
fname = self.fileQueue.get_nowait()
except Queue.Empty:
time.sleep(1)
# XXX this assumes the queue eventually gets None.
# Maybe this should time out?
continue
if fname is None:
self.log("Queue exhausted, done encoding!")
return
self.log("Pulled %s from queue" % fname)
fname = os.path.basename(fname)
new_name = self._makeNewCompressedFileName(fname)
self.encode(fname, new_name)
self.log("encoded %s" % new_name)
def encode(self, in_file, out_file):
"""Encode one file from wavdir to oggdir,
given basenames for *in_file* and *out_file*.
Also packs the wav using flac.
"""
in_path = os.path.join(self.wavdir, in_file)
out_path = os.path.join(self.oggdir, out_file)
if os.path.exists(out_path):
self.log("Already have %s, skipping..." % out_path)
return
# XXX this should blow up if oggenc not found
binary = 'oggenc' # 'oggenc-aotuv'
cmd = '%s -Q -q %d -o %r %r' % (binary, self.quality, out_path,
in_path)
ret = os.system(cmd)
if ret != 0:
raise RuntimeError("Command exited nonzero status %d: %r"
% (ret, cmd))
_encode_flac(in_path, self.log)
def tag_all(self):
"""
Tag all OGG files in the current directory
with info from cddb.
"""
artist, title = self._getArtistAndTitle()
files = glob.glob(self.oggdir + '/*ogg')
self.log("Tagging %d files." % len(files))
files.sort()
year = self.cddb_info.get('DYEAR', '')
genre = self.cddb_info.get('DGENRE', '')
titles = [k for k in self.cddb_info.keys() if k.startswith('TTITLE')]
if len(titles) < len(files):
self.log("%d titles but I have %d files, cowardly refusing to tag."
% (len(titles), len(files)))
return
for i in range(len(files)):
track = self.cddb_info.get('TTITLE%d' % i,
'%s, track %2d' % (title, i))
#self.log( '---------\n %s' % track)
f = files[i]
#self.log(f)
#self.log('YYY', 0)
tag = {'genre': genre,
'date': str(year),
'album': title,
'title': track,
'tracknumber': '%d/%d' % (i+1, len(files)),
'artist': artist,
}
# XXX VorbisComment appears to be flaky!
comment = ogg.vorbis.VorbisComment()
for k, v in tag.items():
comment[k] = v # worky??
comment.write_to(f)
del(comment) # That doesn't trigger an error...
#self.log("wheee") # We always get this far...
# Sometimes we barfed mysteriously before this line.
# The ogg.vorbis code would apparently raise exceptions
# and the traceback would show the wrong line.
# Think it's fixed now.
#self.log("hello???")
def _name_from_title(self, s, number=None):
"""Clean up a title, make it suitable for a filename."""
# Whitespaces to underscores.
s = s.strip()
s = '_'.join(s.split())
if number is not None:
s = '%02d_%s' % (number, s)
allchars = string.maketrans('', '')
# Get rid of a lot of nasty characters.
s = s.translate(allchars, '\\/!@#$%^&*()[]{};:"\'<>,./?+`~')
return s
def _getTrackNoFromFname(self, fname):
fname = os.path.basename(fname)
name, ext = os.path.splitext(fname)
# Maybe it's the default output from cdparanoia -B.
name = _rstrip(name, '.cdda')
name = _lstrip(name, 'track')
try:
# Maybe it's of the form 01_Name_Of_Song.foo
i = int(name.split('_')[0])
return i
except ValueError:
pass
try:
# Maybe it's of the form track_01.foo
i = int(name.split('_')[-1])
return i
except ValueError:
pass
raise ValueError, "Couldn't get track number from name %s" % fname
def _makeNewCompressedFileName(self, fname):
# Use our CDDB info to get a better name for this filename.
name = os.path.splitext(fname)[0] + self.compressed_extension
i = self._getTrackNoFromFname(name)
artist, title = self._getArtistAndTitle()
# cddb counts from 0, but our human-readable numbers count from 1.
track = self.cddb_info.get('TTITLE%d' % (i - 1),
'%s, track %02d' % (title, i)
)
new_name = self._name_from_title(track, number=i) + self.compressed_extension
return new_name
def _getArtistAndTitle(self):
# Best guess at artist and album title.
try:
dtitle, dartist = self.cddb_info.get('DTITLE', '/').split('/', 1)
artist = self.artist or dartist
title = self.title or dtitle
except ValueError:
title = self.title or self.cddb_info.get('DTITLE', '')
artist = self.artist
artist = artist.replace('_', ' ')
title = title.replace('_', ' ')
return artist, title
if __name__ == '__main__':
if len(sys.argv) < 2:
sys.stderr.write(
"""Usage: \ncd2ogg 'artist' 'title' (quality)
\n"""
)
sys.exit(1)
artist = sys.argv[1]
title = sys.argv[2]
try:
quality = int(sys.argv[3])
except IndexError:
quality = None
main = Main(artist, title, quality)
main.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment