Skip to content

Instantly share code, notes, and snippets.

@lennier1
Last active October 25, 2024 06:13
Show Gist options
  • Save lennier1/dd34a4221f413f90cc9555fbf584e277 to your computer and use it in GitHub Desktop.
Save lennier1/dd34a4221f413f90cc9555fbf584e277 to your computer and use it in GitHub Desktop.
Scrape Veoh video IDs from a specified category
import argparse
from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeoutError
import time
import sys
def scrape_veoh_videos(category, output_file, min_page=1, max_page=None, length_filter='all', reverse=False, language=None, subtitle=None, sort='recent', log_file=None):
video_ids = set()
total_videos_found = 0
# Function to handle logging
def log_message(message):
print(message)
if log_file:
with open(log_file, 'a') as log:
log.write(message + '\n')
# Map length filter arguments to select option values
length_options = {
'all': '0',
'lt4': '1',
'4to20': '2',
'20to60': '3',
'gt60': '4'
}
# Map sort options to select option values
sort_options = {
'recent': 'most recent',
'popular': 'most viewed',
'length': 'run length',
'alpha': 'title'
}
# List of length filters if length_filter is 'each'
length_filters_list = ['lt4', '4to20', '20to60', 'gt60']
# List of language codes (excluding empty string)
language_codes = [
'en', 'ja', 'de', 'fr', 'es', 'ab', 'aa', 'af', 'ak', 'sq', 'am', 'ar', 'an', 'hy', 'as', 'av', 'ae',
'ay', 'az', 'bm', 'ba', 'eu', 'be', 'bn', 'bh', 'bi', 'bs', 'br', 'bg', 'my', 'ca', 'km', 'ch', 'ce', 'zh',
'cv', 'kw', 'co', 'cr', 'hr', 'cs', 'da', 'dv', 'nl', 'dz', 'et', 'ee', 'fo', 'fj', 'fi', 'ff', 'gd', 'gl',
'lg', 'ka', 'el', 'gn', 'gu', 'ht', 'ha', 'hw', 'he', 'hz', 'hi', 'ho', 'hu', 'is', 'io', 'ig', 'id', 'ie',
'iu', 'ik', 'ga', 'it', 'jv', 'kn', 'kr', 'ks', 'kk', 'ki', 'rw', 'kv', 'kg', 'ko', 'ku', 'ky', 'lo', 'la',
'lv', 'li', 'ln', 'lt', 'lu', 'lb', 'mk', 'mg', 'ms', 'ml', 'mt', 'gv', 'mi', 'mr', 'mh', 'mo', 'mn', 'na',
'ng', 'ne', 'no', 'oj', 'or', 'om', 'pi', 'fa', 'pl', 'pt', 'ps', 'qu', 'ro', 'rm', 'rn', 'ru', 'sm', 'sg',
'sa', 'sc', 'sr', 'sn', 'ii', 'sd', 'sk', 'sl', 'so', 'su', 'sw', 'ss', 'sv', 'tl', 'ty', 'tg', 'ta', 'tt',
'te', 'th', 'bo', 'ti', 'to', 'ts', 'tn', 'tr', 'tk', 'tw', 'uk', 'ur', 'uz', 've', 'vi', 'vo', 'wa', 'cy',
'fy', 'wo', 'xh', 'yi', 'yo', 'zu'
]
if length_filter != 'each' and length_filter not in length_options:
log_message(f"Invalid length filter '{length_filter}'. Valid options are: {', '.join(length_options.keys())}.")
sys.exit(1)
# Adjusted get_language_value function
def get_language_value(code):
if code == 'hw':
return None # We will handle 'hw' separately
elif code == 'all' or code is None:
return 'all'
else:
return code
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
context = browser.new_context()
page = context.new_page()
# Function to scrape videos for given filters
def scrape_with_filters(length_filter, language, subtitle):
nonlocal total_videos_found
nonlocal video_ids
# Map length filter to value
if length_filter != 'all':
length_value = length_options[length_filter]
else:
length_value = '0'
# Initialize retries for base page
base_url = f'https://www.veoh.com/list/videos/{category}'
base_page_loaded = False
retries = 0
max_retries = 25
backoff = 2
max_backoff = 8
while not base_page_loaded and retries < max_retries:
try:
log_message(f"\nNavigating to base URL: {base_url}")
page.goto(base_url, wait_until='networkidle', timeout=60000)
time.sleep(3)
base_page_loaded = True
except Exception as e:
retries += 1
delay = min(backoff * (2 ** (retries - 1)), max_backoff)
log_message(f"Error loading base page: {e}")
log_message(f"Retrying in {delay} seconds (Retry {retries}/{max_retries})...")
time.sleep(delay)
if not base_page_loaded:
log_message(f"Failed to load base page after {max_retries} attempts. Exiting.")
browser.close()
sys.exit(1)
# Apply the filters
filters_applied = False
if length_filter != 'all':
# Apply the length filter
try:
# Locate the select element for length filter
length_select_selector = 'div.box:has-text("Length (min)") select'
# Verify that the select element exists
page.wait_for_selector(length_select_selector, timeout=5000)
# Select the option with the appropriate value
page.select_option(length_select_selector, value=length_value)
time.sleep(3) # Wait for the page to update with filtered results
log_message(f"Length filter '{length_filter}' applied successfully.")
filters_applied = True
except Exception as e:
log_message(f"Error applying length filter: {e}")
browser.close()
sys.exit(1)
else:
log_message("No length filter applied (length 'all').")
if language and language != 'all':
# Apply language filter
try:
language_value = get_language_value(language)
language_select_selector = 'div.box:has-text("Language") select'
page.wait_for_selector(language_select_selector, timeout=5000)
if language == 'hw':
# For Hawaiian, select by label
page.select_option(language_select_selector, label='Hawaiian')
else:
page.select_option(language_select_selector, value=language_value)
time.sleep(3)
log_message(f"Language filter '{language}' applied successfully.")
filters_applied = True
except Exception as e:
log_message(f"Error applying language filter: {e}")
browser.close()
sys.exit(1)
else:
log_message("No language filter applied.")
if subtitle and subtitle != 'all':
# Apply subtitle filter
try:
subtitle_value = get_language_value(subtitle)
subtitle_select_selector = 'div.box:has-text("Subtitle") select'
page.wait_for_selector(subtitle_select_selector, timeout=5000)
if subtitle == 'hw':
# For Hawaiian, select by label
page.select_option(subtitle_select_selector, label='Hawaiian')
else:
page.select_option(subtitle_select_selector, value=subtitle_value)
time.sleep(3)
log_message(f"Subtitle filter '{subtitle}' applied successfully.")
filters_applied = True
except Exception as e:
log_message(f"Error applying subtitle filter: {e}")
browser.close()
sys.exit(1)
else:
log_message("No subtitle filter applied.")
if sort != 'recent':
# Apply sort option
try:
sort_value = sort_options[sort]
sort_select_selector = 'div.box:has-text("Sort by") select'
page.wait_for_selector(sort_select_selector, timeout=5000)
page.select_option(sort_select_selector, value=sort_value)
time.sleep(3)
log_message(f"Sort option '{sort}' applied successfully.")
filters_applied = True
except Exception as e:
log_message(f"Error applying sort option: {e}")
browser.close()
sys.exit(1)
else:
log_message("Default sort option applied ('recent').")
# Determine if we can navigate directly using URL
can_use_direct_navigation = not filters_applied
# **Added "No Results" Check Here**
# Check if there are no results before attempting to get last page number
no_result_selector = 'p.no-result'
if page.query_selector(no_result_selector):
log_message("No results found for the current filters.")
return # Exit the function and proceed to the next filter
# Get the last page number after filters are applied
try:
page.wait_for_selector('ul', timeout=10000)
pagination_uls = page.query_selector_all('ul')
last_page_num = None
for ul in pagination_uls:
lis = ul.query_selector_all('li')
for li in lis:
a = li.query_selector('a')
if a:
text = a.inner_text().strip()
if text.isdigit():
num = int(text)
if last_page_num is None or num > last_page_num:
last_page_num = num
if last_page_num:
log_message(f"Detected last page number: {last_page_num}")
else:
log_message("Could not find last page number. Exiting.")
browser.close()
sys.exit(1)
except Exception as e:
log_message(f"Error detecting last page number: {e}")
browser.close()
sys.exit(1)
# Navigate to the starting page
if reverse:
current_page_num = last_page_num
if can_use_direct_navigation:
# Navigate directly to the last page URL
page.goto(f'{base_url}?page={current_page_num}', wait_until='networkidle', timeout=60000)
time.sleep(3)
else:
current_page_num = min_page
if can_use_direct_navigation and current_page_num != 1:
# Navigate directly to the starting page URL
page.goto(f'{base_url}?page={current_page_num}', wait_until='networkidle', timeout=60000)
time.sleep(3)
# Main scraping loop
while True:
if max_page is not None:
if not reverse and current_page_num > max_page:
log_message(f"Reached the maximum page number: {max_page}. Exiting.")
break
elif reverse and current_page_num < min_page:
log_message(f"Reached the minimum page number: {min_page}. Exiting.")
break
# Add condition to stop at last page in forward mode
if not reverse and current_page_num > last_page_num:
log_message(f"Reached the last page number: {last_page_num}. Exiting.")
break
success = False
retries = 0
max_retries = 25 # As per your adjustment
backoff = 2 # Initial backoff time in seconds
max_backoff = 8 # As per your adjustment
while not success and retries < max_retries:
try:
log_message(f"\nProcessing page {current_page_num} (Attempt {retries + 1})...")
if can_use_direct_navigation:
# Navigate directly to page URL
page.goto(f'{base_url}?page={current_page_num}', wait_until='networkidle', timeout=60000)
time.sleep(3)
# Check if there are no results
no_result_selector = 'p.no-result'
if page.query_selector(no_result_selector):
log_message("No results found for the current filters.")
return # Exit the function and proceed to the next filter
# Wait for the video elements to load
page.wait_for_selector("a[href*='/watch/']", timeout=10000)
time.sleep(3)
# Extract video elements
video_elements = page.query_selector_all("a[href*='/watch/']")
page_videos = []
new_videos = []
for element in video_elements:
href = element.get_attribute('href')
if href and '/watch/' in href:
video_id = href.split('/watch/')[-1]
page_videos.append(video_id) # Add all videos found on the page
if video_id not in video_ids:
new_videos.append(video_id) # Keep track of new videos
video_ids.add(video_id) # Add to the set of unique videos
# If no new videos detected, assume page didn't load correctly
if len(new_videos) == 0:
retries += 1
log_message(f"No new videos detected on page {current_page_num}. Retrying (Attempt {retries}/{max_retries})...")
# Try reloading the page using the alternative method
try:
if reverse:
# Click "Next" then "Prev"
next_button_selector = 'a.pager_button.next_arrow'
prev_button_selector = 'a.pager_button.prev_arrow'
# Click "Next"
if page.is_visible(next_button_selector):
with page.expect_navigation(wait_until='networkidle', timeout=60000):
page.click(next_button_selector)
time.sleep(3)
else:
log_message("Next button not found during retry.")
break
# Click "Prev"
if page.is_visible(prev_button_selector):
with page.expect_navigation(wait_until='networkidle', timeout=60000):
page.click(prev_button_selector)
time.sleep(3)
else:
log_message("Prev button not found during retry.")
break
else:
# Click "Prev" then "Next"
prev_button_selector = 'a.pager_button.prev_arrow'
next_button_selector = 'a.pager_button.next_arrow'
# Click "Prev"
if page.is_visible(prev_button_selector):
with page.expect_navigation(wait_until='networkidle', timeout=60000):
page.click(prev_button_selector)
time.sleep(3)
else:
log_message("Prev button not found during retry.")
break
# Click "Next"
if page.is_visible(next_button_selector):
with page.expect_navigation(wait_until='networkidle', timeout=60000):
page.click(next_button_selector)
time.sleep(3)
else:
log_message("Next button not found during retry.")
break
continue # Retry the page
except Exception as e:
log_message(f"Error reloading page {current_page_num}: {e}")
delay = min(backoff * (2 ** (retries - 1)), max_backoff)
log_message(f"Retrying in {delay} seconds...")
time.sleep(delay)
continue
else:
# Update total videos found
total_videos_found += len(new_videos)
# Progress update with number of new videos
log_message(f"Found {len(page_videos)} videos on page {current_page_num}, {len(new_videos)} new. Total unique videos found so far: {len(video_ids)}.")
# Append the videos found on this page to the output file
with open(output_file, 'a') as f:
for vid in new_videos: # Write only new videos
f.write(f"video:{vid}\n")
# Reset retries and mark success
retries = 0
success = True
except Exception as e:
retries += 1
delay = min(backoff * (2 ** (retries - 1)), max_backoff)
log_message(f"Error processing page {current_page_num}: {e}")
log_message(f"Retrying in {delay} seconds (Retry {retries}/{max_retries})...")
time.sleep(delay)
if not success:
log_message(f"Failed to process page {current_page_num} after {max_retries} attempts. Skipping to the next page.")
# Update current page number from the page
current_page_number_element = page.query_selector('ul li.current a')
if current_page_number_element:
current_page_num_on_page = int(current_page_number_element.inner_text().strip())
log_message(f"Current page number on site: {current_page_num_on_page}")
else:
log_message("Could not find current page number after processing page. Exiting.")
break
# Check for navigation end conditions
try:
if reverse:
if current_page_num <= min_page:
log_message(f"Reached the minimum page number: {min_page}. Exiting.")
break
else:
# Try clicking the "Prev" button
prev_button_selector = 'a.pager_button.prev_arrow'
if page.is_visible(prev_button_selector):
current_page_num -= 1
with page.expect_navigation(wait_until='networkidle', timeout=60000):
page.click(prev_button_selector)
time.sleep(3)
else:
log_message("No more previous pages to navigate.")
break
else:
if current_page_num >= last_page_num:
log_message(f"Reached the last page number: {last_page_num}. Exiting.")
break
else:
# Try clicking the "Next" button
next_button_selector = 'a.pager_button.next_arrow'
if page.is_visible(next_button_selector):
current_page_num += 1
with page.expect_navigation(wait_until='networkidle', timeout=60000):
page.click(next_button_selector)
time.sleep(3)
else:
log_message("No more next pages to navigate.")
break
except Exception as e:
log_message(f"Error navigating to next/previous page from page {current_page_num}: {e}")
break # Exit the loop if unable to navigate further
log_message(f"\nFinished scraping with length='{length_filter}', language='{language}', subtitle='{subtitle}'. Total unique videos found: {len(video_ids)}.")
try:
# Handle 'each' options for language and subtitle
languages_to_process = [language]
subtitles_to_process = [subtitle]
if language == 'each':
languages_to_process = language_codes
if subtitle == 'each':
subtitles_to_process = language_codes
# Iterate over languages and subtitles
for lang in languages_to_process:
for sub in subtitles_to_process:
log_message(f"\nStarting scraping with language='{lang}', subtitle='{sub}'...")
scrape_with_filters(length_filter, lang, sub)
# Do not clear video_ids between iterations to accumulate total unique videos
except Exception as e:
log_message(f"An unexpected error occurred: {e}")
browser.close()
sys.exit(1)
browser.close()
log_message(f"\nExtraction complete. Total unique videos found: {len(video_ids)}.")
def main():
parser = argparse.ArgumentParser(description='Scrape video IDs from Veoh.')
parser.add_argument('-c', '--category', required=True, help='Video category (e.g., action_adventure)')
parser.add_argument('-o', '--output', required=True, help='Output file to save video IDs')
parser.add_argument('-min', type=int, help='Starting page number (default: 1)')
parser.add_argument('-max', type=int, help='Ending page number (default: until last page)')
parser.add_argument('-length', choices=['all', 'lt4', '4to20', '20to60', 'gt60', 'each'], default='all',
help='Length filter to apply (default: all)')
parser.add_argument('-r', '--reverse', action='store_true', help='Scrape pages in reverse order')
parser.add_argument('-language', help="Language filter (e.g., en, ja, or 'each')")
parser.add_argument('-subtitle', help="Subtitle filter (e.g., en, ja, or 'each')")
parser.add_argument('-sort', choices=['recent', 'length', 'popular', 'alpha'], default='recent',
help='Sort option (default: recent)')
parser.add_argument('-l', '--log', help='Log file to append progress printouts')
args = parser.parse_args()
# Set default min_page to 1 if not provided
min_page = args.min if args.min is not None else 1
# Ensure min_page is at least 1
if min_page < 1:
print("Error: -min must be at least 1.")
sys.exit(1)
# Check for invalid combinations
if args.length == 'each' and (args.min is not None or args.max is not None):
print("Error: -length each option cannot be used with -min or -max.")
sys.exit(1)
scrape_veoh_videos(
category=args.category,
output_file=args.output,
min_page=min_page,
max_page=args.max,
length_filter=args.length,
reverse=args.reverse,
language=args.language,
subtitle=args.subtitle,
sort=args.sort,
log_file=args.log
)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment