|
import os |
|
import aiohttp |
|
import asyncio |
|
import json |
|
import argparse |
|
from pathlib import Path |
|
import hashlib |
|
import time |
|
|
|
# Configuration |
|
API_KEY = os.getenv("LASTFM_API_KEY") |
|
API_SECRET = os.getenv("LASTFM_API_SECRET") |
|
SESSION_KEY = os.getenv("LASTFM_SESSION_KEY") |
|
BASE_URL = "http://ws.audioscrobbler.com/2.0/" |
|
USERNAME = os.getenv("LASTFM_USERNAME") # Your Last.fm username |
|
|
|
if not all([API_KEY, API_SECRET, SESSION_KEY, USERNAME]): |
|
raise EnvironmentError( |
|
"Missing one or more required environment variables: LASTFM_API_KEY, LASTFM_API_SECRET, LASTFM_SESSION_KEY, LASTFM_USERNAME" |
|
) |
|
|
|
CACHE_FILE = Path(f"{USERNAME}_scrobbles.json") |
|
|
|
|
|
def get_signature(params): |
|
"""Generate an API signature.""" |
|
sorted_params = "".join(f"{key}{params[key]}" for key in sorted(params)) |
|
return hashlib.md5((sorted_params + API_SECRET).encode("utf-8")).hexdigest() |
|
|
|
|
|
async def fetch_with_retries(session, url, params, retries=3, delay=2): |
|
"""Fetch a URL with retries.""" |
|
for attempt in range(retries): |
|
try: |
|
async with session.get(url, params=params) as response: |
|
response.raise_for_status() |
|
return await response.json() |
|
except aiohttp.ClientError as e: |
|
if attempt < retries - 1: |
|
await asyncio.sleep(delay * (2 ** attempt)) |
|
else: |
|
raise e |
|
|
|
|
|
async def scan_scrobbles(limit=200): |
|
"""Scan all scrobbles and cache them locally.""" |
|
all_scrobbles = [] |
|
async with aiohttp.ClientSession() as session: |
|
params = { |
|
"method": "user.getRecentTracks", |
|
"user": USERNAME, |
|
"api_key": API_KEY, |
|
"format": "json", |
|
"limit": limit, |
|
} |
|
|
|
# Fetch the first page to get total page count |
|
initial_data = await fetch_with_retries(session, BASE_URL, {**params, "page": 1}) |
|
total_pages = int(initial_data["recenttracks"]["@attr"]["totalPages"]) |
|
print(f"Total pages to scan: {total_pages}") |
|
|
|
# Fetch all pages |
|
for page in range(1, total_pages + 1): |
|
print(f"Scanning page {page} of {total_pages}...") |
|
data = await fetch_with_retries(session, BASE_URL, {**params, "page": page}) |
|
tracks = data["recenttracks"]["track"] |
|
scrobbles = [ |
|
{ |
|
"artist": track["artist"]["#text"], |
|
"track": track["name"], |
|
"timestamp": track.get("date", {}).get("uts"), |
|
} |
|
for track in tracks |
|
if "date" in track |
|
] |
|
all_scrobbles.extend(scrobbles) |
|
|
|
# Save scrobbles to cache |
|
with open(CACHE_FILE, "w") as f: |
|
json.dump(all_scrobbles, f, indent=4) |
|
print(f"Scanned and cached {len(all_scrobbles)} scrobbles.") |
|
|
|
|
|
def search_scrobbles_by_artist(artist): |
|
"""Search scrobbles by artist and group by track.""" |
|
if not CACHE_FILE.exists(): |
|
print("No cached scrobbles found. Run the scan command first.") |
|
return |
|
|
|
with open(CACHE_FILE, "r") as f: |
|
scrobbles = json.load(f) |
|
|
|
# for scrobble in scrobbles: |
|
# if scrobble["artist"].lower() == "Alex Baker": |
|
# print(f"{scrobble['artist']} - {scrobble['track']} (timestamp: {scrobble['timestamp']})") |
|
# if scrobble["artist"].lower() == artist.lower(): |
|
# print(f"{scrobble['artist']} - {scrobble['track']} (timestamp: {scrobble['timestamp']})") |
|
|
|
filtered = [s for s in scrobbles if s["artist"].lower() == artist.lower()] |
|
grouped = {} |
|
for scrobble in filtered: |
|
track = scrobble["track"] |
|
grouped[track] = grouped.get(track, 0) + 1 |
|
|
|
print(f"Found {len(filtered)} scrobbles for artist {artist}:") |
|
for track, count in grouped.items(): |
|
print(f"{track}: {count}") |
|
|
|
|
|
def search_scrobbles_by_track(track): |
|
"""Search scrobbles by track.""" |
|
if not CACHE_FILE.exists(): |
|
print("No cached scrobbles found. Run the scan command first.") |
|
return |
|
|
|
with open(CACHE_FILE, "r") as f: |
|
scrobbles = json.load(f) |
|
|
|
filtered = [s for s in scrobbles if s["track"].lower() == track.lower()] |
|
print(f"Found {len(filtered)} scrobbles for track {track}:") |
|
for scrobble in filtered: |
|
print(f"{scrobble['artist']} - {scrobble['track']} (timestamp: {scrobble['timestamp']})") |
|
|
|
async def delete_scrobble(session, artist, track, timestamp): |
|
""" |
|
Delete a specific scrobble using the track and timestamp. |
|
""" |
|
params = { |
|
"method": "track.removeScrobble", |
|
"artist": artist, |
|
"track": track, |
|
"timestamp": timestamp, |
|
"api_key": API_KEY, |
|
"sk": SESSION_KEY, |
|
} |
|
params["api_sig"] = get_signature(params) # Generate the API signature |
|
params["format"] = "json" |
|
|
|
async with session.post(BASE_URL, data=params) as response: |
|
response.raise_for_status() |
|
return await response.json() |
|
|
|
|
|
async def delete_scrobbles(filter_fn): |
|
"""Delete scrobbles matching a filter function.""" |
|
if not CACHE_FILE.exists(): |
|
print("No cached scrobbles found. Run the scan command first.") |
|
return |
|
|
|
with open(CACHE_FILE, "r") as f: |
|
scrobbles = json.load(f) |
|
|
|
filtered = [s for s in scrobbles if filter_fn(s)] |
|
print(filtered) |
|
|
|
async with aiohttp.ClientSession() as session: |
|
delete_tasks = [ |
|
delete_scrobble(session, s["artist"], s["track"], s["timestamp"]) |
|
for s in filtered |
|
] |
|
results = await asyncio.gather(*delete_tasks, return_exceptions=True) |
|
|
|
for scrobble, result in zip(filtered, results): |
|
if isinstance(result, Exception): |
|
print(f"Failed to delete scrobble: {scrobble['track']} - {result}") |
|
else: |
|
print(f"Deleted scrobble: {scrobble['track']}") |
|
|
|
# Remove deleted scrobbles from cache |
|
remaining = [s for s in scrobbles if not filter_fn(s)] |
|
with open(CACHE_FILE, "w") as f: |
|
json.dump(remaining, f, indent=4) |
|
print(f"Deleted {len(filtered)} scrobbles.") |
|
|
|
|
|
async def delete_by_artist(artist): |
|
"""Delete all scrobbles for a specific artist.""" |
|
await delete_scrobbles(lambda s: s["artist"].lower() == artist.lower()) |
|
|
|
|
|
async def delete_by_track(track): |
|
"""Delete all scrobbles for a specific track.""" |
|
await delete_scrobbles(lambda s: s["track"].lower() == track.lower()) |
|
|
|
|
|
def main(): |
|
parser = argparse.ArgumentParser(description="Manage Last.fm scrobbles.") |
|
subparsers = parser.add_subparsers(dest="command", required=True) |
|
|
|
subparsers.add_parser("scan", help="Scan and cache all scrobbles.") |
|
search_parser = subparsers.add_parser("search", help="Search for scrobbles.") |
|
search_parser.add_argument("artist", help="Artist to search for.") |
|
del_artist_parser = subparsers.add_parser("delete-artist", help="Delete scrobbles by artist.") |
|
del_artist_parser.add_argument("artist", help="Artist to delete scrobbles for.") |
|
del_track_parser = subparsers.add_parser("delete-track", help="Delete scrobbles by track.") |
|
del_track_parser.add_argument("track", help="Track to delete scrobbles for.") |
|
|
|
args = parser.parse_args() |
|
|
|
print("Args", args) |
|
|
|
if args.command == "scan": |
|
asyncio.run(scan_scrobbles()) |
|
elif args.command == "search": |
|
search_scrobbles_by_artist(args.artist) |
|
elif args.command == "delete-artist": |
|
asyncio.run(delete_by_artist(args.artist)) |
|
elif args.command == "delete-track": |
|
asyncio.run(delete_by_track(args.track)) |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |