Created
September 23, 2022 20:15
-
-
Save 0187773933/50a6b2ad8659234bf88ca0b3a9771ef1 to your computer and use it in GitHub Desktop.
Spotify Playlist Miner - Most Used Songs Across Playlist Search
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import os | |
import time | |
import json | |
from pprint import pprint | |
from box import Box # pip install python-box | |
from tqdm import tqdm | |
from concurrent.futures import ThreadPoolExecutor | |
from slugify import slugify # pip install python-slugify==5.0.2 | |
import spotipy # pip install spotipy | |
import spotipy.util as spotipy_util | |
def write_json( file_path , python_object ): | |
with open( file_path , "w" , encoding="utf-8" ) as f: | |
json.dump( python_object , f , ensure_ascii=False , indent=4 ) | |
def read_json( file_path ): | |
with open( file_path ) as f: | |
return json.load( f ) | |
def batch_process( options ): | |
batch_size = len( options[ "batch_list" ] ) | |
with ThreadPoolExecutor() as executor: | |
result_pool = list( tqdm( executor.map( options[ "function_reference" ] , iter( options[ "batch_list" ] ) ) , total=batch_size ) ) | |
return result_pool | |
class SpotifyPlaylistMiner: | |
def __init__( self , config={} ): | |
self.config = Box( config ) | |
if "token" not in self.config: | |
self.login_and_get_new_token() | |
self.sp = spotipy.Spotify( auth=self.config.token ) | |
def login_and_get_new_token( self ): | |
os.environ[ "SPOTIPY_CLIENT_ID" ] = self.config.client_id | |
os.environ[ "SPOTIPY_CLIENT_SECRET" ] = self.config.client_secret | |
os.environ[ "SPOTIPY_REDIRECT_URI" ] = self.config.redirect_url | |
token = spotipy_util.prompt_for_user_token( self.config.username , "user-library-read" ) | |
# print( token ) | |
self.config.token = token | |
return token | |
# https://github.com/plamere/playlistminer/blob/master/scripts/crawl.py | |
# idk if we can get passed 1,000 playlists ? it breaks after offset=1000 | |
def search_playlists( self , query ): | |
limit = 50 | |
offset = 0 | |
playlists = [] | |
search_over = False | |
while search_over == False: | |
try: | |
result = self.sp.search( query , limit=limit , offset=offset , type="playlist" ) | |
result_total_playlists = result[ "playlists" ][ "total" ] | |
x_playlists = [ Box({ "playlist_id": x[ "id" ] , "playlist_owner_id": x[ "owner" ][ "id" ] }) for x in result[ "playlists" ][ "items" ] ] | |
playlists = playlists + x_playlists | |
print( f"Gathering [{len(playlists)}] of {result_total_playlists} Playlists , Offset == {offset} , New == {len(x_playlists)}" ) | |
if len( playlists ) == result_total_playlists: | |
# print( "Reason - 1" ) | |
search_over = True | |
# if len( result[ "playlists" ][ "items" ] ) < limit: | |
# print( "Reason - 2" , print( len( result[ "playlists" ][ "items" ] ) ) ) | |
# search_over = True | |
if len( x_playlists ) < 1: | |
# print( "Reason - 3" ) | |
search_over = True | |
offset += result[ "playlists" ][ "limit" ] | |
time.sleep( 0.1 ) | |
except Exception as e: | |
# print( e ) | |
search_over = True | |
return playlists | |
def get_playlist_tracks( self , options ): | |
tracks = self.sp.user_playlist_tracks( options[ "playlist_owner_id" ] , options[ "playlist_id" ] ) | |
tracks = tracks[ "items" ] | |
# pprint( tracks[ 0 ] ) | |
cleaned_tracks = [] | |
for track_index , track in enumerate( tracks ): | |
if track == None: | |
continue | |
if "track" not in track: | |
continue | |
if track[ "track" ] == None: | |
continue | |
if "id" not in track[ "track" ]: | |
continue | |
cleaned_tracks.append({ | |
"track_id": track[ "track" ][ "id" ] , | |
"title": track[ "track" ][ "name" ] , | |
"album": track[ "track" ][ "album" ][ "name" ] , | |
"artist": track[ "track" ][ "artists" ][ 0 ][ "name" ] , | |
}) | |
options[ "tracks" ] = cleaned_tracks | |
return options | |
def get_top_tracks( self , query ): | |
playlists = self.search_playlists( query ) | |
write_json( f"{slugify(query)}-playlists.json" , playlists ) | |
print( "Downloading Playlist Track Names" ) | |
playlists = batch_process({ | |
"max_workers": 10 , | |
"batch_list": playlists , | |
"function_reference": self.get_playlist_tracks | |
}) | |
tracks_db = {} | |
for playlist_index , playlist in enumerate( playlists ): | |
for track_index , track in enumerate( playlist[ "tracks" ] ): | |
if track[ "track_id" ] not in tracks_db: | |
track[ "total_appearances" ] = 1 | |
tracks_db[ track[ "track_id" ] ] = track | |
else: | |
tracks_db[ track[ "track_id" ] ][ "total_appearances" ] += 1 | |
tracks = list( tracks_db.items() ) | |
tracks = sorted( tracks , key=lambda x: x[ 1 ][ "total_appearances" ] , reverse=True ) | |
write_json( f"{slugify(query)}-tracks.json" , tracks ) | |
return tracks | |
# https://developer.spotify.com/dashboard | |
if __name__ == "__main__": | |
x = SpotifyPlaylistMiner({ | |
"username": "asdf" , | |
"client_id": "asdf" , | |
"client_secret": "asdf" , | |
"redirect_url": "http://localhost:6379" , | |
}) | |
top_tracks = x.get_top_tracks( "psychedelic classic rock" ) | |
for i in range( 0 , 100 ): | |
print( f"{i + 1} === {top_tracks[i][1]['total_appearances']} === {top_tracks[i][1]['artist']} === {top_tracks[i][1]['title']} === https://open.spotify.com/track/{top_tracks[i][1]['track_id']}" ) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment