|
import os |
|
import json |
|
import spotipy |
|
from tqdm import tqdm |
|
|
|
from spotipy.oauth2 import SpotifyOAuth |
|
|
|
|
|
def get_client(): |
|
"""Get Spotify Client.""" |
|
|
|
scopes = [ |
|
"user-library-read", |
|
"user-library-modify", |
|
"playlist-modify-public", |
|
"playlist-modify-private", |
|
] |
|
|
|
spotify_oauth = SpotifyOAuth( |
|
client_id=os.environ["SPOTIFY_CLIENT_ID"], |
|
client_secret=os.environ["SPOTIFY_CLIENT_SECRET"], |
|
redirect_uri="http://localhost:8080/", |
|
scope=" ".join(scopes), |
|
) |
|
|
|
sp = spotipy.Spotify(auth_manager=spotify_oauth) |
|
|
|
return sp |
|
|
|
|
|
def get_all_tracks(playlist=None): |
|
"""Get all tracks belonging to a specific playlist.""" |
|
|
|
tracks = [] |
|
|
|
if playlist is None: |
|
playlist_name = "Liked Songs" |
|
result = sp.current_user_saved_tracks(market="IN", limit=50) |
|
else: |
|
playlist_name = playlist["name"] |
|
result = sp.playlist_items(playlist["id"], market="IN", limit=50) |
|
|
|
with tqdm(desc=f"{playlist_name:25}", total=result["total"]) as pbar: |
|
while True: |
|
for idx, item in enumerate(result["items"]): |
|
item["position"] = result["offset"] + idx |
|
tracks.append(item) |
|
pbar.update(len(result["items"])) |
|
if result["next"]: |
|
result = sp.next(result) |
|
else: |
|
break |
|
|
|
return tracks |
|
|
|
|
|
def filter_duplicates(tracks): |
|
"""Find duplicates from a list of tracks.""" |
|
|
|
tracks.sort(key=lambda x: x["added_at"]) |
|
|
|
all_tracks = {} |
|
duplicates = [] |
|
|
|
for idx, item in enumerate(tracks): |
|
item["idx"] = idx |
|
track_id = item["track"]["id"] |
|
if track_id in all_tracks: |
|
duplicates.append(item) |
|
print("dupe: {idx:04d} {added_at} {track[id]} {track[name]}".format(**item)) |
|
|
|
else: |
|
all_tracks[track_id] = item |
|
|
|
return duplicates |
|
|
|
|
|
def remove_duplicates(duplicates, playlist=None): |
|
"""Remove duplicates from a playlist.""" |
|
|
|
if not duplicates: |
|
return |
|
|
|
if playlist is None: |
|
tracks = [item["track"]["uri"] for item in duplicates] |
|
for start in range(0, len(tracks), 50): |
|
sp.current_user_saved_tracks_delete(tracks[start : start + 50]) |
|
else: |
|
items = [ |
|
{"uri": item["track"]["uri"], "positions": [item["position"]]} |
|
for item in duplicates |
|
] |
|
for start in range(0, len(items), 50): |
|
sp.playlist_remove_specific_occurrences_of_items( |
|
playlist["id"], items[start : start + 50] |
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
sp = get_client() |
|
|
|
tracks = get_all_tracks() |
|
dupes = filter_duplicates(tracks) |
|
remove_duplicates(dupes) |
|
|
|
result = sp.current_user_playlists() |
|
for playlist in result["items"]: |
|
tracks = get_all_tracks(playlist) |
|
dupes = filter_duplicates(tracks) |
|
remove_duplicates(dupes, playlist) |