Created
November 12, 2024 09:45
-
-
Save oazabir/818a7d6412430d9eca2bd62ddfce610f to your computer and use it in GitHub Desktop.
Python Youtube channel, playlist and video fetch
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import datetime, timedelta | |
import time | |
from google.oauth2.service_account import Credentials | |
from googleapiclient.discovery import build | |
import os | |
import json | |
from tenacity import RetryError, retry, stop_after_attempt, wait_fixed | |
# YouTube Data API setup | |
API_KEY = '' # put API key here | |
youtube = build('youtube', 'v3', developerKey=API_KEY) | |
# Authenticate and initialize the Google Sheets client | |
creds = Credentials.from_service_account_file(CREDS_FILE, scopes=SCOPE) | |
client = gspread.authorize(creds) | |
def get_playlist_id(playlist_url): | |
playlist_id = playlist_url.split("list=")[1] | |
return playlist_id | |
# get playlists in a channel | |
def fetch_channel_playlists(api_key, channel_id): | |
youtube = build('youtube', 'v3', developerKey=api_key) | |
# Fetch all playlists for the channel | |
playlists = [] | |
next_page_token = None | |
while True: | |
try: | |
@retry(stop=stop_after_attempt(3), wait=wait_fixed(1)) | |
def fetch_playlists(): | |
return youtube.playlists().list( | |
part='snippet', | |
channelId=channel_id, | |
maxResults=50, | |
pageToken=next_page_token | |
).execute() | |
playlists_response = fetch_playlists() | |
time.sleep(1) | |
for item in playlists_response.get('items', []): | |
playlist_id = item['id'] | |
playlist_url = f"https://www.youtube.com/playlist?list={playlist_id}" | |
playlists.append(playlist_url) | |
next_page_token = playlists_response.get('nextPageToken') | |
if not next_page_token: | |
break | |
except Exception as e: | |
print(f"Error fetching playlists for channel {channel_id}: {e}") | |
break | |
print(f"Found {len(playlists)} playlists for channel {channel_id}") | |
return playlists | |
def fetch_playlist_videos(api_key, playlist_id, skip_video_ids): | |
youtube = build('youtube', 'v3', developerKey=api_key) | |
publishedAfter=(datetime.now() - timedelta(days=10)).strftime("%Y-%m-%dT%H:%M:%SZ") | |
# Fetch playlist details | |
@retry(stop=stop_after_attempt(3), wait=wait_fixed(5)) | |
def fetch_playlist_details(): | |
return youtube.playlists().list( | |
part='snippet', | |
id=playlist_id, | |
maxResults=50 | |
).execute() | |
try: | |
playlist_response = fetch_playlist_details() | |
time.sleep(1) | |
except RetryError: | |
print(f"Failed to fetch playlist details after 3 attempts") | |
return None | |
if not playlist_response['items']: | |
return None | |
playlist_name = playlist_response['items'][0]['snippet']['title'] | |
playlist_url = f"https://www.youtube.com/playlist?list={playlist_id}" | |
# Fetch videos in the playlist | |
videos = [] | |
next_page_token = None | |
while True: | |
@retry(stop=stop_after_attempt(3), wait=wait_fixed(5)) | |
def fetch_playlist_items(): | |
return youtube.playlistItems().list( | |
part='snippet,contentDetails', | |
playlistId=playlist_id, | |
maxResults=50, | |
pageToken=next_page_token | |
).execute() | |
try: | |
playlist_items_response = fetch_playlist_items() | |
time.sleep(1) | |
except RetryError: | |
print(f"Failed to fetch playlist items after 3 attempts") | |
break | |
process_youtube_items(youtube, videos, playlist_items_response, skip_video_ids) | |
time.sleep(1) | |
next_page_token = playlist_items_response.get('nextPageToken') | |
if not next_page_token: | |
break | |
for video in videos: | |
video['playlist_url'] = playlist_url | |
return { | |
'playlist_name': playlist_name, | |
'playlist_url': playlist_url, | |
'videos': videos | |
} | |
def process_youtube_items(youtube, videos, items, skip_video_ids): | |
from datetime import datetime, timedelta | |
two_weeks_ago = datetime.now() - timedelta(weeks=2) | |
video_ids = [ | |
item['snippet']['resourceId']['videoId'] | |
for item in items['items'] | |
if datetime.strptime(item['snippet']['publishedAt'], '%Y-%m-%dT%H:%M:%SZ') > two_weeks_ago | |
and item['snippet']['resourceId']['videoId'] not in skip_video_ids | |
] | |
if len(video_ids) == 0: | |
return | |
@retry(stop=stop_after_attempt(3), wait=wait_fixed(5)) | |
def fetch_video_details(): | |
return youtube.videos().list( | |
part='snippet,status,contentDetails', | |
id=','.join(video_ids) | |
).execute() | |
try: | |
video_details = fetch_video_details() | |
except RetryError: | |
print(f"Failed to fetch video details after 3 attempts") | |
video_details = {'items': []} | |
for item in items['items']: | |
video_id = item['snippet']['resourceId']['videoId'] if 'resourceId' in item['snippet'] else None | |
video_title = item['snippet'].get('title') | |
video_description = item['snippet'].get('description') | |
video_url = f"https://www.youtube.com/watch?v={video_id}" if video_id else None | |
video_published_at = item['snippet'].get('publishedAt') | |
video_channel_id = item['snippet'].get('channelId') | |
video_thumbnails = item['snippet'].get('thumbnails') | |
if video_thumbnails and video_thumbnails['medium']: | |
video_channel_title = item['snippet'].get('channelTitle', "") | |
# video_default_language = item['snippet'].get('defaultLanguage', "") | |
# video_localized = item['snippet'].get('localized', "") | |
# video_privacy_status = item['status'].get('privacyStatus') if 'status' in item else None | |
# video_item_count = item['contentDetails'].get('itemCount') if 'contentDetails' in item else None | |
# video_embed_html = item['player'].get('embedHtml') if 'player' in item else None | |
# video_localizations = item.get('localizations') | |
video_detail = next((video for video in video_details['items'] if video['id'] == video_id), None) | |
if video_detail: | |
video_duration = video_detail['contentDetails']['duration'] if 'duration' in video_detail['contentDetails'] else '0' | |
if 'embeddable' not in video_detail['status'] or video_detail['status']['embeddable'] == False: | |
print(f"Video {video_id} is not embeddable") | |
continue | |
if 'regionRestriction' in video_detail['contentDetails'] and 'allowed' in video_detail['contentDetails']['regionRestriction'] and len(video_detail['contentDetails']['regionRestriction']['allowed']) > 0: | |
print(f"Video {video_id} is region restricted: {video_detail['contentDetails']['regionRestriction']}") | |
continue | |
videos.append({ | |
'video_id': video_id, | |
'title': video_title, | |
'description': video_description, | |
'url': video_url, | |
'published_at': video_published_at, | |
'channel_id': video_channel_id, | |
'thumbnails': video_thumbnails, | |
'channel_title': video_channel_title, | |
# 'default_language': video_default_language, | |
# 'localized': video_localized, | |
# 'privacy_status': video_privacy_status, | |
# 'item_count': video_item_count, | |
# 'embed_html': video_embed_html, | |
# 'localizations': video_localizations, | |
'duration': video_duration, | |
'content_details': video_detail['contentDetails'] | |
}) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment