Created
June 19, 2025 06:32
-
-
Save 50-Course/e93d073defcaf7e73324f2cd1fad5f2b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import random | |
from contextlib import asynccontextmanager | |
from logging import Logger | |
from pathlib import Path | |
from typing import Annotated, Any, Callable, Dict, Iterable, List, Optional | |
from bs4 import BeautifulSoup | |
from openpyxl import Workbook | |
from openpyxl.utils import get_column_letter | |
from openpyxl.worksheet.worksheet import Worksheet | |
from playwright.async_api import ElementHandle, Locator, Page, async_playwright | |
from playwright.sync_api import BrowserContext | |
from .constants import USER_AGENTS as BROWSER_AGENTS | |
from .constants import url as BASE_URL | |
def get_random_user_agent() -> str: | |
# gets a randomized browser agent | |
return random.choice(BROWSER_AGENTS) | |
async def human_delay(min_=0.8, max_=2.5) -> None: | |
await asyncio.sleep(random.uniform(min_, max_)) | |
async def retry_with_backoff(coro: Callable, retries=3, delay=2): | |
# extends the "core" concept of retry but with exponential backoff | |
# and usable with any functtion | |
for i in range(retries): | |
try: | |
return await coro() | |
except Exception: | |
await asyncio.sleep(delay * 2**i) | |
raise RuntimeError("All retries failed") | |
async def goto_with_retry( | |
page: Page, target_url: str, retries: int = 3, delay: int = 2 | |
): | |
# direct implement of page.goto but with a retry logic | |
for i in range(retries): | |
try: | |
await page.goto(target_url, wait_until="domcontentloaded", timeout=10000) | |
return | |
except Exception as e: | |
print(f"[WARN] Failed loading {BASE_URL}, retry {i + 1}/{retries}: {e}") | |
await asyncio.sleep(delay * (2**i)) | |
async def fallback_locator( | |
page: Page, | |
selectors: List[str], | |
*, | |
scope: Optional[Locator] = None, | |
logger_func: Optional[Callable[[str], None]] = None, | |
fallback_attrs: Optional[List[Dict[str, str]]] = None, | |
) -> Locator: | |
""" | |
Try multiple selectors in order until one matches at least one element. | |
Returns the first matching Locator. | |
Raises ValueError if none match. | |
""" | |
base = scope or page | |
for selector in selectors: | |
try: | |
loc = base.locator(selector) | |
if await loc.count() > 0: | |
if logger_func: | |
logger_func(f"[→] Using selector: {selector}") | |
return loc | |
except Exception: | |
continue | |
# fallback to using attribute value for matching | |
if fallback_attrs: | |
for attr in fallback_attrs: | |
tag_ = attr.get("tag", "*") | |
key_ = attr.get("attr") | |
value_ = attr.get("value") | |
if key_ and value_: | |
# build the expression | |
expr = f"{tag_}[{key_}='{value_}']" | |
try: | |
locator = base.locator(expr) | |
if await locator.count() > 0: | |
return locator | |
except Exception: | |
# skip | |
continue | |
raise ValueError(f"No selectors matched any elements: {selectors}") | |
@asynccontextmanager | |
async def browser_context( | |
headless: bool = False, | |
remote_debugging: bool = False, | |
slow_mo: int = 50, | |
user_agent: Optional[str] = None, | |
viewport: Optional[dict] = None, | |
bypass_csp: bool = False, | |
): | |
async with async_playwright() as p: | |
_args = [ | |
"--no-sandbox", | |
"--disable-setuid-sandbox", | |
"--disable-dev-shm-usage", | |
"--disable-accelerated-2d-canvas", | |
"--no-first-run", | |
"--no-zygote", | |
"--disable-gpu", | |
"--disable-features=IsolateOrigins,site-per-process", | |
"--window-size=1280,800", | |
"--start-maximized", | |
] | |
# browser = await p.chromium.launch(headless=headless, slow_mo=slow_mo, args=_args) | |
browser = await p.firefox.launch(headless=headless, slow_mo=slow_mo) | |
ctx = await browser.new_context( | |
# user_agent=user_agent, | |
# viewport=viewport or {"width": 1280, "height": 800}, | |
ignore_https_errors=True, | |
java_script_enabled=True, | |
bypass_csp=True, | |
) | |
try: | |
yield ctx | |
finally: | |
await browser.close() | |
async def _get_rendered_html(page: Page, selector: str | None = None) -> BeautifulSoup: | |
if selector: | |
content = await page.query_selector(selector) | |
if content: | |
html = await content.inner_html() | |
return BeautifulSoup(html, "html.parser") | |
print(f"[ERROR] Selector {selector} did not return content.") | |
html = await page.content() | |
return BeautifulSoup(html, "html.parser") | |
def sanitize_sheet_name(name: str) -> str: | |
"""Ensure Excel sheet names are valid (max 31 chars, no special chars).""" | |
invalid_chars = ["/", "\\", "*", "[", "]", ":", "?"] | |
for char in invalid_chars: | |
name = name.replace(char, "") | |
return name[:31] | |
def auto_adjust_column_width(sheet: Worksheet): | |
# auto-adjust column widths | |
for col in sheet.columns: | |
max_length = max(len(str(cell.value or "")) for cell in col) | |
adjusted_width = max_length + 2 | |
col_letter = get_column_letter(col[0].column) | |
sheet.column_dimensions[col_letter].width = adjusted_width | |
def write_subcategory_sheets(wb: Workbook, categories: List[Dict[str, Any]]): | |
for category in categories: | |
section = category["section"] | |
for sub in category["subcategories"]: | |
name = sanitize_sheet_name(sub["name"]) | |
index_entries = sub.get("index_entries", []) | |
if not index_entries: | |
continue | |
ws = wb.create_sheet(title=name) | |
headers = ["Section", "Title", "URL", "Image Src", "Image Alt"] | |
ws.append(headers) | |
for item in index_entries: | |
ws.append( | |
[ | |
section, | |
item.get("title", ""), | |
item.get("href", ""), | |
item.get("image_meta", {}).get("src", ""), | |
item.get("image_meta", {}).get("alt", ""), | |
] | |
) | |
auto_adjust_column_width(ws) | |
def write_overview_sheet(wb: Workbook, categories: List[Dict[str, Any]]): | |
overview = wb.active | |
overview.title = "CATEGORIES CATALOG" | |
headers = ["Category", "Subcategory", "URL"] | |
overview.append(headers) | |
for category in categories: | |
category_name = category["section"] | |
for sub in category["subcategories"]: | |
overview.append([category_name, sub["name"], sub["url"]]) | |
auto_adjust_column_width(overview) | |
def write_category_to_excel( | |
categories: List[Dict[str, Any]], | |
filename: str = "scraped_expo_data.xlsx", | |
output_dir: Path | None = None, | |
): | |
if output_dir is None: | |
base_dir = Path(__file__).resolve().parent | |
output_dir = base_dir / "exports" | |
output_dir.mkdir(parents=True, exist_ok=True) | |
output_path = output_dir / filename | |
wb = Workbook() | |
write_overview_sheet(wb, categories) | |
write_subcategory_sheets(wb, categories) | |
write_products_to_excel(wb, categories) | |
wb.save(output_path) | |
print(f"[✓] Excel file saved to: {output_path}") | |
def write_products_to_excel( | |
wb: Workbook, | |
categories: list[dict[str, Any]], | |
): | |
for category in categories: | |
section = category["section"] | |
for sub in category.get("subcategories", []): | |
for entry in sub.get("index_entries", []): | |
products = entry.get("products", []) | |
if not products: | |
continue # skip | |
sheet_name = sanitize_sheet_name(entry["title"]) | |
ws = wb.create_sheet(title=sheet_name) | |
headers = [ | |
"Section", | |
"Subcategory", | |
"Entry Title", | |
"Product Name", | |
"Manufacturer", | |
"Price", | |
"Currency", | |
"Model", | |
"Features", | |
"Image Src", | |
"Link", | |
] | |
ws.append(headers) | |
for p in products: | |
ws.append( | |
[ | |
section, | |
sub["name"], | |
entry["title"], | |
p.get("product_title"), | |
p.get("manufacturer_name"), | |
p.get("price"), | |
p.get("currency"), | |
p.get("product_model"), | |
", ".join(p.get("features", [])), | |
p.get("tile_image_src"), | |
p.get("product_link"), | |
] | |
) | |
auto_adjust_column_width(ws) | |
def write_product_entry_to_excel( | |
entry: Dict[str, Any], section: str, subcategory: str, output_dir: Path | |
): | |
"""Alternative function to write""" | |
if not entry.get("products"): | |
return | |
title = sanitize_sheet_name(entry["title"]) | |
filename = f"{section}__{subcategory}__{title}.xlsx".replace(" ", "_") | |
path = output_dir / filename | |
wb = Workbook() | |
ws = wb.active | |
ws.title = "Products" | |
headers = [ | |
"Product Name", | |
"Manufacturer", | |
"Price", | |
"Currency", | |
"Model", | |
"Features", | |
"Image Src", | |
"Link", | |
] | |
ws.append(headers) | |
for p in entry["products"]: | |
ws.append( | |
[ | |
p.get("product_title"), | |
p.get("manufacturer_name"), | |
p.get("price"), | |
p.get("currency"), | |
p.get("product_model"), | |
", ".join(p.get("features", [])), | |
p.get("tile_image_src"), | |
p.get("product_link"), | |
] | |
) | |
auto_adjust_column_width(ws) | |
wb.save(path) | |
print(f"[✓] Saved: {path}") | |
async def extract_product_link_from_tile(tile: ElementHandle) -> str | None: | |
""" | |
Extracts the actual product link from a product tile. | |
Targets the <a> that wraps an <h3 class="short-name">. | |
""" | |
import re | |
try: | |
# grabs the <h3> element that indicates a product link | |
h3_el = await tile.query_selector("a[href] > h3.short-name") | |
if h3_el: | |
# walk back to the parent <a> | |
parent_a = await h3_el.evaluate_handle("node => node.parentElement") | |
if parent_a: | |
href = await parent_a.get_attribute("href") | |
if href and re.match( | |
r"^https://www\.medicalexpo\.com/prod/.+/product-\d+-\d+\.html$", | |
href, | |
): | |
return href | |
except Exception as e: | |
print(f"[ERROR] Could not extract product link: {e}") | |
return None | |
async def safe_inner_text(locator: Locator) -> str | None: | |
"""Utility fn to help us safely access the inner text of a locator element without breaking the script""" | |
try: | |
if await locator.count() > 0: | |
return (await locator.inner_text()).strip() | |
except Exception: | |
pass | |
return None | |
async def is_valid_product_page( | |
page: Page, logger_func: Optional[Callable] = None | |
) -> bool: | |
# we check if a product page is valid by searching for the product tile | |
# title_block = page.locator('span[class^="sc-2mcr2-0"]') | |
# return await title_block.count() > 0 | |
product_url: str = page.url | |
homepage_url: str = "https://medicalexpo.com" | |
await page.wait_for_url(homepage_url) | |
if page.url != product_url: | |
logger_func(f"[DEBUG] Expected {homepage_url}, but got {page.url}") | |
return False | |
return True | |
def chunk_workload(dataset, size: int | None = None): | |
""" | |
Utility function to chunk any iterable, such as tile data or list of product links | |
or anything iterable such that we may do parrallel processing on it | |
""" | |
import itertools | |
if not size: | |
size = len(dataset) // 2 | |
it = iter(dataset) | |
while chunk := list(itertools.islice(it, size)): | |
yield chunk | |
def process_parallel( | |
dataset, chunk_size: int, process_function: Callable | |
): | |
""" | |
Processes data in parallel using multiprocessing. | |
Args: | |
- dataset: Iterable data (e.g., list of categories or tiles). | |
- chunk_size: Size of chunks to split the dataset. | |
- process_function: The function to apply on each chunk. | |
Returns: | |
- The processed results from each chunk. | |
""" | |
import multiprocessing as mp | |
num_workers = mp.cpu_count() | |
# use all available cpu cores | |
with mp.Pool(processes=num_workers) as pool: | |
for result_chunk in pool.imap(process_function, chunk_workload(dataset, chunk_size)) | |
handle_processed_result(result_chunk) | |
def process_chunk(chunk: Iterable, fn: Callable): | |
""" | |
Utility function to apply a function or callback onto a chunked piece of data | |
For example this would take an existing function of ours, and apply it to each element in the chunk | |
for example: extracting tile information from a specific tile in a chunk, or extracting specific product information | |
from a chunk of product links | |
""" | |
pass | |
def handle_processed_result(result_chunk): | |
""" | |
Utility function to handle the processing of all the processed results | |
""" | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment