Skip to content

Instantly share code, notes, and snippets.

@0187773933
Created September 23, 2022 20:15
Show Gist options
  • Save 0187773933/50a6b2ad8659234bf88ca0b3a9771ef1 to your computer and use it in GitHub Desktop.
Save 0187773933/50a6b2ad8659234bf88ca0b3a9771ef1 to your computer and use it in GitHub Desktop.
Spotify Playlist Miner - Most Used Songs Across Playlist Search
#!/usr/bin/env python3
import os
import time
import json
from pprint import pprint
from box import Box # pip install python-box
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor
from slugify import slugify # pip install python-slugify==5.0.2
import spotipy # pip install spotipy
import spotipy.util as spotipy_util
def write_json( file_path , python_object ):
with open( file_path , "w" , encoding="utf-8" ) as f:
json.dump( python_object , f , ensure_ascii=False , indent=4 )
def read_json( file_path ):
with open( file_path ) as f:
return json.load( f )
def batch_process( options ):
batch_size = len( options[ "batch_list" ] )
with ThreadPoolExecutor() as executor:
result_pool = list( tqdm( executor.map( options[ "function_reference" ] , iter( options[ "batch_list" ] ) ) , total=batch_size ) )
return result_pool
class SpotifyPlaylistMiner:
def __init__( self , config={} ):
self.config = Box( config )
if "token" not in self.config:
self.login_and_get_new_token()
self.sp = spotipy.Spotify( auth=self.config.token )
def login_and_get_new_token( self ):
os.environ[ "SPOTIPY_CLIENT_ID" ] = self.config.client_id
os.environ[ "SPOTIPY_CLIENT_SECRET" ] = self.config.client_secret
os.environ[ "SPOTIPY_REDIRECT_URI" ] = self.config.redirect_url
token = spotipy_util.prompt_for_user_token( self.config.username , "user-library-read" )
# print( token )
self.config.token = token
return token
# https://github.com/plamere/playlistminer/blob/master/scripts/crawl.py
# idk if we can get passed 1,000 playlists ? it breaks after offset=1000
def search_playlists( self , query ):
limit = 50
offset = 0
playlists = []
search_over = False
while search_over == False:
try:
result = self.sp.search( query , limit=limit , offset=offset , type="playlist" )
result_total_playlists = result[ "playlists" ][ "total" ]
x_playlists = [ Box({ "playlist_id": x[ "id" ] , "playlist_owner_id": x[ "owner" ][ "id" ] }) for x in result[ "playlists" ][ "items" ] ]
playlists = playlists + x_playlists
print( f"Gathering [{len(playlists)}] of {result_total_playlists} Playlists , Offset == {offset} , New == {len(x_playlists)}" )
if len( playlists ) == result_total_playlists:
# print( "Reason - 1" )
search_over = True
# if len( result[ "playlists" ][ "items" ] ) < limit:
# print( "Reason - 2" , print( len( result[ "playlists" ][ "items" ] ) ) )
# search_over = True
if len( x_playlists ) < 1:
# print( "Reason - 3" )
search_over = True
offset += result[ "playlists" ][ "limit" ]
time.sleep( 0.1 )
except Exception as e:
# print( e )
search_over = True
return playlists
def get_playlist_tracks( self , options ):
tracks = self.sp.user_playlist_tracks( options[ "playlist_owner_id" ] , options[ "playlist_id" ] )
tracks = tracks[ "items" ]
# pprint( tracks[ 0 ] )
cleaned_tracks = []
for track_index , track in enumerate( tracks ):
if track == None:
continue
if "track" not in track:
continue
if track[ "track" ] == None:
continue
if "id" not in track[ "track" ]:
continue
cleaned_tracks.append({
"track_id": track[ "track" ][ "id" ] ,
"title": track[ "track" ][ "name" ] ,
"album": track[ "track" ][ "album" ][ "name" ] ,
"artist": track[ "track" ][ "artists" ][ 0 ][ "name" ] ,
})
options[ "tracks" ] = cleaned_tracks
return options
def get_top_tracks( self , query ):
playlists = self.search_playlists( query )
write_json( f"{slugify(query)}-playlists.json" , playlists )
print( "Downloading Playlist Track Names" )
playlists = batch_process({
"max_workers": 10 ,
"batch_list": playlists ,
"function_reference": self.get_playlist_tracks
})
tracks_db = {}
for playlist_index , playlist in enumerate( playlists ):
for track_index , track in enumerate( playlist[ "tracks" ] ):
if track[ "track_id" ] not in tracks_db:
track[ "total_appearances" ] = 1
tracks_db[ track[ "track_id" ] ] = track
else:
tracks_db[ track[ "track_id" ] ][ "total_appearances" ] += 1
tracks = list( tracks_db.items() )
tracks = sorted( tracks , key=lambda x: x[ 1 ][ "total_appearances" ] , reverse=True )
write_json( f"{slugify(query)}-tracks.json" , tracks )
return tracks
# https://developer.spotify.com/dashboard
if __name__ == "__main__":
x = SpotifyPlaylistMiner({
"username": "asdf" ,
"client_id": "asdf" ,
"client_secret": "asdf" ,
"redirect_url": "http://localhost:6379" ,
})
top_tracks = x.get_top_tracks( "psychedelic classic rock" )
for i in range( 0 , 100 ):
print( f"{i + 1} === {top_tracks[i][1]['total_appearances']} === {top_tracks[i][1]['artist']} === {top_tracks[i][1]['title']} === https://open.spotify.com/track/{top_tracks[i][1]['track_id']}" )
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment