Skip to content

Instantly share code, notes, and snippets.

@WomB0ComB0
Last active February 26, 2025 09:18
Show Gist options
  • Save WomB0ComB0/108d225feae0d13c6e2be6aef6b9c845 to your computer and use it in GitHub Desktop.
Save WomB0ComB0/108d225feae0d13c6e2be6aef6b9c845 to your computer and use it in GitHub Desktop.
Advanced web scraper template with content segmentation.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# pylint: disable=W0611
import json
import logging
import os
import random
import re
import subprocess
import sys
import time
from pathlib import Path
from typing import Dict, List, Optional, Any, TypedDict, Union
from urllib.parse import urlparse, urljoin, quote
import requests
from bs4 import BeautifulSoup
from requests.exceptions import RequestException, HTTPError
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
)
from webdriver_manager.chrome import ChromeDriverManager
import fs
from fs import open_fs
from fs.errors import ResourceNotFound
# Set up logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[logging.FileHandler("scraper.log"), logging.StreamHandler()],
)
logger = logging.getLogger("WebScraper")
# Type definitions
class ContentLink(TypedDict):
"""Represents a link to content with title and URL."""
title: str
url: str
class Section(TypedDict):
"""Represents a section of content with a title and list of links."""
title: str
links: List[ContentLink]
class SiteStructure(TypedDict):
"""Represents the overall structure of a site with title, URL, and sections."""
title: str
url: str
sections: List[Section]
class ContentData(TypedDict):
"""Represents scraped content data with title, content text, and source URL."""
title: str
content: str
url: str
# Firewall detection patterns
FIREWALL_PATTERNS = {
"cloudflare": [
"cloudflare",
"checking your browser",
"just a moment",
"cf-browser-verification",
"cf_clearance",
"cf-ray",
"cf-chl-",
],
"akamai": [
"akamai",
"ak_bmsc",
"bm_sv",
"bot management",
"_abck",
],
"imperva": [
"incapsula",
"imperva",
"visid_incap",
"_icl_current",
"incap_ses",
"nlbi_",
],
"distil": [
"distil",
"distil_r_captcha",
"dstl",
],
"perimeterx": [
"perimeterx",
"_pxhd",
"_px3",
"_px",
"px-captcha",
],
"datadome": [
"datadome",
"datadome-challenge",
"__ddg",
"datadome.co",
],
"vercel": [
"vercel",
"vercel-protection",
"vercel-edge",
"vercel-analytics",
],
"fastly": [
"fastly",
"fastly-error",
"fastly-restarts",
],
"sucuri": [
"sucuri",
"sucuri_cloudproxy",
],
"reblaze": [
"reblaze",
"rbzid",
],
"aws_waf": [
"aws-waf",
"awswaf",
"aws-waf-token",
],
"generic": [
"captcha",
"bot detected",
"bot protection",
"security check",
"automated access",
"unusual traffic",
"suspicious activity",
"access denied",
"blocked",
"challenge",
"human verification",
],
}
def random_sleep(min_seconds: int = 1, max_seconds: int = 3) -> None:
"""Sleep for a random amount of time between min and max seconds.
Args:
min_seconds: Minimum sleep time in seconds
max_seconds: Maximum sleep time in seconds
"""
time.sleep(random.uniform(min_seconds, max_seconds))
def wait_for_new_window(
driver: webdriver.Chrome, current_window: str, timeout: int = 10
) -> bool:
"""Wait for a new window to appear and switch to it.
Args:
driver: Selenium WebDriver instance
current_window: Handle of the current window
timeout: Maximum time to wait in seconds
Returns:
True if a new window was found and switched to, False otherwise
"""
start_time = time.time()
while time.time() - start_time < timeout:
for window_handle in driver.window_handles:
if window_handle != current_window:
driver.switch_to.window(window_handle)
return True
time.sleep(0.5)
return False
def parse_cookies_file(file_path: str) -> Dict[str, str]:
"""Parse the cookies file and return a dictionary of cookies.
Args:
file_path: Path to the cookies file
Returns:
Dictionary of cookie name-value pairs
Raises:
Exception: If there's an error parsing the cookies file
"""
cookies: Dict[str, str] = {}
try:
with open(file_path, "r", encoding="utf-8") as f:
lines = f.readlines()
for line in lines:
if line.strip():
parts = line.strip().split("\t")
if len(parts) >= 2:
name = parts[0]
value = parts[1]
cookies[name] = value
logger.info("Successfully parsed %d cookies from %s", len(cookies), file_path)
return cookies
except Exception as e:
logger.error("Error parsing cookies file: %s", str(e))
raise
def detect_firewall(response_text: str, headers: Dict[str, str]) -> Optional[str]:
"""Detect which firewall or anti-bot system is being used.
Args:
response_text: HTML content of the response
headers: Response headers
Returns:
Name of the detected firewall or None if no firewall is detected
"""
# Convert headers to lowercase for easier matching
headers_lower = {k.lower(): v for k, v in headers.items()}
headers_str = str(headers_lower)
# Check response text and headers against patterns
for firewall, patterns in FIREWALL_PATTERNS.items():
for pattern in patterns:
if (
pattern.lower() in response_text.lower()
or pattern.lower() in headers_str
):
logger.info("Detected %s firewall/protection", firewall)
return firewall
return None
def setup_selenium_driver(
undetected: bool = False,
random_window_size: bool = True,
user_agent: Optional[str] = None,
) -> webdriver.Chrome:
"""Set up and return a Selenium Chrome driver.
Args:
undetected: Whether to use undetected-chromedriver
random_window_size: Whether to use a random window size
user_agent: Custom user agent to use
Returns:
Configured Chrome WebDriver instance
"""
if undetected:
try:
import undetected_chromedriver as uc
options = uc.ChromeOptions()
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
if user_agent:
options.add_argument(f"--user-agent={user_agent}")
driver = uc.Chrome(options=options)
# Set random window size if requested
if random_window_size:
width = 1024 + random.randint(0, 200)
height = 768 + random.randint(0, 200)
driver.set_window_size(width, height)
return driver
except ImportError:
logger.warning(
"undetected_chromedriver not installed. Falling back to regular ChromeDriver"
)
# Regular ChromeDriver setup
options = Options()
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
# Randomize window size to avoid fingerprinting
if random_window_size:
width = 1024 + random.randint(0, 200)
height = 768 + random.randint(0, 200)
options.add_argument(f"--window-size={width},{height}")
else:
options.add_argument("--window-size=1920,1080")
# Set custom user agent if provided
if user_agent:
options.add_argument(f"--user-agent={user_agent}")
else:
options.add_argument(
"--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
)
# Disable automation flags to avoid detection
options.add_experimental_option("excludeSwitches", ["enable-automation"])
options.add_experimental_option("useAutomationExtension", False)
service = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=service, options=options)
# Execute CDP commands to disable webdriver flags
driver.execute_cdp_cmd(
"Page.addScriptToEvaluateOnNewDocument",
{
"source": """
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
});
"""
},
)
return driver
def handle_2fa(driver: webdriver.Chrome, timeout: int = 120) -> bool:
"""Handle 2FA authentication by waiting for user input.
This function will detect common 2FA elements and wait for the user to complete the process.
Args:
driver: Selenium WebDriver instance
timeout: Maximum time to wait for 2FA completion in seconds
Returns:
True if 2FA was completed successfully or not needed, False otherwise
"""
logger.info("Checking for 2FA verification...")
# Common 2FA indicators
two_fa_indicators = [
"//input[@type='text' and contains(@placeholder, 'code')]",
"//input[@type='text' and contains(@id, 'code')]",
"//input[@type='text' and contains(@name, 'code')]",
"//div[contains(text(), 'verification code')]",
"//div[contains(text(), 'two-factor')]",
"//div[contains(text(), '2FA')]",
"//div[contains(text(), 'authenticator')]",
]
for indicator in two_fa_indicators:
try:
element = driver.find_element(By.XPATH, indicator)
if element:
logger.info(
"2FA verification detected. Waiting for user to complete..."
)
print("\n2FA verification required!")
print(
f"Please complete the 2FA process in the browser within {timeout} seconds."
)
# Save screenshot to help user
screenshots_dir = Path("debug_screenshots")
screenshots_dir.mkdir(exist_ok=True)
driver.save_screenshot(str(screenshots_dir / "2fa_verification.png"))
print(f"Screenshot saved to {screenshots_dir}/2fa_verification.png")
# Wait for success indicators
success_indicators = [
"//div[contains(@class, 'dashboard')]",
"//div[contains(@class, 'home')]",
"//a[contains(text(), 'logout')]",
"//a[contains(text(), 'sign out')]",
]
start_time = time.time()
while time.time() - start_time < timeout:
for success in success_indicators:
try:
if driver.find_element(By.XPATH, success):
logger.info("2FA verification completed successfully")
return True
except NoSuchElementException:
pass
time.sleep(2)
logger.warning("2FA verification timed out")
return False
except NoSuchElementException:
continue
logger.info("No 2FA verification detected")
return True
def handle_captcha(driver: webdriver.Chrome, timeout: int = 120) -> bool:
"""Handle CAPTCHA challenges by waiting for user input.
Args:
driver: Selenium WebDriver instance
timeout: Maximum time to wait for CAPTCHA completion in seconds
Returns:
True if CAPTCHA was completed successfully or not needed, False otherwise
"""
logger.info("Checking for CAPTCHA challenges...")
# Common CAPTCHA indicators
captcha_indicators = [
"//iframe[contains(@src, 'recaptcha')]",
"//iframe[contains(@src, 'captcha')]",
"//div[contains(@class, 'g-recaptcha')]",
"//div[contains(@class, 'h-captcha')]",
"//div[contains(@class, 'captcha')]",
"//button[contains(text(), 'I am human')]",
"//button[contains(text(), 'Verify')]",
]
for indicator in captcha_indicators:
try:
element = driver.find_element(By.XPATH, indicator)
if element:
logger.info(
"CAPTCHA challenge detected. Waiting for user to complete..."
)
print("\nCAPTCHA verification required!")
print(
f"Please complete the CAPTCHA in the browser within {timeout} seconds."
)
# Save screenshot to help user
screenshots_dir = Path("debug_screenshots")
screenshots_dir.mkdir(exist_ok=True)
driver.save_screenshot(str(screenshots_dir / "captcha_challenge.png"))
print(f"Screenshot saved to {screenshots_dir}/captcha_challenge.png")
# Wait for success indicators (page changes after CAPTCHA)
start_time = time.time()
current_url = driver.current_url
while time.time() - start_time < timeout:
# Check if URL changed or if CAPTCHA element is gone
try:
driver.find_element(By.XPATH, indicator)
except NoSuchElementException:
logger.info(
"CAPTCHA element no longer found, verification likely completed"
)
return True
# Check if URL changed
if driver.current_url != current_url:
logger.info(
"URL changed after CAPTCHA, verification likely completed"
)
return True
time.sleep(2)
logger.warning("CAPTCHA verification timed out")
return False
except NoSuchElementException:
continue
logger.info("No CAPTCHA challenge detected")
return True
def login_to_site(driver: webdriver.Chrome, email: str, password: str) -> bool:
"""Login to a website using credentials.
This is a template function that needs to be customized for specific sites.
Includes handling for 2FA authentication.
Args:
driver: Selenium WebDriver instance
email: User email or username
password: User password
Returns:
True if login was successful, False otherwise
"""
try:
screenshots_dir = Path("debug_screenshots")
screenshots_dir.mkdir(exist_ok=True)
# Find and click login button
login_button = WebDriverWait(driver, 10).until(
EC.element_to_be_clickable((By.CLASS_NAME, "login-button-class"))
)
login_button.click()
random_sleep()
# Enter email
email_input = WebDriverWait(driver, 10).until(
EC.element_to_be_clickable((By.ID, "email-field-id"))
)
email_input.clear()
email_input.send_keys(email)
random_sleep()
# Click next or continue button if needed
next_button = WebDriverWait(driver, 10).until(
EC.element_to_be_clickable((By.CSS_SELECTOR, "#next-button-selector"))
)
next_button.click()
random_sleep(2, 4)
# Enter password
password_input = WebDriverWait(driver, 10).until(
EC.element_to_be_clickable((By.CSS_SELECTOR, "input[type='password']"))
)
password_input.clear()
password_input.send_keys(password)
random_sleep()
# Click login submit button
submit_button = WebDriverWait(driver, 10).until(
EC.element_to_be_clickable((By.CSS_SELECTOR, "#submit-button-selector"))
)
submit_button.click()
random_sleep(3, 5)
# Handle CAPTCHA if needed
if not handle_captcha(driver):
logger.warning("CAPTCHA verification failed or timed out")
return False
# Handle 2FA if needed
if not handle_2fa(driver):
logger.warning("2FA verification failed or timed out")
return False
# Check for success indicators
success_indicators = ["profile-element", "user-avatar", "logout-text"]
for indicator in success_indicators:
try:
if indicator.startswith("."):
WebDriverWait(driver, 5).until(
EC.presence_of_element_located((By.CSS_SELECTOR, indicator))
)
elif "_" in indicator:
WebDriverWait(driver, 5).until(
EC.presence_of_element_located((By.CLASS_NAME, indicator))
)
else:
WebDriverWait(driver, 5).until(
EC.presence_of_element_located(
(By.XPATH, f"//*[contains(text(), '{indicator}')]")
)
)
logger.info("Login successful - found indicator: %s", indicator)
return True
except (TimeoutException, NoSuchElementException):
continue
logger.warning("Could not confirm successful login")
return False
except (TimeoutException, NoSuchElementException) as e:
logger.error("Error during login process: %s", str(e))
driver.save_screenshot(str(screenshots_dir / "login_error.png"))
return False
def extract_site_structure(html_content: str, url: str) -> SiteStructure:
"""Extract site structure from HTML content.
This is a template function that needs to be customized for specific sites.
Args:
html_content: HTML content of the page
url: URL of the page
Returns:
SiteStructure object containing the site's structure
"""
soup = BeautifulSoup(html_content, "html.parser")
# Find the title of the page
title_element = soup.find("h1")
title = title_element.text.strip() if title_element else "Unknown Title"
sections = []
# Create a default section for main content
main_section = {"title": "Main Content", "links": []}
sections.append(main_section)
# Find all section headers (customize selectors for your target site)
section_headers = soup.select("h2.section-header")
current_section = main_section
# Process each section header
for header in section_headers:
section_title = header.text.strip()
new_section = {"title": section_title, "links": []}
sections.append(new_section)
current_section = new_section
# Find all content links in this section (customize selectors)
content_links = header.find_next("div").select("a.content-link")
for link in content_links:
link_title = link.text.strip()
link_url = urljoin(url, link.get("href", ""))
current_section["links"].append({"title": link_title, "url": link_url})
return {"title": title, "url": url, "sections": sections}
def is_flaresolverr_running() -> bool:
"""Check if FlareSolverr is running.
Returns:
True if FlareSolverr is running, False otherwise
"""
try:
response = requests.get("http://localhost:8191/v1", timeout=5)
return response.status_code == 200
except (RequestException, HTTPError):
return False
def start_flaresolverr() -> bool:
"""Start FlareSolverr if it's not already running.
Returns:
True if FlareSolverr is running or was started successfully, False otherwise
"""
if is_flaresolverr_running():
logger.info("FlareSolverr is already running")
return True
logger.info("Starting FlareSolverr...")
try:
# Adjust the command based on your system and how FlareSolverr is installed
subprocess.Popen(
[
"docker",
"run",
"-p",
"8191:8191",
"ghcr.io/flaresolverr/flaresolverr:latest",
],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
# Wait for FlareSolverr to start
for _ in range(30): # Wait up to 30 seconds
if is_flaresolverr_running():
logger.info("FlareSolverr started successfully")
return True
time.sleep(1)
logger.error("FlareSolverr failed to start within the timeout period")
return False
except (RequestException, HTTPError) as e:
logger.error("Error starting FlareSolverr: %s", str(e))
return False
def get_with_flaresolverr(url: str, cookies: Dict[str, str]) -> Optional[str]:
"""Get a URL using FlareSolverr to bypass Cloudflare protection.
Args:
url: URL to access
cookies: Dictionary of cookies to use
Returns:
HTML content if successful, None otherwise
"""
if not is_flaresolverr_running():
if not start_flaresolverr():
logger.error("Failed to start FlareSolverr, cannot proceed")
return None
logger.info("Using FlareSolverr to access %s", url)
# Prepare cookies for FlareSolverr
cookie_string = "; ".join([f"{name}={value}" for name, value in cookies.items()])
payload = {
"cmd": "request.get",
"url": url,
"maxTimeout": 60000,
"cookies": cookie_string,
"headers": {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
"Referer": "https://www.example.com/",
},
}
try:
response = requests.post("http://localhost:8191/v1", json=payload, timeout=70)
response.raise_for_status()
result = response.json()
if result.get("status") == "ok":
return result.get("solution", {}).get("response", "")
else:
logger.error(
"FlareSolverr error: %s", result.get("message", "Unknown error")
)
return None
except (RequestException, HTTPError) as e:
logger.error("Error using FlareSolverr: %s", str(e))
return None
def get_with_selenium(url: str, cookies: Dict[str, str]) -> Optional[str]:
"""Get a URL using Selenium to bypass protection.
Args:
url: URL to access
cookies: Dictionary of cookies to use
Returns:
HTML content if successful, None otherwise
"""
logger.info("Using Selenium to access %s", url)
try:
# Try with undetected-chromedriver first
driver = setup_selenium_driver(undetected=True, random_window_size=True)
# Navigate to the domain first to set cookies
parsed_url = urlparse(url)
domain_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
driver.get(domain_url)
# Set cookies
for name, value in cookies.items():
driver.add_cookie(
{"name": name, "value": value, "domain": parsed_url.netloc}
)
# Now navigate to the actual URL
driver.get(url)
# Handle CAPTCHA if needed
if not handle_captcha(driver):
logger.warning("CAPTCHA verification failed or timed out")
driver.quit()
return None
# Wait for page to load
random_sleep(3, 5)
# Get the page source
html_content = driver.page_source
driver.quit()
return html_content
except (TimeoutException, NoSuchElementException) as e:
logger.error("Error using Selenium to access %s: %s", url, str(e))
return None
def get_with_curl_impersonate(url: str, cookies: Dict[str, str]) -> Optional[str]:
"""Get a URL using curl_cffi with impersonation to bypass protection.
Args:
url: URL to access
cookies: Dictionary of cookies to use
Returns:
HTML content if successful, None otherwise
"""
try:
from curl_cffi import requests as curl_requests
logger.info("Using curl_cffi with impersonation to access %s", url)
# Prepare cookies
cookie_string = "; ".join(
[f"{name}={value}" for name, value in cookies.items()]
)
# Use curl_cffi with Chrome impersonation
response = curl_requests.get(
url,
headers={
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
"Referer": "https://www.example.com/",
"Cookie": cookie_string,
},
impersonate="chrome110",
timeout=30,
)
return response.text
except ImportError:
logger.warning("curl_cffi not installed. Cannot use curl impersonation.")
return None
except (TimeoutException, NoSuchElementException) as e:
logger.error("Error using curl_cffi to access %s: %s", url, str(e))
return None
@retry(
retry=retry_if_exception_type((RequestException, HTTPError, TimeoutException, NoSuchElementException)),
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
)
def get_site_structure(url: str, cookies: Dict[str, str]) -> SiteStructure:
"""Get the site structure with all sections and links.
Args:
url: URL of the site
cookies: Dictionary of cookies to use
Returns:
SiteStructure object containing the site's structure
Raises:
RequestException, HTTPError: If there's an error fetching the site structure
"""
logger.info("Fetching site structure from %s", url)
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
"Referer": "https://www.example.com/",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
}
# Clean headers to ensure they're ASCII-compatible
clean_headers = {}
for key, value in headers.items():
clean_value = value.encode("ascii", "ignore").decode("ascii")
clean_headers[key] = clean_value
# Encode URL to handle special characters
parsed_url = urlparse(url)
encoded_path = quote(parsed_url.path)
encoded_url = parsed_url._replace(path=encoded_path).geturl()
# Clean cookies to ensure they're ASCII-compatible
clean_cookies = {}
for name, value in cookies.items():
if isinstance(value, str):
clean_value = value.encode("ascii", "ignore").decode("ascii")
clean_cookies[name] = clean_value
else:
clean_cookies[name] = value
try:
# First try with regular requests
session = requests.Session()
response = session.get(
encoded_url, cookies=clean_cookies, headers=clean_headers, timeout=30
)
response.raise_for_status()
html_content = response.text
# Detect firewall
firewall = detect_firewall(html_content, response.headers)
if firewall:
logger.info("Detected %s protection, trying bypass methods", firewall)
# Try different bypass methods based on the detected firewall
if firewall == "cloudflare":
# Try FlareSolverr first for Cloudflare
flare_content = get_with_flaresolverr(encoded_url, clean_cookies)
if flare_content:
html_content = flare_content
else:
# Fall back to curl impersonation
curl_content = get_with_curl_impersonate(encoded_url, clean_cookies)
if curl_content:
html_content = curl_content
else:
# Last resort: Selenium
selenium_content = get_with_selenium(encoded_url, clean_cookies)
if selenium_content:
html_content = selenium_content
elif firewall in ["akamai", "imperva", "distil", "perimeterx", "datadome"]:
# These firewalls often need browser fingerprinting evasion
# Try curl impersonation first
curl_content = get_with_curl_impersonate(encoded_url, clean_cookies)
if curl_content:
html_content = curl_content
else:
# Fall back to Selenium
selenium_content = get_with_selenium(encoded_url, clean_cookies)
if selenium_content:
html_content = selenium_content
else:
# For other firewalls, try Selenium directly
selenium_content = get_with_selenium(encoded_url, clean_cookies)
if selenium_content:
html_content = selenium_content
# Save the HTML for debugging
with open("debug_site_structure.html", "w", encoding="utf-8") as f:
f.write(html_content)
return extract_site_structure(html_content, url)
except Exception as e:
logger.error("Error fetching site structure from %s: %s", url, e, exc_info=True)
raise
def extract_content_from_html(html_content: str) -> str:
"""Extract content from HTML.
This is a template function that needs to be customized for specific sites.
Args:
html_content: HTML content of the page
Returns:
Extracted text content
"""
soup = BeautifulSoup(html_content, "html.parser")
# Find the main content container (customize selector for your target site)
content_elements = soup.select("div.main-content")
content = ""
for element in content_elements:
content += element.get_text(separator="\n\n") + "\n\n"
return content
@retry(
retry=retry_if_exception_type((RequestException, HTTPError)),
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
)
def scrape_content(url: str, cookies: Dict[str, str]) -> Optional[ContentData]:
"""Scrape the content from a specific URL.
Args:
url: URL to scrape
cookies: Dictionary of cookies to use
Returns:
ContentData object if successful, None otherwise
Raises:
RequestException, HTTPError: If there's an error scraping the content
"""
logger.info("Scraping content from %s", url)
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
"Referer": "https://www.example.com/",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
}
# Clean headers to ensure they're ASCII-compatible
clean_headers = {}
for key, value in headers.items():
clean_value = value.encode("ascii", "ignore").decode("ascii")
clean_headers[key] = clean_value
# Encode URL to handle special characters
parsed_url = urlparse(url)
encoded_path = quote(parsed_url.path)
encoded_url = parsed_url._replace(path=encoded_path).geturl()
# Clean cookies to ensure they're ASCII-compatible
clean_cookies = {}
for name, value in cookies.items():
if isinstance(value, str):
clean_value = value.encode("ascii", "ignore").decode("ascii")
clean_cookies[name] = clean_value
else:
clean_cookies[name] = value
try:
# First try with regular requests
session = requests.Session()
response = session.get(
encoded_url, cookies=clean_cookies, headers=clean_headers, timeout=30
)
response.raise_for_status()
html_content = response.text
# Check if we got a Cloudflare challenge
if "Just a moment" in html_content or "Checking your browser" in html_content:
logger.info("Cloudflare protection detected, using FlareSolverr")
flare_content = get_with_flaresolverr(encoded_url, clean_cookies)
if flare_content:
html_content = flare_content
else:
logger.warning("FlareSolverr failed, proceeding with original response")
soup = BeautifulSoup(html_content, "html.parser")
# Find the title (customize selector for your target site)
title_element = soup.select_one("h1.title")
title = title_element.text.strip() if title_element else "Untitled Content"
# Extract the content
content = extract_content_from_html(html_content)
if not content:
logger.warning("No content found at %s", url)
return None
return {"title": title, "content": content, "url": url}
except (RequestException, HTTPError) as e:
logger.error("Error scraping content from %s: %s", url, e, exc_info=True)
return None
def process_content(content_data: ContentData) -> str:
"""Process the content data.
This is a template function that can be customized to transform content
(e.g., format it, enhance it with AI, etc.)
Args:
content_data: ContentData object to process
Returns:
Processed content as a string
"""
# Simple example: just format with markdown
processed_content = f"# {content_data['title']}\n\n{content_data['content']}"
return processed_content
def save_site_content(
site_structure: SiteStructure, content_data: Dict[str, ContentData]
) -> None:
"""Save site content using PyFilesystem2.
Args:
site_structure: SiteStructure object containing the site's structure
content_data: Dictionary mapping URLs to ContentData objects
"""
parsed_url = urlparse(site_structure["url"])
path_parts = parsed_url.path.strip("/").split("/")
site_name = path_parts[-1] if path_parts else "site"
site_name = sanitize_filename(site_name)
site_fs = open_fs(f"osfs://{site_name}", create=True)
# Save the site structure as JSON
with site_fs.open("site_structure.json", "w", encoding="utf-8") as f:
json.dump(site_structure, f, indent=4)
# Create a README with links to all content
with site_fs.open("README.md", "w", encoding="utf-8") as f:
f.write(f"# {site_structure['title']}\n\n")
for section in site_structure["sections"]:
f.write(f"## {section['title']}\n\n")
for link in section["links"]:
content_filename = sanitize_filename(link["title"])
section_dir_name = sanitize_filename(section["title"])
f.write(
f"- [{link['title']}]({section_dir_name}/{content_filename}.md) ([Original]({link['url']}))\n"
)
f.write("\n")
# Save each section's content
for section in site_structure["sections"]:
section_dir = sanitize_filename(section["title"])
site_fs.makedirs(section_dir, recreate=True)
for link in section["links"]:
content_url = link["url"]
if content_url in content_data:
data = content_data[content_url]
content_filename = sanitize_filename(link["title"])
# Save original content
with site_fs.open(
f"{section_dir}/{content_filename}_original.md",
"w",
encoding="utf-8",
) as f:
f.write(f"# {data['title']}\n\n")
f.write(data["content"])
# Save processed content
with site_fs.open(
f"{section_dir}/{content_filename}.md", "w", encoding="utf-8"
) as f:
f.write(data["processed_content"])
def sanitize_filename(filename: str) -> str:
"""Remove invalid characters from a filename.
Args:
filename: Original filename
Returns:
Sanitized filename with invalid characters replaced by underscores
"""
# Replace invalid characters with underscores
invalid_chars = r'[<>:"/\\|?*]'
return re.sub(invalid_chars, "_", filename)
def main() -> None:
"""Main function to run the web scraper.
This function handles the entire scraping process:
1. Get the target URL from command line or user input
2. Load cookies if available
3. Check and start FlareSolverr if needed
4. Get the site structure
5. Scrape content for each link
6. Process and save the content
"""
try:
if len(sys.argv) > 1:
target_url = sys.argv[1]
else:
target_url = input("Enter the target URL: ")
# Ensure URL has proper format
if not target_url.startswith("http"):
target_url = f"https://{target_url}"
# Load cookies if available
cookies = {}
cookies_file = "cookies.txt"
if os.path.exists(cookies_file):
cookies = parse_cookies_file(cookies_file)
logger.info("Cookies loaded from %s", cookies_file)
else:
logger.warning("No cookies file found at %s", cookies_file)
print("\nNo cookies file found. You may need to create one.")
print("Format should be: name<tab>value for each cookie on a new line.")
# Check if FlareSolverr is available
if not is_flaresolverr_running():
print("FlareSolverr is not running. Attempting to start it...")
if not start_flaresolverr():
print(
"Warning: Could not start FlareSolverr. Some sites may not be accessible."
)
# Get site structure
site_structure = get_site_structure(target_url, cookies)
if site_structure["sections"]:
content_data = {}
total_links = sum(
len(section["links"]) for section in site_structure["sections"]
)
processed_links = 0
for section in site_structure["sections"]:
section_title = section["title"]
logger.info("Processing section: %s", section_title)
print(
f"\n[{processed_links}/{total_links}] Processing section: {section_title}"
)
for link in section["links"]:
link_title = link["title"]
link_url = link["url"]
processed_links += 1
progress = int(processed_links / total_links * 50)
print(
f"\r[{'#' * progress}{' ' * (50-progress)}] {processed_links}/{total_links} - {link_title}",
end="",
)
data = scrape_content(link_url, cookies)
if data:
processed_content = process_content(data)
data["processed_content"] = processed_content
content_data[link_url] = data
logger.info("Processed content for: %s", link_title)
time.sleep(1) # Avoid rate limiting
else:
logger.warning("Failed to scrape content: %s", link_title)
print(f"\nFailed to scrape content: {link_title}")
print() # New line after section
save_site_content(site_structure, content_data)
logger.info("Content has been successfully scraped and processed!")
print("\nContent has been successfully scraped and processed!")
print(
f"Check the '{sanitize_filename(site_structure['title'])}' directory for the results."
)
else:
logger.error("No sections found in the site structure")
print(
"No sections found in the site structure. Check the logs for details."
)
except (RequestException, HTTPError) as e:
logger.error("An error occurred in the main function: %s", e, exc_info=True)
print(f"\nError: {e}")
print("Check the log file for more details.")
if __name__ == "__main__":
main()
@WomB0ComB0
Copy link
Author

requests>=2.25.1
beautifulsoup4>=4.9.3
selenium>=4.0.0
webdriver-manager>=3.5.2
tenacity>=8.0.1
fs>=2.4.16
undetected-chromedriver>=3.1.5
curl-cffi>=0.5.5
lxml>=4.6.3
urllib3>=1.26.5
python-dotenv>=0.19.0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment