Last active
March 30, 2025 20:53
-
-
Save RitamDey/33d4b62238d2ab47945442f311a33ef4 to your computer and use it in GitHub Desktop.
Automation script to sort liked tracks into years and create playlists based upon when they were added to liked playlist
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import spotipy | |
from spotipy.oauth2 import SpotifyOAuth | |
from datetime import datetime, timezone | |
import logging | |
from logging.handlers import TimedRotatingFileHandler | |
from collections import defaultdict | |
import json | |
import re | |
from sys import exit | |
from os import path | |
def save_last_run_state(start_time, playlist_data): | |
timestamp = start_time.timestamp() | |
state_data = {} | |
state_data["last-run"] = timestamp | |
state_data["playlists"] = playlist_data | |
with open("lastrun.json", "w") as save_state: | |
print(json.dumps(state_data), file=save_state) | |
logging.info("Saved the state for the current execution") | |
def load_last_run_state(): | |
if not path.exists("lastrun.json") and not path.isfile("lastrun.json"): | |
logging.info("Last execution state doesn't exists.") | |
return None, {} | |
with open("lastrun.json", "r") as state_data_file: | |
state_data = json.load(state_data_file) | |
# timestamp = datetime.fromtimestamp(state_data["last-run"], tz=timezone.utc) | |
# last_run = timestamp.isoformat() | |
last_run = datetime.fromtimestamp(state_data["last-run"], tz=timezone.utc) | |
return last_run, state_data["playlists"] | |
def collect_and_sort_tracks(spotify, last_run): | |
songs_year_mapping = defaultdict(list) # Using defaultdict so as to not handle if a year key is absent | |
user_playlists = spotify.current_user_saved_tracks(limit=50, market="IN") | |
if last_run is None: | |
# If no last run timestamp is present, assume the script has never run before and set this variable to a sentinel value, UNIX epoch timestamp | |
logging.info("No last run data found. Setting timestamp to epoch") | |
last_run = datetime.fromtimestamp(0, tz=timezone.utc) | |
while user_playlists is not None and user_playlists["total"] > 0: | |
for track in user_playlists["items"]: | |
name = track["track"]["name"] | |
spotify_id = track["track"]["id"] | |
added_at = datetime.fromisoformat(track["added_at"]) | |
if added_at <= last_run: | |
return songs_year_mapping | |
year = str(added_at.year) # int is not hashable, thus convert to string | |
songs_year_mapping[year].append(track["track"]["uri"]) # Only the Spotify URI is needed for adding tracks to playlists | |
logging.info(f"Sorted {name} (Spotify ID: {spotify_id}) in {year}") | |
# Get the next set of saved tracks from the liked playlist | |
user_playlists = spotify.next(user_playlists) | |
return songs_year_mapping | |
def search_playlists(spotify): | |
# Get the user's created playlists first | |
user = spotify.me() | |
user_id = user["id"] | |
playlist_data = {} | |
playlists = spotify.current_user_playlists(limit=50) | |
# Regex that would match the playlist titles of the discovery playlists | |
playlist_re = re.compile(r'^(\d{4}) Discoveries$') | |
logging.info(f"Searching for the yearly playlists for user ID {user_id}.") | |
while playlists is not None and playlists["total"] > 0: | |
for playlist in playlists["items"]: | |
playlist_name = playlist["name"] | |
spotify_id = playlist["id"] | |
match = re.match(playlist_re, playlist_name) | |
if match: | |
logging.info(f"Found a discovery playlist -> {playlist_name}") | |
year = match.group(1) | |
playlist_data[year] = spotify_id | |
playlists = spotify.next(playlists) | |
return playlist_data | |
def create_playlists(spotify, years, playlists_present): | |
# TODO: Handle if a playlist with the same exists. Spotify doesn't allow fetching playlist by name | |
# Get the current user's ID | |
user = spotify.me() | |
user_id = user["id"] | |
# Template to be used for playlist names and descriptions | |
name_template = "{} Discoveries" | |
description_temple = "Songs that I liked in {}" | |
logging.info(f"Creating playlits for user ID {user_id}.") | |
created_playlist_data = {} | |
for year in years: | |
playlist = name_template.format(year) | |
description = description_temple.format(year) | |
if year in playlists_present: | |
created_playlist_data[year] = playlists_present[year] | |
spotify_id = playlists_present[year] | |
logging.info(f"Skipping creation of playlist for year {year}. Playlist already present (Spotify ID {spotify_id})") | |
else: | |
created_playlist = spotify.user_playlist_create( | |
user = user_id, | |
name = playlist, | |
description = description, | |
public = False | |
) | |
spotify_id = created_playlist["id"] | |
created_playlist_data[year] = spotify_id | |
logging.info(f"Created playlist {playlist} (Spotify ID {spotify_id})") | |
return created_playlist_data | |
def add_tracks_to_playlist(spotify, playlist_data, tracks_data, years): | |
# Spotify currently allows a max of 100 items to be added at one | |
track_chunk_size = 100 | |
for year in years: | |
# Get the created playlist ID | |
playlist_id = playlist_data[year] | |
logging.info(f"Adding tracks for {year} playlist") | |
# Get all saved tracks from that year and have a count of the number of tracks added | |
tracks = tracks_data[year] | |
tracks_count = len(tracks) | |
# Chunk up the tracks into lists of 100 track chunks, this is done to reduce API calls to Spotify | |
# ref: https://www.programiz.com/python-programming/examples/list-chunks | |
tracks_chunked = [ tracks[i:i + track_chunk_size] for i in range(0, tracks_count, track_chunk_size) ] | |
# Add each chunks to current pplaylist | |
for chunk in tracks_chunked: | |
spotify.playlist_add_items(playlist_id, chunk) | |
logging.info(f"Added {chunk} tracks to playlist {playlist_id}") | |
if __name__ == "__main__": | |
# Setup logging for the entire script | |
# log = TimedRotatingFileHandler(filename="execution.log", when="D", backupCount=10) | |
# log.setLevel(logging.INFO) | |
# log.setFormatter(logging.Formatter( | |
# fmt="%(asctime)s - %(filename)s - %(levelname)s: %(message)s", | |
# datefmt="%d/%m/%Y %I:%M:%S %p" | |
# )) | |
# logging.getLogger().addHandler(log) | |
logging.basicConfig( | |
format="%(asctime)s - %(filename)s - %(levelname)s: %(message)s", | |
datefmt="%d/%m/%Y %I:%M:%S %p", | |
level=logging.INFO | |
) | |
scope_array = [ | |
# For reading user's playlists | |
"playlist-read-private", | |
"playlist-modify-private", | |
"user-library-modify", | |
"user-library-read", | |
"user-read-private", | |
"user-read-email" | |
] | |
scope = ",".join(scope_array) | |
auth = SpotifyOAuth(scope=scope) | |
spotify = spotipy.Spotify(auth_manager=auth) | |
logging.info("Authentication successful with Spotify APIs") | |
tokens = auth.get_cached_token() | |
if tokens: | |
auth.refresh_access_token(tokens['refresh_token']) | |
logging.info("Authentication token refreshed with Spotify APIs") | |
# Use this instead of datetime.utcnow as it has errors causing wrong timestamps being saved | |
start_time = datetime.now(timezone.utc) | |
last_run, saved_playlists = load_last_run_state() | |
logging.info(f"Last run @ {last_run}") | |
sorted_tracks = collect_and_sort_tracks(spotify, last_run) | |
if len(sorted_tracks) == 0: | |
logging.info("No new tracks found. Exiting") | |
save_last_run_state(start_time, saved_playlists) | |
exit(0) | |
years = sorted_tracks.keys() | |
playlists = search_playlists(spotify) | |
playlists = create_playlists(spotify, years, playlists) | |
add_tracks_to_playlist(spotify, playlists, sorted_tracks, years) | |
save_last_run_state(start_time, playlists) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment