Skip to content

Instantly share code, notes, and snippets.

@RitamDey
Last active March 30, 2025 20:53
Show Gist options
  • Save RitamDey/33d4b62238d2ab47945442f311a33ef4 to your computer and use it in GitHub Desktop.
Save RitamDey/33d4b62238d2ab47945442f311a33ef4 to your computer and use it in GitHub Desktop.
Automation script to sort liked tracks into years and create playlists based upon when they were added to liked playlist
import spotipy
from spotipy.oauth2 import SpotifyOAuth
from datetime import datetime, timezone
import logging
from logging.handlers import TimedRotatingFileHandler
from collections import defaultdict
import json
import re
from sys import exit
from os import path
def save_last_run_state(start_time, playlist_data):
timestamp = start_time.timestamp()
state_data = {}
state_data["last-run"] = timestamp
state_data["playlists"] = playlist_data
with open("lastrun.json", "w") as save_state:
print(json.dumps(state_data), file=save_state)
logging.info("Saved the state for the current execution")
def load_last_run_state():
if not path.exists("lastrun.json") and not path.isfile("lastrun.json"):
logging.info("Last execution state doesn't exists.")
return None, {}
with open("lastrun.json", "r") as state_data_file:
state_data = json.load(state_data_file)
# timestamp = datetime.fromtimestamp(state_data["last-run"], tz=timezone.utc)
# last_run = timestamp.isoformat()
last_run = datetime.fromtimestamp(state_data["last-run"], tz=timezone.utc)
return last_run, state_data["playlists"]
def collect_and_sort_tracks(spotify, last_run):
songs_year_mapping = defaultdict(list) # Using defaultdict so as to not handle if a year key is absent
user_playlists = spotify.current_user_saved_tracks(limit=50, market="IN")
if last_run is None:
# If no last run timestamp is present, assume the script has never run before and set this variable to a sentinel value, UNIX epoch timestamp
logging.info("No last run data found. Setting timestamp to epoch")
last_run = datetime.fromtimestamp(0, tz=timezone.utc)
while user_playlists is not None and user_playlists["total"] > 0:
for track in user_playlists["items"]:
name = track["track"]["name"]
spotify_id = track["track"]["id"]
added_at = datetime.fromisoformat(track["added_at"])
if added_at <= last_run:
return songs_year_mapping
year = str(added_at.year) # int is not hashable, thus convert to string
songs_year_mapping[year].append(track["track"]["uri"]) # Only the Spotify URI is needed for adding tracks to playlists
logging.info(f"Sorted {name} (Spotify ID: {spotify_id}) in {year}")
# Get the next set of saved tracks from the liked playlist
user_playlists = spotify.next(user_playlists)
return songs_year_mapping
def search_playlists(spotify):
# Get the user's created playlists first
user = spotify.me()
user_id = user["id"]
playlist_data = {}
playlists = spotify.current_user_playlists(limit=50)
# Regex that would match the playlist titles of the discovery playlists
playlist_re = re.compile(r'^(\d{4}) Discoveries$')
logging.info(f"Searching for the yearly playlists for user ID {user_id}.")
while playlists is not None and playlists["total"] > 0:
for playlist in playlists["items"]:
playlist_name = playlist["name"]
spotify_id = playlist["id"]
match = re.match(playlist_re, playlist_name)
if match:
logging.info(f"Found a discovery playlist -> {playlist_name}")
year = match.group(1)
playlist_data[year] = spotify_id
playlists = spotify.next(playlists)
return playlist_data
def create_playlists(spotify, years, playlists_present):
# TODO: Handle if a playlist with the same exists. Spotify doesn't allow fetching playlist by name
# Get the current user's ID
user = spotify.me()
user_id = user["id"]
# Template to be used for playlist names and descriptions
name_template = "{} Discoveries"
description_temple = "Songs that I liked in {}"
logging.info(f"Creating playlits for user ID {user_id}.")
created_playlist_data = {}
for year in years:
playlist = name_template.format(year)
description = description_temple.format(year)
if year in playlists_present:
created_playlist_data[year] = playlists_present[year]
spotify_id = playlists_present[year]
logging.info(f"Skipping creation of playlist for year {year}. Playlist already present (Spotify ID {spotify_id})")
else:
created_playlist = spotify.user_playlist_create(
user = user_id,
name = playlist,
description = description,
public = False
)
spotify_id = created_playlist["id"]
created_playlist_data[year] = spotify_id
logging.info(f"Created playlist {playlist} (Spotify ID {spotify_id})")
return created_playlist_data
def add_tracks_to_playlist(spotify, playlist_data, tracks_data, years):
# Spotify currently allows a max of 100 items to be added at one
track_chunk_size = 100
for year in years:
# Get the created playlist ID
playlist_id = playlist_data[year]
logging.info(f"Adding tracks for {year} playlist")
# Get all saved tracks from that year and have a count of the number of tracks added
tracks = tracks_data[year]
tracks_count = len(tracks)
# Chunk up the tracks into lists of 100 track chunks, this is done to reduce API calls to Spotify
# ref: https://www.programiz.com/python-programming/examples/list-chunks
tracks_chunked = [ tracks[i:i + track_chunk_size] for i in range(0, tracks_count, track_chunk_size) ]
# Add each chunks to current pplaylist
for chunk in tracks_chunked:
spotify.playlist_add_items(playlist_id, chunk)
logging.info(f"Added {chunk} tracks to playlist {playlist_id}")
if __name__ == "__main__":
# Setup logging for the entire script
# log = TimedRotatingFileHandler(filename="execution.log", when="D", backupCount=10)
# log.setLevel(logging.INFO)
# log.setFormatter(logging.Formatter(
# fmt="%(asctime)s - %(filename)s - %(levelname)s: %(message)s",
# datefmt="%d/%m/%Y %I:%M:%S %p"
# ))
# logging.getLogger().addHandler(log)
logging.basicConfig(
format="%(asctime)s - %(filename)s - %(levelname)s: %(message)s",
datefmt="%d/%m/%Y %I:%M:%S %p",
level=logging.INFO
)
scope_array = [
# For reading user's playlists
"playlist-read-private",
"playlist-modify-private",
"user-library-modify",
"user-library-read",
"user-read-private",
"user-read-email"
]
scope = ",".join(scope_array)
auth = SpotifyOAuth(scope=scope)
spotify = spotipy.Spotify(auth_manager=auth)
logging.info("Authentication successful with Spotify APIs")
tokens = auth.get_cached_token()
if tokens:
auth.refresh_access_token(tokens['refresh_token'])
logging.info("Authentication token refreshed with Spotify APIs")
# Use this instead of datetime.utcnow as it has errors causing wrong timestamps being saved
start_time = datetime.now(timezone.utc)
last_run, saved_playlists = load_last_run_state()
logging.info(f"Last run @ {last_run}")
sorted_tracks = collect_and_sort_tracks(spotify, last_run)
if len(sorted_tracks) == 0:
logging.info("No new tracks found. Exiting")
save_last_run_state(start_time, saved_playlists)
exit(0)
years = sorted_tracks.keys()
playlists = search_playlists(spotify)
playlists = create_playlists(spotify, years, playlists)
add_tracks_to_playlist(spotify, playlists, sorted_tracks, years)
save_last_run_state(start_time, playlists)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment