Skip to content

Instantly share code, notes, and snippets.

@jasontucker
Last active July 24, 2024 18:05
Show Gist options
  • Save jasontucker/c954f6a48e9ca0a9d11782ba5b5e7de4 to your computer and use it in GitHub Desktop.
Save jasontucker/c954f6a48e9ca0a9d11782ba5b5e7de4 to your computer and use it in GitHub Desktop.
with pushover support
#!/usr/bin/env python3
import os
import pickle
import requests
import google.auth.transport.requests
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from datetime import datetime, timedelta
# Path to your OAuth 2.0 credentials file
CLIENT_SECRETS_FILE = "client_secret.json"
SCOPES = ["https://www.googleapis.com/auth/youtube"]
# Replace with your custom playlist ID and Pushover details
CUSTOM_PLAYLIST_ID = "YOUR_CUSTOM_PLAYLIST_ID"
PUSHOVER_USER_KEY = "YOUR_PUSHOVER_USER_KEY"
PUSHOVER_API_TOKEN = "YOUR_PUSHOVER_API_TOKEN"
# Token file to store access and refresh tokens
TOKEN_FILE = "token.pickle"
# Time interval to check for new videos
TIME_INTERVAL = timedelta(days=7)
# Quota cost per request type
QUOTA_COSTS = {
'search_list': 100, # Cost for each search.list request
'playlistItems_list': 1, # Cost for each playlistItems.list request
'playlistItems_insert': 50, # Cost for each playlistItems.insert request
'playlistItems_delete': 50, # Cost for each playlistItems.delete request
}
# Maximum quota per day (default 10,000 units, adjust according to your quota)
MAX_QUOTA_PER_DAY = 10000
def get_authenticated_service():
credentials = None
# Check if token.pickle file exists
if os.path.exists(TOKEN_FILE):
with open(TOKEN_FILE, "rb") as token:
credentials = pickle.load(token)
else:
flow = InstalledAppFlow.from_client_secrets_file(CLIENT_SECRETS_FILE, SCOPES)
credentials = flow.run_local_server(port=0)
# Save the credentials for the next run
with open(TOKEN_FILE, "wb") as token:
pickle.dump(credentials, token)
# Refresh the token if expired
if credentials.expired and credentials.refresh_token:
credentials.refresh(google.auth.transport.requests.Request())
with open(TOKEN_FILE, "wb") as token:
pickle.dump(credentials, token)
return build("youtube", "v3", credentials=credentials)
def get_latest_videos_from_channels(youtube, channel_ids):
video_ids = []
titles = []
now = datetime.utcnow()
published_after = (now - TIME_INTERVAL).isoformat("T") + "Z"
quota_used = 0
for channel_id in channel_ids:
request = youtube.search().list(
part="snippet",
channelId=channel_id,
order="date",
publishedAfter=published_after,
type="video",
maxResults=10
)
response = request.execute()
quota_used += QUOTA_COSTS['search_list']
for item in response['items']:
video_ids.append(item["id"]["videoId"])
titles.append(item["snippet"]["title"])
return video_ids, titles, quota_used
def get_playlist_items(youtube, playlist_id):
playlist_items = []
next_page_token = None
quota_used = 0
while True:
request = youtube.playlistItems().list(
part="snippet",
maxResults=50,
playlistId=playlist_id,
pageToken=next_page_token
)
response = request.execute()
quota_used += QUOTA_COSTS['playlistItems_list']
playlist_items += response['items']
next_page_token = response.get('nextPageToken')
if not next_page_token:
break
return playlist_items, quota_used
def remove_playlist_item(youtube, playlist_item_id):
request = youtube.playlistItems().delete(
id=playlist_item_id
)
request.execute()
return QUOTA_COSTS['playlistItems_delete']
def add_to_custom_playlist(youtube, video_id, playlist_id):
request = youtube.playlistItems().insert(
part="snippet",
body={
"snippet": {
"playlistId": playlist_id,
"resourceId": {
"kind": "youtube#video",
"videoId": video_id
}
}
}
)
response = request.execute()
return QUOTA_COSTS['playlistItems_insert']
def send_pushover_notification(user_key, api_token, title, message):
url = "https://api.pushover.net/1/messages.json"
data = {
"token": api_token,
"user": user_key,
"title": title,
"message": message
}
response = requests.post(url, data=data)
return response.status_code
def main():
youtube = get_authenticated_service()
# Get current playlist items
current_playlist_items, quota_used = get_playlist_items(youtube, CUSTOM_PLAYLIST_ID)
current_video_ids = {item['snippet']['resourceId']['videoId'] for item in current_playlist_items}
# Remove all items in the playlist
for item in current_playlist_items:
quota_used += remove_playlist_item(youtube, item['id'])
# List of channel IDs you want to check
channel_ids = [
"CHANNEL_ID_1",
"CHANNEL_ID_2",
"CHANNEL_ID_3"
]
added_videos = []
# Fetch latest videos from channels
latest_video_ids, titles, quota_cost = get_latest_videos_from_channels(youtube, channel_ids)
quota_used += quota_cost
for video_id, title in zip(latest_video_ids, titles):
# Add the video to the custom playlist if it's not a duplicate
if video_id not in current_video_ids:
quota_used += add_to_custom_playlist(youtube, video_id, CUSTOM_PLAYLIST_ID)
added_videos.append(title)
print(f"Added '{title.encode('ascii', 'ignore').decode()}' to custom playlist")
# Check if quota exceeded
if quota_used > MAX_QUOTA_PER_DAY:
print(f"Quota exceeded: Used {quota_used} units today")
send_pushover_notification(PUSHOVER_USER_KEY, PUSHOVER_API_TOKEN, "YouTube Playlist Update", f"Quota exceeded: Used {quota_used} units today")
else:
# Send Pushover notification
if added_videos:
message = f"Added {len(added_videos)} new video(s) to the custom playlist:\n" + "\n".join(added_videos)
else:
message = "No new videos were added to the custom playlist."
send_pushover_notification(PUSHOVER_USER_KEY, PUSHOVER_API_TOKEN, "YouTube Playlist Update", message)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment