|
#!env python3 |
|
import argparse |
|
from itertools import islice |
|
import os.path |
|
import re |
|
import shlex |
|
import shutil |
|
import subprocess |
|
import sys |
|
import tempfile |
|
from urllib.parse import quote |
|
from xml.etree import ElementTree as ET |
|
|
|
try: |
|
import magic |
|
HAVE_MAGIC = True |
|
except ImportError: |
|
HAVE_MAGIC = False |
|
try: |
|
import requests |
|
HAVE_REQUESTS = True |
|
except ImportError: |
|
HAVE_REQUESTS = False |
|
|
|
|
|
NS = 'http://musicbrainz.org/ns/mmd-2.0#' |
|
TRACK_NUMBER = re.compile(r'([0-9]+)\s*\W') |
|
LUCENE_SPECIAL = re.compile(r'([+\-!(){}\[\]^"-*?:\\/]|&&|\|\|)') |
|
|
|
|
|
def lucene_escape(phrase): |
|
return LUCENE_SPECIAL.sub(r'\\\1', phrase) |
|
|
|
|
|
class ReleaseNotFound(RuntimeError): |
|
pass |
|
|
|
|
|
class CouldNotFetchReleaseData(RuntimeError): |
|
pass |
|
|
|
|
|
def get_argument_parser(): |
|
parser = argparse.ArgumentParser() |
|
parser.add_argument('folder') |
|
parser.add_argument('release_id', nargs='?', default=None, |
|
help='MusicBrainz release ID') |
|
parser.add_argument('-i,--ignore-existing', action='store_true', |
|
default=False, dest='ignore_existing') |
|
parser.add_argument('--api-url', default='http://musicbrainz.org/ws/2/', |
|
help='MusicBrainz API root') |
|
|
|
return parser |
|
|
|
|
|
def get_flac_track_number(file_path): |
|
proc = subprocess.Popen(['metaflac', '--show-tag=TRACKNUMBER', file_path], |
|
stdout=subprocess.PIPE, stderr=subprocess.PIPE, |
|
universal_newlines=True) |
|
stdout, stderr = proc.communicate() |
|
if proc.returncode != 0: |
|
raise RuntimeError(stderr) |
|
for line in stdout.splitlines(): |
|
if line.upper().startswith('TRACKNUMBER='): |
|
return int(line[12:].strip()) |
|
return None |
|
|
|
|
|
def get_other_track_number(file_path): |
|
proc = subprocess.Popen(['avprobe', '-show_format', file_path], |
|
stdout=subprocess.PIPE, stderr=subprocess.PIPE, |
|
universal_newlines=True) |
|
stdout, stderr = proc.communicate() |
|
if proc.returncode != 0: |
|
raise RuntimeError(stderr) |
|
for line in stdout.splitlines(): |
|
if line.startswith('track='): |
|
return int(line[6:].strip().split('/', 1)[0]) |
|
return None |
|
|
|
|
|
def get_mp3_track_number(file_path): |
|
proc = subprocess.Popen(['id3v2', '--list-rfc822', file_path], |
|
stdout=subprocess.PIPE, stderr=subprocess.PIPE, |
|
universal_newlines=True) |
|
stdout, stderr = proc.communicate() |
|
if proc.returncode != 0: |
|
raise RuntimeError(stderr) |
|
for line in stdout.splitlines(): |
|
if line.startswith('TRCK: '): |
|
return int(line[6:].strip().split('/', 1)[0]) |
|
if line.startswith('TRK: '): |
|
return int(line[5:].strip().split('/', 1)[0]) |
|
return None |
|
|
|
|
|
TRACK_NUMBER_HANDLERS = { |
|
'audio/x-flac': get_flac_track_number, |
|
'audio/mpeg': get_mp3_track_number, |
|
'audio/mp4': get_mp3_track_number, |
|
'audio/x-musepack': get_other_track_number, |
|
'audio/x-wav': get_other_track_number, |
|
'video/x-ms-asf': get_other_track_number, |
|
'application/octet-stream': get_mp3_track_number, |
|
} |
|
|
|
|
|
def get_track_number(file_path, mime_type): |
|
track_number = None |
|
if mime_type in TRACK_NUMBER_HANDLERS: |
|
track_number = TRACK_NUMBER_HANDLERS[mime_type](file_path) |
|
|
|
if track_number is None: |
|
filename = os.path.basename(file_path) |
|
match = TRACK_NUMBER.match(filename) |
|
if match: |
|
track_number = int(match.group(1)) |
|
return track_number |
|
|
|
|
|
def release_search_result_iterator(release_list, api_url, query, page, limit): |
|
for release in release_list: |
|
date_node = release.find('./{{{}}}date'.format(NS)) |
|
country_node = release.find('./{{{}}}country'.format(NS)) |
|
label_node = release.find('./{{{ns}}}label-info-list/{{{ns}}}label-info/{{{ns}}}label/{{{ns}}}name'.format(ns=NS)) |
|
result = { |
|
'id': release.get('id'), |
|
'title': release.find('./{{{}}}title'.format(NS)).text, |
|
'artist': parse_artist(release), |
|
'types': [ |
|
type_.text |
|
for type_ in release.findall('./{{{ns}}}release-group/{{{ns}}}primary-type'.format(ns=NS)) + release.findall('./{{{ns}}}release-group/{{{ns}}}secondary-type-list/{{{ns}}}secondary-type'.format(ns=NS)) |
|
], |
|
'date': date_node.text if date_node is not None else '', |
|
'country': country_node.text if country_node is not None else '', |
|
'label': label_node.text if label_node is not None else '', |
|
'discs': [ |
|
int(disc.find('./{{{}}}track-list'.format(NS)).get('count')) |
|
for disc in release.findall('./{{{ns}}}medium-list/{{{ns}}}medium'.format(ns=NS)) |
|
], |
|
} |
|
yield result |
|
if int(release_list.get('count')) > page * limit: |
|
_, next_page = search_releases(api_url, query, page + 1, limit) |
|
yield from next_page |
|
|
|
|
|
def search_releases(api_url, query, page=1, limit=25): |
|
url = '{}release/?query={}&offset={}&limit={}'.format( |
|
api_url, quote(query), (page - 1) * limit, limit |
|
) |
|
try: |
|
response = requests.get(url) |
|
response.raise_for_status() |
|
except SystemExit: |
|
raise |
|
except Exception as e: |
|
raise CouldNotFetchReleaseData() from e |
|
root = ET.fromstring(response.text) |
|
release_list = root[0] |
|
return int(release_list.get('count')), release_search_result_iterator(release_list, api_url, query, page, limit) |
|
|
|
|
|
def get_exact_release_id_from_metadata(folder_path): |
|
for f in os.listdir(folder_path): |
|
file_path = os.path.join(folder_path, f) |
|
if os.path.isfile(file_path) and f.lower().endswith('.flac'): |
|
# TODO: tidy |
|
proc = subprocess.Popen(['metaflac', '--show-tag=MUSICBRAINZ_ALBUMID', file_path], |
|
stdout=subprocess.PIPE, stderr=subprocess.PIPE, |
|
universal_newlines=True) |
|
stdout, stderr = proc.communicate() |
|
if proc.returncode != 0: |
|
raise RuntimeError(stderr) |
|
for line in stdout.splitlines(): |
|
if line.upper().startswith('MUSICBRAINZ_ALBUMID='): |
|
return line[20:].strip() |
|
elif os.path.isdir(file_path) and f.upper().startswith('CD'): |
|
result = get_exact_release_id_from_metadata(file_path) |
|
if result is not None: |
|
return result |
|
return None |
|
|
|
|
|
def get_release_id(api_url, folder_path, ignore_existing=False): |
|
count = 0 |
|
if not ignore_existing: |
|
release_id = get_exact_release_id_from_metadata(folder_path) |
|
if release_id is not None: |
|
return release_id |
|
search_term = os.path.basename(folder_path) |
|
while True: |
|
query = '"{}"'.format(lucene_escape(search_term)) |
|
count, potential_releases = search_releases(api_url, query, limit=8) |
|
if count: |
|
break |
|
print('Found no results for {!r}. What should I search for? '.format(search_term), end='') |
|
search_term = input() |
|
|
|
pages = [] |
|
page = 0 |
|
release_id = None |
|
while True: |
|
if page >= len(pages): |
|
pages.append(list(islice(potential_releases, 8))) |
|
for i, release_data in enumerate(pages[page], 1): |
|
print('{}. {} - {} ({}, {}) [{}]'.format(i, release_data['title'], release_data['artist'], release_data['date'], release_data['country'], ', '.join(map(str, release_data['discs'])))) |
|
print() |
|
print('9. Next page') |
|
print('0. Previous page') |
|
while True: |
|
print('Choose a release: ', end='') |
|
choice = input() |
|
try: |
|
choice = int(choice) |
|
except (TypeError, ValueError): |
|
choice = -1 |
|
if choice < 0 or choice > 9: |
|
print('Try again.') |
|
continue |
|
elif choice == 0: |
|
page = max(0, page - 1) |
|
elif choice <= len(pages[page]): |
|
return pages[page][choice - 1]['id'] |
|
elif choice == 9: |
|
page += 1 |
|
break |
|
|
|
|
|
def parse_artist(node): |
|
artists = [] |
|
for artist in node.iterfind('./{{{}}}artist-credit'.format(NS)): |
|
names = [] |
|
for name_credit in artist.iterfind('./{{{}}}name-credit'.format(NS)): |
|
names.append(name_credit.find('./{{{ns}}}artist/{{{ns}}}name'.format(ns=NS)).text) |
|
names.append(name_credit.get('joinphrase', ' & ')) |
|
|
|
# :-1 to chop off the last join |
|
artists.append(''.join(names[:-1])) |
|
return ' & '.join(artists) |
|
|
|
|
|
def get_release_data(api_url, release_id): |
|
url = '{}release/{}?inc=media+discids+artist-credits+recordings'.format( |
|
api_url, quote(release_id) |
|
) |
|
try: |
|
response = requests.get(url) |
|
response.raise_for_status() |
|
except SystemExit: |
|
raise |
|
except Exception as e: |
|
raise CouldNotFetchReleaseData() from e |
|
root = ET.fromstring(response.text) |
|
release = root[0] |
|
discs = [] |
|
for disc in release.findall('./{{{ns}}}medium-list/{{{ns}}}medium'.format(ns=NS)): |
|
tracks = [] |
|
for track in disc.findall('./{{{ns}}}track-list/{{{ns}}}track'.format(ns=NS)): |
|
tracks.append({ |
|
'number': int(track.find('./{{{}}}number'.format(NS)).text), |
|
'artist': parse_artist(track.find('./{{{}}}recording'.format(NS))), |
|
'title': track.find('./{{{ns}}}recording/{{{ns}}}title'.format(ns=NS)).text, |
|
'track_id': track.get('id'), |
|
}) |
|
disc_title_node = disc.find('./{{{}}}title'.format(NS)) |
|
disc_id_node = disc.find('./{{{ns}}}disc-list/{{{ns}}}disc'.format(ns=NS)) |
|
discs.append({ |
|
'tracks': tracks, |
|
'number': int(disc.find('./{{{}}}position'.format(NS)).text), |
|
'title': disc_title_node.text if disc_title_node is not None else None, |
|
'disc_id': disc_id_node.get('id') if disc_id_node is not None else None, |
|
}) |
|
album_title = release.find('./{{{}}}title'.format(NS)).text |
|
album_date_node = release.find('./{{{}}}date'.format(NS)) |
|
album_artist = parse_artist(release) |
|
return { |
|
'title': album_title, |
|
'date': album_date_node.text if album_date_node is not None else '', |
|
'artist': album_artist, |
|
'album_id': release.get('id'), |
|
'discs': discs, |
|
'compilation': album_artist.lower() == 'various artists', |
|
} |
|
|
|
|
|
def correct_mp3_metadata(file_path, release_data, disc_data, track_data): |
|
cmds = [ |
|
['--delete-v1'], |
|
['--album', release_data['title']], |
|
['--artist', track_data['artist']], |
|
['--year', release_data['date'][-4:]], |
|
['--TPE2', release_data['artist']], |
|
['--TDAT', release_data['date']], |
|
['--song', track_data['title']], |
|
['--track', '{}/{}'.format(track_data['number'], len(disc_data['tracks']))], |
|
['--TPOS', '{}/{}'.format(disc_data['number'], len(release_data['discs']))], |
|
] |
|
for cmd_args in cmds: |
|
cmd = ['id3v2'] + cmd_args + [file_path] |
|
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
|
proc.wait() |
|
if proc.returncode != 0: |
|
raise RuntimeError(' '.join(map(shlex.quote, cmd))) |
|
|
|
|
|
def correct_other_metadata(file_path, release_data, disc_data, track_data): |
|
fd, temp_filename = tempfile.mkstemp(suffix=os.path.splitext(file_path)[-1]) |
|
os.close(fd) |
|
cmd = [ |
|
'avconv', |
|
'-y', |
|
'-i', file_path, |
|
'-c', 'copy', |
|
'-metadata', 'album={}'.format(release_data['title']), |
|
'-metadata', 'artist={}'.format(track_data['artist']), |
|
'-metadata', 'album_artist={}'.format(release_data['artist']), |
|
'-metadata', 'date={}'.format(release_data['date']), |
|
'-metadata', 'title={}'.format(track_data['title']), |
|
'-metadata', 'track={}/{}'.format(track_data['number'], len(disc_data['tracks'])), |
|
'-metadata', 'disc={}/{}'.format(disc_data['number'], len(release_data['discs'])), |
|
temp_filename, |
|
] |
|
try: |
|
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
|
proc.wait() |
|
if proc.returncode != 0: |
|
raise RuntimeError(' '.join(map(shlex.quote, cmd))) |
|
shutil.move(temp_filename, file_path) |
|
finally: |
|
try: |
|
os.unlink(temp_filename) |
|
except SystemExit: |
|
raise |
|
except: |
|
pass |
|
|
|
def correct_vorbis_metadata(file_path, release_data, disc_data, track_data): |
|
cmd = [ |
|
'vorbiscomment', |
|
'-t', 'ALBUM={}'.format(release_data['title']), |
|
'-t', 'ALBUMARTIST={}'.format(release_data['artist']), |
|
'-t', 'ALBUM_ARTIST={}'.format(release_data['artist']), |
|
'-t', 'ARTIST={}'.format(track_data['artist']), |
|
'-t', 'DATE={}'.format(release_data['date']), |
|
'-t', 'TITLE={}'.format(track_data['title']), |
|
'-t', 'TRACKNUMBER={}'.format(track_data['number']), |
|
'-t', 'TRACKTOTAL={}'.format(len(disc_data['tracks'])), |
|
'-t', 'COMPILATION={}'.format(int(release_data['compilation'])), |
|
'-t', 'DISCNUMBER={}'.format(disc_data['number']), |
|
'-t', 'DISCTOTAL={}'.format(len(release_data['discs'])), |
|
'-w', |
|
file_path, |
|
] |
|
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
|
proc.wait() |
|
if proc.returncode != 0: |
|
raise RuntimeError(' '.join(map(shlex.quote, cmd))) |
|
|
|
|
|
def correct_flac_metadata(file_path, release_data, disc_data, track_data): |
|
cmd = [ |
|
'metaflac', |
|
'--remove-tag', 'MUSICBRAINZ_ALBUMID', |
|
'--set-tag', 'MUSICBRAINZ_ALBUMID={}'.format(release_data['album_id']), |
|
'--remove-tag', 'ALBUM', |
|
'--set-tag', 'ALBUM={}'.format(release_data['title']), |
|
'--remove-tag', 'ALBUM_ARTIST', |
|
'--set-tag', 'ALBUM_ARTIST={}'.format(release_data['artist']), |
|
'--remove-tag', 'ALBUMARTIST', |
|
'--set-tag', 'ALBUMARTIST={}'.format(release_data['artist']), |
|
'--remove-tag', 'ARTIST', |
|
'--set-tag', 'ARTIST={}'.format(track_data['artist']), |
|
'--remove-tag', 'DATE', |
|
'--set-tag', 'DATE={}'.format(release_data['date']), |
|
'--remove-tag', 'TITLE', |
|
'--set-tag', 'TITLE={}'.format(track_data['title']), |
|
'--remove-tag', 'TRACKNUMBER', |
|
'--set-tag', 'TRACKNUMBER={}'.format(track_data['number']), |
|
'--remove-tag', 'TRACKTOTAL', |
|
'--set-tag', 'TRACKTOTAL={}'.format(len(disc_data['tracks'])), |
|
'--remove-tag', 'COMPILATION', |
|
'--set-tag', 'COMPILATION={}'.format(int(release_data['compilation'])), |
|
'--remove-tag', 'DISCNUMBER', |
|
'--set-tag', 'DISCNUMBER={}'.format(disc_data['number']), |
|
'--remove-tag', 'DISCTOTAL', |
|
'--set-tag', 'DISCTOTAL={}'.format(len(release_data['discs'])), |
|
file_path, |
|
] |
|
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
|
proc.wait() |
|
if proc.returncode != 0: |
|
raise RuntimeError(' '.join(map(shlex.quote, cmd))) |
|
|
|
|
|
METADATA_HANDLERS = { |
|
'audio/flac': correct_flac_metadata, |
|
'audio/x-flac': correct_flac_metadata, |
|
'audio/mp3': correct_mp3_metadata, |
|
'audio/mp4': correct_mp3_metadata, |
|
'audio/x-musepack': correct_other_metadata, |
|
'audio/x-wav': correct_other_metadata, |
|
'video/x-ms-asf': correct_other_metadata, |
|
'audio/mpeg': correct_mp3_metadata, |
|
'application/octet-stream': correct_mp3_metadata, |
|
'application/ogg': correct_vorbis_metadata, |
|
} |
|
|
|
|
|
def correct_file(file_path, mime_type, disc_number, track_number, release_data): |
|
if mime_type not in METADATA_HANDLERS: |
|
print('Skipped {} ({})'.format(file_path, mime_type)) |
|
return None |
|
disc_data = next(filter( |
|
lambda disc, number=disc_number: disc['number'] == number, |
|
release_data['discs'] |
|
)) |
|
try: |
|
track_data = next(filter( |
|
lambda track, number=track_number: track['number'] == number, |
|
disc_data['tracks'] |
|
)) |
|
except StopIteration: |
|
raise RuntimeError('could not get data for track {} on disc {}'.format(track_number, disc_number)) |
|
file_dir = os.path.dirname(file_path) |
|
_, file_ext = os.path.splitext(file_path) |
|
new_file_path = os.path.join(file_dir, |
|
'{:02d} - {} - {}{}'.format( |
|
track_number, track_data['artist'], track_data['title'], |
|
file_ext |
|
).replace(os.path.sep, '_')) |
|
if file_path != new_file_path: |
|
os.rename(file_path, new_file_path) |
|
METADATA_HANDLERS[mime_type](new_file_path, release_data, disc_data, track_data) |
|
print('{} -> {}'.format(os.path.basename(file_path), os.path.basename(new_file_path))) |
|
return new_file_path |
|
|
|
|
|
def correct_folder(folder_path, release_data, disc_number=1): |
|
files = sorted(os.listdir(folder_path)) |
|
for filename in files: |
|
file_path = os.path.join(folder_path, filename) |
|
if os.path.isdir(file_path): |
|
if filename.startswith('CD'): |
|
disc_number = int(''.join(c for c in filename if c.isdigit())) |
|
correct_folder(file_path, release_data, disc_number) |
|
else: |
|
raise RuntimeError('What is this directory? {}'.format(repr(file_path))) |
|
else: |
|
mime_type = magic.from_file(file_path, mime=True).decode('utf-8').split(';')[0].strip() |
|
track_number = get_track_number(file_path, mime_type) |
|
if track_number is None: |
|
print('Skipping {} ({})'.format(file_path, mime_type)) |
|
continue |
|
new_file_path = correct_file(file_path, mime_type, disc_number, track_number, release_data) |
|
|
|
|
|
def main(argv=None): |
|
if not HAVE_REQUESTS: |
|
print('python3-requests not installed. Cannot proceed.', file=sys.stderr) |
|
return 2 |
|
if not HAVE_MAGIC: |
|
print('python3-magic not installed. Cannot proceed.', file=sys.stderr) |
|
return 4 |
|
if argv is None: |
|
argv = sys.argv[1:] |
|
parser = get_argument_parser() |
|
args = parser.parse_args(argv) |
|
folder_path = os.path.abspath(args.folder) |
|
release_id = args.release_id |
|
if not release_id: |
|
try: |
|
release_id = get_release_id(args.api_url, folder_path, args.ignore_existing) |
|
except ReleaseNotFound: |
|
print('Could not find a release.', file=sys.stderr) |
|
return 8 |
|
release_data = get_release_data(args.api_url, release_id) |
|
correct_folder(folder_path, release_data) |
|
os.rename(folder_path, os.path.join(os.path.dirname(folder_path), release_data['title'].replace(os.path.sep, '_'))) |
|
|
|
|
|
if __name__ == '__main__': |
|
sys.exit(main()) |