Last active
August 14, 2017 08:24
-
-
Save otykhonruk/3969732 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import argparse | |
import json | |
import os.path | |
import re | |
import sys | |
try: | |
# python 2 | |
from urllib import urlencode, urlretrieve | |
from urllib2 import urlopen | |
except ImportError: | |
# python 3 | |
from urllib.request import urlopen, urlretrieve | |
from urllib.parse import urlencode | |
FILENAME = "{track_num:02d}_{title_link}.mp3" | |
STREAM_FORMAT = 'mp3' | |
STREAM_MOUNT = '/bandcamp' | |
def _discover(args): | |
params = { | |
'f': 'null', | |
'g': args.g, | |
'p': 1, | |
'r': args.r, | |
's': args.s, | |
'w': 0 | |
} | |
res = urlopen('http://bandcamp.com/discover_cb', urlencode(params)) | |
data = json.loads(res.read()) | |
if data: | |
items = data['discover_results']['items'] | |
for item in items: | |
url_hints = item['url_hints'] | |
domain = url_hints['custom_domain'] | |
if not domain: | |
domain = url_hints['subdomain'] + '.bandcamp.com' | |
album = url_hints['slug'] | |
yield 'http://{}/album/{}'.format(domain, album) | |
def _init_shout(password='hackme'): | |
from shout import Shout | |
s = Shout() | |
s.password = password | |
s.format = STREAM_FORMAT | |
s.mount = STREAM_MOUNT | |
s.open() | |
return s | |
def _get_field(name, page): | |
exp = "^\s*{0}\s*:\s*(.*?),?$".format(name) | |
pattern = re.compile(exp, re.MULTILINE) | |
result = pattern.search(page) | |
return result.group(1) | |
def _get_info(url): | |
page = None | |
try: | |
page = urlopen(url).read().decode('utf-8') | |
except IOError as e: | |
print('Cannot load album from {url}: {err}'.format(url=url, err=e)) | |
return None | |
# artist = json.loads(_get_field('artist', page)) | |
# album = json.loads(_get_field('current', page))['title'] | |
trackinfo = json.loads(_get_field('trackinfo', page)) | |
return trackinfo | |
def _process_album(trackinfo, fn): | |
for track in trackinfo: | |
file = track['file'] | |
if file: | |
if(isinstance(file, dict)): | |
url = next(iter(file.values())) | |
if url.startswith('//'): | |
url = 'https:' + url | |
track['file'] = url | |
fn(track) | |
else: | |
print("Free preview of the track {track_num:d} - '{title}' is not available.".format(**track)) | |
def _download_track(track): | |
track['title_link'] = os.path.basename(track['title_link']) | |
fname = FILENAME.format(**track) | |
if os.path.exists(fname): | |
print("Track {track_num:d} - '{title}' is already downloaded".format(**track)) | |
else: | |
print("Retrieving track {track_num:d} - '{title}'".format(**track)) | |
try: | |
urlretrieve(track['file'], fname) | |
except IOError as e: | |
print('Cannot retrieve track {track_num:d} - {title}'.format(**track)) | |
print(e) | |
def _stream_track(client): | |
def _i_stream_track(track): | |
client.set_metadata({'song': str(track['title'])}) | |
f = urlopen(track['file']) | |
nbuf = f.read(4096) | |
while 1: | |
buf = nbuf | |
nbuf = f.read(4096) | |
if len(buf) == 0: | |
break | |
client.send(buf) | |
client.sync() | |
f.close() | |
return _i_stream_track | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description='bandcamp.com client') | |
parser.add_argument('--download', help='download tracks to current directory (default)', | |
dest='download_mode', action='store_true', default=True) | |
parser.add_argument('--stream', help='stream album(s) to icecast server', | |
dest='download_mode', action='store_false') | |
parser.add_argument('--discover', help='discover albums', | |
action='store_true', default=False) | |
parser.add_argument('url', nargs='*') | |
# discover params | |
parser.add_argument('-g', help='genre', action='store') | |
parser.add_argument('-r', help='recommended', action='store', choices=['most', 'latest']) | |
parser.add_argument('-s', help='category', action='store', choices=['top', 'pic', 'new', 'rec']) | |
args = parser.parse_args() | |
urls = _discover(args) if args.discover else args.url | |
shout_client = None | |
if not args.download_mode: | |
shout_client = _init_shout() | |
for url in urls: | |
info = _get_info(url) | |
if info: | |
if args.download_mode: | |
_process_album(info, _download_track) | |
else: | |
_process_album(info, _stream_track(shout_client)) | |
if shout_client: | |
shout_client.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment