Skip to content

Instantly share code, notes, and snippets.

@WomB0ComB0
Last active February 28, 2025 14:39
Show Gist options
  • Save WomB0ComB0/f4ec5cf82b4a40570e4d65f7ac6d6e4a to your computer and use it in GitHub Desktop.
Save WomB0ComB0/f4ec5cf82b4a40570e4d65f7ac6d6e4a to your computer and use it in GitHub Desktop.
Generate article/document summarizations with the Google Gemini API
"""
A comprehensive web scraping and content analysis tool that extracts, summarizes, and documents technical articles.
This module provides functionality to:
- Scrape technical content from web pages
- Extract metadata and images
- Generate AI-powered summaries using Google's Gemini model
- Create and update Google Docs with the processed content
The tool uses type hints throughout and follows strict type safety practices.
Dependencies:
- requests: For making HTTP requests
- beautifulsoup4: For HTML parsing
- google-generativeai: For AI content generation
- google-auth: For Google API authentication
- google-api-python-client: For Google Docs integration
- tenacity: For retry logic
- python-dotenv: For environment variable management
"""
import requests
from requests.exceptions import RequestException, HTTPError
from bs4 import BeautifulSoup, Tag
import google.generativeai as genai
from google.oauth2 import service_account
from googleapiclient.discovery import build
from typing import Dict, List, Tuple, Optional, Any, TypedDict
import logging
import os
import re
from urllib.parse import urljoin, urlparse
from tenacity import retry, stop_after_attempt, wait_exponential
from dotenv import load_dotenv, find_dotenv
from concurrent.futures import ThreadPoolExecutor
load_dotenv(find_dotenv())
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[logging.FileHandler("scraper.log"), logging.StreamHandler()],
)
logger = logging.getLogger(__name__)
class Config(TypedDict):
"""Configuration type definition for API credentials.
Attributes:
GOOGLE_API_KEY: API key for Google services
GOOGLE_CREDENTIALS_FILE: Path to service account credentials file
"""
GOOGLE_API_KEY: str
GOOGLE_CREDENTIALS_FILE: str
class Metadata(TypedDict):
"""Type definition for article metadata.
Attributes:
title: Article title
description: Article description/summary
author: Content author
date_published: Publication date
category: Content category
emoji: Category emoji indicator
"""
title: Optional[str]
description: Optional[str]
author: Optional[str]
date_published: Optional[str]
category: Optional[str]
emoji: Optional[str]
class Image(TypedDict):
"""Type definition for image data.
Attributes:
url: Image source URL
alt: Alt text description
context: Surrounding text context
is_blob: Whether image is stored as blob
"""
url: str
alt: str
context: str
is_blob: bool
class ConfigManager:
"""Manages configuration and credential loading.
Handles loading and validation of required API credentials and configuration values.
"""
def __init__(self) -> None:
"""Initialize configuration manager and load config."""
self.config = self._load_config()
def _load_config(self) -> Config:
"""Load and validate configuration from environment.
Returns:
Config: Validated configuration dictionary
Raises:
ValueError: If required environment variables are missing
FileNotFoundError: If credentials file doesn't exist
"""
try:
google_creds_file = os.getenv("GOOGLE_CREDENTIALS_FILE", "")
if not google_creds_file:
raise ValueError("GOOGLE_CREDENTIALS_FILE environment variable not set")
if not os.path.exists(google_creds_file):
raise FileNotFoundError(
f"Credentials file not found at: {google_creds_file}"
)
google_api_key = os.getenv("GOOGLE_API_KEY", "")
if not google_api_key:
raise ValueError("GOOGLE_API_KEY environment variable not set")
config = {
"GOOGLE_API_KEY": google_api_key,
"GOOGLE_CREDENTIALS_FILE": google_creds_file,
}
return config
except Exception as e:
logger.error("Error loading configuration: %s", str(e))
raise
class FileMetadata:
"""Container for file metadata information.
Attributes:
filename: Name of the file
metadata: Dictionary of metadata key-value pairs
error: Optional error message
"""
def __init__(
self, filename: str, metadata: Dict[str, List[str]], error: Optional[str] = None
) -> None:
"""Initialize file metadata container.
Args:
filename: Name of the file
metadata: Dictionary of metadata
error: Optional error message
"""
self.filename = filename
self.metadata = metadata
self.error = error
def to_dict(self) -> Dict[str, Any]:
"""Convert metadata to dictionary format.
Returns:
Dict containing the metadata fields
"""
return {
"filename": self.filename,
"metadata": self.metadata,
"error": self.error,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "FileMetadata":
"""Create FileMetadata instance from dictionary.
Args:
data: Dictionary containing metadata fields
Returns:
New FileMetadata instance
"""
return cls(
filename=data.get("filename", ""),
metadata=data.get("metadata", {}),
error=data.get("error"),
)
class LLMQueryManager:
"""Manages interactions with Google's Gemini LLM.
Handles configuration and querying of the Gemini model for content generation.
"""
def __init__(self, config: Config) -> None:
"""Initialize LLM query manager.
Args:
config: Configuration dictionary with API credentials
"""
self.config = config
genai.configure(api_key=config["GOOGLE_API_KEY"])
self.model = genai.GenerativeModel("gemini-2.0-flash")
async def query_model(
self, query: str, max_tokens: int = 2048, temperature: float = 0.7
) -> Dict[str, Any]:
"""Query the Gemini model with enhanced prompting.
Args:
query: Input text prompt
max_tokens: Maximum response length
temperature: Sampling temperature
Returns:
Dictionary containing model response
Raises:
Exception: If model query fails
"""
try:
enhanced_query = (
f"Please provide a comprehensive and detailed response to the following query. "
f"Include examples, technical details, and relevant context where applicable.\n\n"
f"{query}"
)
generation_config = {
"temperature": temperature,
"max_output_tokens": max_tokens,
"top_p": 0.9,
"top_k": 40,
}
response = self.model.generate_content(
enhanced_query, generation_config=generation_config
)
if len(response.text.split()) < 200:
follow_up = (
f"The previous response was too brief. Please expand on the following points:\n"
f"1. Provide more technical details about the key concepts.\n"
f"2. Include additional examples or use cases.\n"
f"3. Add more context about the author's perspective or industry trends.\n"
f"Here is the original response:\n{response.text}"
)
response = self.model.generate_content(
follow_up, generation_config=generation_config
)
return {"response": response.text}
except Exception as e:
logger.error("Error querying Gemini Pro: %s", str(e))
raise
class ContentScraper:
"""A web content scraper that extracts text, metadata, and images from technical articles.
This class handles fetching web pages, parsing HTML content, and extracting relevant
information using BeautifulSoup. It includes retry logic for failed requests and
concurrent processing of different content types.
Attributes:
session (requests.Session): Configured session for making HTTP requests
"""
def __init__(self):
"""Initialize the scraper with a configured requests session."""
self.session = requests.Session()
self.session.headers.update(
{
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
"Accept-Encoding": "gzip, deflate, br",
}
)
@retry(
stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)
)
def scrape_url(self, url: str) -> Tuple[str, List[Dict[str, Any]], Metadata]:
"""
Scrapes URL with advanced content extraction and metadata
Returns: (text_content, images, metadata)
"""
try:
# Use HEAD request first to check content type and size
head_response = self.session.head(url, timeout=10, allow_redirects=True)
content_type = head_response.headers.get("Content-Type", "").lower()
# More flexible content type check
if not any(
html_type in content_type
for html_type in ["text/html", "application/xhtml+xml"]
):
logger.warning(
"Content type '%s' may not be HTML, proceeding anyway", content_type
)
# Get content with streaming for large pages
response = self.session.get(url, timeout=15)
response.raise_for_status()
# Use the response content directly instead of iter_content
soup = BeautifulSoup(response.content, "html.parser")
# Extract metadata and content in parallel
with ThreadPoolExecutor() as executor:
metadata_future = executor.submit(self._extract_metadata, soup)
content_future = executor.submit(self._extract_main_content, soup)
images_future = executor.submit(self._extract_images, soup, url)
metadata = metadata_future.result()
content = content_future.result()
images = images_future.result()
# Enhanced web-specific categorization
category = metadata.get("category", "")
if category and "web" in category.lower():
metadata = self._enhance_web_metadata(metadata, content)
return content, images, metadata
except Exception as e:
logger.error("Error scraping URL %s: %s", url, str(e))
raise
def _enhance_web_metadata(self, metadata: Metadata, content: str) -> Metadata:
"""Enhance metadata for web development related content.
Detects web frameworks and technologies mentioned in the content
and adds them to the metadata.
Args:
metadata: The current metadata dictionary
content: The article content to analyze
Returns:
Enhanced metadata dictionary with framework and technology information
"""
frameworks = self._detect_web_frameworks(content)
if frameworks:
metadata["frameworks"] = ", ".join(frameworks)
technologies = self._detect_web_technologies(content)
if technologies:
metadata["technologies"] = ", ".join(technologies)
return metadata
def _detect_web_frameworks(self, content: str) -> List[str]:
"""Detect mentions of web frameworks in the content.
Args:
content: The text content to analyze
Returns:
List of detected framework names
"""
framework_patterns = {
"React": r"\bReact(\.js)?\b",
"Angular": r"\bAngular\b",
"Vue": r"\bVue(\.js)?\b",
"Svelte": r"\bSvelte\b",
"Next.js": r"\bNext\.js\b",
"Nuxt.js": r"\bNuxt\.js\b",
"Express": r"\bExpress(\.js)?\b",
"Django": r"\bDjango\b",
"Flask": r"\bFlask\b",
"Laravel": r"\bLaravel\b",
}
return [
fw
for fw, pattern in framework_patterns.items()
if re.search(pattern, content, re.I)
]
def _detect_web_technologies(self, content: str) -> List[str]:
"""Detect mentions of web technologies in the content.
Args:
content: The text content to analyze
Returns:
List of detected technology names
"""
tech_patterns: Dict[str, str] = {
"TypeScript": r"\bTypeScript\b",
"Webpack": r"\bWebpack\b",
"GraphQL": r"\bGraphQL\b",
"REST": r"\bREST(ful)?\b",
"WebSocket": r"\bWebSocket\b",
"PWA": r"\bPWA\b",
"SPA": r"\bSPA\b",
"SSR": r"\bSSR\b",
"CSR": r"\bCSR\b",
}
return [
tech
for tech, pattern in tech_patterns.items()
if re.search(pattern, content, re.I)
]
def _extract_main_content(self, soup: BeautifulSoup) -> str:
"""Extract the main content from the parsed HTML.
Attempts to find the article content first, falling back to a more
general content extraction approach if needed.
Args:
soup: Parsed BeautifulSoup HTML document
Returns:
Extracted and cleaned main content text
"""
article = self._extract_article(soup)
if article:
return self._clean_content(article.get_text())
return self._extract_main_content_fallback(soup)
def _extract_main_content_fallback(self, soup: BeautifulSoup) -> str:
"""Fallback method for extracting main content when article extraction fails.
Removes common non-content elements and attempts to find content
based on common HTML patterns.
Args:
soup: Parsed BeautifulSoup HTML document
Returns:
Extracted and cleaned content text
"""
for element in soup.find_all(
[
"script",
"style",
"nav",
"header",
"footer",
"iframe",
"aside",
"form",
"button",
"noscript",
]
):
element.decompose()
main_content = soup.find(
["article", "main", "div"],
{"class": re.compile(r"article|post|content|entry", re.I)},
)
if not main_content:
main_content = soup
content_elements = main_content.find_all(
["p", "h1", "h2", "h3", "h4", "h5", "h6"]
)
content = "\n\n".join([elem.get_text().strip() for elem in content_elements])
lines = content.splitlines()
unique_lines = []
seen = set()
for line in lines:
if line and line not in seen:
seen.add(line)
unique_lines.append(line)
return "\n".join(unique_lines)
def _extract_article(self, soup: BeautifulSoup) -> Optional[Tag]:
"""Extract the main article content using a scoring system.
Scores HTML elements based on various heuristics to identify
the most likely article content container.
Args:
soup: Parsed BeautifulSoup HTML document
Returns:
The highest scoring article element, or None if no suitable element is found
"""
candidates = soup.find_all(["article", "div", "section"])
if not candidates:
return None
scored = []
for elem in candidates:
score = 0
text = elem.get_text().strip()
score += min(len(text.split()) / 100, 10)
score += len(elem.find_all("p")) * 0.5
score += len(elem.find_all(["h1", "h2", "h3"])) * 0.5
score -= len(elem.find_all("a")) * 0.1
scored.append((score, elem))
return max(scored, key=lambda x: x[0])[1]
def _extract_metadata(self, soup: BeautifulSoup) -> Metadata:
"""Extract metadata from the HTML document.
Extracts standard metadata like title, description, author,
and publication date from meta tags and common HTML patterns.
Args:
soup: Parsed BeautifulSoup HTML document
Returns:
Dictionary containing the extracted metadata
"""
metadata: Metadata = {
"title": soup.title.string if soup.title else None,
"description": None,
"author": None,
"date_published": None,
"category": "", # Ensure category is always a string
"emoji": None,
}
meta_desc = soup.find("meta", {"name": "description"})
if meta_desc:
metadata["description"] = meta_desc.get("content")
author_elements = soup.find_all(
["a", "span", "meta"],
{
"class": re.compile(r"author|byline", re.I),
"name": re.compile(r"author", re.I),
},
)
if author_elements:
metadata["author"] = author_elements[0].get_text().strip()
date_elements = soup.find_all(
["time", "meta"],
{
"class": re.compile(r"date|published|posted", re.I),
"property": re.compile(r"published_time", re.I),
},
)
if date_elements:
metadata["date_published"] = date_elements[0].get(
"datetime", date_elements[0].get_text().strip()
)
return metadata
def _extract_images(
self, soup: BeautifulSoup, base_url: str
) -> List[Dict[str, Any]]:
"""Extract images and their metadata from the HTML document.
Finds all images, resolves their URLs, and extracts surrounding
context and alt text.
Args:
soup: Parsed BeautifulSoup HTML document
base_url: The base URL for resolving relative image URLs
Returns:
List of dictionaries containing image metadata
"""
images: List[Dict[str, Any]] = []
for img in soup.find_all("img"):
try:
src = img.get("src", "")
if not src:
continue
if not bool(urlparse(src).netloc):
src = urljoin(base_url, src)
context = self._get_image_context(img)
images.append(
{
"url": src,
"alt": img.get("alt", ""),
"context": context,
"is_blob": "blob" in src,
}
)
except RequestException as e:
logger.warning("Error processing image: %s", str(e))
continue
return images
def _get_image_context(self, img_tag: Any) -> str:
"""Extract contextual text surrounding an image.
Looks for captions and surrounding paragraphs to provide
context for the image.
Args:
img_tag: BeautifulSoup Tag object for the image
Returns:
String containing the combined contextual text
"""
context = []
caption = img_tag.find_next(["figcaption", "caption"])
if caption:
context.append(caption.get_text().strip())
prev_p = img_tag.find_previous("p")
next_p = img_tag.find_next("p")
if prev_p:
context.append(prev_p.get_text().strip())
if next_p:
context.append(next_p.get_text().strip())
return " ".join(context)
def _categorize_content(self, content: str) -> Tuple[str, str]:
"""Categorize the content based on keyword analysis.
Analyzes the content for technology-related keywords to determine
the primary category and assigns an appropriate emoji.
Args:
content: The text content to analyze
Returns:
Tuple of (category_name, category_emoji)
"""
content_lower = content.lower()
if any(
word in content_lower
for word in ["web", "frontend", "backend", "full stack"]
):
return "web", "🌐"
elif any(
word in content_lower for word in ["mobile", "ios", "android", "flutter"]
):
return "mobile", "📱"
elif any(
word in content_lower
for word in ["ai", "machine learning", "deep learning", "llm"]
):
return "ai", "🤖"
elif any(
word in content_lower
for word in ["cyber", "security", "hack", "vulnerability"]
):
return "cybersecurity", "🔒"
elif any(word in content_lower for word in ["cloud", "aws", "azure", "gcp"]):
return "cloud", "☁️"
elif any(
word in content_lower
for word in ["devops", "ci/cd", "kubernetes", "docker"]
):
return "devops", "🛠️"
elif any(
word in content_lower for word in ["data", "database", "sql", "big data"]
):
return "data", "📊"
elif any(
word in content_lower
for word in ["hardware", "iot", "embedded", "raspberry pi"]
):
return "hardware", "🖥️"
elif any(
word in content_lower
for word in ["software", "programming", "code", "developer"]
):
return "software", "💻"
else:
return "general", "📰"
def _clean_content(self, text: str) -> str:
"""Clean and deduplicate extracted text content.
Normalizes whitespace and removes duplicate lines while
preserving meaningful content.
Args:
text: The raw text content to clean
Returns:
Cleaned and deduplicated text content
"""
text = re.sub(r"\s+", " ", text).strip()
lines = text.splitlines()
unique_lines = []
seen = set()
for line in lines:
if line and line not in seen:
seen.add(line)
unique_lines.append(line)
return "\n".join(unique_lines)
class ContentSummarizer:
"""A content summarization service that uses LLM to generate technical article summaries.
This class handles generating comprehensive summaries of technical content using
an LLM query manager. It includes prompt engineering and follow-up queries to
ensure high quality, detailed summaries.
Attributes:
config (Config): Configuration object containing API settings
llm_query_manager (LLMQueryManager): Manager for LLM API interactions
"""
def __init__(self, config: Config):
"""Initialize the summarizer with config and LLM query manager.
Args:
config (Config): Configuration object containing API settings
"""
self.config = config
self.llm_query_manager = LLMQueryManager(config)
async def summarize(self, content: str, metadata: Metadata, url: str) -> str:
"""Generate a comprehensive summary of technical content using LLM.
Sends the content to the LLM with a specialized system prompt and handles
follow-up queries if the initial summary is too brief.
Args:
content (str): The technical content to summarize
metadata (Metadata): Article metadata like title, author etc.
url (str): Source URL of the content
Returns:
str: The generated summary text
Raises:
Exception: If summarization fails
"""
system_prompt = self._get_system_prompt(url, metadata)
full_query = system_prompt + "\n\n" + content
try:
response = await self.llm_query_manager.query_model(full_query)
summary = response["response"]
if len(summary.split()) < 200:
logger.info("Summary is too short, requesting more details")
follow_up_query = (
f"The previous summary was too brief. Please expand on the following points:\n"
f"1. Provide more technical details about the key concepts.\n"
f"2. Include additional examples or use cases.\n"
f"3. Add more context about the author's perspective or industry trends.\n"
f"Here is the original summary:\n{summary}"
)
response = await self.llm_query_manager.query_model(follow_up_query)
summary = response["response"]
return summary
except Exception as e:
logger.error("Summarization error: %s", str(e))
raise
def _get_system_prompt(self, url: str, metadata: Metadata) -> str:
"""Generate the system prompt for the LLM summarization request.
Creates a detailed prompt that guides the LLM to generate a comprehensive
technical summary with specific sections and requirements.
Args:
url (str): Source URL of the content
metadata (Metadata): Article metadata containing title, author etc.
Returns:
str: The formatted system prompt
"""
return f"""You are an expert technical content analyzer and summarizer, specializing in software engineering,
full-stack development, app development, cybersecurity, and related technical fields.
Article Context:
- URL: {url}
- Title: {metadata.get('title', 'N/A')}
- Author: {metadata.get('author', 'N/A')}
- Date Published: {metadata.get('date_published', 'N/A')}
Your task is to:
1. Provide a comprehensive yet concise summary of the technical content (minimum 300 words).
2. Identify and explain key technical concepts, frameworks, or tools mentioned in detail.
3. Add relevant context and background information where necessary.
4. Include links to official documentation or relevant specifications when applicable.
5. Highlight any best practices or important technical considerations.
6. Note any potential limitations, challenges, or alternative approaches.
7. Structure the output with clear sections:
- Executive Summary (3-4 sentences)
- Key Technical Points (minimum 5 bullet points with detailed explanations)
- Detailed Analysis (in-depth discussion of the main concepts)
- Additional Resources & References (links to relevant documentation, tools, or further reading)
Please maintain technical accuracy while making the content accessible to developers
of varying experience levels. Include code examples or technical specifications only
if they are crucial for understanding the core concepts.
Article Content to Analyze:"""
class GoogleDocRequest(TypedDict):
"""Type definition for Google Docs text insertion request.
Attributes:
insertText (Dict[str, Any]): Text insertion parameters
"""
insertText: Dict[str, Any]
class GoogleDocImageRequest(TypedDict):
"""Type definition for Google Docs image insertion request.
Attributes:
insertInlineImage (Dict[str, Any]): Image insertion parameters
"""
insertInlineImage: Dict[str, Any]
class GoogleDocBatchUpdateRequest(TypedDict):
"""Type definition for Google Docs batch update request.
Attributes:
requests (List[Dict[str, Any]]): List of update operations
"""
requests: List[Dict[str, Any]]
class GoogleDocResponse(TypedDict):
"""Type definition for Google Docs API response.
Attributes:
documentId (str): ID of the created/updated document
title (str): Title of the document
"""
documentId: str
title: str
class GeminiResponse(TypedDict):
"""Type definition for Gemini API response.
Attributes:
candidates (List[Dict[str, Any]]): List of response candidates
"""
candidates: List[Dict[str, Any]]
class GoogleDocsManager:
"""Manager for Google Docs operations including document creation and updates.
Handles authentication and provides methods for creating new documents,
sharing them with users, and updating their content with summaries and images.
Attributes:
credentials (Credentials): Google service account credentials
docs_service (Resource): Google Docs API service
drive_service (Resource): Google Drive API service
markdown_converter (MarkdownConverter): Converter for markdown formatting
"""
def __init__(self, config: Config):
"""Initialize the manager with Google API credentials and services.
Args:
config (Config): Configuration containing Google credentials
"""
self.credentials = service_account.Credentials.from_service_account_file(
config["GOOGLE_CREDENTIALS_FILE"],
scopes=[
"https://www.googleapis.com/auth/documents",
"https://www.googleapis.com/auth/drive",
],
)
self.docs_service = build(
"docs", "v1", credentials=self.credentials, static_discovery=False
)
self.drive_service = build(
"drive", "v3", credentials=self.credentials, static_discovery=False
)
self.markdown_converter = MarkdownConverter()
def create_document(self, title: str, user_email: str) -> str:
"""Create a new Google Doc and share it with a user.
Args:
title (str): Title for the new document
user_email (str): Email address to share the document with
Returns:
str: ID of the created document
Raises:
Exception: If document creation or sharing fails
"""
try:
# pylint: disable=no-member
doc: GoogleDocResponse = (
self.docs_service.documents().create(body={"title": title}).execute()
)
document_id = str(doc["documentId"])
# pylint: disable=no-member
self.drive_service.permissions().create( # type: ignore
fileId=document_id,
body={"type": "user", "role": "writer", "emailAddress": user_email},
).execute()
return document_id
except Exception as e:
logger.error("Error creating Google Doc: %s", str(e))
raise
def update_document(
self,
document_id: str,
summary: str,
images: List[FileMetadata],
metadata: Metadata,
) -> None:
"""Update a Google Doc with formatted content and images.
Args:
document_id (str): ID of the document to update
summary (str): Formatted summary text
images (List[FileMetadata]): List of image metadata
metadata (Metadata): Article metadata
Raises:
Exception: If document update fails
"""
try:
formatted_summary = self.markdown_converter.convert(summary)
document_requests: List[Dict[str, Any]] = self._create_update_requests(
formatted_summary, images, metadata
)
# pylint: disable=no-member
self.docs_service.documents().batchUpdate(
documentId=document_id, body={"requests": document_requests}
).execute()
logger.info("Google Doc updated successfully")
except Exception as e:
logger.error("Error updating Google Doc: %s", str(e))
raise
def _create_update_requests(
self, summary: str, images: List[FileMetadata], metadata: Metadata
) -> List[Dict[str, Any]]:
"""Create a list of update requests for document content.
Generates requests to insert title, metadata, summary text and images
in the correct order and format.
Args:
summary (str): Formatted summary text
images (List[FileMetadata]): List of image metadata
metadata (Metadata): Article metadata
Returns:
List[Dict[str, Any]]: List of update operation requests
"""
update_requests: List[Dict[str, Any]] = []
title_request: Dict[str, Any] = {
"insertText": {
"location": {"index": 1},
"text": f"{metadata.get('emoji', '📰')} Technical Article Summary: {metadata.get('title', 'Untitled')}\n\n",
}
}
update_requests.append(title_request)
metadata_text = (
f"Source: {metadata.get('title', 'N/A')}\n"
f"Author: {metadata.get('author', 'N/A')}\n"
f"Date Published: {metadata.get('date_published', 'N/A')}\n"
f"Category: {metadata.get('category', 'general')}\n\n"
)
metadata_request: Dict[str, Any] = {
"insertText": {"location": {"index": 1}, "text": metadata_text}
}
update_requests.append(metadata_request)
update_requests.append(
{"insertText": {"location": {"index": 1}, "text": summary + "\n\n"}}
)
for img in images:
if img.metadata.get("is_blob"):
try:
image_request: Dict[str, Any] = {
"insertInlineImage": {
"location": {"index": 1},
"uri": img.filename,
"objectSize": {
"height": {"magnitude": 300, "unit": "PT"},
"width": {"magnitude": 400, "unit": "PT"},
},
}
}
update_requests.append(image_request)
except (RequestException, HTTPError) as e:
logger.warning("Error adding image %s: %s", img.filename, str(e))
continue
return update_requests
class MarkdownConverter:
"""A class for converting Markdown formatted text to plain text.
This class handles converting common Markdown syntax elements like headers, bold/italic text,
links, images, code blocks, blockquotes, and lists into plain text while preserving the
content and readability.
Attributes:
patterns (Dict[str, Pattern]): Dictionary of compiled regex patterns for matching
different Markdown syntax elements. Keys are pattern names and values are
compiled regex Pattern objects.
"""
def __init__(self):
"""Initialize the MarkdownConverter with regex patterns for Markdown syntax."""
self.patterns = {
"header": re.compile(r"^(#{1,6})\s*(.*)"),
"bold": re.compile(r"\*\*(.*?)\*\*"),
"italic": re.compile(r"\*(.*?)\*"),
"link": re.compile(r"\[(.*?)\]\((.*?)\)"),
"image": re.compile(r"!\[(.*?)\]\((.*?)\)"),
"code": re.compile(r"`(.*?)`"),
"blockquote": re.compile(r"^>\s*(.*)"),
"unordered_list": re.compile(r"^[\*\-\+]\s*(.*)"),
"ordered_list": re.compile(r"^\d+\.\s*(.*)"),
"horizontal_rule": re.compile(r"^---|___|\*\*\*"),
"line_break": re.compile(r" $"),
}
def convert(self, markdown: str) -> str:
"""Convert Markdown formatted text to plain text.
Processes the input Markdown text line by line, removing Markdown syntax elements
while preserving the content and structure of the text.
Args:
markdown (str): The Markdown formatted text to convert.
Returns:
str: The converted plain text with Markdown syntax removed.
Example:
>>> converter = MarkdownConverter()
>>> markdown = "# Title\\n**Bold text** and *italic*\\n> Quote"
>>> converter.convert(markdown)
'Title\\nBold text and italic\\nQuote'
"""
lines = markdown.splitlines()
plain_lines = []
for line in lines:
line = line.strip()
if not line:
plain_lines.append("")
continue
header_match = self.patterns["header"].match(line)
if header_match:
line = header_match.group(2)
line = self.patterns["bold"].sub(r"\1", line)
line = self.patterns["italic"].sub(r"\1", line)
line = self.patterns["link"].sub(r"\1", line)
line = self.patterns["image"].sub(r"\1", line)
line = self.patterns["code"].sub(r"\1", line)
blockquote_match = self.patterns["blockquote"].match(line)
if blockquote_match:
line = blockquote_match.group(1)
unordered_match = self.patterns["unordered_list"].match(line)
if unordered_match:
line = unordered_match.group(1)
ordered_match = self.patterns["ordered_list"].match(line)
if ordered_match:
line = ordered_match.group(1)
if self.patterns["horizontal_rule"].match(line):
continue
if self.patterns["line_break"].search(line):
continue
plain_lines.append(line)
return "\n".join(plain_lines)
async def main() -> None:
try:
config_manager = ConfigManager()
scraper = ContentScraper()
summarizer = ContentSummarizer(config_manager.config)
docs_manager = GoogleDocsManager(config_manager.config)
url = input("Enter the URL to scrape: ").strip()
user_email = input("Enter your Google email address: ").strip()
if not url.startswith(("http://", "https://")):
raise ValueError("Invalid URL. Please include http:// or https://")
logger.info("Scraping content from %s", url)
content, images, metadata = scraper.scrape_url(url)
file_metadata_images = [
FileMetadata(
filename=img["url"],
metadata={
"url": img["url"],
"alt": img["alt"],
"context": img["context"],
"is_blob": img["is_blob"]
}
)
for img in images
]
logger.info("Generating summary")
summary = await summarizer.summarize(content, metadata, url)
logger.info("Creating new Google Doc")
doc_title = metadata.get("title", "Technical Article Summary")
document_id = docs_manager.create_document(doc_title, user_email)
logger.info("Updating Google Doc")
docs_manager.update_document(document_id, summary, file_metadata_images, metadata)
logger.info("Process completed successfully! Document ID: %s", document_id)
except Exception as e:
logger.error("Error in main process: %s", str(e))
raise
if __name__ == "__main__":
import asyncio
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment