A practical guide to building a concurrent, multi-source news scraper using
scrapling— a modern HTML parsing + HTTP library with browser impersonation (no BeautifulSoup/requests needed).
pip install scrapling>=0.4.0 curl_cffi>=0.5.0What you get:
| Package | Role |
|---|---|
scrapling |
HTTP fetching + CSS selector parsing |
curl_cffi |
Low-level HTTP engine with TLS fingerprint impersonation |
browserforge |
Generates realistic browser fingerprints (transitive) |
rookiepy |
Cookie/session management (transitive) |
The key method is Fetcher.get(url, impersonate='chrome') which mimics a
real Chrome browser — no manual headers, no Cloudflare fighting.
news_api.py
│
├── BaseNewsScraper ← abstract base
│ ├── SiteScraperA ← one per source
│ ├── SiteScraperB
│ └── SiteScraperC
│
├── NewsAggregator ← concurrent fetch via ThreadPoolExecutor
│
└── Public API
├── fetch_news(source=None, limit=25)
├── fetch_article_content(url)
└── NEWS_SOURCES
The base class handles:
- URL fetching with browser impersonation
- Article ID generation (MD5 hash of URL)
- Generic extraction fallbacks (ld+json, meta tags)
- Flexible date parsing (multiple formats)
import re, json, hashlib
from datetime import datetime
from scrapling.fetchers import Fetcher
class BaseNewsScraper:
source_id: str = "base"
source_name: str = "Base"
base_url: str = ""
def _generate_id(self, url: str) -> str:
return hashlib.md5(url.encode()).hexdigest()[:12]
def _fetch_page(self, url: str):
try:
return Fetcher.get(url, impersonate='chrome')
except Exception as e:
print(f"Error fetching {url}: {e}")
return None
def _parse_date_generic(self, date_str: str):
"""Try multiple date formats."""
if not date_str:
return None
date_str = date_str.strip()
date_str = re.sub(r' IST$', '', date_str).strip()
formats = [
'%B %d, %Y %H:%M', '%b %d, %Y %H:%M',
'%Y-%m-%dT%H:%M:%S', '%Y-%m-%d',
'%b %d, %Y', '%B %d, %Y',
'%d %b %Y', '%d %B %Y',
]
for fmt in formats:
try:
return datetime.strptime(date_str, fmt)
except ValueError:
continue
return None def fetch_latest_news(self, limit: int = 25):
page = self._fetch_page(self.base_url)
if not page:
return []
news_items = []
seen_urls = set()
for link in page.css('a'):
href = link.css('::attr(href)').get()
text = link.css('::text').get()
if not href or not text:
continue
href_str = str(href).strip()
text_str = str(text).strip()
if not self._is_news_article(href_str):
continue
if len(text_str) < 25:
continue
# Normalize relative URLs
if not href_str.startswith('http'):
if href_str.startswith('/'):
domain = '/'.join(self.base_url.split('/')[:3])
href_str = f"{domain}{href_str}"
else:
continue
if href_str in seen_urls:
continue
seen_urls.add(href_str)
news_items.append({
'id': self._generate_id(href_str),
'headline': text_str.strip(),
'description': '',
'source': self.source_id,
'sourceUrl': href_str,
'publishedAt': datetime.now().isoformat(),
'fetchedAt': datetime.now().isoformat(),
})
if len(news_items) >= limit:
break
return news_items def fetch_article_content(self, url: str):
page = self._fetch_page(url)
if not page:
return {
'id': self._generate_id(url),
'error': 'Failed to fetch article',
'headline': '', 'description': '',
'source': self.source_id, 'sourceUrl': url,
'publishedAt': datetime.now().isoformat(),
'fetchedAt': datetime.now().isoformat(),
'symbols': [],
}
title, text, date, symbols = self._extract_specifics(page, url)
if not title:
# Fallback to generic extraction
title, text_fb, date_fb, sym_fb = self._generic_article_extraction(page)
if not text: text = text_fb
if not date: date = date_fb
if not symbols: symbols = sym_fb
return {
'id': self._generate_id(url),
'headline': title,
'description': text,
'source': self.source_id,
'sourceUrl': url,
'publishedAt': date or datetime.now().isoformat(),
'fetchedAt': datetime.now().isoformat(),
'symbols': symbols,
}Used when the site-specific extractor fails:
def _generic_article_extraction(self, page):
if not page:
return "", "", None, []
title = (page.css('h1::text').get() or '').strip() \
or (page.css('h2.title::text').get() or '').strip()
# Try multiple content selectors
paragraphs = (page.css('article p::text').getall()
or page.css('.story p::text').getall()
or page.css('div p::text').getall())
text = '\n\n'.join(p.strip() for p in paragraphs if len(p.strip()) > 50)
# Date from <time> or .date elements
published_at = None
for el in page.css('time::text').getall() + page.css('.date::text').getall():
parsed = self._parse_date_generic(el)
if parsed:
published_at = parsed.isoformat()
break
# Fallback to ld+json or meta tags
if not title or not text:
ld_data = self._extract_ld_json(page)
meta_title, meta_desc = self._extract_from_meta(page)
if not title:
title = next((item.get('headline') or item.get('name')
for item in ld_data if item.get('headline')), meta_title)
if not text:
text = next((item.get('articleBody') or item.get('description')
for item in ld_data if len(item.get('articleBody','')) > 50), meta_desc)
return title.strip(), text, published_at, [] def _extract_ld_json(self, page):
results = []
for script in page.css('script[type="application/ld+json"]'):
text = ' '.join(script.css('::text').getall()).strip()
if not text:
continue
text = re.sub(r'[\x00-\x1f]', ' ', text)
try:
data = json.loads(text)
if isinstance(data, list):
results.extend(data)
elif isinstance(data, dict):
results.append(data)
except json.JSONDecodeError:
pass
return results
def _extract_from_meta(self, page):
title = desc = ""
for meta in page.css('meta'):
attrs = meta.attrib if hasattr(meta, 'attrib') else {}
prop = (attrs.get('property', '') or attrs.get('name', '')).lower()
content = attrs.get('content', '')
if prop == 'og:title' and not title:
title = content.strip()
elif prop in ('og:description', 'description') and not desc:
desc = content.strip()
return title, desc def _is_news_article(self, url: str) -> bool:
"""Filter URLs — only scrape actual articles."""
return True
def _extract_specifics(self, page, url: str):
"""Site-specific extraction. Override in subclasses."""
return "", "", None, []Here's an example scraper for a real news site:
class ExampleSiteScraper(BaseNewsScraper):
source_id = "example"
source_name = "Example News"
base_url = "https://www.example-news.com/markets/"
def _is_news_article(self, url: str) -> bool:
return bool(re.search(r'-\d+\.html', url))
def _extract_specifics(self, page, url: str):
# Title
title = page.css('h1::text').get() or ''
# Date from meta tag
published_at = None
for meta in page.css('meta'):
attrs = meta.attrib if hasattr(meta, 'attrib') else {}
if attrs.get('property') == 'article:published_time':
parsed = self._parse_date_generic(attrs['content'])
if parsed:
published_at = parsed.isoformat()
break
# Content
paras = page.css('article p::text').getall()
text = '\n\n'.join(p.strip() for p in paras if len(p.strip()) > 50)
# Extract mentioned stock symbols from links
symbols = []
for link in page.css('.content a'):
href = link.css('::attr(href)').get() or ''
text = link.css('::text').get() or ''
if '/stocks/' in href:
symbols.append({
'name': text.strip(),
'code': href.rstrip('/').split('/')[-1],
'url': href if href.startswith('http') else f"https://example.com{href}"
})
return title.strip(), text, published_at, symbolsSome sites require authenticated sessions or pass Cloudflare challenges via
cookies. rookiepy extracts cookies from your installed browsers so you
can pass them to scrapling requests.
# rookiepy is a transitive dep of scrapling, but pin it explicitly if needed
pip install rookiepy>=0.2.0import rookiepy
from http.cookiejar import CookieJar
# Extract cookies for a specific domain from Chrome
cookies_raw = rookiepy.chrome(['.example.com'])
cookies = rookiepy.to_cookiejar(cookies_raw) # → CookieJar
# Fallback to Firefox if Chrome has none
if not list(cookies):
cookies_raw = rookiepy.firefox(['.example.com'])
cookies = rookiepy.to_cookiejar(cookies_raw)
# Or use the generic extractor
cookies_raw = rookiepy.get('https://www.example.com')
cookies = rookiepy.to_cookiejar(cookies_raw)from scrapling.fetchers import Fetcher
cookies = rookiepy.to_cookiejar(
rookiepy.chrome(['.example.com'])
)
page = Fetcher.get(url, impersonate='chrome', cookies=cookies)For multi-page scraping that needs persistent auth:
from scrapling.fetchers import FetcherSession
with FetcherSession() as session:
# Extract cookies from browser and attach
cookies = rookiepy.to_cookiejar(
rookiepy.chrome(['.target-site.com'])
)
# Session maintains cookies across requests
page1 = session.get('https://target-site.com/dashboard')
page2 = session.get('https://target-site.com/settings')class BaseNewsScraper:
def __init__(self):
self._cookies = None
def _load_browser_cookies(self, domain: str):
"""Extract cookies from installed browser."""
try:
import rookiepy
raw = rookiepy.chrome([domain])
if not list(raw):
raw = rookiepy.firefox([domain])
self._cookies = rookiepy.to_cookiejar(raw)
except Exception:
self._cookies = None
def _fetch_page(self, url: str):
try:
kwargs = {'impersonate': 'chrome'}
if self._cookies:
kwargs['cookies'] = self._cookies
return Fetcher.get(url, **kwargs)
except Exception as e:
print(f"Error fetching {url}: {e}")
return NoneThen enable cookies per scraper:
class SiteWithAuthScraper(BaseNewsScraper):
source_id = "authenticated_site"
base_url = "https://target-site.com/news/"
def __init__(self):
super().__init__()
# Load cookies on init — user must be logged into Chrome
self._load_browser_cookies('.target-site.com')| Function | Source |
|---|---|
rookiepy.chrome(domains) |
Google Chrome / Chromium |
rookiepy.firefox(domains) |
Mozilla Firefox |
rookiepy.edge(domains) |
Microsoft Edge |
rookiepy.brave(domains) |
Brave Browser |
rookiepy.get(url) |
Any available browser (auto-detect) |
- The user must be logged into the site in their browser for cookies to contain valid session tokens.
- Cookies expire — re-extract periodically for long-running scrapers.
- Headless environments (servers, Docker) have no browser to extract from. In that case, pass cookies via env vars or a cookie file instead.
Fetches from all scrapers in parallel and assembles results in a fixed order:
import concurrent.futures
class NewsAggregator:
def __init__(self):
self.scrapers = {
'example': ExampleSiteScraper(),
'another': AnotherSiteScraper(),
'third': ThirdSiteScraper(),
}
def fetch_from_source(self, source_id: str, limit: int = 25):
scraper = self.scrapers.get(source_id)
if not scraper:
return []
return scraper.fetch_latest_news(limit)
def fetch_all(self, limit_per_source: int = 10):
"""Fetch from all sources concurrently."""
source_order = list(self.scrapers.keys())
all_news = []
with concurrent.futures.ThreadPoolExecutor(
max_workers=len(self.scrapers)
) as executor:
futures = {
executor.submit(self.scrapers[sid].fetch_latest_news, limit_per_source): sid
for sid in source_order
}
results = {}
for future in concurrent.futures.as_completed(futures):
sid = futures[future]
try:
results[sid] = future.result()
except Exception as e:
print(f"Error from {sid}: {e}")
results[sid] = []
# Assemble in fixed order for deterministic output
for sid in source_order:
all_news.extend(results.get(sid, []))
return all_news
def fetch_article(self, source_id: str, url: str):
scraper = self.scrapers.get(source_id)
if not scraper:
scraper = BaseNewsScraper() # generic fallback
return scraper.fetch_article_content(url)
def group_news_by_symbol(self, articles: list) -> dict:
"""Group articles by stock symbol code."""
groups = {}
for a in articles:
for sym in a.get('symbols', []):
code = sym.get('code')
if not code:
continue
if code not in groups:
groups[code] = []
if not any(ex['id'] == a['id'] for ex in groups[code]):
groups[code].append({
'id': a['id'],
'headline': a.get('headline', ''),
'source': a.get('source', ''),
'sourceUrl': a.get('sourceUrl', ''),
'publishedAt': a.get('publishedAt', ''),
})
return groups_aggregator = NewsAggregator()
NEWS_SOURCES = [
{'id': s.source_id, 'name': s.source_name, 'url': s.base_url}
for s in _aggregator.scrapers.values()
]
def fetch_news(source: str = None, limit: int = 25):
"""Fetch headlines. `source=None` or `'all'` fetches from every source."""
if source is None or source == 'all':
per_source = max(limit // len(_aggregator.scrapers), 5)
return _aggregator.fetch_all(per_source)
return _aggregator.fetch_from_source(source, limit)
def fetch_article_content(url: str):
"""Fetch full article. Source is auto-detected from URL domain."""
source_id = 'unknown'
for sid, scraper in _aggregator.scrapers.items():
domain = scraper.base_url.split('/')[2]
if domain in url:
source_id = sid
break
return _aggregator.fetch_article(source_id, url)from news_api import fetch_news, fetch_article_content, NEWS_SOURCES
# List sources
for s in NEWS_SOURCES:
print(f" {s['id']:20s} → {s['name']}")
# Headlines from one source
items = fetch_news(source="example", limit=5)
for item in items:
print(item['headline'], item['sourceUrl'])
# Headlines from ALL sources
all_items = fetch_news(limit=30)
# Full article
article = fetch_article_content(all_items[0]["sourceUrl"])
print(article["headline"])
print(article["description"][:300])# 1. Subclass
class MyScraper(BaseNewsScraper):
source_id = "mysite"
source_name = "My Site"
base_url = "https://mysite.com/news/"
def _is_news_article(self, url):
return "/story/" in url
def _extract_specifics(self, page, url):
title = page.css("h1::text").get() or ""
meta = page.css('meta[property="article:published_time"]')
date = meta[0].attrib.get("content") if meta else None
paras = page.css("article p::text").getall()
text = "\n\n".join(p.strip() for p in paras if len(p) > 50)
return title, text, date, []
# 2. Register
from news_api import _aggregator
_aggregator.scrapers["mysite"] = MyScraper()| Goal | Code |
|---|---|
Text of first <h1> |
page.css('h1::text').get() |
All <p> text |
page.css('p::text').getall() |
All <a> elements |
page.css('a') |
| Attribute of first match | page.css('.class').attrib.get('href') |
| Text inside a div | page.css('.article p::text').getall() |
| JSON-LD blocks | page.css('script[type="application/ld+json"]') |
| Meta tag content | page.css('meta[property="og:title"]').attrib.get('content') |
When a site-specific extractor returns empty fields, it cascades:
_extract_specifics(page, url) ← your override
│ returns empty
▼
_generic_article_extraction(page)
├─ h1 / h2.title for title
├─ article p / .story p / div p for content
├─ <time> / .date / .time elements for date
└─ ld+json → meta tags (og:title, og:description)
- Paywalls: Many news sites truncate content. ld+json often contains the full description as a fallback.
- DOM breakage: Sites change layouts. Monitor and update selectors.
- Rate limits: No built-in throttling. Add
time.sleep(1)between calls if you get HTTP 429s. - Cloudflare blocks: The
impersonate='chrome'flag helps, but some networks still trigger 1015 errors. Use a VPN if needed. - Date parsing: Generic parser handles common formats, but relative dates ("2 hours ago") won't parse.
- Non-blocking: The library is synchronous. Wrap in
asyncio.to_thread()for async apps.
#!/usr/bin/env python3
from news_api import fetch_news, fetch_article_content
items = fetch_news(limit=30)
for item in items:
print(f"[{item['source']:16s}] {item['headline']}")
print(f" {item['sourceUrl']}\n")
if items:
detail = fetch_article_content(items[0]["sourceUrl"])
print("--- DETAIL ---")
print(detail["headline"])
print(detail.get("description", "(no content)")[:500])
if detail.get("symbols"):
print("Symbols:", ", ".join(s["code"] for s in detail["symbols"]))