Skip to content

Instantly share code, notes, and snippets.

@5AMsan
Created May 27, 2025 15:44
Show Gist options
  • Save 5AMsan/e72388eafbe9839203790212a35b6edd to your computer and use it in GitHub Desktop.
Save 5AMsan/e72388eafbe9839203790212a35b6edd to your computer and use it in GitHub Desktop.
FIP stream by date and duration to YTMusic playlist
import tkinter as tk
from tkinter import ttk, messagebox, simpledialog # ttk for themed widgets, messagebox for popups
import requests
import json
import time
from datetime import datetime, timedelta
import sys
# --- Your YouTube Music API Integration (Assuming ytmusicapi) ---
# Make sure you have ytmusicapi installed: pip install ytmusicapi
from ytmusicapi import YTMusic, OAuthCredentials
# Initialize YTMusic instance (replace with your actual auth file path)
# This should point to the JSON file generated by running 'ytmusicapi browser'
client_id = 'CLIENT_ID'
client_secret = 'CLIENT_SECRET'
YT_MUSIC_AUTH_FILE = "oauth.json"
try:
yt = YTMusic(YT_MUSIC_AUTH_FILE, oauth_credentials=OAuthCredentials(client_id=client_id, client_secret=client_secret))
print(f"Successfully initialized YTMusic with {YT_MUSIC_AUTH_FILE}")
except Exception as e:
messagebox.showerror("Initialization Error", f"Failed to initialize YTMusic. Make sure '{YT_MUSIC_AUTH_FILE}' exists and is valid.\nError: {e}")
sys.exit(1) # Exit if YTMusic cannot be initialized
# --- Your API Endpoint for Song Data (Pagination) ---
# IMPORTANT: Replace this with the actual URL of your API endpoint that provides the JSON data
BASE_API_URL = "https://www.radiofrance.fr/api/songs"
# --- Core Logic Functions ---
def fetch_all_songs_paginated(base_url, params):
"""
Fetches all songs from an API endpoint that uses a 'next' cursor for pagination.
Args:
base_url (str): The base URL of the API endpoint (e.g., "https://api.example.com/data").
Returns:
list: A list containing all song dictionaries fetched from all pages.
"""
all_songs = []
next_cursor = None # Initialize cursor to None for the first request
page_count = 0
print("Starting to fetch songs from API...")
while True:
page_count += 1
if next_cursor:
params['pageCursor'] = next_cursor # Or 'padCursor', depending on API's parameter name
try:
print(f"Fetching page {page_count} (cursor: {next_cursor if next_cursor else 'initial'})...")
response = requests.get(base_url, params=params)
response.raise_for_status() # Raise an HTTPError for bad responses (4xx or 5xx)
data = response.json()
# Extract songs from the current page
current_page_songs = data.get("songs", [])
all_songs.extend(current_page_songs)
print(f" - Added {len(current_page_songs)} songs from this page.")
print( data.get("next"), next_cursor)
if not data.get("next") :
print("No more pages found (next cursor is null/empty).")
break # Exit the loop if there's no next cursor
else:
# Get the cursor for the next page
next_cursor = data.get("next")
# Be polite to the API: wait a bit before the next request
time.sleep(1) # Wait for 1 second
except requests.exceptions.HTTPError as e:
print(f"HTTP error occurred: {e} - Response: {response.text}", file=sys.stderr)
break
except requests.exceptions.ConnectionError as e:
print(f"Connection error occurred: {e}", file=sys.stderr)
break
except requests.exceptions.Timeout as e:
print(f"Timeout error occurred: {e}", file=sys.stderr)
break
except requests.exceptions.RequestException as e:
print(f"An unexpected request error occurred: {e}", file=sys.stderr)
break
except json.JSONDecodeError as e:
print(f"Failed to decode JSON response: {e} - Response text: {response.text}", file=sys.stderr)
break
except Exception as e:
print(f"An unknown error occurred: {e}", file=sys.stderr)
break
print(f"\nFinished fetching. Total songs retrieved: {len(all_songs)}")
return all_songs
def perform_search_ytm(query, yt_instance):
"""
Performs a YouTube Music search and returns the raw results.
"""
print(f"\nSearching YouTube Music for '{query}'...")
try:
# filter='songs' is good for music tracks. Limit to a reasonable number for selection.
search_results = yt_instance.search(query, filter='songs', limit=10)
return search_results
except Exception as e:
print(f"An error occurred during YouTube Music search: {e}", file=sys.stderr)
return []
def add_song_to_playlist(song_info, yt_instance, target_playlist_id):
"""
Adds the selected song to the YouTube Music playlist.
"""
item_id_to_add = song_info.get('videoId')
if not item_id_to_add:
print(f"Error: Selected item '{song_info.get('title')}' has no video ID. Cannot add to playlist.")
return False
print(f"\nAdding '{song_info.get('title')}' by '{song_info.get('artists', [{'name': 'N/A'}])[0].get('name')}' to playlist...")
try:
# ytmusicapi.add_playlist_items expects a list of video IDs
response = yt_instance.add_playlist_items(target_playlist_id, [item_id_to_add])
# Check the response for success. ytmusicapi's response can vary.
# A common success indicator for add_playlist_items is a non-empty 'playlistEditResults' list
# or a 'status' field. Adapt this check based on actual API response.
if response and response.get('status') and response.get('playlistEditResults'):
# Check if at least one item was successfully added without error
# if any(res.get('status') == 'STATUS_SUCCEEDED' for res in response.get('playlistEditResults')):
if response.get('status') == 'STATUS_SUCCEEDED':
print(f"Successfully added '{song_info.get('title')}' to the playlist.")
return True
else:
print(f"Failed to add '{song_info.get('title')}'. No successful edit result found. Response: {response}")
return False
elif response and response.get('status') == 'STATUS_SUCCEEDED': # Older ytmusicapi versions might just return status
print(f"Successfully added '{song_info.get('title')}' to the playlist (old API response format).")
return True
elif response and response.get('status') == 'STATUS_FAILED' and response.get('actions',[])[0].get('addToToastAction', {}).get('item', {}).get('notificationActionRenderer', {}).get('responseText', {}).get('runs', [])[0].get('text','') == "This track is already in the playlist":
print(f"Track '{song_info.get('title')}' is already in the playlist. Skipping.")
return True
else:
print(f"Failed to add '{song_info.get('title')}' to the playlist. Unexpected response: {response}")
return False
except Exception as e:
print(f"An error occurred while adding to playlist: {e}", file=sys.stderr)
messagebox.showerror("Playlist Add Error", f"Error adding '{song_info.get('title')}' to playlist: {e}")
return False
# --- Tkinter GUI Functions ---
class ParameterInputWindow:
def __init__(self, master):
self.master = master
master.title("Configure Music Scraper")
self.params = None # To store parameters if dialog is successful
current_time_str = datetime.now().strftime("%Y-%m-%d 22:00:00")
# Start Date/Time
ttk.Label(master, text='Start Date and Time (YYYY-MM-DD HH:MM:SS):').grid(row=0, column=0, sticky='w', padx=5, pady=5)
self.start_datetime_entry = ttk.Entry(master, width=30)
self.start_datetime_entry.insert(0, current_time_str)
self.start_datetime_entry.grid(row=0, column=1, padx=5, pady=5)
# Duration
ttk.Label(master, text='Duration of Livestream to Scrap (minutes):').grid(row=1, column=0, sticky='w', padx=5, pady=5)
self.duration_minutes_entry = ttk.Entry(master, width=10)
self.duration_minutes_entry.insert(0, '150')
self.duration_minutes_entry.grid(row=1, column=1, sticky='w', padx=5, pady=5)
# Auto-Accept Checkbox
self.auto_accept_var = tk.BooleanVar(value=True)
self.auto_accept_checkbox = ttk.Checkbutton(master, text='Auto-Accept First YouTube Music fuzzy match', variable=self.auto_accept_var)
self.auto_accept_checkbox.grid(row=2, column=0, columnspan=2, sticky='w', padx=5, pady=5)
# Playlist ID
ttk.Label(master, text='Existing YouTube Music Playlist ID (empty to create a new one):').grid(row=3, column=0, sticky='w', padx=5, pady=5)
self.playlist_id_entry = ttk.Entry(master, width=30)
self.playlist_id_entry.insert(0, "")
self.playlist_id_entry.grid(row=3, column=1, padx=5, pady=5)
# Buttons
self.start_button = ttk.Button(master, text='Start Scraping', command=self.on_start)
self.start_button.grid(row=4, column=0, padx=5, pady=10)
self.exit_button = ttk.Button(master, text='Exit', command=self.on_exit)
self.exit_button.grid(row=4, column=1, padx=5, pady=10)
def on_start(self):
start_datetime_str = self.start_datetime_entry.get()
duration_minutes_str = self.duration_minutes_entry.get()
auto_accept = self.auto_accept_var.get()
playlist_id= self.playlist_id_entry.get()
try:
start_dt_obj = datetime.strptime(start_datetime_str, "%Y-%m-%d %H:%M:%S")
duration = int(duration_minutes_str)
if duration <= 0:
messagebox.showerror("Input Error", "Duration must be a positive number.")
return
self.params = {
"start_dt_obj": start_dt_obj,
"duration_minutes": duration,
"auto_accept": auto_accept,
"playlist_id": playlist_id
}
self.master.destroy() # Close the window
except ValueError as e:
messagebox.showerror("Input Error", f"Invalid input format. Please check date/time or duration.\nError: {e}")
def on_exit(self):
self.params = None # Indicate cancellation
self.master.destroy() # Close the window
def get_parameters_from_gui():
root = tk.Tk()
# root.withdraw() # Hide the main Tkinter window
param_window = tk.Toplevel(root) # Create a new top-level window for the parameters
app = ParameterInputWindow(param_window)
root.wait_window(param_window) # Wait for the parameter window to close
root.destroy() # Destroy the hidden root window
return app.params # Return the parameters or None if cancelled
class SongSelectionWindow:
def __init__(self, master, search_results):
self.master = master
master.title("Select YouTube Music Song")
master.transient(master.master) # Make it modal relative to its parent (main Tkinter window)
master.grab_set() # Grab all events until this window is closed
self.selected_song_info = None
ttk.Label(master, text="Multiple YouTube Music results found. Please select the correct song:").pack(padx=10, pady=10)
# Listbox for results
self.listbox = tk.Listbox(master, width=90, height=15, selectmode=tk.SINGLE)
self.listbox.pack(padx=10, pady=5, fill=tk.BOTH, expand=True)
# Add scrollbar
scrollbar = ttk.Scrollbar(master, orient="vertical", command=self.listbox.yview)
scrollbar.pack(side="right", fill="y")
self.listbox.config(yscrollcommand=scrollbar.set)
# Populate listbox
if search_results:
for i, item in enumerate(search_results):
title = item.get('title', 'N/A')
artist = item.get('artists', [{'name': 'N/A'}])[0].get('name')
album = item.get('album', {}).get('name', 'N/A')
duration = item.get('duration', 'N/A')
self.listbox.insert(tk.END, f"{i+1}. {title} - {artist} | Album: {album} | Duration: {duration}")
else:
self.listbox.insert(tk.END, "No results to display.")
# Buttons
button_frame = ttk.Frame(master)
button_frame.pack(pady=10)
ttk.Button(button_frame, text='Select Song', command=lambda: self.on_select(search_results)).pack(side=tk.LEFT, padx=5)
ttk.Button(button_frame, text='Skip Song', command=self.on_skip).pack(side=tk.LEFT, padx=5)
def on_select(self, search_results):
selected_indices = self.listbox.curselection() # Returns tuple of selected indices
if selected_indices:
selected_index = selected_indices[0] # Get the first selected index
if 0 <= selected_index < len(search_results):
self.selected_song_info = search_results[selected_index]
print(f"User selected: {self.selected_song_info.get('title')} by {self.selected_song_info.get('artists', [{'name': 'N/A'}])[0].get('name')}")
self.master.destroy() # Close the window
else:
messagebox.showwarning("No Selection", "Please select an item first.")
def on_skip(self):
self.selected_song_info = None # Indicate skip
print("Song selection skipped by user.")
self.master.destroy() # Close the window
def choose_song_from_results_gui(search_results):
root = tk.Tk()
# root.withdraw() # Hide the main Tkinter window
selection_window = tk.Toplevel(root) # Create a new top-level window for selection
app = SongSelectionWindow(selection_window, search_results)
root.wait_window(selection_window) # Wait for the selection window to close
root.destroy() # Destroy the hidden root window
return app.selected_song_info # Return the selected song or None
# --- Main Script Execution ---
def main():
# 1. Get parameters from GUI
params = get_parameters_from_gui()
if not params:
print("Script execution cancelled by user.")
return
start_dt_obj = params["start_dt_obj"]
duration_minutes = params["duration_minutes"]
auto_accept = params["auto_accept"]
playlist_id = params["playlist_id"]
if not playlist_id:
new_playlist = yt.create_playlist("My Automated Music", "Songs added via script")
playlist_id = new_playlist
start_timestamp = int(start_dt_obj.timestamp())
end_dt_obj = start_dt_obj + timedelta(minutes=duration_minutes)
end_timestamp = int(end_dt_obj.timestamp())
# print(f"\n--- Script Parameters ---")
# print(f"Start Date/Time: {start_dt_obj} (Timestamp: {start_timestamp})")
# print(f"End Date/Time: {end_dt_obj} (Timestamp: {end_timestamp})")
# print(f"Duration to Scrap: {duration_minutes} minutes")
# print(f"Auto-Accept First Result: {auto_accept}")
# print(f"Auto-Accept First Result: {auto_accept}")
# print(f"-------------------------")
# 2. Fetch all songs from your API, handling pagination and time filtering
# all_source_songs = fetch_all_songs_paginated(BASE_API_URL, start_timestamp, end_timestamp)
all_source_songs = fetch_all_songs_paginated(BASE_API_URL, {"station": "fip", "isPad": "true", "start": start_timestamp, "stop": end_timestamp})
if not all_source_songs:
messagebox.showinfo("No Songs Found", "No songs found from the source API within the specified time range.")
return
print(f"\nProcessing {len(all_source_songs)} songs retrieved from source API...")
tracks_not_found = []
tracks_skipped_count = 0
tracks_added_count = 0
# 3. Process each song for YouTube Music and add to playlist
for i, source_song in enumerate(all_source_songs):
artist = source_song.get("firstLine", False)
title = source_song.get("secondLine", False)
if not artist or not title:
continue
query = f"{title} {artist}"
print(f"\n--- Processing song {i+1}/{len(all_source_songs)}: '{title}' by '{artist}' ---")
ytm_search_results = perform_search_ytm(query, yt)
selected_ytm_song = None
if not ytm_search_results:
print(f"No YouTube Music results found for '{query}'. Skipping.")
messagebox.showinfo("No YTM Results", f"No YouTube Music results found for '{query}'. Skipping.")
continue
elif auto_accept:
res_artist = ytm_search_results[0].get('artists', [{'name': 'N/A'}])[0].get('name')
res_title = ytm_search_results[0].get('title')
if (not artist in res_artist and not res_artist in artist and artist != res_artist) and (not title in res_title and not res_title in title and title != res_title):
choose_song_from_results_gui(ytm_search_results)
else:
selected_ytm_song = ytm_search_results[0]
print(f"Auto-accepting first result for entry #{i+1}: '{res_title}' by '{res_artist}'")
else:
selected_ytm_song = choose_song_from_results_gui(ytm_search_results)
if selected_ytm_song and not("SKIPPED" == selected_ytm_song) :
if add_song_to_playlist(selected_ytm_song, yt, playlist_id):
tracks_added_count += 1
elif "SKIPPED" == selected_ytm_song:
continue
else:
print(f"No song selected or skipped for '{query}'.")
tracks_skipped_count += 1
time.sleep(0.5) # Small delay between processing each song for YTM API politeness
# --- Display Summary in GUI ---
summary_message = (
f"Script finished processing all songs!\n\n"
f"Summary:\n"
f"Tracks processed from source: {len(all_source_songs)}\n"
f"Tracks successfully added to YouTube Music: {tracks_added_count}\n"
f"Tracks not found on YouTube Music (details below): {len(tracks_not_found)}\n"
f"Tracks skipped by user: {tracks_skipped_count}\n"
f"\t🔪---------------------------------\n"
f"Not found details:\n"
f"{"\n".join(tracks_not_found)}"
)
messagebox.showinfo("Processing Complete", summary_message)
messagebox.showinfo("Done", "Script finished processing all songs!")
# --- Run the main function ---
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment