Modified version of the original last-export script to also obtain album, artist and track info. Just run it and wait. It might take a while (hours) to complete. If we rush, we might get blocked from the API.
# lastfm-export.py -u <LASTFM_USERNAME>
Modified version of the original last-export script to also obtain album, artist and track info. Just run it and wait. It might take a while (hours) to complete. If we rush, we might get blocked from the API.
# lastfm-export.py -u <LASTFM_USERNAME>
| #!/usr/bin/env python | |
| #-*- coding: utf-8 -*- | |
| # This program is free software: you can redistribute it and/or modify | |
| # it under the terms of the GNU General Public License as published by | |
| # the Free Software Foundation, either version 3 of the License, or | |
| # (at your option) any later version. | |
| # | |
| # This program is distributed in the hope that it will be useful, | |
| # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
| # GNU General Public License for more details. | |
| # | |
| # You should have received a copy of the GNU General Public License | |
| # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
| # | |
| """ | |
| Script for exporting tracks through audioscrobbler API. | |
| Usage: lastexport.py -u USER [-o OUTFILE] [-p STARTPAGE] [-s SERVER] | |
| """ | |
| import urllib2, urllib, sys, time, re | |
| import xml.etree.ElementTree as ET | |
| from optparse import OptionParser | |
| import json | |
| import pickle | |
| import os | |
| __version__ = '0.0.4' | |
| if os.path.exists('artist.p'): | |
| artist_dict = pickle.load(open('artist.p', 'rb')) | |
| else: | |
| artist_dict = dict() | |
| if os.path.exists('track.p'): | |
| track_dict = pickle.load(open('track.p', 'rb')) | |
| else: | |
| track_dict = dict() | |
| if os.path.exists('album.p'): | |
| album_dict = pickle.load(open('album.p', 'rb')) | |
| else: | |
| album_dict = dict() | |
| def get_options(parser): | |
| """ Define command line options.""" | |
| parser.add_option("-u", "--user", dest="username", default=None, | |
| help="User name.") | |
| parser.add_option("-o", "--outfile", dest="outfile", default="exported_tracks.txt", | |
| help="Output file, default is exported_tracks.txt") | |
| parser.add_option("-p", "--page", dest="startpage", type="int", default="1", | |
| help="Page to start fetching tracks from, default is 1") | |
| parser.add_option("-s", "--server", dest="server", default="last.fm", | |
| help="Server to fetch track info from, default is last.fm") | |
| parser.add_option("-t", "--type", dest="infotype", default="scrobbles", | |
| help="Type of information to export, scrobbles|loved|banned, default is scrobbles") | |
| options, args = parser.parse_args() | |
| if not options.username: | |
| sys.exit("User name not specified, see --help") | |
| if options.infotype == "loved": | |
| infotype = "lovedtracks" | |
| elif options.infotype == "banned": | |
| infotype = "bannedtracks" | |
| else: | |
| infotype = "recenttracks" | |
| return options.username, options.outfile, options.startpage, options.server, infotype | |
| def connect_server(server, username, startpage, sleep_func=time.sleep, tracktype='recenttracks'): | |
| """ Connect to server and get a XML page.""" | |
| if server == "libre.fm": | |
| baseurl = 'http://alpha.libre.fm/2.0/?' | |
| urlvars = dict(method='user.get%s' % tracktype, | |
| api_key=('lastexport.py-%s' % __version__).ljust(32, '-'), | |
| user=username, | |
| page=startpage, | |
| limit=200) | |
| elif server == "last.fm": | |
| baseurl = 'http://ws.audioscrobbler.com/2.0/?' | |
| urlvars = dict(method='user.get%s' % tracktype, | |
| api_key='e38cc7822bd7476fe4083e36ee69748e', | |
| user=username, | |
| page=startpage, | |
| limit=50) | |
| else: | |
| if server[:7] != 'http://': | |
| server = 'http://%s' % server | |
| baseurl = server + '/2.0/?' | |
| urlvars = dict(method='user.get%s' % tracktype, | |
| api_key=('lastexport.py-%s' % __version__).ljust(32, '-'), | |
| user=username, | |
| page=startpage, | |
| limit=200) | |
| url = baseurl + urllib.urlencode(urlvars) | |
| for interval in (1, 5, 10, 62, 240, 480, 1200, 2400): | |
| try: | |
| f = urllib2.urlopen(url) | |
| break | |
| except Exception, e: | |
| last_exc = e | |
| print "Exception occured, retrying in %ds: %s" % (interval, e) | |
| sleep_func(interval) | |
| else: | |
| print "Failed to open page %s" % urlvars['page'] | |
| raise last_exc | |
| response = f.read() | |
| f.close() | |
| #bad hack to fix bad xml | |
| response = re.sub('\xef\xbf\xbe', '', response) | |
| return response | |
| def connect_server_artist(server, mbid, artist, sleep_func=time.sleep): | |
| """ Connect to server and get a XML page.""" | |
| artist = artist.encode('utf-8') | |
| if server == "last.fm": | |
| baseurl = 'http://ws.audioscrobbler.com/2.0/?' | |
| urlvars = dict(method='artist.getinfo', | |
| api_key='e38cc7822bd7476fe4083e36ee69748e', | |
| format='json', | |
| artist=artist, | |
| mbid=mbid) | |
| url = baseurl + urllib.urlencode(urlvars) | |
| print url | |
| for interval in (1, 5, 10, 62, 240, 480, 1200, 2400): | |
| try: | |
| f = urllib2.urlopen(url) | |
| break | |
| except Exception, e: | |
| last_exc = e | |
| print "Exception occured, retrying in %ds: %s" % (interval, e) | |
| sleep_func(interval) | |
| else: | |
| print "Failed to open page %s" % urlvars['page'] | |
| raise last_exc | |
| response = f.read() | |
| f.close() | |
| return response | |
| def connect_server_track(server, mbid, track, artist, sleep_func=time.sleep): | |
| """ Connect to server and get a XML page.""" | |
| track = track.encode('utf-8') | |
| artist = artist.encode('utf-8') | |
| if server == "last.fm": | |
| baseurl = 'http://ws.audioscrobbler.com/2.0/?' | |
| urlvars = dict(method='track.getInfo', | |
| api_key='e38cc7822bd7476fe4083e36ee69748e', | |
| format='json', | |
| track=track, | |
| artist=artist, | |
| mbid=mbid) | |
| url = baseurl + urllib.urlencode(urlvars) | |
| print url | |
| for interval in (1, 5, 10, 62, 240, 480, 1200, 2400): | |
| try: | |
| f = urllib2.urlopen(url) | |
| break | |
| except Exception, e: | |
| last_exc = e | |
| print "Exception occured, retrying in %ds: %s" % (interval, e) | |
| sleep_func(interval) | |
| else: | |
| print "Failed to open page %s" % urlvars['page'] | |
| raise last_exc | |
| response = f.read() | |
| f.close() | |
| return response | |
| def connect_server_album(server, mbid, album, artist, sleep_func=time.sleep): | |
| """ Connect to server and get a XML page.""" | |
| album = album.encode('utf-8') | |
| artist = artist.encode('utf-8') | |
| if server == "last.fm": | |
| baseurl = 'http://ws.audioscrobbler.com/2.0/?' | |
| urlvars = dict(method='album.getinfo', | |
| api_key='e38cc7822bd7476fe4083e36ee69748e', | |
| format='json', | |
| mbid=mbid, | |
| album=album, | |
| artist=artist) | |
| url = baseurl + urllib.urlencode(urlvars) | |
| print url | |
| for interval in (1, 5, 10, 62, 240, 480, 1200, 2400): | |
| try: | |
| f = urllib2.urlopen(url) | |
| break | |
| except Exception, e: | |
| last_exc = e | |
| print "Exception occured, retrying in %ds: %s" % (interval, e) | |
| sleep_func(interval) | |
| else: | |
| print "Failed to open page %s" % urlvars['page'] | |
| raise last_exc | |
| response = f.read() | |
| f.close() | |
| return response | |
| def get_pageinfo(response, tracktype='recenttracks'): | |
| """Check how many pages of tracks the user have.""" | |
| xmlpage = ET.fromstring(response) | |
| totalpages = xmlpage.find(tracktype).attrib.get('totalPages') | |
| return int(totalpages) | |
| def get_tracklist(response): | |
| """Read XML page and get a list of tracks and their info.""" | |
| xmlpage = ET.fromstring(response) | |
| tracklist = xmlpage.getiterator('track') | |
| return tracklist | |
| def parse_artist(response_dict): | |
| tmp_artist_dict = {} | |
| mbid = response_dict['artist']['mbid'] | |
| tmp_artist_dict[mbid] = {} | |
| if isinstance(response_dict['artist'].get('bio',{}).get('formationlist',{}).get('formation',{}), dict): | |
| tmp_artist_dict[mbid]['yearfrom'] = response_dict['artist'].get('bio',{}).get('formationlist',{}).get('formation',{}).get('yearfrom','') | |
| tmp_artist_dict[mbid]['yearto'] = response_dict['artist'].get('bio',{}).get('formationlist',{}).get('formation',{}).get('yearto','') | |
| else: | |
| tmp_artist_dict[mbid]['yearfrom'] = response_dict['artist'].get('bio',{}).get('formationlist',{}).get('formation',{})[0].get('yearfrom','') | |
| year_lenght = len(response_dict['artist'].get('bio',{}).get('formationlist',{}).get('formation',{})) - 1 | |
| tmp_artist_dict[mbid]['yearto'] = response_dict['artist'].get('bio',{}).get('formationlist',{}).get('formation',{})[year_lenght].get('yearto','') | |
| tmp_artist_dict[mbid]['placeformed'] = response_dict['artist'].get('bio',{}).get('placeformed','') | |
| tmp_artist_dict[mbid]['listeners'] = response_dict['artist'].get('stats',{}).get('listeners','') | |
| tmp_artist_dict[mbid]['playcount'] = response_dict['artist'].get('stats',{}).get('playcount','') | |
| if response_dict['artist'].get('tags',{}) == u'\n ': | |
| tmp_artist_dict[mbid]['tag0'] = '' | |
| tmp_artist_dict[mbid]['tag1'] = '' | |
| tmp_artist_dict[mbid]['tag2'] = '' | |
| tmp_artist_dict[mbid]['tag3'] = '' | |
| tmp_artist_dict[mbid]['tag4'] = '' | |
| elif isinstance(response_dict['artist'].get('tags',{}).get('tag'),dict): | |
| tmp_artist_dict[mbid]['tag0'] = response_dict['artist'].get('tags',{}).get('tag').get('name') | |
| tmp_artist_dict[mbid]['tag1'] = '' | |
| tmp_artist_dict[mbid]['tag2'] = '' | |
| tmp_artist_dict[mbid]['tag3'] = '' | |
| tmp_artist_dict[mbid]['tag4'] = '' | |
| else: | |
| for n_tag in [0,1,2,3,4]: | |
| if len(response_dict['artist'].get('tags',{}).get('tag',{})) >= n_tag + 1: | |
| tmp_artist_dict[mbid]['tag' + str(n_tag)] = response_dict['artist'].get('tags',{}).get('tag',{})[n_tag]['name'] | |
| else: | |
| tmp_artist_dict[mbid]['tag' + str(n_tag)] = '' | |
| return tmp_artist_dict | |
| def parse_trackinfo(response_dict): | |
| tmp_track_dict = {} | |
| myid = response_dict['track']['mbid'] | |
| tmp_track_dict[myid] = {} | |
| tmp_track_dict[myid]['duration'] = response_dict['track'].get('duration','') | |
| tmp_track_dict[myid]['listeners'] = response_dict['track'].get('listeners','') | |
| tmp_track_dict[myid]['playcount'] = response_dict['track'].get('playcount','') | |
| if response_dict['track'].get('toptags',{}) == u'\n ': | |
| tmp_track_dict[myid]['tag0'] = '' | |
| tmp_track_dict[myid]['tag1'] = '' | |
| tmp_track_dict[myid]['tag2'] = '' | |
| tmp_track_dict[myid]['tag3'] = '' | |
| tmp_track_dict[myid]['tag4'] = '' | |
| elif isinstance(response_dict['track'].get('toptags',{}).get('tag'),dict): | |
| tmp_track_dict[myid]['tag0'] = response_dict['track'].get('toptags',{}).get('tag').get('name') | |
| tmp_track_dict[myid]['tag1'] = '' | |
| tmp_track_dict[myid]['tag2'] = '' | |
| tmp_track_dict[myid]['tag3'] = '' | |
| tmp_track_dict[myid]['tag4'] = '' | |
| else: | |
| for n_tag in [0,1,2,3,4]: | |
| if len(response_dict['track'].get('toptags',{}).get('tag')) >= n_tag + 1: | |
| tmp_track_dict[myid]['tag' + str(n_tag)] = response_dict['track'].get('toptags',{}).get('tag',{})[n_tag]['name'] | |
| else: | |
| tmp_track_dict[myid]['tag' + str(n_tag)] = '' | |
| return myid, tmp_track_dict | |
| def parse_album(response_dict): | |
| tmp_album_dict = {} | |
| myid = response_dict['album']['mbid'] | |
| tmp_album_dict[myid] = {} | |
| tmp_album_dict[myid]['releasedate'] = response_dict['album'].get('releasedate','') | |
| tmp_album_dict[myid]['listeners'] = response_dict['album'].get('listeners','') | |
| tmp_album_dict[myid]['playcount'] = response_dict['album'].get('playcount','') | |
| return myid, tmp_album_dict | |
| def parse_track(trackelement, username): | |
| """Extract info from every track entry and output to list.""" | |
| if trackelement.find('artist').getchildren(): | |
| #artist info is nested in loved/banned tracks xml | |
| artistname = trackelement.find('artist').find('name').text | |
| artistmbid = trackelement.find('artist').find('mbid').text | |
| else: | |
| artistname = trackelement.find('artist').text | |
| artistmbid = trackelement.find('artist').get('mbid') | |
| if trackelement.find('album') is None: | |
| #no album info for loved/banned tracks | |
| albumname = '' | |
| albummbid = '' | |
| else: | |
| albumname = trackelement.find('album').text | |
| albummbid = trackelement.find('album').get('mbid') | |
| trackname = trackelement.find('name').text | |
| trackmbid = trackelement.find('mbid').text | |
| date = trackelement.find('date').get('uts') | |
| if artistmbid and artistmbid is not '' and artistmbid not in artist_dict: | |
| response_artist = connect_server_artist(server, artistmbid, artistname) | |
| print json.loads(response_artist) | |
| artist_dict.update(parse_artist(json.loads(response_artist))) | |
| if trackmbid and trackmbid is not '' and trackmbid not in track_dict: | |
| response_track = connect_server_track(server, trackmbid, trackname, artistname) | |
| print json.loads(response_track) | |
| myid, response_track = parse_trackinfo(json.loads(response_track)) | |
| track_dict.update(response_track) | |
| if albummbid and albummbid is not '' and albummbid not in album_dict: | |
| response_album = connect_server_album(server, albummbid, albumname, artistname) | |
| print json.loads(response_album) | |
| if 'message' in json.loads(response_album): | |
| pass | |
| elif isinstance(json.loads(response_album), basestring): | |
| pass | |
| else: | |
| myalbumid, response_album = parse_album(json.loads(response_album)) | |
| album_dict.update(response_album) | |
| output = [date, | |
| trackname, | |
| artistname, | |
| albumname, | |
| track_dict.get(trackmbid,{}).get('duration',''), | |
| track_dict.get(trackmbid,{}).get('listeners',''), | |
| track_dict.get(trackmbid,{}).get('playcount',''), | |
| track_dict.get(trackmbid,{}).get('tag0',''), | |
| track_dict.get(trackmbid,{}).get('tag1',''), | |
| track_dict.get(trackmbid,{}).get('tag2',''), | |
| track_dict.get(trackmbid,{}).get('tag3',''), | |
| track_dict.get(trackmbid,{}).get('tag4',''), | |
| trackmbid, | |
| artist_dict.get(artistmbid,{}).get('yearfrom',''), | |
| artist_dict.get(artistmbid,{}).get('yearto',''), | |
| artist_dict.get(artistmbid,{}).get('placeformed',''), | |
| artist_dict.get(artistmbid,{}).get('listeners',''), | |
| artist_dict.get(artistmbid,{}).get('playcount',''), | |
| artist_dict.get(artistmbid,{}).get('tag0',''), | |
| artist_dict.get(artistmbid,{}).get('tag1',''), | |
| artist_dict.get(artistmbid,{}).get('tag2',''), | |
| artist_dict.get(artistmbid,{}).get('tag3',''), | |
| artist_dict.get(artistmbid,{}).get('tag4',''), | |
| artistmbid, | |
| album_dict.get(albummbid,{}).get('releasedate',''), | |
| album_dict.get(albummbid,{}).get('playcount',''), | |
| album_dict.get(albummbid,{}).get('listeners',''), | |
| albummbid] | |
| for i, v in enumerate(output): | |
| if v is None: | |
| output[i] = '' | |
| return output | |
| def write_tracks(tracks, outfileobj): | |
| """Write tracks to an open file""" | |
| for fields in tracks: | |
| outfileobj.write(("\t".join(fields) + "\n").encode('utf-8')) | |
| def get_tracks(server, username, startpage=1, sleep_func=time.sleep, tracktype='recenttracks'): | |
| page = startpage | |
| response = connect_server(server, username, page, sleep_func, tracktype) | |
| totalpages = get_pageinfo(response, tracktype) | |
| if startpage > totalpages: | |
| raise ValueError("First page (%s) is higher than total pages (%s)." % (startpage, totalpages)) | |
| while page <= totalpages: | |
| #Skip connect if on first page, already have that one stored. | |
| if page > startpage: | |
| response = connect_server(server, username, page, sleep_func, tracktype) | |
| tracklist = get_tracklist(response) | |
| tracks = [] | |
| for trackelement in tracklist: | |
| # do not export the currently playing track. | |
| if not trackelement.attrib.has_key("nowplaying") or not trackelement.attrib["nowplaying"]: | |
| tracks.append(parse_track(trackelement, username)) | |
| yield page, totalpages, tracks | |
| page += 1 | |
| sleep_func(.5) | |
| def main(server, username, startpage, outfile, infotype='recenttracks'): | |
| trackdict = dict() | |
| page = startpage # for case of exception | |
| totalpages = -1 # ditto | |
| n = 0 | |
| try: | |
| for page, totalpages, tracks in get_tracks(server, username, startpage, tracktype=infotype): | |
| print "Got page %s of %s.." % (page, totalpages) | |
| pickle.dump(artist_dict, open('artist.p','wb')) | |
| pickle.dump(track_dict, open('track.p','wb')) | |
| pickle.dump(album_dict, open('album.p','wb')) | |
| for track in tracks: | |
| if infotype == 'recenttracks': | |
| trackdict.setdefault(track[0], track) | |
| else: | |
| #Can not use timestamp as key for loved/banned tracks as it's not unique | |
| n += 1 | |
| trackdict.setdefault(n, track) | |
| except ValueError, e: | |
| exit(e) | |
| except Exception: | |
| raise | |
| finally: | |
| with open(outfile, 'a') as outfileobj: | |
| tracks = sorted(trackdict.values(), reverse=True) | |
| write_tracks(tracks, outfileobj) | |
| print "Wrote page %s-%s of %s to file %s" % (startpage, page, totalpages, outfile) | |
| if __name__ == "__main__": | |
| parser = OptionParser() | |
| username, outfile, startpage, server, infotype = get_options(parser) | |
| main(server, username, startpage, outfile, infotype) |