Last active
October 25, 2024 06:13
-
-
Save lennier1/dd34a4221f413f90cc9555fbf584e277 to your computer and use it in GitHub Desktop.
Scrape Veoh video IDs from a specified category
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeoutError | |
import time | |
import sys | |
def scrape_veoh_videos(category, output_file, min_page=1, max_page=None, length_filter='all', reverse=False, language=None, subtitle=None, sort='recent', log_file=None): | |
video_ids = set() | |
total_videos_found = 0 | |
# Function to handle logging | |
def log_message(message): | |
print(message) | |
if log_file: | |
with open(log_file, 'a') as log: | |
log.write(message + '\n') | |
# Map length filter arguments to select option values | |
length_options = { | |
'all': '0', | |
'lt4': '1', | |
'4to20': '2', | |
'20to60': '3', | |
'gt60': '4' | |
} | |
# Map sort options to select option values | |
sort_options = { | |
'recent': 'most recent', | |
'popular': 'most viewed', | |
'length': 'run length', | |
'alpha': 'title' | |
} | |
# List of length filters if length_filter is 'each' | |
length_filters_list = ['lt4', '4to20', '20to60', 'gt60'] | |
# List of language codes (excluding empty string) | |
language_codes = [ | |
'en', 'ja', 'de', 'fr', 'es', 'ab', 'aa', 'af', 'ak', 'sq', 'am', 'ar', 'an', 'hy', 'as', 'av', 'ae', | |
'ay', 'az', 'bm', 'ba', 'eu', 'be', 'bn', 'bh', 'bi', 'bs', 'br', 'bg', 'my', 'ca', 'km', 'ch', 'ce', 'zh', | |
'cv', 'kw', 'co', 'cr', 'hr', 'cs', 'da', 'dv', 'nl', 'dz', 'et', 'ee', 'fo', 'fj', 'fi', 'ff', 'gd', 'gl', | |
'lg', 'ka', 'el', 'gn', 'gu', 'ht', 'ha', 'hw', 'he', 'hz', 'hi', 'ho', 'hu', 'is', 'io', 'ig', 'id', 'ie', | |
'iu', 'ik', 'ga', 'it', 'jv', 'kn', 'kr', 'ks', 'kk', 'ki', 'rw', 'kv', 'kg', 'ko', 'ku', 'ky', 'lo', 'la', | |
'lv', 'li', 'ln', 'lt', 'lu', 'lb', 'mk', 'mg', 'ms', 'ml', 'mt', 'gv', 'mi', 'mr', 'mh', 'mo', 'mn', 'na', | |
'ng', 'ne', 'no', 'oj', 'or', 'om', 'pi', 'fa', 'pl', 'pt', 'ps', 'qu', 'ro', 'rm', 'rn', 'ru', 'sm', 'sg', | |
'sa', 'sc', 'sr', 'sn', 'ii', 'sd', 'sk', 'sl', 'so', 'su', 'sw', 'ss', 'sv', 'tl', 'ty', 'tg', 'ta', 'tt', | |
'te', 'th', 'bo', 'ti', 'to', 'ts', 'tn', 'tr', 'tk', 'tw', 'uk', 'ur', 'uz', 've', 'vi', 'vo', 'wa', 'cy', | |
'fy', 'wo', 'xh', 'yi', 'yo', 'zu' | |
] | |
if length_filter != 'each' and length_filter not in length_options: | |
log_message(f"Invalid length filter '{length_filter}'. Valid options are: {', '.join(length_options.keys())}.") | |
sys.exit(1) | |
# Adjusted get_language_value function | |
def get_language_value(code): | |
if code == 'hw': | |
return None # We will handle 'hw' separately | |
elif code == 'all' or code is None: | |
return 'all' | |
else: | |
return code | |
with sync_playwright() as p: | |
browser = p.chromium.launch(headless=True) | |
context = browser.new_context() | |
page = context.new_page() | |
# Function to scrape videos for given filters | |
def scrape_with_filters(length_filter, language, subtitle): | |
nonlocal total_videos_found | |
nonlocal video_ids | |
# Map length filter to value | |
if length_filter != 'all': | |
length_value = length_options[length_filter] | |
else: | |
length_value = '0' | |
# Initialize retries for base page | |
base_url = f'https://www.veoh.com/list/videos/{category}' | |
base_page_loaded = False | |
retries = 0 | |
max_retries = 25 | |
backoff = 2 | |
max_backoff = 8 | |
while not base_page_loaded and retries < max_retries: | |
try: | |
log_message(f"\nNavigating to base URL: {base_url}") | |
page.goto(base_url, wait_until='networkidle', timeout=60000) | |
time.sleep(3) | |
base_page_loaded = True | |
except Exception as e: | |
retries += 1 | |
delay = min(backoff * (2 ** (retries - 1)), max_backoff) | |
log_message(f"Error loading base page: {e}") | |
log_message(f"Retrying in {delay} seconds (Retry {retries}/{max_retries})...") | |
time.sleep(delay) | |
if not base_page_loaded: | |
log_message(f"Failed to load base page after {max_retries} attempts. Exiting.") | |
browser.close() | |
sys.exit(1) | |
# Apply the filters | |
filters_applied = False | |
if length_filter != 'all': | |
# Apply the length filter | |
try: | |
# Locate the select element for length filter | |
length_select_selector = 'div.box:has-text("Length (min)") select' | |
# Verify that the select element exists | |
page.wait_for_selector(length_select_selector, timeout=5000) | |
# Select the option with the appropriate value | |
page.select_option(length_select_selector, value=length_value) | |
time.sleep(3) # Wait for the page to update with filtered results | |
log_message(f"Length filter '{length_filter}' applied successfully.") | |
filters_applied = True | |
except Exception as e: | |
log_message(f"Error applying length filter: {e}") | |
browser.close() | |
sys.exit(1) | |
else: | |
log_message("No length filter applied (length 'all').") | |
if language and language != 'all': | |
# Apply language filter | |
try: | |
language_value = get_language_value(language) | |
language_select_selector = 'div.box:has-text("Language") select' | |
page.wait_for_selector(language_select_selector, timeout=5000) | |
if language == 'hw': | |
# For Hawaiian, select by label | |
page.select_option(language_select_selector, label='Hawaiian') | |
else: | |
page.select_option(language_select_selector, value=language_value) | |
time.sleep(3) | |
log_message(f"Language filter '{language}' applied successfully.") | |
filters_applied = True | |
except Exception as e: | |
log_message(f"Error applying language filter: {e}") | |
browser.close() | |
sys.exit(1) | |
else: | |
log_message("No language filter applied.") | |
if subtitle and subtitle != 'all': | |
# Apply subtitle filter | |
try: | |
subtitle_value = get_language_value(subtitle) | |
subtitle_select_selector = 'div.box:has-text("Subtitle") select' | |
page.wait_for_selector(subtitle_select_selector, timeout=5000) | |
if subtitle == 'hw': | |
# For Hawaiian, select by label | |
page.select_option(subtitle_select_selector, label='Hawaiian') | |
else: | |
page.select_option(subtitle_select_selector, value=subtitle_value) | |
time.sleep(3) | |
log_message(f"Subtitle filter '{subtitle}' applied successfully.") | |
filters_applied = True | |
except Exception as e: | |
log_message(f"Error applying subtitle filter: {e}") | |
browser.close() | |
sys.exit(1) | |
else: | |
log_message("No subtitle filter applied.") | |
if sort != 'recent': | |
# Apply sort option | |
try: | |
sort_value = sort_options[sort] | |
sort_select_selector = 'div.box:has-text("Sort by") select' | |
page.wait_for_selector(sort_select_selector, timeout=5000) | |
page.select_option(sort_select_selector, value=sort_value) | |
time.sleep(3) | |
log_message(f"Sort option '{sort}' applied successfully.") | |
filters_applied = True | |
except Exception as e: | |
log_message(f"Error applying sort option: {e}") | |
browser.close() | |
sys.exit(1) | |
else: | |
log_message("Default sort option applied ('recent').") | |
# Determine if we can navigate directly using URL | |
can_use_direct_navigation = not filters_applied | |
# **Added "No Results" Check Here** | |
# Check if there are no results before attempting to get last page number | |
no_result_selector = 'p.no-result' | |
if page.query_selector(no_result_selector): | |
log_message("No results found for the current filters.") | |
return # Exit the function and proceed to the next filter | |
# Get the last page number after filters are applied | |
try: | |
page.wait_for_selector('ul', timeout=10000) | |
pagination_uls = page.query_selector_all('ul') | |
last_page_num = None | |
for ul in pagination_uls: | |
lis = ul.query_selector_all('li') | |
for li in lis: | |
a = li.query_selector('a') | |
if a: | |
text = a.inner_text().strip() | |
if text.isdigit(): | |
num = int(text) | |
if last_page_num is None or num > last_page_num: | |
last_page_num = num | |
if last_page_num: | |
log_message(f"Detected last page number: {last_page_num}") | |
else: | |
log_message("Could not find last page number. Exiting.") | |
browser.close() | |
sys.exit(1) | |
except Exception as e: | |
log_message(f"Error detecting last page number: {e}") | |
browser.close() | |
sys.exit(1) | |
# Navigate to the starting page | |
if reverse: | |
current_page_num = last_page_num | |
if can_use_direct_navigation: | |
# Navigate directly to the last page URL | |
page.goto(f'{base_url}?page={current_page_num}', wait_until='networkidle', timeout=60000) | |
time.sleep(3) | |
else: | |
current_page_num = min_page | |
if can_use_direct_navigation and current_page_num != 1: | |
# Navigate directly to the starting page URL | |
page.goto(f'{base_url}?page={current_page_num}', wait_until='networkidle', timeout=60000) | |
time.sleep(3) | |
# Main scraping loop | |
while True: | |
if max_page is not None: | |
if not reverse and current_page_num > max_page: | |
log_message(f"Reached the maximum page number: {max_page}. Exiting.") | |
break | |
elif reverse and current_page_num < min_page: | |
log_message(f"Reached the minimum page number: {min_page}. Exiting.") | |
break | |
# Add condition to stop at last page in forward mode | |
if not reverse and current_page_num > last_page_num: | |
log_message(f"Reached the last page number: {last_page_num}. Exiting.") | |
break | |
success = False | |
retries = 0 | |
max_retries = 25 # As per your adjustment | |
backoff = 2 # Initial backoff time in seconds | |
max_backoff = 8 # As per your adjustment | |
while not success and retries < max_retries: | |
try: | |
log_message(f"\nProcessing page {current_page_num} (Attempt {retries + 1})...") | |
if can_use_direct_navigation: | |
# Navigate directly to page URL | |
page.goto(f'{base_url}?page={current_page_num}', wait_until='networkidle', timeout=60000) | |
time.sleep(3) | |
# Check if there are no results | |
no_result_selector = 'p.no-result' | |
if page.query_selector(no_result_selector): | |
log_message("No results found for the current filters.") | |
return # Exit the function and proceed to the next filter | |
# Wait for the video elements to load | |
page.wait_for_selector("a[href*='/watch/']", timeout=10000) | |
time.sleep(3) | |
# Extract video elements | |
video_elements = page.query_selector_all("a[href*='/watch/']") | |
page_videos = [] | |
new_videos = [] | |
for element in video_elements: | |
href = element.get_attribute('href') | |
if href and '/watch/' in href: | |
video_id = href.split('/watch/')[-1] | |
page_videos.append(video_id) # Add all videos found on the page | |
if video_id not in video_ids: | |
new_videos.append(video_id) # Keep track of new videos | |
video_ids.add(video_id) # Add to the set of unique videos | |
# If no new videos detected, assume page didn't load correctly | |
if len(new_videos) == 0: | |
retries += 1 | |
log_message(f"No new videos detected on page {current_page_num}. Retrying (Attempt {retries}/{max_retries})...") | |
# Try reloading the page using the alternative method | |
try: | |
if reverse: | |
# Click "Next" then "Prev" | |
next_button_selector = 'a.pager_button.next_arrow' | |
prev_button_selector = 'a.pager_button.prev_arrow' | |
# Click "Next" | |
if page.is_visible(next_button_selector): | |
with page.expect_navigation(wait_until='networkidle', timeout=60000): | |
page.click(next_button_selector) | |
time.sleep(3) | |
else: | |
log_message("Next button not found during retry.") | |
break | |
# Click "Prev" | |
if page.is_visible(prev_button_selector): | |
with page.expect_navigation(wait_until='networkidle', timeout=60000): | |
page.click(prev_button_selector) | |
time.sleep(3) | |
else: | |
log_message("Prev button not found during retry.") | |
break | |
else: | |
# Click "Prev" then "Next" | |
prev_button_selector = 'a.pager_button.prev_arrow' | |
next_button_selector = 'a.pager_button.next_arrow' | |
# Click "Prev" | |
if page.is_visible(prev_button_selector): | |
with page.expect_navigation(wait_until='networkidle', timeout=60000): | |
page.click(prev_button_selector) | |
time.sleep(3) | |
else: | |
log_message("Prev button not found during retry.") | |
break | |
# Click "Next" | |
if page.is_visible(next_button_selector): | |
with page.expect_navigation(wait_until='networkidle', timeout=60000): | |
page.click(next_button_selector) | |
time.sleep(3) | |
else: | |
log_message("Next button not found during retry.") | |
break | |
continue # Retry the page | |
except Exception as e: | |
log_message(f"Error reloading page {current_page_num}: {e}") | |
delay = min(backoff * (2 ** (retries - 1)), max_backoff) | |
log_message(f"Retrying in {delay} seconds...") | |
time.sleep(delay) | |
continue | |
else: | |
# Update total videos found | |
total_videos_found += len(new_videos) | |
# Progress update with number of new videos | |
log_message(f"Found {len(page_videos)} videos on page {current_page_num}, {len(new_videos)} new. Total unique videos found so far: {len(video_ids)}.") | |
# Append the videos found on this page to the output file | |
with open(output_file, 'a') as f: | |
for vid in new_videos: # Write only new videos | |
f.write(f"video:{vid}\n") | |
# Reset retries and mark success | |
retries = 0 | |
success = True | |
except Exception as e: | |
retries += 1 | |
delay = min(backoff * (2 ** (retries - 1)), max_backoff) | |
log_message(f"Error processing page {current_page_num}: {e}") | |
log_message(f"Retrying in {delay} seconds (Retry {retries}/{max_retries})...") | |
time.sleep(delay) | |
if not success: | |
log_message(f"Failed to process page {current_page_num} after {max_retries} attempts. Skipping to the next page.") | |
# Update current page number from the page | |
current_page_number_element = page.query_selector('ul li.current a') | |
if current_page_number_element: | |
current_page_num_on_page = int(current_page_number_element.inner_text().strip()) | |
log_message(f"Current page number on site: {current_page_num_on_page}") | |
else: | |
log_message("Could not find current page number after processing page. Exiting.") | |
break | |
# Check for navigation end conditions | |
try: | |
if reverse: | |
if current_page_num <= min_page: | |
log_message(f"Reached the minimum page number: {min_page}. Exiting.") | |
break | |
else: | |
# Try clicking the "Prev" button | |
prev_button_selector = 'a.pager_button.prev_arrow' | |
if page.is_visible(prev_button_selector): | |
current_page_num -= 1 | |
with page.expect_navigation(wait_until='networkidle', timeout=60000): | |
page.click(prev_button_selector) | |
time.sleep(3) | |
else: | |
log_message("No more previous pages to navigate.") | |
break | |
else: | |
if current_page_num >= last_page_num: | |
log_message(f"Reached the last page number: {last_page_num}. Exiting.") | |
break | |
else: | |
# Try clicking the "Next" button | |
next_button_selector = 'a.pager_button.next_arrow' | |
if page.is_visible(next_button_selector): | |
current_page_num += 1 | |
with page.expect_navigation(wait_until='networkidle', timeout=60000): | |
page.click(next_button_selector) | |
time.sleep(3) | |
else: | |
log_message("No more next pages to navigate.") | |
break | |
except Exception as e: | |
log_message(f"Error navigating to next/previous page from page {current_page_num}: {e}") | |
break # Exit the loop if unable to navigate further | |
log_message(f"\nFinished scraping with length='{length_filter}', language='{language}', subtitle='{subtitle}'. Total unique videos found: {len(video_ids)}.") | |
try: | |
# Handle 'each' options for language and subtitle | |
languages_to_process = [language] | |
subtitles_to_process = [subtitle] | |
if language == 'each': | |
languages_to_process = language_codes | |
if subtitle == 'each': | |
subtitles_to_process = language_codes | |
# Iterate over languages and subtitles | |
for lang in languages_to_process: | |
for sub in subtitles_to_process: | |
log_message(f"\nStarting scraping with language='{lang}', subtitle='{sub}'...") | |
scrape_with_filters(length_filter, lang, sub) | |
# Do not clear video_ids between iterations to accumulate total unique videos | |
except Exception as e: | |
log_message(f"An unexpected error occurred: {e}") | |
browser.close() | |
sys.exit(1) | |
browser.close() | |
log_message(f"\nExtraction complete. Total unique videos found: {len(video_ids)}.") | |
def main(): | |
parser = argparse.ArgumentParser(description='Scrape video IDs from Veoh.') | |
parser.add_argument('-c', '--category', required=True, help='Video category (e.g., action_adventure)') | |
parser.add_argument('-o', '--output', required=True, help='Output file to save video IDs') | |
parser.add_argument('-min', type=int, help='Starting page number (default: 1)') | |
parser.add_argument('-max', type=int, help='Ending page number (default: until last page)') | |
parser.add_argument('-length', choices=['all', 'lt4', '4to20', '20to60', 'gt60', 'each'], default='all', | |
help='Length filter to apply (default: all)') | |
parser.add_argument('-r', '--reverse', action='store_true', help='Scrape pages in reverse order') | |
parser.add_argument('-language', help="Language filter (e.g., en, ja, or 'each')") | |
parser.add_argument('-subtitle', help="Subtitle filter (e.g., en, ja, or 'each')") | |
parser.add_argument('-sort', choices=['recent', 'length', 'popular', 'alpha'], default='recent', | |
help='Sort option (default: recent)') | |
parser.add_argument('-l', '--log', help='Log file to append progress printouts') | |
args = parser.parse_args() | |
# Set default min_page to 1 if not provided | |
min_page = args.min if args.min is not None else 1 | |
# Ensure min_page is at least 1 | |
if min_page < 1: | |
print("Error: -min must be at least 1.") | |
sys.exit(1) | |
# Check for invalid combinations | |
if args.length == 'each' and (args.min is not None or args.max is not None): | |
print("Error: -length each option cannot be used with -min or -max.") | |
sys.exit(1) | |
scrape_veoh_videos( | |
category=args.category, | |
output_file=args.output, | |
min_page=min_page, | |
max_page=args.max, | |
length_filter=args.length, | |
reverse=args.reverse, | |
language=args.language, | |
subtitle=args.subtitle, | |
sort=args.sort, | |
log_file=args.log | |
) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment