Created
November 26, 2024 23:03
-
-
Save alairock/67e3086daf688dbf0dbda0d52eccccec to your computer and use it in GitHub Desktop.
LastFM Fun
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import aiohttp | |
import asyncio | |
import json | |
import argparse | |
from pathlib import Path | |
import hashlib | |
import time | |
# Configuration | |
API_KEY = os.getenv("LASTFM_API_KEY") | |
API_SECRET = os.getenv("LASTFM_API_SECRET") | |
SESSION_KEY = os.getenv("LASTFM_SESSION_KEY") | |
BASE_URL = "http://ws.audioscrobbler.com/2.0/" | |
USERNAME = os.getenv("LASTFM_USERNAME") # Your Last.fm username | |
if not all([API_KEY, API_SECRET, SESSION_KEY, USERNAME]): | |
raise EnvironmentError( | |
"Missing one or more required environment variables: LASTFM_API_KEY, LASTFM_API_SECRET, LASTFM_SESSION_KEY, LASTFM_USERNAME" | |
) | |
CACHE_FILE = Path(f"{USERNAME}_scrobbles.json") | |
def get_signature(params): | |
"""Generate an API signature.""" | |
sorted_params = "".join(f"{key}{params[key]}" for key in sorted(params)) | |
return hashlib.md5((sorted_params + API_SECRET).encode("utf-8")).hexdigest() | |
async def fetch_with_retries(session, url, params, retries=3, delay=2): | |
"""Fetch a URL with retries.""" | |
for attempt in range(retries): | |
try: | |
async with session.get(url, params=params) as response: | |
response.raise_for_status() | |
return await response.json() | |
except aiohttp.ClientError as e: | |
if attempt < retries - 1: | |
await asyncio.sleep(delay * (2 ** attempt)) | |
else: | |
raise e | |
async def scan_scrobbles(limit=200): | |
"""Scan all scrobbles and cache them locally.""" | |
all_scrobbles = [] | |
async with aiohttp.ClientSession() as session: | |
params = { | |
"method": "user.getRecentTracks", | |
"user": USERNAME, | |
"api_key": API_KEY, | |
"format": "json", | |
"limit": limit, | |
} | |
# Fetch the first page to get total page count | |
initial_data = await fetch_with_retries(session, BASE_URL, {**params, "page": 1}) | |
total_pages = int(initial_data["recenttracks"]["@attr"]["totalPages"]) | |
print(f"Total pages to scan: {total_pages}") | |
# Fetch all pages | |
for page in range(1, total_pages + 1): | |
print(f"Scanning page {page} of {total_pages}...") | |
data = await fetch_with_retries(session, BASE_URL, {**params, "page": page}) | |
tracks = data["recenttracks"]["track"] | |
scrobbles = [ | |
{ | |
"artist": track["artist"]["#text"], | |
"track": track["name"], | |
"timestamp": track.get("date", {}).get("uts"), | |
} | |
for track in tracks | |
if "date" in track | |
] | |
all_scrobbles.extend(scrobbles) | |
# Save scrobbles to cache | |
with open(CACHE_FILE, "w") as f: | |
json.dump(all_scrobbles, f, indent=4) | |
print(f"Scanned and cached {len(all_scrobbles)} scrobbles.") | |
def search_scrobbles_by_artist(artist): | |
"""Search scrobbles by artist and group by track.""" | |
if not CACHE_FILE.exists(): | |
print("No cached scrobbles found. Run the scan command first.") | |
return | |
with open(CACHE_FILE, "r") as f: | |
scrobbles = json.load(f) | |
# for scrobble in scrobbles: | |
# if scrobble["artist"].lower() == "Alex Baker": | |
# print(f"{scrobble['artist']} - {scrobble['track']} (timestamp: {scrobble['timestamp']})") | |
# if scrobble["artist"].lower() == artist.lower(): | |
# print(f"{scrobble['artist']} - {scrobble['track']} (timestamp: {scrobble['timestamp']})") | |
filtered = [s for s in scrobbles if s["artist"].lower() == artist.lower()] | |
grouped = {} | |
for scrobble in filtered: | |
track = scrobble["track"] | |
grouped[track] = grouped.get(track, 0) + 1 | |
print(f"Found {len(filtered)} scrobbles for artist {artist}:") | |
for track, count in grouped.items(): | |
print(f"{track}: {count}") | |
def search_scrobbles_by_track(track): | |
"""Search scrobbles by track.""" | |
if not CACHE_FILE.exists(): | |
print("No cached scrobbles found. Run the scan command first.") | |
return | |
with open(CACHE_FILE, "r") as f: | |
scrobbles = json.load(f) | |
filtered = [s for s in scrobbles if s["track"].lower() == track.lower()] | |
print(f"Found {len(filtered)} scrobbles for track {track}:") | |
for scrobble in filtered: | |
print(f"{scrobble['artist']} - {scrobble['track']} (timestamp: {scrobble['timestamp']})") | |
async def delete_scrobble(session, artist, track, timestamp): | |
""" | |
Delete a specific scrobble using the track and timestamp. | |
""" | |
params = { | |
"method": "track.removeScrobble", | |
"artist": artist, | |
"track": track, | |
"timestamp": timestamp, | |
"api_key": API_KEY, | |
"sk": SESSION_KEY, | |
} | |
params["api_sig"] = get_signature(params) # Generate the API signature | |
params["format"] = "json" | |
async with session.post(BASE_URL, data=params) as response: | |
response.raise_for_status() | |
return await response.json() | |
async def delete_scrobbles(filter_fn): | |
"""Delete scrobbles matching a filter function.""" | |
if not CACHE_FILE.exists(): | |
print("No cached scrobbles found. Run the scan command first.") | |
return | |
with open(CACHE_FILE, "r") as f: | |
scrobbles = json.load(f) | |
filtered = [s for s in scrobbles if filter_fn(s)] | |
async with aiohttp.ClientSession() as session: | |
delete_tasks = [ | |
delete_scrobble(session, s["artist"], s["track"], s["timestamp"]) | |
for s in filtered | |
] | |
results = await asyncio.gather(*delete_tasks, return_exceptions=True) | |
successes = 0 | |
for scrobble, result in zip(filtered, results): | |
if isinstance(result, Exception): | |
print(f"Failed to delete scrobble: {scrobble['track']} - {result}") | |
else: | |
successes += 1 | |
print(f"Deleted scrobble: {scrobble['track']}") | |
print(f"Deleted {successes} scrobbles.") | |
async def delete_by_artist(artist): | |
"""Delete all scrobbles for a specific artist.""" | |
await delete_scrobbles(lambda s: s["artist"].lower() == artist.lower()) | |
async def delete_by_track(track): | |
"""Delete all scrobbles for a specific track.""" | |
await delete_scrobbles(lambda s: s["track"].lower() == track.lower()) | |
def main(): | |
parser = argparse.ArgumentParser(description="Manage Last.fm scrobbles.") | |
subparsers = parser.add_subparsers(dest="command", required=True) | |
subparsers.add_parser("scan", help="Scan and cache all scrobbles.") | |
search_parser = subparsers.add_parser("search", help="Search for scrobbles.") | |
search_parser.add_argument("artist", help="Artist to search for.") | |
del_artist_parser = subparsers.add_parser("delete-artist", help="Delete scrobbles by artist.") | |
del_artist_parser.add_argument("artist", help="Artist to delete scrobbles for.") | |
del_track_parser = subparsers.add_parser("delete-track", help="Delete scrobbles by track.") | |
del_track_parser.add_argument("track", help="Track to delete scrobbles for.") | |
args = parser.parse_args() | |
print("Args", args) | |
if args.command == "scan": | |
asyncio.run(scan_scrobbles()) | |
elif args.command == "search": | |
search_scrobbles_by_artist(args.artist) | |
elif args.command == "delete-artist": | |
asyncio.run(delete_by_artist(args.artist)) | |
elif args.command == "delete-track": | |
asyncio.run(delete_by_track(args.track)) | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import hashlib | |
import os | |
# Your API credentials | |
API_KEY = os.getenv("LASTFM_API_KEY") | |
API_SECRET = os.getenv("LASTFM_API_SECRET") | |
BASE_URL = "http://ws.audioscrobbler.com/2.0/" | |
def get_signature(params): | |
"""Generate an API signature.""" | |
sorted_params = "".join(f"{key}{params[key]}" for key in sorted(params)) | |
return hashlib.md5((sorted_params + API_SECRET).encode('utf-8')).hexdigest() | |
def get_token(): | |
"""Request a token from Last.fm.""" | |
params = { | |
"method": "auth.getToken", | |
"api_key": API_KEY, | |
"format": "json", | |
} | |
response = requests.get(BASE_URL, params=params) | |
response.raise_for_status() | |
token = response.json().get("token") | |
print(f"Your token: {token}") | |
return token | |
def authenticate_token(token): | |
"""Direct user to authenticate the token.""" | |
print("Go to this URL to authenticate your token:") | |
print(f"https://www.last.fm/api/auth/?api_key={API_KEY}&token={token}") | |
def get_session_key(token): | |
"""Exchange token for a session key.""" | |
params = { | |
"method": "auth.getSession", | |
"api_key": API_KEY, | |
"token": token, | |
} | |
params["api_sig"] = get_signature(params) | |
params["format"] = "json" | |
response = requests.get(BASE_URL, params=params) | |
response.raise_for_status() | |
session = response.json().get("session", {}) | |
session_key = session.get("key") | |
return session_key | |
def main(): | |
# Step 1: Get a token | |
token = get_token() | |
# Step 2: Authenticate the token | |
authenticate_token(token) | |
input("Press Enter after you've authenticated the token...") | |
# Step 3: Get the session key | |
session_key = get_session_key(token) | |
if session_key: | |
print(f"Your session key is: {session_key}") | |
else: | |
print("Failed to retrieve session key.") | |
if __name__ == "__main__": | |
main() | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import os | |
import base64 | |
# Configuration | |
CLIENT_ID = os.getenv("SPOTIFY_CLIENT_ID") | |
CLIENT_SECRET = os.getenv("SPOTIFY_CLIENT_SECRET") | |
TOKEN_URL = "https://accounts.spotify.com/api/token" | |
import webbrowser | |
from urllib.parse import urlencode, urlparse, parse_qs | |
# Configuration | |
REDIRECT_URI = "https://localhost:5000/spot" # Ensure this is registered in your Spotify app | |
SCOPES = "user-read-recently-played,playlist-modify-private,playlist-modify-public" # Add more scopes as needed | |
AUTH_URL = "https://accounts.spotify.com/authorize" | |
def get_authorization_url(): | |
"""Generate the Spotify authorization URL.""" | |
params = { | |
"client_id": CLIENT_ID, | |
"response_type": "token", # Use "token" for implicit grant | |
"redirect_uri": REDIRECT_URI, | |
"scope": SCOPES, | |
} | |
url = f"{AUTH_URL}?{urlencode(params)}" | |
return url | |
def extract_access_token(redirect_url): | |
"""Extract the access token from the redirect URL fragment.""" | |
parsed_url = urlparse(redirect_url) | |
fragment = parsed_url.fragment # Token comes in the URL fragment | |
if not fragment: | |
print("Error: No fragment found in the URL.") | |
return None | |
fragment_params = dict(item.split("=") for item in fragment.split("&")) | |
return fragment_params.get("access_token") | |
def main(): | |
print("Step 1: Open the following URL to authorize the application:") | |
auth_url = get_authorization_url() | |
print(auth_url) | |
# Automatically open the URL in the default browser | |
webbrowser.open(auth_url) | |
print("\nStep 2: After authorizing, you'll be redirected to a URL.") | |
print("Copy the entire URL and paste it here.") | |
redirect_url = input("Paste the redirect URL here: ").strip() | |
# Extract the access token | |
access_token = extract_access_token(redirect_url) | |
if access_token: | |
print("\nAccess token received:") | |
print(access_token) | |
else: | |
print("\nFailed to extract access token.") | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import aiohttp | |
import asyncio | |
import json | |
from pathlib import Path | |
from collections import Counter | |
# Configuration | |
LASTFM_API_KEY = os.getenv("LASTFM_API_KEY") | |
LASTFM_USERNAME = os.getenv("LASTFM_USERNAME") | |
SPOTIFY_ACCESS_TOKEN = os.getenv("SPOTIFY_ACCESS_TOKEN") # Spotify OAuth Bearer Token | |
SPOTIFY_BASE_URL = "https://api.spotify.com/v1" | |
OUTPUT_DIR = Path("output") | |
OUTPUT_DIR.mkdir(exist_ok=True) | |
LASTFM_CACHE = OUTPUT_DIR / "lastfm_scrobbles.json" | |
async def fetch_with_retries(session, url, params=None, headers=None, retries=3, delay=2): | |
"""Fetch a URL with retries.""" | |
for attempt in range(retries): | |
try: | |
async with session.get(url, params=params, headers=headers) as response: | |
response.raise_for_status() | |
return await response.json() | |
except aiohttp.ClientError as e: | |
if attempt < retries - 1: | |
await asyncio.sleep(delay * (2 ** attempt)) | |
else: | |
raise e | |
async def get_lastfm_scrobbles(limit=200): | |
"""Fetch scrobbles from Last.fm.""" | |
if LASTFM_CACHE.exists(): | |
print(f"Using cached Last.fm scrobbles from {LASTFM_CACHE}") | |
with open(LASTFM_CACHE, "r") as f: | |
return json.load(f) | |
print("Fetching Last.fm scrobbles...") | |
all_scrobbles = [] | |
async with aiohttp.ClientSession() as session: | |
params = { | |
"method": "user.getRecentTracks", | |
"user": LASTFM_USERNAME, | |
"api_key": LASTFM_API_KEY, | |
"format": "json", | |
"limit": limit, | |
} | |
# Fetch the first page to get total pages | |
first_page = await fetch_with_retries(session, "http://ws.audioscrobbler.com/2.0/", params=params) | |
total_pages = int(first_page["recenttracks"]["@attr"]["totalPages"]) | |
print(f"Total Last.fm pages: {total_pages}") | |
# Fetch all pages | |
for page in range(1, total_pages + 1): | |
print(f"Fetching Last.fm page {page}...") | |
params["page"] = page | |
data = await fetch_with_retries(session, "http://ws.audioscrobbler.com/2.0/", params=params) | |
tracks = data["recenttracks"]["track"] | |
all_scrobbles.extend( | |
{ | |
"artist": track["artist"]["#text"], | |
"track": track["name"], | |
} | |
for track in tracks | |
if "date" in track # Ignore currently playing tracks | |
) | |
# Save scrobbles to cache | |
with open(LASTFM_CACHE, "w") as f: | |
json.dump(all_scrobbles, f, indent=4) | |
print(f"Fetched and cached {len(all_scrobbles)} scrobbles from Last.fm.") | |
return all_scrobbles | |
def find_most_listened_songs(lastfm_scrobbles, min_plays=5): | |
"""Find songs with more than min_plays.""" | |
counter = Counter((item["artist"], item["track"]) for item in lastfm_scrobbles) | |
return [{"artist": artist, "track": track} for (artist, track), count in counter.items() if count > min_plays] | |
async def create_or_replace_playlist(session, playlist_name, tracks): | |
"""Create or replace a Spotify playlist with the given tracks, caching URIs immediately.""" | |
headers = {"Authorization": f"Bearer {SPOTIFY_ACCESS_TOKEN}"} | |
cache_file = OUTPUT_DIR / "track_uris.json" | |
# Load cached URIs if available | |
if cache_file.exists(): | |
with open(cache_file, "r") as f: | |
cached_uris = json.load(f) | |
else: | |
cached_uris = {} | |
# Get current user ID | |
user_data = await fetch_with_retries(session, f"{SPOTIFY_BASE_URL}/me", headers=headers) | |
user_id = user_data["id"] | |
# Check if playlist already exists | |
playlists = await fetch_with_retries(session, f"{SPOTIFY_BASE_URL}/me/playlists", headers=headers) | |
existing_playlist = next( | |
(playlist for playlist in playlists["items"] if playlist["name"] == playlist_name), None | |
) | |
# Delete existing playlist if it exists | |
if existing_playlist: | |
playlist_id = existing_playlist["id"] | |
print(f"Deleting existing playlist '{playlist_name}'...") | |
await session.delete(f"{SPOTIFY_BASE_URL}/playlists/{playlist_id}/followers", headers=headers) | |
# Create new playlist | |
print(f"Creating new playlist '{playlist_name}'...") | |
payload = {"name": playlist_name, "description": "Songs with most listens from Last.fm", "public": False} | |
response = await session.post( | |
f"{SPOTIFY_BASE_URL}/users/{user_id}/playlists", headers=headers, json=payload | |
) | |
response.raise_for_status() | |
playlist = await response.json() | |
playlist_id = playlist["id"] | |
# Search Spotify for each track and add to playlist immediately | |
print(f"Searching for tracks and adding to playlist '{playlist_name}'...") | |
for song in tracks: | |
# Check if the URI is already cached | |
cache_key = f"{song['artist']} - {song['track']}" | |
if cache_key in cached_uris: | |
track_uri = cached_uris[cache_key] | |
else: | |
# Search for the track on Spotify | |
params = { | |
"q": f"track:\"{song['track']}\" artist:\"{song['artist']}\"", | |
"type": "track", | |
"limit": 1 | |
} | |
search_results = await fetch_with_retries(session, f"{SPOTIFY_BASE_URL}/search", params=params, headers=headers) | |
items = search_results.get("tracks", {}).get("items", []) | |
if items: | |
track_uri = items[0]["uri"] | |
cached_uris[cache_key] = track_uri # Cache the URI immediately | |
# Update the cache file | |
with open(cache_file, "w") as f: | |
json.dump(cached_uris, f, indent=4) | |
else: | |
print(f"Could not find track: {song['artist']} - {song['track']}") | |
continue | |
# Add the track to the playlist immediately | |
await session.post( | |
f"{SPOTIFY_BASE_URL}/playlists/{playlist_id}/tracks", | |
headers=headers, | |
json={"uris": [track_uri]}, | |
) | |
print(f"Added track: {song['artist']} - {song['track']}") | |
print(f"Finished adding tracks to playlist '{playlist_name}'.") | |
async def main(): | |
# Fetch Last.fm data | |
lastfm_scrobbles = await get_lastfm_scrobbles() | |
# Find songs with more than 5 plays | |
min_plays = 20 | |
most_listened_songs = find_most_listened_songs(lastfm_scrobbles, min_plays=min_plays) | |
print(f"Found {len(most_listened_songs)} songs with more than {min_plays} plays.") | |
# Create or replace Spotify playlist | |
async with aiohttp.ClientSession() as session: | |
await create_or_replace_playlist(session, "XXMostListensXX", most_listened_songs) | |
if __name__ == "__main__": | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment