Skip to content

Instantly share code, notes, and snippets.

@syntaxhacker
Last active June 4, 2026 13:12
Show Gist options
  • Select an option

  • Save syntaxhacker/9a839831d0587f039f260eecc0c00db9 to your computer and use it in GitHub Desktop.

Select an option

Save syntaxhacker/9a839831d0587f039f260eecc0c00db9 to your computer and use it in GitHub Desktop.
Build a multi-source news scraper with scrapling — base class, site-specific scrapers, concurrent aggregator, article extraction with ld+json fallbacks.

Building a Multi-Source News Scraper with scrapling

A practical guide to building a concurrent, multi-source news scraper using scrapling — a modern HTML parsing + HTTP library with browser impersonation (no BeautifulSoup/requests needed).


Installation

pip install scrapling>=0.4.0 curl_cffi>=0.5.0

What you get:

Package Role
scrapling HTTP fetching + CSS selector parsing
curl_cffi Low-level HTTP engine with TLS fingerprint impersonation
browserforge Generates realistic browser fingerprints (transitive)
rookiepy Cookie/session management (transitive)

The key method is Fetcher.get(url, impersonate='chrome') which mimics a real Chrome browser — no manual headers, no Cloudflare fighting.


Architecture Pattern

news_api.py
  │
  ├── BaseNewsScraper          ← abstract base
  │   ├── SiteScraperA         ← one per source
  │   ├── SiteScraperB
  │   └── SiteScraperC
  │
  ├── NewsAggregator           ← concurrent fetch via ThreadPoolExecutor
  │
  └── Public API
      ├── fetch_news(source=None, limit=25)
      ├── fetch_article_content(url)
      └── NEWS_SOURCES

Base Scraper

The base class handles:

  • URL fetching with browser impersonation
  • Article ID generation (MD5 hash of URL)
  • Generic extraction fallbacks (ld+json, meta tags)
  • Flexible date parsing (multiple formats)
import re, json, hashlib
from datetime import datetime
from scrapling.fetchers import Fetcher

class BaseNewsScraper:
    source_id: str = "base"
    source_name: str = "Base"
    base_url: str = ""

    def _generate_id(self, url: str) -> str:
        return hashlib.md5(url.encode()).hexdigest()[:12]

    def _fetch_page(self, url: str):
        try:
            return Fetcher.get(url, impersonate='chrome')
        except Exception as e:
            print(f"Error fetching {url}: {e}")
            return None

    def _parse_date_generic(self, date_str: str):
        """Try multiple date formats."""
        if not date_str:
            return None
        date_str = date_str.strip()
        date_str = re.sub(r' IST$', '', date_str).strip()
        formats = [
            '%B %d, %Y %H:%M', '%b %d, %Y %H:%M',
            '%Y-%m-%dT%H:%M:%S', '%Y-%m-%d',
            '%b %d, %Y', '%B %d, %Y',
            '%d %b %Y', '%d %B %Y',
        ]
        for fmt in formats:
            try:
                return datetime.strptime(date_str, fmt)
            except ValueError:
                continue
        return None

Fetching Latest Headlines

    def fetch_latest_news(self, limit: int = 25):
        page = self._fetch_page(self.base_url)
        if not page:
            return []

        news_items = []
        seen_urls = set()

        for link in page.css('a'):
            href = link.css('::attr(href)').get()
            text = link.css('::text').get()

            if not href or not text:
                continue

            href_str = str(href).strip()
            text_str = str(text).strip()

            if not self._is_news_article(href_str):
                continue
            if len(text_str) < 25:
                continue

            # Normalize relative URLs
            if not href_str.startswith('http'):
                if href_str.startswith('/'):
                    domain = '/'.join(self.base_url.split('/')[:3])
                    href_str = f"{domain}{href_str}"
                else:
                    continue

            if href_str in seen_urls:
                continue
            seen_urls.add(href_str)

            news_items.append({
                'id': self._generate_id(href_str),
                'headline': text_str.strip(),
                'description': '',
                'source': self.source_id,
                'sourceUrl': href_str,
                'publishedAt': datetime.now().isoformat(),
                'fetchedAt': datetime.now().isoformat(),
            })
            if len(news_items) >= limit:
                break

        return news_items

Fetching Full Article Content

    def fetch_article_content(self, url: str):
        page = self._fetch_page(url)
        if not page:
            return {
                'id': self._generate_id(url),
                'error': 'Failed to fetch article',
                'headline': '', 'description': '',
                'source': self.source_id, 'sourceUrl': url,
                'publishedAt': datetime.now().isoformat(),
                'fetchedAt': datetime.now().isoformat(),
                'symbols': [],
            }

        title, text, date, symbols = self._extract_specifics(page, url)
        if not title:
            # Fallback to generic extraction
            title, text_fb, date_fb, sym_fb = self._generic_article_extraction(page)
            if not text:   text = text_fb
            if not date:   date = date_fb
            if not symbols: symbols = sym_fb

        return {
            'id': self._generate_id(url),
            'headline': title,
            'description': text,
            'source': self.source_id,
            'sourceUrl': url,
            'publishedAt': date or datetime.now().isoformat(),
            'fetchedAt': datetime.now().isoformat(),
            'symbols': symbols,
        }

Generic Fallback Extraction

Used when the site-specific extractor fails:

    def _generic_article_extraction(self, page):
        if not page:
            return "", "", None, []

        title = (page.css('h1::text').get() or '').strip() \
             or (page.css('h2.title::text').get() or '').strip()

        # Try multiple content selectors
        paragraphs = (page.css('article p::text').getall()
                   or page.css('.story p::text').getall()
                   or page.css('div p::text').getall())
        text = '\n\n'.join(p.strip() for p in paragraphs if len(p.strip()) > 50)

        # Date from <time> or .date elements
        published_at = None
        for el in page.css('time::text').getall() + page.css('.date::text').getall():
            parsed = self._parse_date_generic(el)
            if parsed:
                published_at = parsed.isoformat()
                break

        # Fallback to ld+json or meta tags
        if not title or not text:
            ld_data = self._extract_ld_json(page)
            meta_title, meta_desc = self._extract_from_meta(page)
            if not title:
                title = next((item.get('headline') or item.get('name')
                            for item in ld_data if item.get('headline')), meta_title)
            if not text:
                text = next((item.get('articleBody') or item.get('description')
                           for item in ld_data if len(item.get('articleBody','')) > 50), meta_desc)

        return title.strip(), text, published_at, []

ld+json & Meta Extraction

    def _extract_ld_json(self, page):
        results = []
        for script in page.css('script[type="application/ld+json"]'):
            text = ' '.join(script.css('::text').getall()).strip()
            if not text:
                continue
            text = re.sub(r'[\x00-\x1f]', ' ', text)
            try:
                data = json.loads(text)
                if isinstance(data, list):
                    results.extend(data)
                elif isinstance(data, dict):
                    results.append(data)
            except json.JSONDecodeError:
                pass
        return results

    def _extract_from_meta(self, page):
        title = desc = ""
        for meta in page.css('meta'):
            attrs = meta.attrib if hasattr(meta, 'attrib') else {}
            prop = (attrs.get('property', '') or attrs.get('name', '')).lower()
            content = attrs.get('content', '')
            if prop == 'og:title' and not title:
                title = content.strip()
            elif prop in ('og:description', 'description') and not desc:
                desc = content.strip()
        return title, desc

Abstract Methods (Override Per Site)

    def _is_news_article(self, url: str) -> bool:
        """Filter URLs — only scrape actual articles."""
        return True

    def _extract_specifics(self, page, url: str):
        """Site-specific extraction. Override in subclasses."""
        return "", "", None, []

Concrete Scraper Example

Here's an example scraper for a real news site:

class ExampleSiteScraper(BaseNewsScraper):
    source_id = "example"
    source_name = "Example News"
    base_url = "https://www.example-news.com/markets/"

    def _is_news_article(self, url: str) -> bool:
        return bool(re.search(r'-\d+\.html', url))

    def _extract_specifics(self, page, url: str):
        # Title
        title = page.css('h1::text').get() or ''

        # Date from meta tag
        published_at = None
        for meta in page.css('meta'):
            attrs = meta.attrib if hasattr(meta, 'attrib') else {}
            if attrs.get('property') == 'article:published_time':
                parsed = self._parse_date_generic(attrs['content'])
                if parsed:
                    published_at = parsed.isoformat()
                    break

        # Content
        paras = page.css('article p::text').getall()
        text = '\n\n'.join(p.strip() for p in paras if len(p.strip()) > 50)

        # Extract mentioned stock symbols from links
        symbols = []
        for link in page.css('.content a'):
            href = link.css('::attr(href)').get() or ''
            text = link.css('::text').get() or ''
            if '/stocks/' in href:
                symbols.append({
                    'name': text.strip(),
                    'code': href.rstrip('/').split('/')[-1],
                    'url': href if href.startswith('http') else f"https://example.com{href}"
                })

        return title.strip(), text, published_at, symbols

Using Browser Cookies with rookiepy

Some sites require authenticated sessions or pass Cloudflare challenges via cookies. rookiepy extracts cookies from your installed browsers so you can pass them to scrapling requests.

# rookiepy is a transitive dep of scrapling, but pin it explicitly if needed
pip install rookiepy>=0.2.0

Extract Cookies from Chrome/Firefox

import rookiepy
from http.cookiejar import CookieJar

# Extract cookies for a specific domain from Chrome
cookies_raw = rookiepy.chrome(['.example.com'])
cookies = rookiepy.to_cookiejar(cookies_raw)  # → CookieJar

# Fallback to Firefox if Chrome has none
if not list(cookies):
    cookies_raw = rookiepy.firefox(['.example.com'])
    cookies = rookiepy.to_cookiejar(cookies_raw)

# Or use the generic extractor
cookies_raw = rookiepy.get('https://www.example.com')
cookies = rookiepy.to_cookiejar(cookies_raw)

Pass Cookies to scrapling's Fetcher

from scrapling.fetchers import Fetcher

cookies = rookiepy.to_cookiejar(
    rookiepy.chrome(['.example.com'])
)
page = Fetcher.get(url, impersonate='chrome', cookies=cookies)

Using a Session with Cookies

For multi-page scraping that needs persistent auth:

from scrapling.fetchers import FetcherSession

with FetcherSession() as session:
    # Extract cookies from browser and attach
    cookies = rookiepy.to_cookiejar(
        rookiepy.chrome(['.target-site.com'])
    )

    # Session maintains cookies across requests
    page1 = session.get('https://target-site.com/dashboard')
    page2 = session.get('https://target-site.com/settings')

Integrate into the Base Scraper

class BaseNewsScraper:
    def __init__(self):
        self._cookies = None

    def _load_browser_cookies(self, domain: str):
        """Extract cookies from installed browser."""
        try:
            import rookiepy
            raw = rookiepy.chrome([domain])
            if not list(raw):
                raw = rookiepy.firefox([domain])
            self._cookies = rookiepy.to_cookiejar(raw)
        except Exception:
            self._cookies = None

    def _fetch_page(self, url: str):
        try:
            kwargs = {'impersonate': 'chrome'}
            if self._cookies:
                kwargs['cookies'] = self._cookies
            return Fetcher.get(url, **kwargs)
        except Exception as e:
            print(f"Error fetching {url}: {e}")
            return None

Then enable cookies per scraper:

class SiteWithAuthScraper(BaseNewsScraper):
    source_id = "authenticated_site"
    base_url = "https://target-site.com/news/"

    def __init__(self):
        super().__init__()
        # Load cookies on init — user must be logged into Chrome
        self._load_browser_cookies('.target-site.com')

Supported Browsers

Function Source
rookiepy.chrome(domains) Google Chrome / Chromium
rookiepy.firefox(domains) Mozilla Firefox
rookiepy.edge(domains) Microsoft Edge
rookiepy.brave(domains) Brave Browser
rookiepy.get(url) Any available browser (auto-detect)

Caveats

  • The user must be logged into the site in their browser for cookies to contain valid session tokens.
  • Cookies expire — re-extract periodically for long-running scrapers.
  • Headless environments (servers, Docker) have no browser to extract from. In that case, pass cookies via env vars or a cookie file instead.

Aggregator (Concurrent Multi-Source)

Fetches from all scrapers in parallel and assembles results in a fixed order:

import concurrent.futures

class NewsAggregator:
    def __init__(self):
        self.scrapers = {
            'example':    ExampleSiteScraper(),
            'another':    AnotherSiteScraper(),
            'third':      ThirdSiteScraper(),
        }

    def fetch_from_source(self, source_id: str, limit: int = 25):
        scraper = self.scrapers.get(source_id)
        if not scraper:
            return []
        return scraper.fetch_latest_news(limit)

    def fetch_all(self, limit_per_source: int = 10):
        """Fetch from all sources concurrently."""
        source_order = list(self.scrapers.keys())
        all_news = []

        with concurrent.futures.ThreadPoolExecutor(
            max_workers=len(self.scrapers)
        ) as executor:
            futures = {
                executor.submit(self.scrapers[sid].fetch_latest_news, limit_per_source): sid
                for sid in source_order
            }
            results = {}
            for future in concurrent.futures.as_completed(futures):
                sid = futures[future]
                try:
                    results[sid] = future.result()
                except Exception as e:
                    print(f"Error from {sid}: {e}")
                    results[sid] = []

        # Assemble in fixed order for deterministic output
        for sid in source_order:
            all_news.extend(results.get(sid, []))

        return all_news

    def fetch_article(self, source_id: str, url: str):
        scraper = self.scrapers.get(source_id)
        if not scraper:
            scraper = BaseNewsScraper()  # generic fallback
        return scraper.fetch_article_content(url)

    def group_news_by_symbol(self, articles: list) -> dict:
        """Group articles by stock symbol code."""
        groups = {}
        for a in articles:
            for sym in a.get('symbols', []):
                code = sym.get('code')
                if not code:
                    continue
                if code not in groups:
                    groups[code] = []
                if not any(ex['id'] == a['id'] for ex in groups[code]):
                    groups[code].append({
                        'id': a['id'],
                        'headline': a.get('headline', ''),
                        'source': a.get('source', ''),
                        'sourceUrl': a.get('sourceUrl', ''),
                        'publishedAt': a.get('publishedAt', ''),
                    })
        return groups

Public API

_aggregator = NewsAggregator()

NEWS_SOURCES = [
    {'id': s.source_id, 'name': s.source_name, 'url': s.base_url}
    for s in _aggregator.scrapers.values()
]

def fetch_news(source: str = None, limit: int = 25):
    """Fetch headlines. `source=None` or `'all'` fetches from every source."""
    if source is None or source == 'all':
        per_source = max(limit // len(_aggregator.scrapers), 5)
        return _aggregator.fetch_all(per_source)
    return _aggregator.fetch_from_source(source, limit)

def fetch_article_content(url: str):
    """Fetch full article. Source is auto-detected from URL domain."""
    source_id = 'unknown'
    for sid, scraper in _aggregator.scrapers.items():
        domain = scraper.base_url.split('/')[2]
        if domain in url:
            source_id = sid
            break
    return _aggregator.fetch_article(source_id, url)

Usage

from news_api import fetch_news, fetch_article_content, NEWS_SOURCES

# List sources
for s in NEWS_SOURCES:
    print(f"  {s['id']:20s}{s['name']}")

# Headlines from one source
items = fetch_news(source="example", limit=5)
for item in items:
    print(item['headline'], item['sourceUrl'])

# Headlines from ALL sources
all_items = fetch_news(limit=30)

# Full article
article = fetch_article_content(all_items[0]["sourceUrl"])
print(article["headline"])
print(article["description"][:300])

Adding a Custom Scraper

# 1. Subclass
class MyScraper(BaseNewsScraper):
    source_id = "mysite"
    source_name = "My Site"
    base_url = "https://mysite.com/news/"

    def _is_news_article(self, url):
        return "/story/" in url

    def _extract_specifics(self, page, url):
        title = page.css("h1::text").get() or ""
        meta = page.css('meta[property="article:published_time"]')
        date = meta[0].attrib.get("content") if meta else None
        paras = page.css("article p::text").getall()
        text = "\n\n".join(p.strip() for p in paras if len(p) > 50)
        return title, text, date, []

# 2. Register
from news_api import _aggregator
_aggregator.scrapers["mysite"] = MyScraper()

Selector Cheat Sheet (scrapling CSS)

Goal Code
Text of first <h1> page.css('h1::text').get()
All <p> text page.css('p::text').getall()
All <a> elements page.css('a')
Attribute of first match page.css('.class').attrib.get('href')
Text inside a div page.css('.article p::text').getall()
JSON-LD blocks page.css('script[type="application/ld+json"]')
Meta tag content page.css('meta[property="og:title"]').attrib.get('content')

Extraction Fallback Chain

When a site-specific extractor returns empty fields, it cascades:

_extract_specifics(page, url)      ← your override
  │ returns empty
  ▼
_generic_article_extraction(page)
  ├─ h1 / h2.title for title
  ├─ article p / .story p / div p for content
  ├─ <time> / .date / .time elements for date
  └─ ld+json → meta tags (og:title, og:description)

Caveats

  • Paywalls: Many news sites truncate content. ld+json often contains the full description as a fallback.
  • DOM breakage: Sites change layouts. Monitor and update selectors.
  • Rate limits: No built-in throttling. Add time.sleep(1) between calls if you get HTTP 429s.
  • Cloudflare blocks: The impersonate='chrome' flag helps, but some networks still trigger 1015 errors. Use a VPN if needed.
  • Date parsing: Generic parser handles common formats, but relative dates ("2 hours ago") won't parse.
  • Non-blocking: The library is synchronous. Wrap in asyncio.to_thread() for async apps.

Minimal Standalone Script

#!/usr/bin/env python3
from news_api import fetch_news, fetch_article_content

items = fetch_news(limit=30)

for item in items:
    print(f"[{item['source']:16s}] {item['headline']}")
    print(f"         {item['sourceUrl']}\n")

if items:
    detail = fetch_article_content(items[0]["sourceUrl"])
    print("--- DETAIL ---")
    print(detail["headline"])
    print(detail.get("description", "(no content)")[:500])
    if detail.get("symbols"):
        print("Symbols:", ", ".join(s["code"] for s in detail["symbols"]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment