Created
May 27, 2025 15:44
-
-
Save 5AMsan/e72388eafbe9839203790212a35b6edd to your computer and use it in GitHub Desktop.
FIP stream by date and duration to YTMusic playlist
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import tkinter as tk | |
from tkinter import ttk, messagebox, simpledialog # ttk for themed widgets, messagebox for popups | |
import requests | |
import json | |
import time | |
from datetime import datetime, timedelta | |
import sys | |
# --- Your YouTube Music API Integration (Assuming ytmusicapi) --- | |
# Make sure you have ytmusicapi installed: pip install ytmusicapi | |
from ytmusicapi import YTMusic, OAuthCredentials | |
# Initialize YTMusic instance (replace with your actual auth file path) | |
# This should point to the JSON file generated by running 'ytmusicapi browser' | |
client_id = 'CLIENT_ID' | |
client_secret = 'CLIENT_SECRET' | |
YT_MUSIC_AUTH_FILE = "oauth.json" | |
try: | |
yt = YTMusic(YT_MUSIC_AUTH_FILE, oauth_credentials=OAuthCredentials(client_id=client_id, client_secret=client_secret)) | |
print(f"Successfully initialized YTMusic with {YT_MUSIC_AUTH_FILE}") | |
except Exception as e: | |
messagebox.showerror("Initialization Error", f"Failed to initialize YTMusic. Make sure '{YT_MUSIC_AUTH_FILE}' exists and is valid.\nError: {e}") | |
sys.exit(1) # Exit if YTMusic cannot be initialized | |
# --- Your API Endpoint for Song Data (Pagination) --- | |
# IMPORTANT: Replace this with the actual URL of your API endpoint that provides the JSON data | |
BASE_API_URL = "https://www.radiofrance.fr/api/songs" | |
# --- Core Logic Functions --- | |
def fetch_all_songs_paginated(base_url, params): | |
""" | |
Fetches all songs from an API endpoint that uses a 'next' cursor for pagination. | |
Args: | |
base_url (str): The base URL of the API endpoint (e.g., "https://api.example.com/data"). | |
Returns: | |
list: A list containing all song dictionaries fetched from all pages. | |
""" | |
all_songs = [] | |
next_cursor = None # Initialize cursor to None for the first request | |
page_count = 0 | |
print("Starting to fetch songs from API...") | |
while True: | |
page_count += 1 | |
if next_cursor: | |
params['pageCursor'] = next_cursor # Or 'padCursor', depending on API's parameter name | |
try: | |
print(f"Fetching page {page_count} (cursor: {next_cursor if next_cursor else 'initial'})...") | |
response = requests.get(base_url, params=params) | |
response.raise_for_status() # Raise an HTTPError for bad responses (4xx or 5xx) | |
data = response.json() | |
# Extract songs from the current page | |
current_page_songs = data.get("songs", []) | |
all_songs.extend(current_page_songs) | |
print(f" - Added {len(current_page_songs)} songs from this page.") | |
print( data.get("next"), next_cursor) | |
if not data.get("next") : | |
print("No more pages found (next cursor is null/empty).") | |
break # Exit the loop if there's no next cursor | |
else: | |
# Get the cursor for the next page | |
next_cursor = data.get("next") | |
# Be polite to the API: wait a bit before the next request | |
time.sleep(1) # Wait for 1 second | |
except requests.exceptions.HTTPError as e: | |
print(f"HTTP error occurred: {e} - Response: {response.text}", file=sys.stderr) | |
break | |
except requests.exceptions.ConnectionError as e: | |
print(f"Connection error occurred: {e}", file=sys.stderr) | |
break | |
except requests.exceptions.Timeout as e: | |
print(f"Timeout error occurred: {e}", file=sys.stderr) | |
break | |
except requests.exceptions.RequestException as e: | |
print(f"An unexpected request error occurred: {e}", file=sys.stderr) | |
break | |
except json.JSONDecodeError as e: | |
print(f"Failed to decode JSON response: {e} - Response text: {response.text}", file=sys.stderr) | |
break | |
except Exception as e: | |
print(f"An unknown error occurred: {e}", file=sys.stderr) | |
break | |
print(f"\nFinished fetching. Total songs retrieved: {len(all_songs)}") | |
return all_songs | |
def perform_search_ytm(query, yt_instance): | |
""" | |
Performs a YouTube Music search and returns the raw results. | |
""" | |
print(f"\nSearching YouTube Music for '{query}'...") | |
try: | |
# filter='songs' is good for music tracks. Limit to a reasonable number for selection. | |
search_results = yt_instance.search(query, filter='songs', limit=10) | |
return search_results | |
except Exception as e: | |
print(f"An error occurred during YouTube Music search: {e}", file=sys.stderr) | |
return [] | |
def add_song_to_playlist(song_info, yt_instance, target_playlist_id): | |
""" | |
Adds the selected song to the YouTube Music playlist. | |
""" | |
item_id_to_add = song_info.get('videoId') | |
if not item_id_to_add: | |
print(f"Error: Selected item '{song_info.get('title')}' has no video ID. Cannot add to playlist.") | |
return False | |
print(f"\nAdding '{song_info.get('title')}' by '{song_info.get('artists', [{'name': 'N/A'}])[0].get('name')}' to playlist...") | |
try: | |
# ytmusicapi.add_playlist_items expects a list of video IDs | |
response = yt_instance.add_playlist_items(target_playlist_id, [item_id_to_add]) | |
# Check the response for success. ytmusicapi's response can vary. | |
# A common success indicator for add_playlist_items is a non-empty 'playlistEditResults' list | |
# or a 'status' field. Adapt this check based on actual API response. | |
if response and response.get('status') and response.get('playlistEditResults'): | |
# Check if at least one item was successfully added without error | |
# if any(res.get('status') == 'STATUS_SUCCEEDED' for res in response.get('playlistEditResults')): | |
if response.get('status') == 'STATUS_SUCCEEDED': | |
print(f"Successfully added '{song_info.get('title')}' to the playlist.") | |
return True | |
else: | |
print(f"Failed to add '{song_info.get('title')}'. No successful edit result found. Response: {response}") | |
return False | |
elif response and response.get('status') == 'STATUS_SUCCEEDED': # Older ytmusicapi versions might just return status | |
print(f"Successfully added '{song_info.get('title')}' to the playlist (old API response format).") | |
return True | |
elif response and response.get('status') == 'STATUS_FAILED' and response.get('actions',[])[0].get('addToToastAction', {}).get('item', {}).get('notificationActionRenderer', {}).get('responseText', {}).get('runs', [])[0].get('text','') == "This track is already in the playlist": | |
print(f"Track '{song_info.get('title')}' is already in the playlist. Skipping.") | |
return True | |
else: | |
print(f"Failed to add '{song_info.get('title')}' to the playlist. Unexpected response: {response}") | |
return False | |
except Exception as e: | |
print(f"An error occurred while adding to playlist: {e}", file=sys.stderr) | |
messagebox.showerror("Playlist Add Error", f"Error adding '{song_info.get('title')}' to playlist: {e}") | |
return False | |
# --- Tkinter GUI Functions --- | |
class ParameterInputWindow: | |
def __init__(self, master): | |
self.master = master | |
master.title("Configure Music Scraper") | |
self.params = None # To store parameters if dialog is successful | |
current_time_str = datetime.now().strftime("%Y-%m-%d 22:00:00") | |
# Start Date/Time | |
ttk.Label(master, text='Start Date and Time (YYYY-MM-DD HH:MM:SS):').grid(row=0, column=0, sticky='w', padx=5, pady=5) | |
self.start_datetime_entry = ttk.Entry(master, width=30) | |
self.start_datetime_entry.insert(0, current_time_str) | |
self.start_datetime_entry.grid(row=0, column=1, padx=5, pady=5) | |
# Duration | |
ttk.Label(master, text='Duration of Livestream to Scrap (minutes):').grid(row=1, column=0, sticky='w', padx=5, pady=5) | |
self.duration_minutes_entry = ttk.Entry(master, width=10) | |
self.duration_minutes_entry.insert(0, '150') | |
self.duration_minutes_entry.grid(row=1, column=1, sticky='w', padx=5, pady=5) | |
# Auto-Accept Checkbox | |
self.auto_accept_var = tk.BooleanVar(value=True) | |
self.auto_accept_checkbox = ttk.Checkbutton(master, text='Auto-Accept First YouTube Music fuzzy match', variable=self.auto_accept_var) | |
self.auto_accept_checkbox.grid(row=2, column=0, columnspan=2, sticky='w', padx=5, pady=5) | |
# Playlist ID | |
ttk.Label(master, text='Existing YouTube Music Playlist ID (empty to create a new one):').grid(row=3, column=0, sticky='w', padx=5, pady=5) | |
self.playlist_id_entry = ttk.Entry(master, width=30) | |
self.playlist_id_entry.insert(0, "") | |
self.playlist_id_entry.grid(row=3, column=1, padx=5, pady=5) | |
# Buttons | |
self.start_button = ttk.Button(master, text='Start Scraping', command=self.on_start) | |
self.start_button.grid(row=4, column=0, padx=5, pady=10) | |
self.exit_button = ttk.Button(master, text='Exit', command=self.on_exit) | |
self.exit_button.grid(row=4, column=1, padx=5, pady=10) | |
def on_start(self): | |
start_datetime_str = self.start_datetime_entry.get() | |
duration_minutes_str = self.duration_minutes_entry.get() | |
auto_accept = self.auto_accept_var.get() | |
playlist_id= self.playlist_id_entry.get() | |
try: | |
start_dt_obj = datetime.strptime(start_datetime_str, "%Y-%m-%d %H:%M:%S") | |
duration = int(duration_minutes_str) | |
if duration <= 0: | |
messagebox.showerror("Input Error", "Duration must be a positive number.") | |
return | |
self.params = { | |
"start_dt_obj": start_dt_obj, | |
"duration_minutes": duration, | |
"auto_accept": auto_accept, | |
"playlist_id": playlist_id | |
} | |
self.master.destroy() # Close the window | |
except ValueError as e: | |
messagebox.showerror("Input Error", f"Invalid input format. Please check date/time or duration.\nError: {e}") | |
def on_exit(self): | |
self.params = None # Indicate cancellation | |
self.master.destroy() # Close the window | |
def get_parameters_from_gui(): | |
root = tk.Tk() | |
# root.withdraw() # Hide the main Tkinter window | |
param_window = tk.Toplevel(root) # Create a new top-level window for the parameters | |
app = ParameterInputWindow(param_window) | |
root.wait_window(param_window) # Wait for the parameter window to close | |
root.destroy() # Destroy the hidden root window | |
return app.params # Return the parameters or None if cancelled | |
class SongSelectionWindow: | |
def __init__(self, master, search_results): | |
self.master = master | |
master.title("Select YouTube Music Song") | |
master.transient(master.master) # Make it modal relative to its parent (main Tkinter window) | |
master.grab_set() # Grab all events until this window is closed | |
self.selected_song_info = None | |
ttk.Label(master, text="Multiple YouTube Music results found. Please select the correct song:").pack(padx=10, pady=10) | |
# Listbox for results | |
self.listbox = tk.Listbox(master, width=90, height=15, selectmode=tk.SINGLE) | |
self.listbox.pack(padx=10, pady=5, fill=tk.BOTH, expand=True) | |
# Add scrollbar | |
scrollbar = ttk.Scrollbar(master, orient="vertical", command=self.listbox.yview) | |
scrollbar.pack(side="right", fill="y") | |
self.listbox.config(yscrollcommand=scrollbar.set) | |
# Populate listbox | |
if search_results: | |
for i, item in enumerate(search_results): | |
title = item.get('title', 'N/A') | |
artist = item.get('artists', [{'name': 'N/A'}])[0].get('name') | |
album = item.get('album', {}).get('name', 'N/A') | |
duration = item.get('duration', 'N/A') | |
self.listbox.insert(tk.END, f"{i+1}. {title} - {artist} | Album: {album} | Duration: {duration}") | |
else: | |
self.listbox.insert(tk.END, "No results to display.") | |
# Buttons | |
button_frame = ttk.Frame(master) | |
button_frame.pack(pady=10) | |
ttk.Button(button_frame, text='Select Song', command=lambda: self.on_select(search_results)).pack(side=tk.LEFT, padx=5) | |
ttk.Button(button_frame, text='Skip Song', command=self.on_skip).pack(side=tk.LEFT, padx=5) | |
def on_select(self, search_results): | |
selected_indices = self.listbox.curselection() # Returns tuple of selected indices | |
if selected_indices: | |
selected_index = selected_indices[0] # Get the first selected index | |
if 0 <= selected_index < len(search_results): | |
self.selected_song_info = search_results[selected_index] | |
print(f"User selected: {self.selected_song_info.get('title')} by {self.selected_song_info.get('artists', [{'name': 'N/A'}])[0].get('name')}") | |
self.master.destroy() # Close the window | |
else: | |
messagebox.showwarning("No Selection", "Please select an item first.") | |
def on_skip(self): | |
self.selected_song_info = None # Indicate skip | |
print("Song selection skipped by user.") | |
self.master.destroy() # Close the window | |
def choose_song_from_results_gui(search_results): | |
root = tk.Tk() | |
# root.withdraw() # Hide the main Tkinter window | |
selection_window = tk.Toplevel(root) # Create a new top-level window for selection | |
app = SongSelectionWindow(selection_window, search_results) | |
root.wait_window(selection_window) # Wait for the selection window to close | |
root.destroy() # Destroy the hidden root window | |
return app.selected_song_info # Return the selected song or None | |
# --- Main Script Execution --- | |
def main(): | |
# 1. Get parameters from GUI | |
params = get_parameters_from_gui() | |
if not params: | |
print("Script execution cancelled by user.") | |
return | |
start_dt_obj = params["start_dt_obj"] | |
duration_minutes = params["duration_minutes"] | |
auto_accept = params["auto_accept"] | |
playlist_id = params["playlist_id"] | |
if not playlist_id: | |
new_playlist = yt.create_playlist("My Automated Music", "Songs added via script") | |
playlist_id = new_playlist | |
start_timestamp = int(start_dt_obj.timestamp()) | |
end_dt_obj = start_dt_obj + timedelta(minutes=duration_minutes) | |
end_timestamp = int(end_dt_obj.timestamp()) | |
# print(f"\n--- Script Parameters ---") | |
# print(f"Start Date/Time: {start_dt_obj} (Timestamp: {start_timestamp})") | |
# print(f"End Date/Time: {end_dt_obj} (Timestamp: {end_timestamp})") | |
# print(f"Duration to Scrap: {duration_minutes} minutes") | |
# print(f"Auto-Accept First Result: {auto_accept}") | |
# print(f"Auto-Accept First Result: {auto_accept}") | |
# print(f"-------------------------") | |
# 2. Fetch all songs from your API, handling pagination and time filtering | |
# all_source_songs = fetch_all_songs_paginated(BASE_API_URL, start_timestamp, end_timestamp) | |
all_source_songs = fetch_all_songs_paginated(BASE_API_URL, {"station": "fip", "isPad": "true", "start": start_timestamp, "stop": end_timestamp}) | |
if not all_source_songs: | |
messagebox.showinfo("No Songs Found", "No songs found from the source API within the specified time range.") | |
return | |
print(f"\nProcessing {len(all_source_songs)} songs retrieved from source API...") | |
tracks_not_found = [] | |
tracks_skipped_count = 0 | |
tracks_added_count = 0 | |
# 3. Process each song for YouTube Music and add to playlist | |
for i, source_song in enumerate(all_source_songs): | |
artist = source_song.get("firstLine", False) | |
title = source_song.get("secondLine", False) | |
if not artist or not title: | |
continue | |
query = f"{title} {artist}" | |
print(f"\n--- Processing song {i+1}/{len(all_source_songs)}: '{title}' by '{artist}' ---") | |
ytm_search_results = perform_search_ytm(query, yt) | |
selected_ytm_song = None | |
if not ytm_search_results: | |
print(f"No YouTube Music results found for '{query}'. Skipping.") | |
messagebox.showinfo("No YTM Results", f"No YouTube Music results found for '{query}'. Skipping.") | |
continue | |
elif auto_accept: | |
res_artist = ytm_search_results[0].get('artists', [{'name': 'N/A'}])[0].get('name') | |
res_title = ytm_search_results[0].get('title') | |
if (not artist in res_artist and not res_artist in artist and artist != res_artist) and (not title in res_title and not res_title in title and title != res_title): | |
choose_song_from_results_gui(ytm_search_results) | |
else: | |
selected_ytm_song = ytm_search_results[0] | |
print(f"Auto-accepting first result for entry #{i+1}: '{res_title}' by '{res_artist}'") | |
else: | |
selected_ytm_song = choose_song_from_results_gui(ytm_search_results) | |
if selected_ytm_song and not("SKIPPED" == selected_ytm_song) : | |
if add_song_to_playlist(selected_ytm_song, yt, playlist_id): | |
tracks_added_count += 1 | |
elif "SKIPPED" == selected_ytm_song: | |
continue | |
else: | |
print(f"No song selected or skipped for '{query}'.") | |
tracks_skipped_count += 1 | |
time.sleep(0.5) # Small delay between processing each song for YTM API politeness | |
# --- Display Summary in GUI --- | |
summary_message = ( | |
f"Script finished processing all songs!\n\n" | |
f"Summary:\n" | |
f"Tracks processed from source: {len(all_source_songs)}\n" | |
f"Tracks successfully added to YouTube Music: {tracks_added_count}\n" | |
f"Tracks not found on YouTube Music (details below): {len(tracks_not_found)}\n" | |
f"Tracks skipped by user: {tracks_skipped_count}\n" | |
f"\t🔪---------------------------------\n" | |
f"Not found details:\n" | |
f"{"\n".join(tracks_not_found)}" | |
) | |
messagebox.showinfo("Processing Complete", summary_message) | |
messagebox.showinfo("Done", "Script finished processing all songs!") | |
# --- Run the main function --- | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment