Last active
February 26, 2025 09:18
-
-
Save WomB0ComB0/108d225feae0d13c6e2be6aef6b9c845 to your computer and use it in GitHub Desktop.
Advanced web scraper template with content segmentation.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
# pylint: disable=W0611 | |
import json | |
import logging | |
import os | |
import random | |
import re | |
import subprocess | |
import sys | |
import time | |
from pathlib import Path | |
from typing import Dict, List, Optional, Any, TypedDict, Union | |
from urllib.parse import urlparse, urljoin, quote | |
import requests | |
from bs4 import BeautifulSoup | |
from requests.exceptions import RequestException, HTTPError | |
from selenium import webdriver | |
from selenium.webdriver.chrome.options import Options | |
from selenium.webdriver.chrome.service import Service | |
from selenium.webdriver.common.by import By | |
from selenium.webdriver.support.ui import WebDriverWait | |
from selenium.webdriver.support import expected_conditions as EC | |
from selenium.common.exceptions import TimeoutException, NoSuchElementException | |
from tenacity import ( | |
retry, | |
stop_after_attempt, | |
wait_exponential, | |
retry_if_exception_type, | |
) | |
from webdriver_manager.chrome import ChromeDriverManager | |
import fs | |
from fs import open_fs | |
from fs.errors import ResourceNotFound | |
# Set up logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", | |
handlers=[logging.FileHandler("scraper.log"), logging.StreamHandler()], | |
) | |
logger = logging.getLogger("WebScraper") | |
# Type definitions | |
class ContentLink(TypedDict): | |
"""Represents a link to content with title and URL.""" | |
title: str | |
url: str | |
class Section(TypedDict): | |
"""Represents a section of content with a title and list of links.""" | |
title: str | |
links: List[ContentLink] | |
class SiteStructure(TypedDict): | |
"""Represents the overall structure of a site with title, URL, and sections.""" | |
title: str | |
url: str | |
sections: List[Section] | |
class ContentData(TypedDict): | |
"""Represents scraped content data with title, content text, and source URL.""" | |
title: str | |
content: str | |
url: str | |
# Firewall detection patterns | |
FIREWALL_PATTERNS = { | |
"cloudflare": [ | |
"cloudflare", | |
"checking your browser", | |
"just a moment", | |
"cf-browser-verification", | |
"cf_clearance", | |
"cf-ray", | |
"cf-chl-", | |
], | |
"akamai": [ | |
"akamai", | |
"ak_bmsc", | |
"bm_sv", | |
"bot management", | |
"_abck", | |
], | |
"imperva": [ | |
"incapsula", | |
"imperva", | |
"visid_incap", | |
"_icl_current", | |
"incap_ses", | |
"nlbi_", | |
], | |
"distil": [ | |
"distil", | |
"distil_r_captcha", | |
"dstl", | |
], | |
"perimeterx": [ | |
"perimeterx", | |
"_pxhd", | |
"_px3", | |
"_px", | |
"px-captcha", | |
], | |
"datadome": [ | |
"datadome", | |
"datadome-challenge", | |
"__ddg", | |
"datadome.co", | |
], | |
"vercel": [ | |
"vercel", | |
"vercel-protection", | |
"vercel-edge", | |
"vercel-analytics", | |
], | |
"fastly": [ | |
"fastly", | |
"fastly-error", | |
"fastly-restarts", | |
], | |
"sucuri": [ | |
"sucuri", | |
"sucuri_cloudproxy", | |
], | |
"reblaze": [ | |
"reblaze", | |
"rbzid", | |
], | |
"aws_waf": [ | |
"aws-waf", | |
"awswaf", | |
"aws-waf-token", | |
], | |
"generic": [ | |
"captcha", | |
"bot detected", | |
"bot protection", | |
"security check", | |
"automated access", | |
"unusual traffic", | |
"suspicious activity", | |
"access denied", | |
"blocked", | |
"challenge", | |
"human verification", | |
], | |
} | |
def random_sleep(min_seconds: int = 1, max_seconds: int = 3) -> None: | |
"""Sleep for a random amount of time between min and max seconds. | |
Args: | |
min_seconds: Minimum sleep time in seconds | |
max_seconds: Maximum sleep time in seconds | |
""" | |
time.sleep(random.uniform(min_seconds, max_seconds)) | |
def wait_for_new_window( | |
driver: webdriver.Chrome, current_window: str, timeout: int = 10 | |
) -> bool: | |
"""Wait for a new window to appear and switch to it. | |
Args: | |
driver: Selenium WebDriver instance | |
current_window: Handle of the current window | |
timeout: Maximum time to wait in seconds | |
Returns: | |
True if a new window was found and switched to, False otherwise | |
""" | |
start_time = time.time() | |
while time.time() - start_time < timeout: | |
for window_handle in driver.window_handles: | |
if window_handle != current_window: | |
driver.switch_to.window(window_handle) | |
return True | |
time.sleep(0.5) | |
return False | |
def parse_cookies_file(file_path: str) -> Dict[str, str]: | |
"""Parse the cookies file and return a dictionary of cookies. | |
Args: | |
file_path: Path to the cookies file | |
Returns: | |
Dictionary of cookie name-value pairs | |
Raises: | |
Exception: If there's an error parsing the cookies file | |
""" | |
cookies: Dict[str, str] = {} | |
try: | |
with open(file_path, "r", encoding="utf-8") as f: | |
lines = f.readlines() | |
for line in lines: | |
if line.strip(): | |
parts = line.strip().split("\t") | |
if len(parts) >= 2: | |
name = parts[0] | |
value = parts[1] | |
cookies[name] = value | |
logger.info("Successfully parsed %d cookies from %s", len(cookies), file_path) | |
return cookies | |
except Exception as e: | |
logger.error("Error parsing cookies file: %s", str(e)) | |
raise | |
def detect_firewall(response_text: str, headers: Dict[str, str]) -> Optional[str]: | |
"""Detect which firewall or anti-bot system is being used. | |
Args: | |
response_text: HTML content of the response | |
headers: Response headers | |
Returns: | |
Name of the detected firewall or None if no firewall is detected | |
""" | |
# Convert headers to lowercase for easier matching | |
headers_lower = {k.lower(): v for k, v in headers.items()} | |
headers_str = str(headers_lower) | |
# Check response text and headers against patterns | |
for firewall, patterns in FIREWALL_PATTERNS.items(): | |
for pattern in patterns: | |
if ( | |
pattern.lower() in response_text.lower() | |
or pattern.lower() in headers_str | |
): | |
logger.info("Detected %s firewall/protection", firewall) | |
return firewall | |
return None | |
def setup_selenium_driver( | |
undetected: bool = False, | |
random_window_size: bool = True, | |
user_agent: Optional[str] = None, | |
) -> webdriver.Chrome: | |
"""Set up and return a Selenium Chrome driver. | |
Args: | |
undetected: Whether to use undetected-chromedriver | |
random_window_size: Whether to use a random window size | |
user_agent: Custom user agent to use | |
Returns: | |
Configured Chrome WebDriver instance | |
""" | |
if undetected: | |
try: | |
import undetected_chromedriver as uc | |
options = uc.ChromeOptions() | |
options.add_argument("--no-sandbox") | |
options.add_argument("--disable-dev-shm-usage") | |
if user_agent: | |
options.add_argument(f"--user-agent={user_agent}") | |
driver = uc.Chrome(options=options) | |
# Set random window size if requested | |
if random_window_size: | |
width = 1024 + random.randint(0, 200) | |
height = 768 + random.randint(0, 200) | |
driver.set_window_size(width, height) | |
return driver | |
except ImportError: | |
logger.warning( | |
"undetected_chromedriver not installed. Falling back to regular ChromeDriver" | |
) | |
# Regular ChromeDriver setup | |
options = Options() | |
options.add_argument("--no-sandbox") | |
options.add_argument("--disable-dev-shm-usage") | |
# Randomize window size to avoid fingerprinting | |
if random_window_size: | |
width = 1024 + random.randint(0, 200) | |
height = 768 + random.randint(0, 200) | |
options.add_argument(f"--window-size={width},{height}") | |
else: | |
options.add_argument("--window-size=1920,1080") | |
# Set custom user agent if provided | |
if user_agent: | |
options.add_argument(f"--user-agent={user_agent}") | |
else: | |
options.add_argument( | |
"--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" | |
) | |
# Disable automation flags to avoid detection | |
options.add_experimental_option("excludeSwitches", ["enable-automation"]) | |
options.add_experimental_option("useAutomationExtension", False) | |
service = Service(ChromeDriverManager().install()) | |
driver = webdriver.Chrome(service=service, options=options) | |
# Execute CDP commands to disable webdriver flags | |
driver.execute_cdp_cmd( | |
"Page.addScriptToEvaluateOnNewDocument", | |
{ | |
"source": """ | |
Object.defineProperty(navigator, 'webdriver', { | |
get: () => undefined | |
}); | |
""" | |
}, | |
) | |
return driver | |
def handle_2fa(driver: webdriver.Chrome, timeout: int = 120) -> bool: | |
"""Handle 2FA authentication by waiting for user input. | |
This function will detect common 2FA elements and wait for the user to complete the process. | |
Args: | |
driver: Selenium WebDriver instance | |
timeout: Maximum time to wait for 2FA completion in seconds | |
Returns: | |
True if 2FA was completed successfully or not needed, False otherwise | |
""" | |
logger.info("Checking for 2FA verification...") | |
# Common 2FA indicators | |
two_fa_indicators = [ | |
"//input[@type='text' and contains(@placeholder, 'code')]", | |
"//input[@type='text' and contains(@id, 'code')]", | |
"//input[@type='text' and contains(@name, 'code')]", | |
"//div[contains(text(), 'verification code')]", | |
"//div[contains(text(), 'two-factor')]", | |
"//div[contains(text(), '2FA')]", | |
"//div[contains(text(), 'authenticator')]", | |
] | |
for indicator in two_fa_indicators: | |
try: | |
element = driver.find_element(By.XPATH, indicator) | |
if element: | |
logger.info( | |
"2FA verification detected. Waiting for user to complete..." | |
) | |
print("\n2FA verification required!") | |
print( | |
f"Please complete the 2FA process in the browser within {timeout} seconds." | |
) | |
# Save screenshot to help user | |
screenshots_dir = Path("debug_screenshots") | |
screenshots_dir.mkdir(exist_ok=True) | |
driver.save_screenshot(str(screenshots_dir / "2fa_verification.png")) | |
print(f"Screenshot saved to {screenshots_dir}/2fa_verification.png") | |
# Wait for success indicators | |
success_indicators = [ | |
"//div[contains(@class, 'dashboard')]", | |
"//div[contains(@class, 'home')]", | |
"//a[contains(text(), 'logout')]", | |
"//a[contains(text(), 'sign out')]", | |
] | |
start_time = time.time() | |
while time.time() - start_time < timeout: | |
for success in success_indicators: | |
try: | |
if driver.find_element(By.XPATH, success): | |
logger.info("2FA verification completed successfully") | |
return True | |
except NoSuchElementException: | |
pass | |
time.sleep(2) | |
logger.warning("2FA verification timed out") | |
return False | |
except NoSuchElementException: | |
continue | |
logger.info("No 2FA verification detected") | |
return True | |
def handle_captcha(driver: webdriver.Chrome, timeout: int = 120) -> bool: | |
"""Handle CAPTCHA challenges by waiting for user input. | |
Args: | |
driver: Selenium WebDriver instance | |
timeout: Maximum time to wait for CAPTCHA completion in seconds | |
Returns: | |
True if CAPTCHA was completed successfully or not needed, False otherwise | |
""" | |
logger.info("Checking for CAPTCHA challenges...") | |
# Common CAPTCHA indicators | |
captcha_indicators = [ | |
"//iframe[contains(@src, 'recaptcha')]", | |
"//iframe[contains(@src, 'captcha')]", | |
"//div[contains(@class, 'g-recaptcha')]", | |
"//div[contains(@class, 'h-captcha')]", | |
"//div[contains(@class, 'captcha')]", | |
"//button[contains(text(), 'I am human')]", | |
"//button[contains(text(), 'Verify')]", | |
] | |
for indicator in captcha_indicators: | |
try: | |
element = driver.find_element(By.XPATH, indicator) | |
if element: | |
logger.info( | |
"CAPTCHA challenge detected. Waiting for user to complete..." | |
) | |
print("\nCAPTCHA verification required!") | |
print( | |
f"Please complete the CAPTCHA in the browser within {timeout} seconds." | |
) | |
# Save screenshot to help user | |
screenshots_dir = Path("debug_screenshots") | |
screenshots_dir.mkdir(exist_ok=True) | |
driver.save_screenshot(str(screenshots_dir / "captcha_challenge.png")) | |
print(f"Screenshot saved to {screenshots_dir}/captcha_challenge.png") | |
# Wait for success indicators (page changes after CAPTCHA) | |
start_time = time.time() | |
current_url = driver.current_url | |
while time.time() - start_time < timeout: | |
# Check if URL changed or if CAPTCHA element is gone | |
try: | |
driver.find_element(By.XPATH, indicator) | |
except NoSuchElementException: | |
logger.info( | |
"CAPTCHA element no longer found, verification likely completed" | |
) | |
return True | |
# Check if URL changed | |
if driver.current_url != current_url: | |
logger.info( | |
"URL changed after CAPTCHA, verification likely completed" | |
) | |
return True | |
time.sleep(2) | |
logger.warning("CAPTCHA verification timed out") | |
return False | |
except NoSuchElementException: | |
continue | |
logger.info("No CAPTCHA challenge detected") | |
return True | |
def login_to_site(driver: webdriver.Chrome, email: str, password: str) -> bool: | |
"""Login to a website using credentials. | |
This is a template function that needs to be customized for specific sites. | |
Includes handling for 2FA authentication. | |
Args: | |
driver: Selenium WebDriver instance | |
email: User email or username | |
password: User password | |
Returns: | |
True if login was successful, False otherwise | |
""" | |
try: | |
screenshots_dir = Path("debug_screenshots") | |
screenshots_dir.mkdir(exist_ok=True) | |
# Find and click login button | |
login_button = WebDriverWait(driver, 10).until( | |
EC.element_to_be_clickable((By.CLASS_NAME, "login-button-class")) | |
) | |
login_button.click() | |
random_sleep() | |
# Enter email | |
email_input = WebDriverWait(driver, 10).until( | |
EC.element_to_be_clickable((By.ID, "email-field-id")) | |
) | |
email_input.clear() | |
email_input.send_keys(email) | |
random_sleep() | |
# Click next or continue button if needed | |
next_button = WebDriverWait(driver, 10).until( | |
EC.element_to_be_clickable((By.CSS_SELECTOR, "#next-button-selector")) | |
) | |
next_button.click() | |
random_sleep(2, 4) | |
# Enter password | |
password_input = WebDriverWait(driver, 10).until( | |
EC.element_to_be_clickable((By.CSS_SELECTOR, "input[type='password']")) | |
) | |
password_input.clear() | |
password_input.send_keys(password) | |
random_sleep() | |
# Click login submit button | |
submit_button = WebDriverWait(driver, 10).until( | |
EC.element_to_be_clickable((By.CSS_SELECTOR, "#submit-button-selector")) | |
) | |
submit_button.click() | |
random_sleep(3, 5) | |
# Handle CAPTCHA if needed | |
if not handle_captcha(driver): | |
logger.warning("CAPTCHA verification failed or timed out") | |
return False | |
# Handle 2FA if needed | |
if not handle_2fa(driver): | |
logger.warning("2FA verification failed or timed out") | |
return False | |
# Check for success indicators | |
success_indicators = ["profile-element", "user-avatar", "logout-text"] | |
for indicator in success_indicators: | |
try: | |
if indicator.startswith("."): | |
WebDriverWait(driver, 5).until( | |
EC.presence_of_element_located((By.CSS_SELECTOR, indicator)) | |
) | |
elif "_" in indicator: | |
WebDriverWait(driver, 5).until( | |
EC.presence_of_element_located((By.CLASS_NAME, indicator)) | |
) | |
else: | |
WebDriverWait(driver, 5).until( | |
EC.presence_of_element_located( | |
(By.XPATH, f"//*[contains(text(), '{indicator}')]") | |
) | |
) | |
logger.info("Login successful - found indicator: %s", indicator) | |
return True | |
except (TimeoutException, NoSuchElementException): | |
continue | |
logger.warning("Could not confirm successful login") | |
return False | |
except (TimeoutException, NoSuchElementException) as e: | |
logger.error("Error during login process: %s", str(e)) | |
driver.save_screenshot(str(screenshots_dir / "login_error.png")) | |
return False | |
def extract_site_structure(html_content: str, url: str) -> SiteStructure: | |
"""Extract site structure from HTML content. | |
This is a template function that needs to be customized for specific sites. | |
Args: | |
html_content: HTML content of the page | |
url: URL of the page | |
Returns: | |
SiteStructure object containing the site's structure | |
""" | |
soup = BeautifulSoup(html_content, "html.parser") | |
# Find the title of the page | |
title_element = soup.find("h1") | |
title = title_element.text.strip() if title_element else "Unknown Title" | |
sections = [] | |
# Create a default section for main content | |
main_section = {"title": "Main Content", "links": []} | |
sections.append(main_section) | |
# Find all section headers (customize selectors for your target site) | |
section_headers = soup.select("h2.section-header") | |
current_section = main_section | |
# Process each section header | |
for header in section_headers: | |
section_title = header.text.strip() | |
new_section = {"title": section_title, "links": []} | |
sections.append(new_section) | |
current_section = new_section | |
# Find all content links in this section (customize selectors) | |
content_links = header.find_next("div").select("a.content-link") | |
for link in content_links: | |
link_title = link.text.strip() | |
link_url = urljoin(url, link.get("href", "")) | |
current_section["links"].append({"title": link_title, "url": link_url}) | |
return {"title": title, "url": url, "sections": sections} | |
def is_flaresolverr_running() -> bool: | |
"""Check if FlareSolverr is running. | |
Returns: | |
True if FlareSolverr is running, False otherwise | |
""" | |
try: | |
response = requests.get("http://localhost:8191/v1", timeout=5) | |
return response.status_code == 200 | |
except (RequestException, HTTPError): | |
return False | |
def start_flaresolverr() -> bool: | |
"""Start FlareSolverr if it's not already running. | |
Returns: | |
True if FlareSolverr is running or was started successfully, False otherwise | |
""" | |
if is_flaresolverr_running(): | |
logger.info("FlareSolverr is already running") | |
return True | |
logger.info("Starting FlareSolverr...") | |
try: | |
# Adjust the command based on your system and how FlareSolverr is installed | |
subprocess.Popen( | |
[ | |
"docker", | |
"run", | |
"-p", | |
"8191:8191", | |
"ghcr.io/flaresolverr/flaresolverr:latest", | |
], | |
stdout=subprocess.DEVNULL, | |
stderr=subprocess.DEVNULL, | |
) | |
# Wait for FlareSolverr to start | |
for _ in range(30): # Wait up to 30 seconds | |
if is_flaresolverr_running(): | |
logger.info("FlareSolverr started successfully") | |
return True | |
time.sleep(1) | |
logger.error("FlareSolverr failed to start within the timeout period") | |
return False | |
except (RequestException, HTTPError) as e: | |
logger.error("Error starting FlareSolverr: %s", str(e)) | |
return False | |
def get_with_flaresolverr(url: str, cookies: Dict[str, str]) -> Optional[str]: | |
"""Get a URL using FlareSolverr to bypass Cloudflare protection. | |
Args: | |
url: URL to access | |
cookies: Dictionary of cookies to use | |
Returns: | |
HTML content if successful, None otherwise | |
""" | |
if not is_flaresolverr_running(): | |
if not start_flaresolverr(): | |
logger.error("Failed to start FlareSolverr, cannot proceed") | |
return None | |
logger.info("Using FlareSolverr to access %s", url) | |
# Prepare cookies for FlareSolverr | |
cookie_string = "; ".join([f"{name}={value}" for name, value in cookies.items()]) | |
payload = { | |
"cmd": "request.get", | |
"url": url, | |
"maxTimeout": 60000, | |
"cookies": cookie_string, | |
"headers": { | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", | |
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", | |
"Accept-Language": "en-US,en;q=0.5", | |
"Referer": "https://www.example.com/", | |
}, | |
} | |
try: | |
response = requests.post("http://localhost:8191/v1", json=payload, timeout=70) | |
response.raise_for_status() | |
result = response.json() | |
if result.get("status") == "ok": | |
return result.get("solution", {}).get("response", "") | |
else: | |
logger.error( | |
"FlareSolverr error: %s", result.get("message", "Unknown error") | |
) | |
return None | |
except (RequestException, HTTPError) as e: | |
logger.error("Error using FlareSolverr: %s", str(e)) | |
return None | |
def get_with_selenium(url: str, cookies: Dict[str, str]) -> Optional[str]: | |
"""Get a URL using Selenium to bypass protection. | |
Args: | |
url: URL to access | |
cookies: Dictionary of cookies to use | |
Returns: | |
HTML content if successful, None otherwise | |
""" | |
logger.info("Using Selenium to access %s", url) | |
try: | |
# Try with undetected-chromedriver first | |
driver = setup_selenium_driver(undetected=True, random_window_size=True) | |
# Navigate to the domain first to set cookies | |
parsed_url = urlparse(url) | |
domain_url = f"{parsed_url.scheme}://{parsed_url.netloc}" | |
driver.get(domain_url) | |
# Set cookies | |
for name, value in cookies.items(): | |
driver.add_cookie( | |
{"name": name, "value": value, "domain": parsed_url.netloc} | |
) | |
# Now navigate to the actual URL | |
driver.get(url) | |
# Handle CAPTCHA if needed | |
if not handle_captcha(driver): | |
logger.warning("CAPTCHA verification failed or timed out") | |
driver.quit() | |
return None | |
# Wait for page to load | |
random_sleep(3, 5) | |
# Get the page source | |
html_content = driver.page_source | |
driver.quit() | |
return html_content | |
except (TimeoutException, NoSuchElementException) as e: | |
logger.error("Error using Selenium to access %s: %s", url, str(e)) | |
return None | |
def get_with_curl_impersonate(url: str, cookies: Dict[str, str]) -> Optional[str]: | |
"""Get a URL using curl_cffi with impersonation to bypass protection. | |
Args: | |
url: URL to access | |
cookies: Dictionary of cookies to use | |
Returns: | |
HTML content if successful, None otherwise | |
""" | |
try: | |
from curl_cffi import requests as curl_requests | |
logger.info("Using curl_cffi with impersonation to access %s", url) | |
# Prepare cookies | |
cookie_string = "; ".join( | |
[f"{name}={value}" for name, value in cookies.items()] | |
) | |
# Use curl_cffi with Chrome impersonation | |
response = curl_requests.get( | |
url, | |
headers={ | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", | |
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", | |
"Accept-Language": "en-US,en;q=0.5", | |
"Referer": "https://www.example.com/", | |
"Cookie": cookie_string, | |
}, | |
impersonate="chrome110", | |
timeout=30, | |
) | |
return response.text | |
except ImportError: | |
logger.warning("curl_cffi not installed. Cannot use curl impersonation.") | |
return None | |
except (TimeoutException, NoSuchElementException) as e: | |
logger.error("Error using curl_cffi to access %s: %s", url, str(e)) | |
return None | |
@retry( | |
retry=retry_if_exception_type((RequestException, HTTPError, TimeoutException, NoSuchElementException)), | |
stop=stop_after_attempt(3), | |
wait=wait_exponential(multiplier=1, min=2, max=10), | |
) | |
def get_site_structure(url: str, cookies: Dict[str, str]) -> SiteStructure: | |
"""Get the site structure with all sections and links. | |
Args: | |
url: URL of the site | |
cookies: Dictionary of cookies to use | |
Returns: | |
SiteStructure object containing the site's structure | |
Raises: | |
RequestException, HTTPError: If there's an error fetching the site structure | |
""" | |
logger.info("Fetching site structure from %s", url) | |
headers = { | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", | |
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", | |
"Accept-Language": "en-US,en;q=0.5", | |
"Referer": "https://www.example.com/", | |
"Connection": "keep-alive", | |
"Upgrade-Insecure-Requests": "1", | |
} | |
# Clean headers to ensure they're ASCII-compatible | |
clean_headers = {} | |
for key, value in headers.items(): | |
clean_value = value.encode("ascii", "ignore").decode("ascii") | |
clean_headers[key] = clean_value | |
# Encode URL to handle special characters | |
parsed_url = urlparse(url) | |
encoded_path = quote(parsed_url.path) | |
encoded_url = parsed_url._replace(path=encoded_path).geturl() | |
# Clean cookies to ensure they're ASCII-compatible | |
clean_cookies = {} | |
for name, value in cookies.items(): | |
if isinstance(value, str): | |
clean_value = value.encode("ascii", "ignore").decode("ascii") | |
clean_cookies[name] = clean_value | |
else: | |
clean_cookies[name] = value | |
try: | |
# First try with regular requests | |
session = requests.Session() | |
response = session.get( | |
encoded_url, cookies=clean_cookies, headers=clean_headers, timeout=30 | |
) | |
response.raise_for_status() | |
html_content = response.text | |
# Detect firewall | |
firewall = detect_firewall(html_content, response.headers) | |
if firewall: | |
logger.info("Detected %s protection, trying bypass methods", firewall) | |
# Try different bypass methods based on the detected firewall | |
if firewall == "cloudflare": | |
# Try FlareSolverr first for Cloudflare | |
flare_content = get_with_flaresolverr(encoded_url, clean_cookies) | |
if flare_content: | |
html_content = flare_content | |
else: | |
# Fall back to curl impersonation | |
curl_content = get_with_curl_impersonate(encoded_url, clean_cookies) | |
if curl_content: | |
html_content = curl_content | |
else: | |
# Last resort: Selenium | |
selenium_content = get_with_selenium(encoded_url, clean_cookies) | |
if selenium_content: | |
html_content = selenium_content | |
elif firewall in ["akamai", "imperva", "distil", "perimeterx", "datadome"]: | |
# These firewalls often need browser fingerprinting evasion | |
# Try curl impersonation first | |
curl_content = get_with_curl_impersonate(encoded_url, clean_cookies) | |
if curl_content: | |
html_content = curl_content | |
else: | |
# Fall back to Selenium | |
selenium_content = get_with_selenium(encoded_url, clean_cookies) | |
if selenium_content: | |
html_content = selenium_content | |
else: | |
# For other firewalls, try Selenium directly | |
selenium_content = get_with_selenium(encoded_url, clean_cookies) | |
if selenium_content: | |
html_content = selenium_content | |
# Save the HTML for debugging | |
with open("debug_site_structure.html", "w", encoding="utf-8") as f: | |
f.write(html_content) | |
return extract_site_structure(html_content, url) | |
except Exception as e: | |
logger.error("Error fetching site structure from %s: %s", url, e, exc_info=True) | |
raise | |
def extract_content_from_html(html_content: str) -> str: | |
"""Extract content from HTML. | |
This is a template function that needs to be customized for specific sites. | |
Args: | |
html_content: HTML content of the page | |
Returns: | |
Extracted text content | |
""" | |
soup = BeautifulSoup(html_content, "html.parser") | |
# Find the main content container (customize selector for your target site) | |
content_elements = soup.select("div.main-content") | |
content = "" | |
for element in content_elements: | |
content += element.get_text(separator="\n\n") + "\n\n" | |
return content | |
@retry( | |
retry=retry_if_exception_type((RequestException, HTTPError)), | |
stop=stop_after_attempt(3), | |
wait=wait_exponential(multiplier=1, min=2, max=10), | |
) | |
def scrape_content(url: str, cookies: Dict[str, str]) -> Optional[ContentData]: | |
"""Scrape the content from a specific URL. | |
Args: | |
url: URL to scrape | |
cookies: Dictionary of cookies to use | |
Returns: | |
ContentData object if successful, None otherwise | |
Raises: | |
RequestException, HTTPError: If there's an error scraping the content | |
""" | |
logger.info("Scraping content from %s", url) | |
headers = { | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", | |
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", | |
"Accept-Language": "en-US,en;q=0.5", | |
"Referer": "https://www.example.com/", | |
"Connection": "keep-alive", | |
"Upgrade-Insecure-Requests": "1", | |
} | |
# Clean headers to ensure they're ASCII-compatible | |
clean_headers = {} | |
for key, value in headers.items(): | |
clean_value = value.encode("ascii", "ignore").decode("ascii") | |
clean_headers[key] = clean_value | |
# Encode URL to handle special characters | |
parsed_url = urlparse(url) | |
encoded_path = quote(parsed_url.path) | |
encoded_url = parsed_url._replace(path=encoded_path).geturl() | |
# Clean cookies to ensure they're ASCII-compatible | |
clean_cookies = {} | |
for name, value in cookies.items(): | |
if isinstance(value, str): | |
clean_value = value.encode("ascii", "ignore").decode("ascii") | |
clean_cookies[name] = clean_value | |
else: | |
clean_cookies[name] = value | |
try: | |
# First try with regular requests | |
session = requests.Session() | |
response = session.get( | |
encoded_url, cookies=clean_cookies, headers=clean_headers, timeout=30 | |
) | |
response.raise_for_status() | |
html_content = response.text | |
# Check if we got a Cloudflare challenge | |
if "Just a moment" in html_content or "Checking your browser" in html_content: | |
logger.info("Cloudflare protection detected, using FlareSolverr") | |
flare_content = get_with_flaresolverr(encoded_url, clean_cookies) | |
if flare_content: | |
html_content = flare_content | |
else: | |
logger.warning("FlareSolverr failed, proceeding with original response") | |
soup = BeautifulSoup(html_content, "html.parser") | |
# Find the title (customize selector for your target site) | |
title_element = soup.select_one("h1.title") | |
title = title_element.text.strip() if title_element else "Untitled Content" | |
# Extract the content | |
content = extract_content_from_html(html_content) | |
if not content: | |
logger.warning("No content found at %s", url) | |
return None | |
return {"title": title, "content": content, "url": url} | |
except (RequestException, HTTPError) as e: | |
logger.error("Error scraping content from %s: %s", url, e, exc_info=True) | |
return None | |
def process_content(content_data: ContentData) -> str: | |
"""Process the content data. | |
This is a template function that can be customized to transform content | |
(e.g., format it, enhance it with AI, etc.) | |
Args: | |
content_data: ContentData object to process | |
Returns: | |
Processed content as a string | |
""" | |
# Simple example: just format with markdown | |
processed_content = f"# {content_data['title']}\n\n{content_data['content']}" | |
return processed_content | |
def save_site_content( | |
site_structure: SiteStructure, content_data: Dict[str, ContentData] | |
) -> None: | |
"""Save site content using PyFilesystem2. | |
Args: | |
site_structure: SiteStructure object containing the site's structure | |
content_data: Dictionary mapping URLs to ContentData objects | |
""" | |
parsed_url = urlparse(site_structure["url"]) | |
path_parts = parsed_url.path.strip("/").split("/") | |
site_name = path_parts[-1] if path_parts else "site" | |
site_name = sanitize_filename(site_name) | |
site_fs = open_fs(f"osfs://{site_name}", create=True) | |
# Save the site structure as JSON | |
with site_fs.open("site_structure.json", "w", encoding="utf-8") as f: | |
json.dump(site_structure, f, indent=4) | |
# Create a README with links to all content | |
with site_fs.open("README.md", "w", encoding="utf-8") as f: | |
f.write(f"# {site_structure['title']}\n\n") | |
for section in site_structure["sections"]: | |
f.write(f"## {section['title']}\n\n") | |
for link in section["links"]: | |
content_filename = sanitize_filename(link["title"]) | |
section_dir_name = sanitize_filename(section["title"]) | |
f.write( | |
f"- [{link['title']}]({section_dir_name}/{content_filename}.md) ([Original]({link['url']}))\n" | |
) | |
f.write("\n") | |
# Save each section's content | |
for section in site_structure["sections"]: | |
section_dir = sanitize_filename(section["title"]) | |
site_fs.makedirs(section_dir, recreate=True) | |
for link in section["links"]: | |
content_url = link["url"] | |
if content_url in content_data: | |
data = content_data[content_url] | |
content_filename = sanitize_filename(link["title"]) | |
# Save original content | |
with site_fs.open( | |
f"{section_dir}/{content_filename}_original.md", | |
"w", | |
encoding="utf-8", | |
) as f: | |
f.write(f"# {data['title']}\n\n") | |
f.write(data["content"]) | |
# Save processed content | |
with site_fs.open( | |
f"{section_dir}/{content_filename}.md", "w", encoding="utf-8" | |
) as f: | |
f.write(data["processed_content"]) | |
def sanitize_filename(filename: str) -> str: | |
"""Remove invalid characters from a filename. | |
Args: | |
filename: Original filename | |
Returns: | |
Sanitized filename with invalid characters replaced by underscores | |
""" | |
# Replace invalid characters with underscores | |
invalid_chars = r'[<>:"/\\|?*]' | |
return re.sub(invalid_chars, "_", filename) | |
def main() -> None: | |
"""Main function to run the web scraper. | |
This function handles the entire scraping process: | |
1. Get the target URL from command line or user input | |
2. Load cookies if available | |
3. Check and start FlareSolverr if needed | |
4. Get the site structure | |
5. Scrape content for each link | |
6. Process and save the content | |
""" | |
try: | |
if len(sys.argv) > 1: | |
target_url = sys.argv[1] | |
else: | |
target_url = input("Enter the target URL: ") | |
# Ensure URL has proper format | |
if not target_url.startswith("http"): | |
target_url = f"https://{target_url}" | |
# Load cookies if available | |
cookies = {} | |
cookies_file = "cookies.txt" | |
if os.path.exists(cookies_file): | |
cookies = parse_cookies_file(cookies_file) | |
logger.info("Cookies loaded from %s", cookies_file) | |
else: | |
logger.warning("No cookies file found at %s", cookies_file) | |
print("\nNo cookies file found. You may need to create one.") | |
print("Format should be: name<tab>value for each cookie on a new line.") | |
# Check if FlareSolverr is available | |
if not is_flaresolverr_running(): | |
print("FlareSolverr is not running. Attempting to start it...") | |
if not start_flaresolverr(): | |
print( | |
"Warning: Could not start FlareSolverr. Some sites may not be accessible." | |
) | |
# Get site structure | |
site_structure = get_site_structure(target_url, cookies) | |
if site_structure["sections"]: | |
content_data = {} | |
total_links = sum( | |
len(section["links"]) for section in site_structure["sections"] | |
) | |
processed_links = 0 | |
for section in site_structure["sections"]: | |
section_title = section["title"] | |
logger.info("Processing section: %s", section_title) | |
print( | |
f"\n[{processed_links}/{total_links}] Processing section: {section_title}" | |
) | |
for link in section["links"]: | |
link_title = link["title"] | |
link_url = link["url"] | |
processed_links += 1 | |
progress = int(processed_links / total_links * 50) | |
print( | |
f"\r[{'#' * progress}{' ' * (50-progress)}] {processed_links}/{total_links} - {link_title}", | |
end="", | |
) | |
data = scrape_content(link_url, cookies) | |
if data: | |
processed_content = process_content(data) | |
data["processed_content"] = processed_content | |
content_data[link_url] = data | |
logger.info("Processed content for: %s", link_title) | |
time.sleep(1) # Avoid rate limiting | |
else: | |
logger.warning("Failed to scrape content: %s", link_title) | |
print(f"\nFailed to scrape content: {link_title}") | |
print() # New line after section | |
save_site_content(site_structure, content_data) | |
logger.info("Content has been successfully scraped and processed!") | |
print("\nContent has been successfully scraped and processed!") | |
print( | |
f"Check the '{sanitize_filename(site_structure['title'])}' directory for the results." | |
) | |
else: | |
logger.error("No sections found in the site structure") | |
print( | |
"No sections found in the site structure. Check the logs for details." | |
) | |
except (RequestException, HTTPError) as e: | |
logger.error("An error occurred in the main function: %s", e, exc_info=True) | |
print(f"\nError: {e}") | |
print("Check the log file for more details.") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
requests>=2.25.1
beautifulsoup4>=4.9.3
selenium>=4.0.0
webdriver-manager>=3.5.2
tenacity>=8.0.1
fs>=2.4.16
undetected-chromedriver>=3.1.5
curl-cffi>=0.5.5
lxml>=4.6.3
urllib3>=1.26.5
python-dotenv>=0.19.0