Skip to content

Instantly share code, notes, and snippets.

@alairock
Created November 26, 2024 23:03
Show Gist options
  • Save alairock/67e3086daf688dbf0dbda0d52eccccec to your computer and use it in GitHub Desktop.
Save alairock/67e3086daf688dbf0dbda0d52eccccec to your computer and use it in GitHub Desktop.
LastFM Fun
import os
import aiohttp
import asyncio
import json
import argparse
from pathlib import Path
import hashlib
import time
# Configuration
API_KEY = os.getenv("LASTFM_API_KEY")
API_SECRET = os.getenv("LASTFM_API_SECRET")
SESSION_KEY = os.getenv("LASTFM_SESSION_KEY")
BASE_URL = "http://ws.audioscrobbler.com/2.0/"
USERNAME = os.getenv("LASTFM_USERNAME") # Your Last.fm username
if not all([API_KEY, API_SECRET, SESSION_KEY, USERNAME]):
raise EnvironmentError(
"Missing one or more required environment variables: LASTFM_API_KEY, LASTFM_API_SECRET, LASTFM_SESSION_KEY, LASTFM_USERNAME"
)
CACHE_FILE = Path(f"{USERNAME}_scrobbles.json")
def get_signature(params):
"""Generate an API signature."""
sorted_params = "".join(f"{key}{params[key]}" for key in sorted(params))
return hashlib.md5((sorted_params + API_SECRET).encode("utf-8")).hexdigest()
async def fetch_with_retries(session, url, params, retries=3, delay=2):
"""Fetch a URL with retries."""
for attempt in range(retries):
try:
async with session.get(url, params=params) as response:
response.raise_for_status()
return await response.json()
except aiohttp.ClientError as e:
if attempt < retries - 1:
await asyncio.sleep(delay * (2 ** attempt))
else:
raise e
async def scan_scrobbles(limit=200):
"""Scan all scrobbles and cache them locally."""
all_scrobbles = []
async with aiohttp.ClientSession() as session:
params = {
"method": "user.getRecentTracks",
"user": USERNAME,
"api_key": API_KEY,
"format": "json",
"limit": limit,
}
# Fetch the first page to get total page count
initial_data = await fetch_with_retries(session, BASE_URL, {**params, "page": 1})
total_pages = int(initial_data["recenttracks"]["@attr"]["totalPages"])
print(f"Total pages to scan: {total_pages}")
# Fetch all pages
for page in range(1, total_pages + 1):
print(f"Scanning page {page} of {total_pages}...")
data = await fetch_with_retries(session, BASE_URL, {**params, "page": page})
tracks = data["recenttracks"]["track"]
scrobbles = [
{
"artist": track["artist"]["#text"],
"track": track["name"],
"timestamp": track.get("date", {}).get("uts"),
}
for track in tracks
if "date" in track
]
all_scrobbles.extend(scrobbles)
# Save scrobbles to cache
with open(CACHE_FILE, "w") as f:
json.dump(all_scrobbles, f, indent=4)
print(f"Scanned and cached {len(all_scrobbles)} scrobbles.")
def search_scrobbles_by_artist(artist):
"""Search scrobbles by artist and group by track."""
if not CACHE_FILE.exists():
print("No cached scrobbles found. Run the scan command first.")
return
with open(CACHE_FILE, "r") as f:
scrobbles = json.load(f)
# for scrobble in scrobbles:
# if scrobble["artist"].lower() == "Alex Baker":
# print(f"{scrobble['artist']} - {scrobble['track']} (timestamp: {scrobble['timestamp']})")
# if scrobble["artist"].lower() == artist.lower():
# print(f"{scrobble['artist']} - {scrobble['track']} (timestamp: {scrobble['timestamp']})")
filtered = [s for s in scrobbles if s["artist"].lower() == artist.lower()]
grouped = {}
for scrobble in filtered:
track = scrobble["track"]
grouped[track] = grouped.get(track, 0) + 1
print(f"Found {len(filtered)} scrobbles for artist {artist}:")
for track, count in grouped.items():
print(f"{track}: {count}")
def search_scrobbles_by_track(track):
"""Search scrobbles by track."""
if not CACHE_FILE.exists():
print("No cached scrobbles found. Run the scan command first.")
return
with open(CACHE_FILE, "r") as f:
scrobbles = json.load(f)
filtered = [s for s in scrobbles if s["track"].lower() == track.lower()]
print(f"Found {len(filtered)} scrobbles for track {track}:")
for scrobble in filtered:
print(f"{scrobble['artist']} - {scrobble['track']} (timestamp: {scrobble['timestamp']})")
async def delete_scrobble(session, artist, track, timestamp):
"""
Delete a specific scrobble using the track and timestamp.
"""
params = {
"method": "track.removeScrobble",
"artist": artist,
"track": track,
"timestamp": timestamp,
"api_key": API_KEY,
"sk": SESSION_KEY,
}
params["api_sig"] = get_signature(params) # Generate the API signature
params["format"] = "json"
async with session.post(BASE_URL, data=params) as response:
response.raise_for_status()
return await response.json()
async def delete_scrobbles(filter_fn):
"""Delete scrobbles matching a filter function."""
if not CACHE_FILE.exists():
print("No cached scrobbles found. Run the scan command first.")
return
with open(CACHE_FILE, "r") as f:
scrobbles = json.load(f)
filtered = [s for s in scrobbles if filter_fn(s)]
async with aiohttp.ClientSession() as session:
delete_tasks = [
delete_scrobble(session, s["artist"], s["track"], s["timestamp"])
for s in filtered
]
results = await asyncio.gather(*delete_tasks, return_exceptions=True)
successes = 0
for scrobble, result in zip(filtered, results):
if isinstance(result, Exception):
print(f"Failed to delete scrobble: {scrobble['track']} - {result}")
else:
successes += 1
print(f"Deleted scrobble: {scrobble['track']}")
print(f"Deleted {successes} scrobbles.")
async def delete_by_artist(artist):
"""Delete all scrobbles for a specific artist."""
await delete_scrobbles(lambda s: s["artist"].lower() == artist.lower())
async def delete_by_track(track):
"""Delete all scrobbles for a specific track."""
await delete_scrobbles(lambda s: s["track"].lower() == track.lower())
def main():
parser = argparse.ArgumentParser(description="Manage Last.fm scrobbles.")
subparsers = parser.add_subparsers(dest="command", required=True)
subparsers.add_parser("scan", help="Scan and cache all scrobbles.")
search_parser = subparsers.add_parser("search", help="Search for scrobbles.")
search_parser.add_argument("artist", help="Artist to search for.")
del_artist_parser = subparsers.add_parser("delete-artist", help="Delete scrobbles by artist.")
del_artist_parser.add_argument("artist", help="Artist to delete scrobbles for.")
del_track_parser = subparsers.add_parser("delete-track", help="Delete scrobbles by track.")
del_track_parser.add_argument("track", help="Track to delete scrobbles for.")
args = parser.parse_args()
print("Args", args)
if args.command == "scan":
asyncio.run(scan_scrobbles())
elif args.command == "search":
search_scrobbles_by_artist(args.artist)
elif args.command == "delete-artist":
asyncio.run(delete_by_artist(args.artist))
elif args.command == "delete-track":
asyncio.run(delete_by_track(args.track))
if __name__ == "__main__":
main()
import requests
import hashlib
import os
# Your API credentials
API_KEY = os.getenv("LASTFM_API_KEY")
API_SECRET = os.getenv("LASTFM_API_SECRET")
BASE_URL = "http://ws.audioscrobbler.com/2.0/"
def get_signature(params):
"""Generate an API signature."""
sorted_params = "".join(f"{key}{params[key]}" for key in sorted(params))
return hashlib.md5((sorted_params + API_SECRET).encode('utf-8')).hexdigest()
def get_token():
"""Request a token from Last.fm."""
params = {
"method": "auth.getToken",
"api_key": API_KEY,
"format": "json",
}
response = requests.get(BASE_URL, params=params)
response.raise_for_status()
token = response.json().get("token")
print(f"Your token: {token}")
return token
def authenticate_token(token):
"""Direct user to authenticate the token."""
print("Go to this URL to authenticate your token:")
print(f"https://www.last.fm/api/auth/?api_key={API_KEY}&token={token}")
def get_session_key(token):
"""Exchange token for a session key."""
params = {
"method": "auth.getSession",
"api_key": API_KEY,
"token": token,
}
params["api_sig"] = get_signature(params)
params["format"] = "json"
response = requests.get(BASE_URL, params=params)
response.raise_for_status()
session = response.json().get("session", {})
session_key = session.get("key")
return session_key
def main():
# Step 1: Get a token
token = get_token()
# Step 2: Authenticate the token
authenticate_token(token)
input("Press Enter after you've authenticated the token...")
# Step 3: Get the session key
session_key = get_session_key(token)
if session_key:
print(f"Your session key is: {session_key}")
else:
print("Failed to retrieve session key.")
if __name__ == "__main__":
main()
import requests
import os
import base64
# Configuration
CLIENT_ID = os.getenv("SPOTIFY_CLIENT_ID")
CLIENT_SECRET = os.getenv("SPOTIFY_CLIENT_SECRET")
TOKEN_URL = "https://accounts.spotify.com/api/token"
import webbrowser
from urllib.parse import urlencode, urlparse, parse_qs
# Configuration
REDIRECT_URI = "https://localhost:5000/spot" # Ensure this is registered in your Spotify app
SCOPES = "user-read-recently-played,playlist-modify-private,playlist-modify-public" # Add more scopes as needed
AUTH_URL = "https://accounts.spotify.com/authorize"
def get_authorization_url():
"""Generate the Spotify authorization URL."""
params = {
"client_id": CLIENT_ID,
"response_type": "token", # Use "token" for implicit grant
"redirect_uri": REDIRECT_URI,
"scope": SCOPES,
}
url = f"{AUTH_URL}?{urlencode(params)}"
return url
def extract_access_token(redirect_url):
"""Extract the access token from the redirect URL fragment."""
parsed_url = urlparse(redirect_url)
fragment = parsed_url.fragment # Token comes in the URL fragment
if not fragment:
print("Error: No fragment found in the URL.")
return None
fragment_params = dict(item.split("=") for item in fragment.split("&"))
return fragment_params.get("access_token")
def main():
print("Step 1: Open the following URL to authorize the application:")
auth_url = get_authorization_url()
print(auth_url)
# Automatically open the URL in the default browser
webbrowser.open(auth_url)
print("\nStep 2: After authorizing, you'll be redirected to a URL.")
print("Copy the entire URL and paste it here.")
redirect_url = input("Paste the redirect URL here: ").strip()
# Extract the access token
access_token = extract_access_token(redirect_url)
if access_token:
print("\nAccess token received:")
print(access_token)
else:
print("\nFailed to extract access token.")
if __name__ == "__main__":
main()
import os
import aiohttp
import asyncio
import json
from pathlib import Path
from collections import Counter
# Configuration
LASTFM_API_KEY = os.getenv("LASTFM_API_KEY")
LASTFM_USERNAME = os.getenv("LASTFM_USERNAME")
SPOTIFY_ACCESS_TOKEN = os.getenv("SPOTIFY_ACCESS_TOKEN") # Spotify OAuth Bearer Token
SPOTIFY_BASE_URL = "https://api.spotify.com/v1"
OUTPUT_DIR = Path("output")
OUTPUT_DIR.mkdir(exist_ok=True)
LASTFM_CACHE = OUTPUT_DIR / "lastfm_scrobbles.json"
async def fetch_with_retries(session, url, params=None, headers=None, retries=3, delay=2):
"""Fetch a URL with retries."""
for attempt in range(retries):
try:
async with session.get(url, params=params, headers=headers) as response:
response.raise_for_status()
return await response.json()
except aiohttp.ClientError as e:
if attempt < retries - 1:
await asyncio.sleep(delay * (2 ** attempt))
else:
raise e
async def get_lastfm_scrobbles(limit=200):
"""Fetch scrobbles from Last.fm."""
if LASTFM_CACHE.exists():
print(f"Using cached Last.fm scrobbles from {LASTFM_CACHE}")
with open(LASTFM_CACHE, "r") as f:
return json.load(f)
print("Fetching Last.fm scrobbles...")
all_scrobbles = []
async with aiohttp.ClientSession() as session:
params = {
"method": "user.getRecentTracks",
"user": LASTFM_USERNAME,
"api_key": LASTFM_API_KEY,
"format": "json",
"limit": limit,
}
# Fetch the first page to get total pages
first_page = await fetch_with_retries(session, "http://ws.audioscrobbler.com/2.0/", params=params)
total_pages = int(first_page["recenttracks"]["@attr"]["totalPages"])
print(f"Total Last.fm pages: {total_pages}")
# Fetch all pages
for page in range(1, total_pages + 1):
print(f"Fetching Last.fm page {page}...")
params["page"] = page
data = await fetch_with_retries(session, "http://ws.audioscrobbler.com/2.0/", params=params)
tracks = data["recenttracks"]["track"]
all_scrobbles.extend(
{
"artist": track["artist"]["#text"],
"track": track["name"],
}
for track in tracks
if "date" in track # Ignore currently playing tracks
)
# Save scrobbles to cache
with open(LASTFM_CACHE, "w") as f:
json.dump(all_scrobbles, f, indent=4)
print(f"Fetched and cached {len(all_scrobbles)} scrobbles from Last.fm.")
return all_scrobbles
def find_most_listened_songs(lastfm_scrobbles, min_plays=5):
"""Find songs with more than min_plays."""
counter = Counter((item["artist"], item["track"]) for item in lastfm_scrobbles)
return [{"artist": artist, "track": track} for (artist, track), count in counter.items() if count > min_plays]
async def create_or_replace_playlist(session, playlist_name, tracks):
"""Create or replace a Spotify playlist with the given tracks, caching URIs immediately."""
headers = {"Authorization": f"Bearer {SPOTIFY_ACCESS_TOKEN}"}
cache_file = OUTPUT_DIR / "track_uris.json"
# Load cached URIs if available
if cache_file.exists():
with open(cache_file, "r") as f:
cached_uris = json.load(f)
else:
cached_uris = {}
# Get current user ID
user_data = await fetch_with_retries(session, f"{SPOTIFY_BASE_URL}/me", headers=headers)
user_id = user_data["id"]
# Check if playlist already exists
playlists = await fetch_with_retries(session, f"{SPOTIFY_BASE_URL}/me/playlists", headers=headers)
existing_playlist = next(
(playlist for playlist in playlists["items"] if playlist["name"] == playlist_name), None
)
# Delete existing playlist if it exists
if existing_playlist:
playlist_id = existing_playlist["id"]
print(f"Deleting existing playlist '{playlist_name}'...")
await session.delete(f"{SPOTIFY_BASE_URL}/playlists/{playlist_id}/followers", headers=headers)
# Create new playlist
print(f"Creating new playlist '{playlist_name}'...")
payload = {"name": playlist_name, "description": "Songs with most listens from Last.fm", "public": False}
response = await session.post(
f"{SPOTIFY_BASE_URL}/users/{user_id}/playlists", headers=headers, json=payload
)
response.raise_for_status()
playlist = await response.json()
playlist_id = playlist["id"]
# Search Spotify for each track and add to playlist immediately
print(f"Searching for tracks and adding to playlist '{playlist_name}'...")
for song in tracks:
# Check if the URI is already cached
cache_key = f"{song['artist']} - {song['track']}"
if cache_key in cached_uris:
track_uri = cached_uris[cache_key]
else:
# Search for the track on Spotify
params = {
"q": f"track:\"{song['track']}\" artist:\"{song['artist']}\"",
"type": "track",
"limit": 1
}
search_results = await fetch_with_retries(session, f"{SPOTIFY_BASE_URL}/search", params=params, headers=headers)
items = search_results.get("tracks", {}).get("items", [])
if items:
track_uri = items[0]["uri"]
cached_uris[cache_key] = track_uri # Cache the URI immediately
# Update the cache file
with open(cache_file, "w") as f:
json.dump(cached_uris, f, indent=4)
else:
print(f"Could not find track: {song['artist']} - {song['track']}")
continue
# Add the track to the playlist immediately
await session.post(
f"{SPOTIFY_BASE_URL}/playlists/{playlist_id}/tracks",
headers=headers,
json={"uris": [track_uri]},
)
print(f"Added track: {song['artist']} - {song['track']}")
print(f"Finished adding tracks to playlist '{playlist_name}'.")
async def main():
# Fetch Last.fm data
lastfm_scrobbles = await get_lastfm_scrobbles()
# Find songs with more than 5 plays
min_plays = 20
most_listened_songs = find_most_listened_songs(lastfm_scrobbles, min_plays=min_plays)
print(f"Found {len(most_listened_songs)} songs with more than {min_plays} plays.")
# Create or replace Spotify playlist
async with aiohttp.ClientSession() as session:
await create_or_replace_playlist(session, "XXMostListensXX", most_listened_songs)
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment