Created
March 20, 2013 00:51
-
-
Save bitmorse/5201491 to your computer and use it in GitHub Desktop.
lastfm scrobble exporter (from https://gitorious.org/fmthings/lasttolibre/blobs/master/lastexport.py // changed the script to try more often on failure )
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
#-*- coding: utf-8 -*- | |
# This program is free software: you can redistribute it and/or modify | |
# it under the terms of the GNU General Public License as published by | |
# the Free Software Foundation, either version 3 of the License, or | |
# (at your option) any later version. | |
# | |
# This program is distributed in the hope that it will be useful, | |
# but WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
# GNU General Public License for more details. | |
# | |
# You should have received a copy of the GNU General Public License | |
# along with this program. If not, see <http://www.gnu.org/licenses/>. | |
# | |
""" | |
Script for exporting tracks through audioscrobbler API. | |
Usage: lastexport.py -u USER [-o OUTFILE] [-p STARTPAGE] [-s SERVER] | |
""" | |
import urllib2, urllib, sys, time, re | |
import xml.etree.ElementTree as ET | |
from optparse import OptionParser | |
__version__ = '0.0.4' | |
def get_options(parser): | |
""" Define command line options.""" | |
parser.add_option("-u", "--user", dest="username", default=None, | |
help="User name.") | |
parser.add_option("-o", "--outfile", dest="outfile", default="exported_tracks.txt", | |
help="Output file, default is exported_tracks.txt") | |
parser.add_option("-p", "--page", dest="startpage", type="int", default="1", | |
help="Page to start fetching tracks from, default is 1") | |
parser.add_option("-s", "--server", dest="server", default="last.fm", | |
help="Server to fetch track info from, default is last.fm") | |
parser.add_option("-t", "--type", dest="infotype", default="scrobbles", | |
help="Type of information to export, scrobbles|loved|banned, default is scrobbles") | |
options, args = parser.parse_args() | |
if not options.username: | |
sys.exit("User name not specified, see --help") | |
if options.infotype == "loved": | |
infotype = "lovedtracks" | |
elif options.infotype == "banned": | |
infotype = "bannedtracks" | |
else: | |
infotype = "recenttracks" | |
return options.username, options.outfile, options.startpage, options.server, infotype | |
def connect_server(server, username, startpage, sleep_func=time.sleep, tracktype='recenttracks'): | |
""" Connect to server and get a XML page.""" | |
if server == "libre.fm": | |
baseurl = 'http://alpha.libre.fm/2.0/?' | |
urlvars = dict(method='user.get%s' % tracktype, | |
api_key=('lastexport.py-%s' % __version__).ljust(32, '-'), | |
user=username, | |
page=startpage, | |
limit=200) | |
elif server == "last.fm": | |
baseurl = 'http://ws.audioscrobbler.com/2.0/?' | |
urlvars = dict(method='user.get%s' % tracktype, | |
api_key='e38cc7822bd7476fe4083e36ee69748e', | |
user=username, | |
page=startpage, | |
limit=50) | |
else: | |
if server[:7] != 'http://': | |
server = 'http://%s' % server | |
baseurl = server + '/2.0/?' | |
urlvars = dict(method='user.get%s' % tracktype, | |
api_key=('lastexport.py-%s' % __version__).ljust(32, '-'), | |
user=username, | |
page=startpage, | |
limit=200) | |
url = baseurl + urllib.urlencode(urlvars) | |
for interval in (1, 5, 10, 62, 240): | |
try: | |
f = urllib2.urlopen(url) | |
break | |
except Exception, e: | |
last_exc = e | |
print "Exception occured, retrying in %ds: %s" % (interval, e) | |
sleep_func(interval) | |
else: | |
print "Failed to open page %s" % urlvars['page'] | |
raise last_exc | |
response = f.read() | |
f.close() | |
#bad hack to fix bad xml | |
response = re.sub('\xef\xbf\xbe', '', response) | |
return response | |
def get_pageinfo(response, tracktype='recenttracks'): | |
"""Check how many pages of tracks the user have.""" | |
xmlpage = ET.fromstring(response) | |
totalpages = xmlpage.find(tracktype).attrib.get('totalPages') | |
return int(totalpages) | |
def get_tracklist(response): | |
"""Read XML page and get a list of tracks and their info.""" | |
xmlpage = ET.fromstring(response) | |
tracklist = xmlpage.getiterator('track') | |
return tracklist | |
def parse_track(trackelement): | |
"""Extract info from every track entry and output to list.""" | |
if trackelement.find('artist').getchildren(): | |
#artist info is nested in loved/banned tracks xml | |
artistname = trackelement.find('artist').find('name').text | |
artistmbid = trackelement.find('artist').find('mbid').text | |
else: | |
artistname = trackelement.find('artist').text | |
artistmbid = trackelement.find('artist').get('mbid') | |
if trackelement.find('album') is None: | |
#no album info for loved/banned tracks | |
albumname = '' | |
albummbid = '' | |
else: | |
albumname = trackelement.find('album').text | |
albummbid = trackelement.find('album').get('mbid') | |
trackname = trackelement.find('name').text | |
trackmbid = trackelement.find('mbid').text | |
date = trackelement.find('date').get('uts') | |
output = [date, trackname, artistname, albumname, trackmbid, artistmbid, albummbid] | |
for i, v in enumerate(output): | |
if v is None: | |
output[i] = '' | |
return output | |
def write_tracks(tracks, outfileobj): | |
"""Write tracks to an open file""" | |
for fields in tracks: | |
outfileobj.write(("\t".join(fields) + "\n").encode('utf-8')) | |
def get_tracks(server, username, startpage=1, sleep_func=time.sleep, tracktype='recenttracks'): | |
page = startpage | |
response = connect_server(server, username, page, sleep_func, tracktype) | |
totalpages = get_pageinfo(response, tracktype) | |
if startpage > totalpages: | |
raise ValueError("First page (%s) is higher than total pages (%s)." % (startpage, totalpages)) | |
while page <= totalpages: | |
#Skip connect if on first page, already have that one stored. | |
if page > startpage: | |
response = connect_server(server, username, page, sleep_func, tracktype) | |
tracklist = get_tracklist(response) | |
tracks = [] | |
for trackelement in tracklist: | |
# do not export the currently playing track. | |
if not trackelement.attrib.has_key("nowplaying") or not trackelement.attrib["nowplaying"]: | |
tracks.append(parse_track(trackelement)) | |
yield page, totalpages, tracks | |
page += 1 | |
sleep_func(.5) | |
def main(server, username, startpage, outfile, infotype='recenttracks'): | |
trackdict = dict() | |
page = startpage # for case of exception | |
totalpages = -1 # ditto | |
n = 0 | |
try: | |
for page, totalpages, tracks in get_tracks(server, username, startpage, tracktype=infotype): | |
print "Got page %s of %s.." % (page, totalpages) | |
for track in tracks: | |
if infotype == 'recenttracks': | |
trackdict.setdefault(track[0], track) | |
else: | |
#Can not use timestamp as key for loved/banned tracks as it's not unique | |
n += 1 | |
trackdict.setdefault(n, track) | |
except ValueError, e: | |
exit(e) | |
except Exception: | |
raise | |
finally: | |
with open(outfile, 'a') as outfileobj: | |
tracks = sorted(trackdict.values(), reverse=True) | |
write_tracks(tracks, outfileobj) | |
print "Wrote page %s-%s of %s to file %s" % (startpage, page, totalpages, outfile) | |
if __name__ == "__main__": | |
parser = OptionParser() | |
username, outfile, startpage, server, infotype = get_options(parser) | |
main(server, username, startpage, outfile, infotype) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Following up on @cuppajoeman - with python2 installed, change the first line of the script to
#!/usr/bin/env python2
from#!/usr/bin/env python