Modified version of the original last-export script to also obtain album, artist and track info. Just run it and wait. It might take a while (hours) to complete. If we rush, we might get blocked from the API.
# lastfm-export.py -u <LASTFM_USERNAME>
Modified version of the original last-export script to also obtain album, artist and track info. Just run it and wait. It might take a while (hours) to complete. If we rush, we might get blocked from the API.
# lastfm-export.py -u <LASTFM_USERNAME>
#!/usr/bin/env python | |
#-*- coding: utf-8 -*- | |
# This program is free software: you can redistribute it and/or modify | |
# it under the terms of the GNU General Public License as published by | |
# the Free Software Foundation, either version 3 of the License, or | |
# (at your option) any later version. | |
# | |
# This program is distributed in the hope that it will be useful, | |
# but WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
# GNU General Public License for more details. | |
# | |
# You should have received a copy of the GNU General Public License | |
# along with this program. If not, see <http://www.gnu.org/licenses/>. | |
# | |
""" | |
Script for exporting tracks through audioscrobbler API. | |
Usage: lastexport.py -u USER [-o OUTFILE] [-p STARTPAGE] [-s SERVER] | |
""" | |
import urllib2, urllib, sys, time, re | |
import xml.etree.ElementTree as ET | |
from optparse import OptionParser | |
import json | |
import pickle | |
import os | |
__version__ = '0.0.4' | |
if os.path.exists('artist.p'): | |
artist_dict = pickle.load(open('artist.p', 'rb')) | |
else: | |
artist_dict = dict() | |
if os.path.exists('track.p'): | |
track_dict = pickle.load(open('track.p', 'rb')) | |
else: | |
track_dict = dict() | |
if os.path.exists('album.p'): | |
album_dict = pickle.load(open('album.p', 'rb')) | |
else: | |
album_dict = dict() | |
def get_options(parser): | |
""" Define command line options.""" | |
parser.add_option("-u", "--user", dest="username", default=None, | |
help="User name.") | |
parser.add_option("-o", "--outfile", dest="outfile", default="exported_tracks.txt", | |
help="Output file, default is exported_tracks.txt") | |
parser.add_option("-p", "--page", dest="startpage", type="int", default="1", | |
help="Page to start fetching tracks from, default is 1") | |
parser.add_option("-s", "--server", dest="server", default="last.fm", | |
help="Server to fetch track info from, default is last.fm") | |
parser.add_option("-t", "--type", dest="infotype", default="scrobbles", | |
help="Type of information to export, scrobbles|loved|banned, default is scrobbles") | |
options, args = parser.parse_args() | |
if not options.username: | |
sys.exit("User name not specified, see --help") | |
if options.infotype == "loved": | |
infotype = "lovedtracks" | |
elif options.infotype == "banned": | |
infotype = "bannedtracks" | |
else: | |
infotype = "recenttracks" | |
return options.username, options.outfile, options.startpage, options.server, infotype | |
def connect_server(server, username, startpage, sleep_func=time.sleep, tracktype='recenttracks'): | |
""" Connect to server and get a XML page.""" | |
if server == "libre.fm": | |
baseurl = 'http://alpha.libre.fm/2.0/?' | |
urlvars = dict(method='user.get%s' % tracktype, | |
api_key=('lastexport.py-%s' % __version__).ljust(32, '-'), | |
user=username, | |
page=startpage, | |
limit=200) | |
elif server == "last.fm": | |
baseurl = 'http://ws.audioscrobbler.com/2.0/?' | |
urlvars = dict(method='user.get%s' % tracktype, | |
api_key='e38cc7822bd7476fe4083e36ee69748e', | |
user=username, | |
page=startpage, | |
limit=50) | |
else: | |
if server[:7] != 'http://': | |
server = 'http://%s' % server | |
baseurl = server + '/2.0/?' | |
urlvars = dict(method='user.get%s' % tracktype, | |
api_key=('lastexport.py-%s' % __version__).ljust(32, '-'), | |
user=username, | |
page=startpage, | |
limit=200) | |
url = baseurl + urllib.urlencode(urlvars) | |
for interval in (1, 5, 10, 62, 240, 480, 1200, 2400): | |
try: | |
f = urllib2.urlopen(url) | |
break | |
except Exception, e: | |
last_exc = e | |
print "Exception occured, retrying in %ds: %s" % (interval, e) | |
sleep_func(interval) | |
else: | |
print "Failed to open page %s" % urlvars['page'] | |
raise last_exc | |
response = f.read() | |
f.close() | |
#bad hack to fix bad xml | |
response = re.sub('\xef\xbf\xbe', '', response) | |
return response | |
def connect_server_artist(server, mbid, artist, sleep_func=time.sleep): | |
""" Connect to server and get a XML page.""" | |
artist = artist.encode('utf-8') | |
if server == "last.fm": | |
baseurl = 'http://ws.audioscrobbler.com/2.0/?' | |
urlvars = dict(method='artist.getinfo', | |
api_key='e38cc7822bd7476fe4083e36ee69748e', | |
format='json', | |
artist=artist, | |
mbid=mbid) | |
url = baseurl + urllib.urlencode(urlvars) | |
print url | |
for interval in (1, 5, 10, 62, 240, 480, 1200, 2400): | |
try: | |
f = urllib2.urlopen(url) | |
break | |
except Exception, e: | |
last_exc = e | |
print "Exception occured, retrying in %ds: %s" % (interval, e) | |
sleep_func(interval) | |
else: | |
print "Failed to open page %s" % urlvars['page'] | |
raise last_exc | |
response = f.read() | |
f.close() | |
return response | |
def connect_server_track(server, mbid, track, artist, sleep_func=time.sleep): | |
""" Connect to server and get a XML page.""" | |
track = track.encode('utf-8') | |
artist = artist.encode('utf-8') | |
if server == "last.fm": | |
baseurl = 'http://ws.audioscrobbler.com/2.0/?' | |
urlvars = dict(method='track.getInfo', | |
api_key='e38cc7822bd7476fe4083e36ee69748e', | |
format='json', | |
track=track, | |
artist=artist, | |
mbid=mbid) | |
url = baseurl + urllib.urlencode(urlvars) | |
print url | |
for interval in (1, 5, 10, 62, 240, 480, 1200, 2400): | |
try: | |
f = urllib2.urlopen(url) | |
break | |
except Exception, e: | |
last_exc = e | |
print "Exception occured, retrying in %ds: %s" % (interval, e) | |
sleep_func(interval) | |
else: | |
print "Failed to open page %s" % urlvars['page'] | |
raise last_exc | |
response = f.read() | |
f.close() | |
return response | |
def connect_server_album(server, mbid, album, artist, sleep_func=time.sleep): | |
""" Connect to server and get a XML page.""" | |
album = album.encode('utf-8') | |
artist = artist.encode('utf-8') | |
if server == "last.fm": | |
baseurl = 'http://ws.audioscrobbler.com/2.0/?' | |
urlvars = dict(method='album.getinfo', | |
api_key='e38cc7822bd7476fe4083e36ee69748e', | |
format='json', | |
mbid=mbid, | |
album=album, | |
artist=artist) | |
url = baseurl + urllib.urlencode(urlvars) | |
print url | |
for interval in (1, 5, 10, 62, 240, 480, 1200, 2400): | |
try: | |
f = urllib2.urlopen(url) | |
break | |
except Exception, e: | |
last_exc = e | |
print "Exception occured, retrying in %ds: %s" % (interval, e) | |
sleep_func(interval) | |
else: | |
print "Failed to open page %s" % urlvars['page'] | |
raise last_exc | |
response = f.read() | |
f.close() | |
return response | |
def get_pageinfo(response, tracktype='recenttracks'): | |
"""Check how many pages of tracks the user have.""" | |
xmlpage = ET.fromstring(response) | |
totalpages = xmlpage.find(tracktype).attrib.get('totalPages') | |
return int(totalpages) | |
def get_tracklist(response): | |
"""Read XML page and get a list of tracks and their info.""" | |
xmlpage = ET.fromstring(response) | |
tracklist = xmlpage.getiterator('track') | |
return tracklist | |
def parse_artist(response_dict): | |
tmp_artist_dict = {} | |
mbid = response_dict['artist']['mbid'] | |
tmp_artist_dict[mbid] = {} | |
if isinstance(response_dict['artist'].get('bio',{}).get('formationlist',{}).get('formation',{}), dict): | |
tmp_artist_dict[mbid]['yearfrom'] = response_dict['artist'].get('bio',{}).get('formationlist',{}).get('formation',{}).get('yearfrom','') | |
tmp_artist_dict[mbid]['yearto'] = response_dict['artist'].get('bio',{}).get('formationlist',{}).get('formation',{}).get('yearto','') | |
else: | |
tmp_artist_dict[mbid]['yearfrom'] = response_dict['artist'].get('bio',{}).get('formationlist',{}).get('formation',{})[0].get('yearfrom','') | |
year_lenght = len(response_dict['artist'].get('bio',{}).get('formationlist',{}).get('formation',{})) - 1 | |
tmp_artist_dict[mbid]['yearto'] = response_dict['artist'].get('bio',{}).get('formationlist',{}).get('formation',{})[year_lenght].get('yearto','') | |
tmp_artist_dict[mbid]['placeformed'] = response_dict['artist'].get('bio',{}).get('placeformed','') | |
tmp_artist_dict[mbid]['listeners'] = response_dict['artist'].get('stats',{}).get('listeners','') | |
tmp_artist_dict[mbid]['playcount'] = response_dict['artist'].get('stats',{}).get('playcount','') | |
if response_dict['artist'].get('tags',{}) == u'\n ': | |
tmp_artist_dict[mbid]['tag0'] = '' | |
tmp_artist_dict[mbid]['tag1'] = '' | |
tmp_artist_dict[mbid]['tag2'] = '' | |
tmp_artist_dict[mbid]['tag3'] = '' | |
tmp_artist_dict[mbid]['tag4'] = '' | |
elif isinstance(response_dict['artist'].get('tags',{}).get('tag'),dict): | |
tmp_artist_dict[mbid]['tag0'] = response_dict['artist'].get('tags',{}).get('tag').get('name') | |
tmp_artist_dict[mbid]['tag1'] = '' | |
tmp_artist_dict[mbid]['tag2'] = '' | |
tmp_artist_dict[mbid]['tag3'] = '' | |
tmp_artist_dict[mbid]['tag4'] = '' | |
else: | |
for n_tag in [0,1,2,3,4]: | |
if len(response_dict['artist'].get('tags',{}).get('tag',{})) >= n_tag + 1: | |
tmp_artist_dict[mbid]['tag' + str(n_tag)] = response_dict['artist'].get('tags',{}).get('tag',{})[n_tag]['name'] | |
else: | |
tmp_artist_dict[mbid]['tag' + str(n_tag)] = '' | |
return tmp_artist_dict | |
def parse_trackinfo(response_dict): | |
tmp_track_dict = {} | |
myid = response_dict['track']['mbid'] | |
tmp_track_dict[myid] = {} | |
tmp_track_dict[myid]['duration'] = response_dict['track'].get('duration','') | |
tmp_track_dict[myid]['listeners'] = response_dict['track'].get('listeners','') | |
tmp_track_dict[myid]['playcount'] = response_dict['track'].get('playcount','') | |
if response_dict['track'].get('toptags',{}) == u'\n ': | |
tmp_track_dict[myid]['tag0'] = '' | |
tmp_track_dict[myid]['tag1'] = '' | |
tmp_track_dict[myid]['tag2'] = '' | |
tmp_track_dict[myid]['tag3'] = '' | |
tmp_track_dict[myid]['tag4'] = '' | |
elif isinstance(response_dict['track'].get('toptags',{}).get('tag'),dict): | |
tmp_track_dict[myid]['tag0'] = response_dict['track'].get('toptags',{}).get('tag').get('name') | |
tmp_track_dict[myid]['tag1'] = '' | |
tmp_track_dict[myid]['tag2'] = '' | |
tmp_track_dict[myid]['tag3'] = '' | |
tmp_track_dict[myid]['tag4'] = '' | |
else: | |
for n_tag in [0,1,2,3,4]: | |
if len(response_dict['track'].get('toptags',{}).get('tag')) >= n_tag + 1: | |
tmp_track_dict[myid]['tag' + str(n_tag)] = response_dict['track'].get('toptags',{}).get('tag',{})[n_tag]['name'] | |
else: | |
tmp_track_dict[myid]['tag' + str(n_tag)] = '' | |
return myid, tmp_track_dict | |
def parse_album(response_dict): | |
tmp_album_dict = {} | |
myid = response_dict['album']['mbid'] | |
tmp_album_dict[myid] = {} | |
tmp_album_dict[myid]['releasedate'] = response_dict['album'].get('releasedate','') | |
tmp_album_dict[myid]['listeners'] = response_dict['album'].get('listeners','') | |
tmp_album_dict[myid]['playcount'] = response_dict['album'].get('playcount','') | |
return myid, tmp_album_dict | |
def parse_track(trackelement, username): | |
"""Extract info from every track entry and output to list.""" | |
if trackelement.find('artist').getchildren(): | |
#artist info is nested in loved/banned tracks xml | |
artistname = trackelement.find('artist').find('name').text | |
artistmbid = trackelement.find('artist').find('mbid').text | |
else: | |
artistname = trackelement.find('artist').text | |
artistmbid = trackelement.find('artist').get('mbid') | |
if trackelement.find('album') is None: | |
#no album info for loved/banned tracks | |
albumname = '' | |
albummbid = '' | |
else: | |
albumname = trackelement.find('album').text | |
albummbid = trackelement.find('album').get('mbid') | |
trackname = trackelement.find('name').text | |
trackmbid = trackelement.find('mbid').text | |
date = trackelement.find('date').get('uts') | |
if artistmbid and artistmbid is not '' and artistmbid not in artist_dict: | |
response_artist = connect_server_artist(server, artistmbid, artistname) | |
print json.loads(response_artist) | |
artist_dict.update(parse_artist(json.loads(response_artist))) | |
if trackmbid and trackmbid is not '' and trackmbid not in track_dict: | |
response_track = connect_server_track(server, trackmbid, trackname, artistname) | |
print json.loads(response_track) | |
myid, response_track = parse_trackinfo(json.loads(response_track)) | |
track_dict.update(response_track) | |
if albummbid and albummbid is not '' and albummbid not in album_dict: | |
response_album = connect_server_album(server, albummbid, albumname, artistname) | |
print json.loads(response_album) | |
if 'message' in json.loads(response_album): | |
pass | |
elif isinstance(json.loads(response_album), basestring): | |
pass | |
else: | |
myalbumid, response_album = parse_album(json.loads(response_album)) | |
album_dict.update(response_album) | |
output = [date, | |
trackname, | |
artistname, | |
albumname, | |
track_dict.get(trackmbid,{}).get('duration',''), | |
track_dict.get(trackmbid,{}).get('listeners',''), | |
track_dict.get(trackmbid,{}).get('playcount',''), | |
track_dict.get(trackmbid,{}).get('tag0',''), | |
track_dict.get(trackmbid,{}).get('tag1',''), | |
track_dict.get(trackmbid,{}).get('tag2',''), | |
track_dict.get(trackmbid,{}).get('tag3',''), | |
track_dict.get(trackmbid,{}).get('tag4',''), | |
trackmbid, | |
artist_dict.get(artistmbid,{}).get('yearfrom',''), | |
artist_dict.get(artistmbid,{}).get('yearto',''), | |
artist_dict.get(artistmbid,{}).get('placeformed',''), | |
artist_dict.get(artistmbid,{}).get('listeners',''), | |
artist_dict.get(artistmbid,{}).get('playcount',''), | |
artist_dict.get(artistmbid,{}).get('tag0',''), | |
artist_dict.get(artistmbid,{}).get('tag1',''), | |
artist_dict.get(artistmbid,{}).get('tag2',''), | |
artist_dict.get(artistmbid,{}).get('tag3',''), | |
artist_dict.get(artistmbid,{}).get('tag4',''), | |
artistmbid, | |
album_dict.get(albummbid,{}).get('releasedate',''), | |
album_dict.get(albummbid,{}).get('playcount',''), | |
album_dict.get(albummbid,{}).get('listeners',''), | |
albummbid] | |
for i, v in enumerate(output): | |
if v is None: | |
output[i] = '' | |
return output | |
def write_tracks(tracks, outfileobj): | |
"""Write tracks to an open file""" | |
for fields in tracks: | |
outfileobj.write(("\t".join(fields) + "\n").encode('utf-8')) | |
def get_tracks(server, username, startpage=1, sleep_func=time.sleep, tracktype='recenttracks'): | |
page = startpage | |
response = connect_server(server, username, page, sleep_func, tracktype) | |
totalpages = get_pageinfo(response, tracktype) | |
if startpage > totalpages: | |
raise ValueError("First page (%s) is higher than total pages (%s)." % (startpage, totalpages)) | |
while page <= totalpages: | |
#Skip connect if on first page, already have that one stored. | |
if page > startpage: | |
response = connect_server(server, username, page, sleep_func, tracktype) | |
tracklist = get_tracklist(response) | |
tracks = [] | |
for trackelement in tracklist: | |
# do not export the currently playing track. | |
if not trackelement.attrib.has_key("nowplaying") or not trackelement.attrib["nowplaying"]: | |
tracks.append(parse_track(trackelement, username)) | |
yield page, totalpages, tracks | |
page += 1 | |
sleep_func(.5) | |
def main(server, username, startpage, outfile, infotype='recenttracks'): | |
trackdict = dict() | |
page = startpage # for case of exception | |
totalpages = -1 # ditto | |
n = 0 | |
try: | |
for page, totalpages, tracks in get_tracks(server, username, startpage, tracktype=infotype): | |
print "Got page %s of %s.." % (page, totalpages) | |
pickle.dump(artist_dict, open('artist.p','wb')) | |
pickle.dump(track_dict, open('track.p','wb')) | |
pickle.dump(album_dict, open('album.p','wb')) | |
for track in tracks: | |
if infotype == 'recenttracks': | |
trackdict.setdefault(track[0], track) | |
else: | |
#Can not use timestamp as key for loved/banned tracks as it's not unique | |
n += 1 | |
trackdict.setdefault(n, track) | |
except ValueError, e: | |
exit(e) | |
except Exception: | |
raise | |
finally: | |
with open(outfile, 'a') as outfileobj: | |
tracks = sorted(trackdict.values(), reverse=True) | |
write_tracks(tracks, outfileobj) | |
print "Wrote page %s-%s of %s to file %s" % (startpage, page, totalpages, outfile) | |
if __name__ == "__main__": | |
parser = OptionParser() | |
username, outfile, startpage, server, infotype = get_options(parser) | |
main(server, username, startpage, outfile, infotype) |