Skip to content

Instantly share code, notes, and snippets.

@mitchellrj
Last active November 14, 2015 11:44
Show Gist options
  • Save mitchellrj/30a78b6c5820dacccb7b to your computer and use it in GitHub Desktop.
Save mitchellrj/30a78b6c5820dacccb7b to your computer and use it in GitHub Desktop.
Correct naming and tags of music files using MusicBrainz

Corrects the filenames and metadata of music files.

Assumptions

  • You want your music files named:

    ALBUM/TRACK-NUMBER - ARTIST - TRACK-TITLE.EXT
    

or for multi-disc albums:

ALBUM/CD DISC-NUMBER/TRACK-NUMBER - ARTIST - TRACK-TITLE.EXT
  • The tracks you are attempting to rename either have existing metadata about what track number they are, or have filenames beginning with a track number.
  • For multi-disc albums the tracks are already divided between folders named "CD DISC-NUMBER"

Requirements

  • python-requests
  • python-magic
  • id3v2
  • vorbis-tools
  • flac
  • libav
#!env python3
import argparse
from itertools import islice
import os.path
import re
import shlex
import shutil
import subprocess
import sys
import tempfile
from urllib.parse import quote
from xml.etree import ElementTree as ET
try:
import magic
HAVE_MAGIC = True
except ImportError:
HAVE_MAGIC = False
try:
import requests
HAVE_REQUESTS = True
except ImportError:
HAVE_REQUESTS = False
NS = 'http://musicbrainz.org/ns/mmd-2.0#'
TRACK_NUMBER = re.compile(r'([0-9]+)\s*\W')
LUCENE_SPECIAL = re.compile(r'([+\-!(){}\[\]^"-*?:\\/]|&&|\|\|)')
def lucene_escape(phrase):
return LUCENE_SPECIAL.sub(r'\\\1', phrase)
class ReleaseNotFound(RuntimeError):
pass
class CouldNotFetchReleaseData(RuntimeError):
pass
def get_argument_parser():
parser = argparse.ArgumentParser()
parser.add_argument('folder')
parser.add_argument('release_id', nargs='?', default=None,
help='MusicBrainz release ID')
parser.add_argument('-i,--ignore-existing', action='store_true',
default=False, dest='ignore_existing')
parser.add_argument('--api-url', default='http://musicbrainz.org/ws/2/',
help='MusicBrainz API root')
return parser
def get_flac_track_number(file_path):
proc = subprocess.Popen(['metaflac', '--show-tag=TRACKNUMBER', file_path],
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
universal_newlines=True)
stdout, stderr = proc.communicate()
if proc.returncode != 0:
raise RuntimeError(stderr)
for line in stdout.splitlines():
if line.upper().startswith('TRACKNUMBER='):
return int(line[12:].strip())
return None
def get_other_track_number(file_path):
proc = subprocess.Popen(['avprobe', '-show_format', file_path],
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
universal_newlines=True)
stdout, stderr = proc.communicate()
if proc.returncode != 0:
raise RuntimeError(stderr)
for line in stdout.splitlines():
if line.startswith('track='):
return int(line[6:].strip().split('/', 1)[0])
return None
def get_mp3_track_number(file_path):
proc = subprocess.Popen(['id3v2', '--list-rfc822', file_path],
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
universal_newlines=True)
stdout, stderr = proc.communicate()
if proc.returncode != 0:
raise RuntimeError(stderr)
for line in stdout.splitlines():
if line.startswith('TRCK: '):
return int(line[6:].strip().split('/', 1)[0])
if line.startswith('TRK: '):
return int(line[5:].strip().split('/', 1)[0])
return None
TRACK_NUMBER_HANDLERS = {
'audio/x-flac': get_flac_track_number,
'audio/mpeg': get_mp3_track_number,
'audio/mp4': get_mp3_track_number,
'audio/x-musepack': get_other_track_number,
'audio/x-wav': get_other_track_number,
'video/x-ms-asf': get_other_track_number,
'application/octet-stream': get_mp3_track_number,
}
def get_track_number(file_path, mime_type):
track_number = None
if mime_type in TRACK_NUMBER_HANDLERS:
track_number = TRACK_NUMBER_HANDLERS[mime_type](file_path)
if track_number is None:
filename = os.path.basename(file_path)
match = TRACK_NUMBER.match(filename)
if match:
track_number = int(match.group(1))
return track_number
def release_search_result_iterator(release_list, api_url, query, page, limit):
for release in release_list:
date_node = release.find('./{{{}}}date'.format(NS))
country_node = release.find('./{{{}}}country'.format(NS))
label_node = release.find('./{{{ns}}}label-info-list/{{{ns}}}label-info/{{{ns}}}label/{{{ns}}}name'.format(ns=NS))
result = {
'id': release.get('id'),
'title': release.find('./{{{}}}title'.format(NS)).text,
'artist': parse_artist(release),
'types': [
type_.text
for type_ in release.findall('./{{{ns}}}release-group/{{{ns}}}primary-type'.format(ns=NS)) + release.findall('./{{{ns}}}release-group/{{{ns}}}secondary-type-list/{{{ns}}}secondary-type'.format(ns=NS))
],
'date': date_node.text if date_node is not None else '',
'country': country_node.text if country_node is not None else '',
'label': label_node.text if label_node is not None else '',
'discs': [
int(disc.find('./{{{}}}track-list'.format(NS)).get('count'))
for disc in release.findall('./{{{ns}}}medium-list/{{{ns}}}medium'.format(ns=NS))
],
}
yield result
if int(release_list.get('count')) > page * limit:
_, next_page = search_releases(api_url, query, page + 1, limit)
yield from next_page
def search_releases(api_url, query, page=1, limit=25):
url = '{}release/?query={}&offset={}&limit={}'.format(
api_url, quote(query), (page - 1) * limit, limit
)
try:
response = requests.get(url)
response.raise_for_status()
except SystemExit:
raise
except Exception as e:
raise CouldNotFetchReleaseData() from e
root = ET.fromstring(response.text)
release_list = root[0]
return int(release_list.get('count')), release_search_result_iterator(release_list, api_url, query, page, limit)
def get_exact_release_id_from_metadata(folder_path):
for f in os.listdir(folder_path):
file_path = os.path.join(folder_path, f)
if os.path.isfile(file_path) and f.lower().endswith('.flac'):
# TODO: tidy
proc = subprocess.Popen(['metaflac', '--show-tag=MUSICBRAINZ_ALBUMID', file_path],
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
universal_newlines=True)
stdout, stderr = proc.communicate()
if proc.returncode != 0:
raise RuntimeError(stderr)
for line in stdout.splitlines():
if line.upper().startswith('MUSICBRAINZ_ALBUMID='):
return line[20:].strip()
elif os.path.isdir(file_path) and f.upper().startswith('CD'):
result = get_exact_release_id_from_metadata(file_path)
if result is not None:
return result
return None
def get_release_id(api_url, folder_path, ignore_existing=False):
count = 0
if not ignore_existing:
release_id = get_exact_release_id_from_metadata(folder_path)
if release_id is not None:
return release_id
search_term = os.path.basename(folder_path)
while True:
query = '"{}"'.format(lucene_escape(search_term))
count, potential_releases = search_releases(api_url, query, limit=8)
if count:
break
print('Found no results for {!r}. What should I search for? '.format(search_term), end='')
search_term = input()
pages = []
page = 0
release_id = None
while True:
if page >= len(pages):
pages.append(list(islice(potential_releases, 8)))
for i, release_data in enumerate(pages[page], 1):
print('{}. {} - {} ({}, {}) [{}]'.format(i, release_data['title'], release_data['artist'], release_data['date'], release_data['country'], ', '.join(map(str, release_data['discs']))))
print()
print('9. Next page')
print('0. Previous page')
while True:
print('Choose a release: ', end='')
choice = input()
try:
choice = int(choice)
except (TypeError, ValueError):
choice = -1
if choice < 0 or choice > 9:
print('Try again.')
continue
elif choice == 0:
page = max(0, page - 1)
elif choice <= len(pages[page]):
return pages[page][choice - 1]['id']
elif choice == 9:
page += 1
break
def parse_artist(node):
artists = []
for artist in node.iterfind('./{{{}}}artist-credit'.format(NS)):
names = []
for name_credit in artist.iterfind('./{{{}}}name-credit'.format(NS)):
names.append(name_credit.find('./{{{ns}}}artist/{{{ns}}}name'.format(ns=NS)).text)
names.append(name_credit.get('joinphrase', ' & '))
# :-1 to chop off the last join
artists.append(''.join(names[:-1]))
return ' & '.join(artists)
def get_release_data(api_url, release_id):
url = '{}release/{}?inc=media+discids+artist-credits+recordings'.format(
api_url, quote(release_id)
)
try:
response = requests.get(url)
response.raise_for_status()
except SystemExit:
raise
except Exception as e:
raise CouldNotFetchReleaseData() from e
root = ET.fromstring(response.text)
release = root[0]
discs = []
for disc in release.findall('./{{{ns}}}medium-list/{{{ns}}}medium'.format(ns=NS)):
tracks = []
for track in disc.findall('./{{{ns}}}track-list/{{{ns}}}track'.format(ns=NS)):
tracks.append({
'number': int(track.find('./{{{}}}number'.format(NS)).text),
'artist': parse_artist(track.find('./{{{}}}recording'.format(NS))),
'title': track.find('./{{{ns}}}recording/{{{ns}}}title'.format(ns=NS)).text,
'track_id': track.get('id'),
})
disc_title_node = disc.find('./{{{}}}title'.format(NS))
disc_id_node = disc.find('./{{{ns}}}disc-list/{{{ns}}}disc'.format(ns=NS))
discs.append({
'tracks': tracks,
'number': int(disc.find('./{{{}}}position'.format(NS)).text),
'title': disc_title_node.text if disc_title_node is not None else None,
'disc_id': disc_id_node.get('id') if disc_id_node is not None else None,
})
album_title = release.find('./{{{}}}title'.format(NS)).text
album_date_node = release.find('./{{{}}}date'.format(NS))
album_artist = parse_artist(release)
return {
'title': album_title,
'date': album_date_node.text if album_date_node is not None else '',
'artist': album_artist,
'album_id': release.get('id'),
'discs': discs,
'compilation': album_artist.lower() == 'various artists',
}
def correct_mp3_metadata(file_path, release_data, disc_data, track_data):
cmds = [
['--delete-v1'],
['--album', release_data['title']],
['--artist', track_data['artist']],
['--year', release_data['date'][-4:]],
['--TPE2', release_data['artist']],
['--TDAT', release_data['date']],
['--song', track_data['title']],
['--track', '{}/{}'.format(track_data['number'], len(disc_data['tracks']))],
['--TPOS', '{}/{}'.format(disc_data['number'], len(release_data['discs']))],
]
for cmd_args in cmds:
cmd = ['id3v2'] + cmd_args + [file_path]
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
proc.wait()
if proc.returncode != 0:
raise RuntimeError(' '.join(map(shlex.quote, cmd)))
def correct_other_metadata(file_path, release_data, disc_data, track_data):
fd, temp_filename = tempfile.mkstemp(suffix=os.path.splitext(file_path)[-1])
os.close(fd)
cmd = [
'avconv',
'-y',
'-i', file_path,
'-c', 'copy',
'-metadata', 'album={}'.format(release_data['title']),
'-metadata', 'artist={}'.format(track_data['artist']),
'-metadata', 'album_artist={}'.format(release_data['artist']),
'-metadata', 'date={}'.format(release_data['date']),
'-metadata', 'title={}'.format(track_data['title']),
'-metadata', 'track={}/{}'.format(track_data['number'], len(disc_data['tracks'])),
'-metadata', 'disc={}/{}'.format(disc_data['number'], len(release_data['discs'])),
temp_filename,
]
try:
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
proc.wait()
if proc.returncode != 0:
raise RuntimeError(' '.join(map(shlex.quote, cmd)))
shutil.move(temp_filename, file_path)
finally:
try:
os.unlink(temp_filename)
except SystemExit:
raise
except:
pass
def correct_vorbis_metadata(file_path, release_data, disc_data, track_data):
cmd = [
'vorbiscomment',
'-t', 'ALBUM={}'.format(release_data['title']),
'-t', 'ALBUMARTIST={}'.format(release_data['artist']),
'-t', 'ALBUM_ARTIST={}'.format(release_data['artist']),
'-t', 'ARTIST={}'.format(track_data['artist']),
'-t', 'DATE={}'.format(release_data['date']),
'-t', 'TITLE={}'.format(track_data['title']),
'-t', 'TRACKNUMBER={}'.format(track_data['number']),
'-t', 'TRACKTOTAL={}'.format(len(disc_data['tracks'])),
'-t', 'COMPILATION={}'.format(int(release_data['compilation'])),
'-t', 'DISCNUMBER={}'.format(disc_data['number']),
'-t', 'DISCTOTAL={}'.format(len(release_data['discs'])),
'-w',
file_path,
]
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
proc.wait()
if proc.returncode != 0:
raise RuntimeError(' '.join(map(shlex.quote, cmd)))
def correct_flac_metadata(file_path, release_data, disc_data, track_data):
cmd = [
'metaflac',
'--remove-tag', 'MUSICBRAINZ_ALBUMID',
'--set-tag', 'MUSICBRAINZ_ALBUMID={}'.format(release_data['album_id']),
'--remove-tag', 'ALBUM',
'--set-tag', 'ALBUM={}'.format(release_data['title']),
'--remove-tag', 'ALBUM_ARTIST',
'--set-tag', 'ALBUM_ARTIST={}'.format(release_data['artist']),
'--remove-tag', 'ALBUMARTIST',
'--set-tag', 'ALBUMARTIST={}'.format(release_data['artist']),
'--remove-tag', 'ARTIST',
'--set-tag', 'ARTIST={}'.format(track_data['artist']),
'--remove-tag', 'DATE',
'--set-tag', 'DATE={}'.format(release_data['date']),
'--remove-tag', 'TITLE',
'--set-tag', 'TITLE={}'.format(track_data['title']),
'--remove-tag', 'TRACKNUMBER',
'--set-tag', 'TRACKNUMBER={}'.format(track_data['number']),
'--remove-tag', 'TRACKTOTAL',
'--set-tag', 'TRACKTOTAL={}'.format(len(disc_data['tracks'])),
'--remove-tag', 'COMPILATION',
'--set-tag', 'COMPILATION={}'.format(int(release_data['compilation'])),
'--remove-tag', 'DISCNUMBER',
'--set-tag', 'DISCNUMBER={}'.format(disc_data['number']),
'--remove-tag', 'DISCTOTAL',
'--set-tag', 'DISCTOTAL={}'.format(len(release_data['discs'])),
file_path,
]
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
proc.wait()
if proc.returncode != 0:
raise RuntimeError(' '.join(map(shlex.quote, cmd)))
METADATA_HANDLERS = {
'audio/flac': correct_flac_metadata,
'audio/x-flac': correct_flac_metadata,
'audio/mp3': correct_mp3_metadata,
'audio/mp4': correct_mp3_metadata,
'audio/x-musepack': correct_other_metadata,
'audio/x-wav': correct_other_metadata,
'video/x-ms-asf': correct_other_metadata,
'audio/mpeg': correct_mp3_metadata,
'application/octet-stream': correct_mp3_metadata,
'application/ogg': correct_vorbis_metadata,
}
def correct_file(file_path, mime_type, disc_number, track_number, release_data):
if mime_type not in METADATA_HANDLERS:
print('Skipped {} ({})'.format(file_path, mime_type))
return None
disc_data = next(filter(
lambda disc, number=disc_number: disc['number'] == number,
release_data['discs']
))
try:
track_data = next(filter(
lambda track, number=track_number: track['number'] == number,
disc_data['tracks']
))
except StopIteration:
raise RuntimeError('could not get data for track {} on disc {}'.format(track_number, disc_number))
file_dir = os.path.dirname(file_path)
_, file_ext = os.path.splitext(file_path)
new_file_path = os.path.join(file_dir,
'{:02d} - {} - {}{}'.format(
track_number, track_data['artist'], track_data['title'],
file_ext
).replace(os.path.sep, '_'))
if file_path != new_file_path:
os.rename(file_path, new_file_path)
METADATA_HANDLERS[mime_type](new_file_path, release_data, disc_data, track_data)
print('{} -> {}'.format(os.path.basename(file_path), os.path.basename(new_file_path)))
return new_file_path
def correct_folder(folder_path, release_data, disc_number=1):
files = sorted(os.listdir(folder_path))
for filename in files:
file_path = os.path.join(folder_path, filename)
if os.path.isdir(file_path):
if filename.startswith('CD'):
disc_number = int(''.join(c for c in filename if c.isdigit()))
correct_folder(file_path, release_data, disc_number)
else:
raise RuntimeError('What is this directory? {}'.format(repr(file_path)))
else:
mime_type = magic.from_file(file_path, mime=True).decode('utf-8').split(';')[0].strip()
track_number = get_track_number(file_path, mime_type)
if track_number is None:
print('Skipping {} ({})'.format(file_path, mime_type))
continue
new_file_path = correct_file(file_path, mime_type, disc_number, track_number, release_data)
def main(argv=None):
if not HAVE_REQUESTS:
print('python3-requests not installed. Cannot proceed.', file=sys.stderr)
return 2
if not HAVE_MAGIC:
print('python3-magic not installed. Cannot proceed.', file=sys.stderr)
return 4
if argv is None:
argv = sys.argv[1:]
parser = get_argument_parser()
args = parser.parse_args(argv)
folder_path = os.path.abspath(args.folder)
release_id = args.release_id
if not release_id:
try:
release_id = get_release_id(args.api_url, folder_path, args.ignore_existing)
except ReleaseNotFound:
print('Could not find a release.', file=sys.stderr)
return 8
release_data = get_release_data(args.api_url, release_id)
correct_folder(folder_path, release_data)
os.rename(folder_path, os.path.join(os.path.dirname(folder_path), release_data['title'].replace(os.path.sep, '_')))
if __name__ == '__main__':
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment