Created
December 6, 2016 16:19
-
-
Save dannymichel/a4f1abfde9d7f7748dc2732607d83356 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
from __future__ import print_function | |
__doc__ = '''This script is designed to make uploads to Gazelle private trackers | |
easier and faster. | |
It will: | |
0. create a torrent file if necessary | |
1. inspect audio files | |
2. extract the AcoustID | |
3. lookup the files on Musicbrainz to extract metadata | |
4. look for duplicates on a Gazelle instance | |
5. prompt for missing information | |
6. upload given torrent file on upload.php | |
You can either pass an existing torrent file (``--torrent``) or ask | |
the script to generate one for you (``--mktorrent``). If both are | |
provided, the torrent will be overwritten with the new settings. This | |
is to make sure you reupload existing what.cd torrents if you have | |
them. Use the --announce argument to point to your personal announce | |
URL as specified in upload.php. This replaces the announce URL in | |
existing torrent files, so you can reuse your what.cd torrents. | |
This processes a single Album at a time. The rationale is that file | |
layout varies and it is difficult to find the right folder hierachy | |
that should be shipped in a torrent file. | |
Only music is supported for now. | |
Known issues: | |
* user interface is very rough: unicode-encoded strings may be | |
unreadable, but should still be submitted correctly. similarly, you | |
will need to convert MB media formats into something Gazelle knows about | |
* parsing a large number of files for AcoustID is slow | |
* can't upload pre-1982 CDs: we need to support multiple releases and | |
this is currently broken | |
* duplication detector sometimes matches too much, and doesn't know | |
trumping rules | |
In general, do things the way upload.php does it. | |
''' | |
# Copyright 2011, Adrian Sampson. | |
# Copyright 2016, Fuda Fafa | |
# | |
# Permission is hereby granted, free of charge, to any person obtaining | |
# a copy of this software and associated documentation files (the | |
# "Software"), to deal in the Software without restriction, including | |
# without limitation the rights to use, copy, modify, merge, publish, | |
# distribute, sublicense, and/or sell copies of the Software, and to | |
# permit persons to whom the Software is furnished to do so, subject to | |
# the following conditions: | |
# | |
# The above copyright notice and this permission notice shall be | |
# included in all copies or substantial portions of the Software. | |
# relevant documentation: | |
# https://github.com/WhatCD/Gazelle/wiki/JSON-API-Documentation#torrent | |
# http://click.pocoo.org/ | |
# http://docs.python-requests.org/ | |
# https://python-musicbrainzngs.readthedocs.io/ | |
# http://musicbrainz.org/development/mmd | |
# https://musicbrainz.org/doc/Release#Status | |
# changelog: | |
# 1.0: first release | |
# 1.1: automatically modify the announce URL when provided with --announce | |
# 1.2: torrent generation --mktorrent, --version support | |
# 1.2.1: fixed 1.1 and 1.2 features so they actually work. | |
# 1.2.2: fixed tags so they are correctly comma-separated | |
# 1.2.3: fixed rare artist parsing problem (with "feat." artists) | |
# 1.2.4: report exception when acoustid fails, thanks frizkie | |
# 1.3.0: format / bitrate detection | |
# 1.3.1: fix issues with unicode filenames and mktorrent | |
# 1.3.2: fix handling of errors in exiftool | |
# 1.3.3: fix error handling, try to handle failures better | |
# 1.3.4: handle missing exiftool | |
# 1.3.5: small bugfixes | |
# 1.4.0: retry metadata on upload failures, remove duplicate / in | |
# URLs, deal better with garbage all over | |
# 1.5.0: source support for PTH, better error handling again | |
# 1.6.0: refactoring, no overwrite by default, --mktorrent and | |
# --torrent supported together | |
# 1.6.1: deal with more corner cases, put version and name in release_desc | |
__version__ = '1.6.1' | |
import errno | |
import json | |
import logging | |
import operator | |
import os | |
import os.path | |
import re | |
import subprocess | |
import sys | |
__prog__ = os.path.basename(sys.argv and sys.argv[0] or __file__) | |
import acoustid | |
import click | |
from musicbrainzngs import get_recording_by_id, get_image_list, set_useragent, get_release_by_id | |
from musicbrainzngs.musicbrainz import ResponseError | |
import requests | |
import requests.utils | |
# API key for this demo script only. Get your own API key at the | |
# Acoustid Web for your application. | |
# http://acoustid.org/ | |
API_KEY = 'cSpUJKpD' | |
def dump(*kargs, **kwargs): | |
return json.dumps(*kargs, indent=2, **kwargs) | |
def dir_size(path='.'): | |
total_size = 0 | |
for dirpath, dirnames, filenames in os.walk(path): | |
for f in filenames: | |
fp = os.path.join(dirpath, f) | |
total_size += os.path.getsize(fp) | |
return total_size | |
def make_torrent(directory, announce): | |
torrent = None | |
try: | |
from BitTorrent.btmakemetafile import make_meta_file | |
except ImportError as e: | |
logging.error("can't generate torrent: %s", e) | |
else: | |
size = dir_size(directory) | |
torrent = directory + '.torrent' | |
logging.warn('found %s bytes in %s, torrent %s', | |
size, directory, torrent) | |
with click.progressbar(label='creating torrent', | |
length=size) as bar: | |
make_meta_file(bytes(directory.encode('utf-8')), | |
bytes(announce.encode('utf-8')), | |
target=torrent, | |
progress=bar.update, progress_percent=False) | |
return torrent | |
def rewrite_torrent_data(torrent, announce=False, source=False): | |
try: | |
from BitTorrent import bencode | |
except ImportError: | |
import bencode | |
with open(torrent, 'rb') as torrentfile: | |
torrent_data = bencode.bdecode(torrentfile.read()) | |
if announce: | |
torrent_data['announce'] = str(announce) | |
if source: | |
torrent_data['info']['source'] = str(source) | |
torrent_data['info']['private'] = 1 # for good measure | |
torrent_data['encoding'] = 'UTF-8' | |
return bencode.bencode(torrent_data) | |
def tracker_login(tracker, username, password): | |
# make sure we have a trailing slash | |
session = requests.Session() | |
ua = '%s (%s/%s)' % (requests.utils.default_user_agent(), | |
__prog__, __version__) | |
session.headers['User-Agent'] = ua | |
r = session.get(tracker + 'login.php') | |
logging.debug('headers: %s', r.headers) | |
if r.status_code != requests.codes.ok: | |
logging.error('could not access site %s', tracker) | |
creds = {'username': username, 'password': password} | |
r = session.post(tracker + 'login.php', | |
data=creds, allow_redirects=False) | |
logging.debug('headers: %s', r.headers) | |
logging.debug('body: %s', r.text) | |
r.raise_for_status() | |
if r.status_code != requests.codes.found: | |
raise requests.HTTPError('wrong password?') | |
else: | |
logging.info('logged into tracker %s', tracker) | |
return session | |
def identify_format(paths): | |
command = ['exiftool', '-json', '-FileType', '-LameVBRQuality', '-AudioBitrate'] | |
command += paths | |
bitrate = 'Other' | |
try: | |
content = subprocess.check_output(command) | |
except subprocess.CalledProcessError as e: | |
logging.info('exiftool complained: %s', e) | |
# this will happen on .cue files and so on | |
content = e.output | |
except OSError as e: | |
if e.errno == errno.ENOENT: | |
logging.warn('exiftool not found, cannot identify bit rate') | |
return | |
else: | |
raise e | |
metas = json.loads(content) | |
for meta in metas: | |
fmt = meta.get('FileType', None) | |
# covered: 192, 256, 320, Vx (VBR), FLAC (Lossless) | |
# not covered: APS (VBR), APX (VBR), q8.x (VBR), 24bit Lossless, Other | |
if fmt == 'MP3': | |
bitrate = meta.get('LameVBRQuality', False) | |
if bitrate is not False: | |
bitrate = 'V%s (VBR)' % bitrate | |
else: | |
bitrate = meta.get('AudioBitrate', 'Other') | |
bitrate = bitrate.replace(' kbps', '') | |
elif fmt == 'FLAC': | |
bitrate = 'Lossless' | |
elif fmt in ['M2TS', 'JPEG', 'HTML', 'PDF']: | |
continue | |
elif fmt is None: | |
continue | |
yield fmt, bitrate | |
def process_album(files, tracker, session): | |
logging.debug('checking files %s', files) | |
releases_meta = {} # copy of the release metadata returned by recordings | |
releases_scores = {} | |
with click.progressbar(files, label='analyzing files') as bar: | |
for path in bar: | |
logging.debug('checking %s' % path) | |
for score, recid, title, artist in match_recording(path.encode('utf-8')): | |
logging.debug('matched with recording %s %s %s %s', | |
score, recid, title, artist) | |
includes = ['artists', 'releases'] | |
try: | |
recording = get_recording_by_id(recid, | |
includes=includes) | |
except ResponseError as e: | |
if '404' not in str(e): | |
raise | |
else: | |
logging.warn('recording id %s not found', recid) | |
continue | |
logging.debug('releases: %s', dump(recording)) | |
for release in recording['recording']['release-list']: | |
rid = release['id'] | |
if rid not in releases_meta: | |
releases_scores[rid] = 0.0 | |
release['artist-credit'] = recording['recording']['artist-credit'] | |
releases_meta[rid] = release | |
releases_scores[rid] += score | |
if not releases_meta: | |
logging.warn('could not find release on Musicbrainz!') | |
return {} | |
output = check_releases(releases_meta, releases_scores) | |
if tracker and session: | |
try: | |
dupes = find_duplicates(tracker, session, output) | |
except (ValueError, requests.HTTPError): | |
logging.warn("warning: invalid response, couldn't check for duplicates") | |
dupes = False | |
if dupes: | |
logging.warn('warning: duplicates found') | |
results = [(result['groupId'], result['torrents']) | |
for result in dupes] | |
logging.info('results: %s %s', results, dupes) | |
for group, torrents in results: | |
for torrent in torrents: | |
torrent['tracker'] = tracker | |
torrent['groupId'] = group | |
logging.warn('''{media} {format} {encoding} log: {hasLog} {logScore} remastered: {remastered} {remasterYear} {remasterCatalogueNumber} {remasterTitle} | |
scene: {scene} files: {fileCount} size: {size} D/S/L: {snatches} {seeders} {leechers} | |
{tracker}torrents.php?id={groupId}&torrendid={torrentId}'''.format(**torrent)) | |
return output | |
def match_recording(filename): | |
try: | |
results = acoustid.match(API_KEY, filename) | |
except (acoustid.FingerprintGenerationError, EOFError) as e: | |
logging.warn("fingerprint could not be calculated on %s: %s", filename, e) | |
return | |
except acoustid.WebServiceError as exc: | |
logging.warn("web service request failed: %s", exc.message) | |
return | |
for score, recid, title, artist in results: | |
logging.info('%s (%s - %s, %f%%)', | |
recid, artist, title, score * 100) | |
yield score, recid, title, artist | |
def cover_url(rid): | |
data = get_image_list(rid) | |
for image in data["images"]: | |
if "Front" in image["types"] and image["approved"]: | |
return image["thumbnails"]["large"] | |
def find_duplicates(tracker, session, output): | |
if len(output['artists[]']) > 1: | |
logging.warn('more than one artists found, duplicate search may fail') | |
params = {'action': 'browse', | |
'artistname': output['artists[]'][0], | |
'groupname': output['title']} | |
r = session.get(tracker + 'ajax.php', params=params) | |
logging.debug('headers: %s', r.headers) | |
logging.debug('content: %s', r.text) | |
logging.debug('status: %s', r.status_code) | |
r.raise_for_status() | |
answer = r.json() | |
if answer['status'] == 'success' and answer['response']['results']: | |
return answer['response']['results'] | |
else: | |
return False | |
def check_releases(releases_meta, releases_scores): | |
release_id = max(releases_scores, key=releases_scores.get) | |
s = sorted(releases_scores.items(), key=operator.itemgetter(1)) | |
logging.info('releases_scores: %s', dump(s)) | |
release = releases_meta[release_id] | |
logging.debug('full release metadata: %s', dump(release)) | |
# delete useless metadata from output: | |
output = {k: v for k, v in release.iteritems() | |
if k in ['date', 'id', 'status', 'title']} | |
output['year'] = output.get('date', '').split('-')[0] | |
output['score'] = '%f' % max(releases_scores.values()) | |
output['release-url'] = 'https://musicbrainz.org/release/%s' % release_id | |
output['artists[]'] = [a['artist']['name'] | |
for a in release['artist-credit'] | |
if type(a) is dict] | |
includes = ['labels', 'discids', 'tags', 'media', | |
'release-groups', 'recordings'] | |
more_meta = get_release_by_id(release_id, includes=includes)['release'] | |
logging.debug('more release metadata: %s', dump(more_meta)) | |
# this should be converted between MB and Gazelle formats: | |
# https://musicbrainz.org/doc/Release/Format | |
medias = [m.get('format', 'CD') | |
for m in more_meta.get('medium-list', [])] | |
output['media'] = " ".join(medias) | |
labels = [l.get('label', {}).get('name', '') | |
for l in more_meta.get('label-info-list', [])] | |
output['record_label'] = " ".join(labels) | |
catalogs = [l.get('catalog-number', '') | |
for l in more_meta.get('label-info-list', [])] | |
output['catalog_number'] = " ".join(catalogs) | |
for field in ['barcode', 'asin', 'country']: | |
output[field] = more_meta.get(field, '') | |
output['release_group_id'] = more_meta['release-group']['id'] | |
output['releasetype'] = more_meta['release-group'].get('type') | |
output['tags'] = [tag['name'].replace(' ', '.') | |
for tag in more_meta['release-group'].get('tag-list', [])] | |
output['tags'] = ", ".join(output['tags']) | |
output['tracknum'] = sum([len(m['track-list']) | |
for m in more_meta['medium-list']]) | |
output['album_desc'] = ''' | |
[url=https://musicbrainz.org/release-group/{release_group_id}]MusicBrainz[/url] | |
[url=http://www.amazon.com/exec/obidos/ASIN/{asin}]Amazon[/url] | |
Country: {country} | |
Barcode: {barcode} | |
Tracks: {tracknum} | |
Track list: | |
'''.format(**output) | |
tracks = [t for m in more_meta.get('medium-list', []) | |
for t in m.get('track-list', [])] | |
for track in tracks: | |
output['album_desc'] += '[#]' + track['recording']['title'] + "\n" | |
if 'release_desc' not in output: | |
marker = 'uploaded using %s %s' % (__prog__, __version__) | |
output['release_desc'] = marker | |
# should be taken from the above release group info, probably | |
try: | |
output['image'] = cover_url(release_id) | |
if output['image'] is None: | |
del output['image'] | |
else: | |
output['image'].replace('http://', 'https://', 1) | |
except ResponseError as e: | |
if '404' not in str(e): | |
raise | |
return output | |
def confirm_data(output): | |
data = {'type': 'Music', | |
'importance[]': '1', # Main, hardcoded | |
} | |
artists = click.prompt('confirm artists', output.get('artists[]', [])) | |
if type(artists) is not list: | |
artists = artists.split(',') | |
data['artists[]'] = artists | |
field_list = ['title', | |
'year', | |
'record_label', | |
'catalog_number', | |
# remaster, remaster_year, | |
# remaster_record_label, | |
# remaster_catalog_number... | |
# 'scene', | |
'media', | |
'format', # missing | |
'bitrate', # missing | |
# vbr ("other bitrates"?) | |
# 'logfiles[]' | |
'tags', | |
'album_desc', | |
'release_desc', # missing | |
'image'] | |
logging.info('formats: MP3, FLAC, Ogg Vorbis, AAC, AC3, DTS') | |
logging.info('bitrate: 192, APS (VBR), V2 (VBR), V1 (VBR), 256, APX (VBR), V0 (VBR), q8.x (VBR), 320, Lossless, 24bit Lossless, Other') | |
logging.warn('confirm metadata, use "none" to avoid using the default value if not checked') | |
for field in field_list: | |
if field in output: | |
data[field] = output[field] | |
for field in field_list: | |
logging.debug('old value: %s', repr(data.get(field))) | |
if field in data: | |
data[field] = click.prompt('confirm %s' % field, data[field]) | |
if data[field].lower() == 'none': | |
data[field] = '' | |
else: | |
data[field] = click.prompt('enter %s' % field, '') | |
logging.debug('new value: %s', repr(data.get(field))) | |
types = {'Album': 1, | |
'Soundtrack': 3, | |
'EP': 5, | |
'Anthology': 6, | |
'Compilation': 7, | |
'Single': 9, | |
'Live album': 11, | |
'Remix': 13, | |
'Bootleg': 14, | |
'Interview': 15, | |
'Mixtape': 16, | |
'Unknown': 21} | |
rtypes = {v: k for k, v in types.iteritems()} | |
click.echo('release types: %s' % types.keys()) | |
if 'releasetype' in output: | |
if output['releasetype'] not in types: | |
# coming from MB | |
output['releasetype'] = rtypes.get(output['releasetype']) | |
releasetype = click.prompt('confirm release type', | |
output['releasetype']) | |
else: | |
releasetype = click.prompt('enter release type') | |
if not types.get(releasetype, False): | |
logging.warn('unknown release type: %s, defaulting to Album', | |
releasetype) | |
data['releasetype'] = types.get(releasetype, 1) | |
return data | |
@click.command(epilog=__doc__) | |
@click.version_option(version=__version__) | |
@click.argument('directory') | |
@click.option('--loglevel', 'loglevel', | |
help='show only warning messages', | |
type=click.Choice(['WARNING', 'INFO', 'DEBUG']), | |
flag_value='WARNING', default=True) | |
@click.option('-v', '--verbose', 'loglevel', help='be more verbose', | |
flag_value='INFO') | |
@click.option('-d', '--debug', 'loglevel', help='even more verbose', | |
flag_value='DEBUG') | |
@click.option('--tracker', help='use Gazelle instance at URL') | |
@click.option('--username', prompt=True, | |
help='username to login with, default: prompted') | |
@click.password_option(confirmation_prompt=False, help='default: prompted') | |
@click.option('--torrent', help='torrent file to upload', | |
type=click.Path(exists=True, readable=True)) | |
@click.option('--announce', help='announce URL to use in torrent') | |
@click.option('--source', help='source tag to add to the torrent, e.g. "PTH"') | |
@click.option('--mktorrent', show_default=True, is_flag=True, | |
help='create a torrent file with the given directory ' | |
'or rewrite provided torrent') | |
def identify(directory, loglevel, tracker, username, password, | |
torrent, announce, source, mktorrent): | |
'''upload given directory and torrent to Gazelle''' | |
logging.basicConfig(format='%(message)s', level=loglevel) | |
# required by btmakemetafile | |
directory = os.path.abspath(directory.rstrip('/')) | |
if mktorrent: | |
if not tracker: | |
raise click.UsageError('--mktorrent needs --tracker') | |
if announce and not torrent: | |
torrent = make_torrent(directory, announce) | |
else: | |
raise click.UsageError('--mktorrent needs --announce') | |
auth_token = None | |
if tracker: | |
tracker = tracker.rstrip('/') + '/' | |
session = None | |
try: | |
session = tracker_login(tracker, username, password) | |
except requests.HTTPError as e: | |
session = None | |
if torrent or mktorrent: | |
raise click.UsageError('login failed: %s', e) | |
else: | |
logging.warn('login failed: %s', e) | |
else: | |
r = session.get(tracker + 'upload.php') | |
# <input type="hidden" name="auth" value="<32 char hex string>"> | |
m = re.search(r'<input\s+type="hidden"\s+name="auth"\s+value="(\w+)"\s*/\s*>', | |
r.text) | |
if m: | |
auth_token = m.group(1) | |
logging.info('found authentication token') | |
elif torrent or mktorrent: | |
raise click.UsageError('could not parse upload form, are you logged in?') | |
else: | |
logging.warn('no form token found, upload impossible') | |
if auth_token: | |
if announce or source: | |
torrent_data = rewrite_torrent_data(torrent, | |
announce=announce, | |
source=source) | |
if mktorrent: | |
open(torrent, 'wb').write(torrent_data) | |
elif torrent: | |
torrent_data = open(torrent, 'rb').read() | |
set_useragent(__prog__, __version__) | |
files = [] | |
for dirpath, _, filenames in os.walk(directory): | |
for f in filenames: | |
files.append(os.path.join(dirpath, f)) | |
if not files: | |
logging.warn('no files provided, aborting') | |
return | |
meta = identify_format(files) | |
if meta: | |
# XXX: just take last file | |
fmt, bitrate = set(meta).pop() | |
output = process_album(files, tracker, session) | |
output['format'] = fmt | |
output['bitrate'] = bitrate | |
if not torrent: | |
logging.warn('no torrent provided, nothing to upload') | |
return | |
elif not auth_token: | |
logging.warn('invalid auth token, aborting') | |
return | |
confirmed = False | |
data = output | |
click.echo('found release: %s' % output.get('release-url')) | |
while not confirmed: | |
data = confirm_data(data) | |
click.echo(dump(data)) | |
confirmed = click.confirm('metadata ok?', default=True) | |
if not click.confirm('upload torrent %s' % (torrent), default=True): | |
return | |
uploaded = False | |
while not uploaded: | |
data['auth'] = auth_token | |
data['submit'] = 'Upload torrent' | |
# hardcode torrent name because requests crashes on unicode filenames | |
files = {'file_input': ('torrent.torrent', torrent_data)} | |
logging.debug('data: %s', data) | |
logging.debug('files: %s', files) | |
click.echo('uploading...') | |
r = session.post(tracker + 'upload.php', data=data, | |
files=files, allow_redirects=False) | |
logging.debug('headers: %s', r.headers) | |
logging.debug('body: %s', r.text) | |
r.raise_for_status() | |
if r.status_code != requests.codes.found: | |
m = re.search(r'<h1>Warning</h1>.*?<strong>(Your torrent has been uploaded;.*?)</strong>', | |
r.text, re.DOTALL) | |
if m: | |
uploaded = True | |
click.echo('uploaded, but warning: %s' % m.group(1)) | |
else: | |
m = re.search(r'<p style="color: red; text-align: center;">(.*?)</p>', | |
r.text, re.DOTALL) | |
if m: | |
logging.warning('upload failed: %s' % m.group(1)) | |
else: | |
logging.warning('upload failed!') | |
del data['auth'] | |
del data['submit'] | |
data = confirm_data(data) | |
else: | |
uploaded = True | |
click.echo('uploaded: %s%s' % (tracker, | |
r.headers['Location'])) | |
if __name__ == '__main__': | |
try: | |
identify() | |
except acoustid.NoBackendError: | |
logging.error("chromaprint library/tool not found") | |
except requests.HTTPError as e: | |
logging.error('error talking with tracker: %s', e) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment