Last active
February 28, 2025 14:39
-
-
Save WomB0ComB0/f4ec5cf82b4a40570e4d65f7ac6d6e4a to your computer and use it in GitHub Desktop.
Generate article/document summarizations with the Google Gemini API
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
A comprehensive web scraping and content analysis tool that extracts, summarizes, and documents technical articles. | |
This module provides functionality to: | |
- Scrape technical content from web pages | |
- Extract metadata and images | |
- Generate AI-powered summaries using Google's Gemini model | |
- Create and update Google Docs with the processed content | |
The tool uses type hints throughout and follows strict type safety practices. | |
Dependencies: | |
- requests: For making HTTP requests | |
- beautifulsoup4: For HTML parsing | |
- google-generativeai: For AI content generation | |
- google-auth: For Google API authentication | |
- google-api-python-client: For Google Docs integration | |
- tenacity: For retry logic | |
- python-dotenv: For environment variable management | |
""" | |
import requests | |
from requests.exceptions import RequestException, HTTPError | |
from bs4 import BeautifulSoup, Tag | |
import google.generativeai as genai | |
from google.oauth2 import service_account | |
from googleapiclient.discovery import build | |
from typing import Dict, List, Tuple, Optional, Any, TypedDict | |
import logging | |
import os | |
import re | |
from urllib.parse import urljoin, urlparse | |
from tenacity import retry, stop_after_attempt, wait_exponential | |
from dotenv import load_dotenv, find_dotenv | |
from concurrent.futures import ThreadPoolExecutor | |
load_dotenv(find_dotenv()) | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s - %(levelname)s - %(message)s", | |
handlers=[logging.FileHandler("scraper.log"), logging.StreamHandler()], | |
) | |
logger = logging.getLogger(__name__) | |
class Config(TypedDict): | |
"""Configuration type definition for API credentials. | |
Attributes: | |
GOOGLE_API_KEY: API key for Google services | |
GOOGLE_CREDENTIALS_FILE: Path to service account credentials file | |
""" | |
GOOGLE_API_KEY: str | |
GOOGLE_CREDENTIALS_FILE: str | |
class Metadata(TypedDict): | |
"""Type definition for article metadata. | |
Attributes: | |
title: Article title | |
description: Article description/summary | |
author: Content author | |
date_published: Publication date | |
category: Content category | |
emoji: Category emoji indicator | |
""" | |
title: Optional[str] | |
description: Optional[str] | |
author: Optional[str] | |
date_published: Optional[str] | |
category: Optional[str] | |
emoji: Optional[str] | |
class Image(TypedDict): | |
"""Type definition for image data. | |
Attributes: | |
url: Image source URL | |
alt: Alt text description | |
context: Surrounding text context | |
is_blob: Whether image is stored as blob | |
""" | |
url: str | |
alt: str | |
context: str | |
is_blob: bool | |
class ConfigManager: | |
"""Manages configuration and credential loading. | |
Handles loading and validation of required API credentials and configuration values. | |
""" | |
def __init__(self) -> None: | |
"""Initialize configuration manager and load config.""" | |
self.config = self._load_config() | |
def _load_config(self) -> Config: | |
"""Load and validate configuration from environment. | |
Returns: | |
Config: Validated configuration dictionary | |
Raises: | |
ValueError: If required environment variables are missing | |
FileNotFoundError: If credentials file doesn't exist | |
""" | |
try: | |
google_creds_file = os.getenv("GOOGLE_CREDENTIALS_FILE", "") | |
if not google_creds_file: | |
raise ValueError("GOOGLE_CREDENTIALS_FILE environment variable not set") | |
if not os.path.exists(google_creds_file): | |
raise FileNotFoundError( | |
f"Credentials file not found at: {google_creds_file}" | |
) | |
google_api_key = os.getenv("GOOGLE_API_KEY", "") | |
if not google_api_key: | |
raise ValueError("GOOGLE_API_KEY environment variable not set") | |
config = { | |
"GOOGLE_API_KEY": google_api_key, | |
"GOOGLE_CREDENTIALS_FILE": google_creds_file, | |
} | |
return config | |
except Exception as e: | |
logger.error("Error loading configuration: %s", str(e)) | |
raise | |
class FileMetadata: | |
"""Container for file metadata information. | |
Attributes: | |
filename: Name of the file | |
metadata: Dictionary of metadata key-value pairs | |
error: Optional error message | |
""" | |
def __init__( | |
self, filename: str, metadata: Dict[str, List[str]], error: Optional[str] = None | |
) -> None: | |
"""Initialize file metadata container. | |
Args: | |
filename: Name of the file | |
metadata: Dictionary of metadata | |
error: Optional error message | |
""" | |
self.filename = filename | |
self.metadata = metadata | |
self.error = error | |
def to_dict(self) -> Dict[str, Any]: | |
"""Convert metadata to dictionary format. | |
Returns: | |
Dict containing the metadata fields | |
""" | |
return { | |
"filename": self.filename, | |
"metadata": self.metadata, | |
"error": self.error, | |
} | |
@classmethod | |
def from_dict(cls, data: Dict[str, Any]) -> "FileMetadata": | |
"""Create FileMetadata instance from dictionary. | |
Args: | |
data: Dictionary containing metadata fields | |
Returns: | |
New FileMetadata instance | |
""" | |
return cls( | |
filename=data.get("filename", ""), | |
metadata=data.get("metadata", {}), | |
error=data.get("error"), | |
) | |
class LLMQueryManager: | |
"""Manages interactions with Google's Gemini LLM. | |
Handles configuration and querying of the Gemini model for content generation. | |
""" | |
def __init__(self, config: Config) -> None: | |
"""Initialize LLM query manager. | |
Args: | |
config: Configuration dictionary with API credentials | |
""" | |
self.config = config | |
genai.configure(api_key=config["GOOGLE_API_KEY"]) | |
self.model = genai.GenerativeModel("gemini-2.0-flash") | |
async def query_model( | |
self, query: str, max_tokens: int = 2048, temperature: float = 0.7 | |
) -> Dict[str, Any]: | |
"""Query the Gemini model with enhanced prompting. | |
Args: | |
query: Input text prompt | |
max_tokens: Maximum response length | |
temperature: Sampling temperature | |
Returns: | |
Dictionary containing model response | |
Raises: | |
Exception: If model query fails | |
""" | |
try: | |
enhanced_query = ( | |
f"Please provide a comprehensive and detailed response to the following query. " | |
f"Include examples, technical details, and relevant context where applicable.\n\n" | |
f"{query}" | |
) | |
generation_config = { | |
"temperature": temperature, | |
"max_output_tokens": max_tokens, | |
"top_p": 0.9, | |
"top_k": 40, | |
} | |
response = self.model.generate_content( | |
enhanced_query, generation_config=generation_config | |
) | |
if len(response.text.split()) < 200: | |
follow_up = ( | |
f"The previous response was too brief. Please expand on the following points:\n" | |
f"1. Provide more technical details about the key concepts.\n" | |
f"2. Include additional examples or use cases.\n" | |
f"3. Add more context about the author's perspective or industry trends.\n" | |
f"Here is the original response:\n{response.text}" | |
) | |
response = self.model.generate_content( | |
follow_up, generation_config=generation_config | |
) | |
return {"response": response.text} | |
except Exception as e: | |
logger.error("Error querying Gemini Pro: %s", str(e)) | |
raise | |
class ContentScraper: | |
"""A web content scraper that extracts text, metadata, and images from technical articles. | |
This class handles fetching web pages, parsing HTML content, and extracting relevant | |
information using BeautifulSoup. It includes retry logic for failed requests and | |
concurrent processing of different content types. | |
Attributes: | |
session (requests.Session): Configured session for making HTTP requests | |
""" | |
def __init__(self): | |
"""Initialize the scraper with a configured requests session.""" | |
self.session = requests.Session() | |
self.session.headers.update( | |
{ | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", | |
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", | |
"Accept-Language": "en-US,en;q=0.5", | |
"Accept-Encoding": "gzip, deflate, br", | |
} | |
) | |
@retry( | |
stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10) | |
) | |
def scrape_url(self, url: str) -> Tuple[str, List[Dict[str, Any]], Metadata]: | |
""" | |
Scrapes URL with advanced content extraction and metadata | |
Returns: (text_content, images, metadata) | |
""" | |
try: | |
# Use HEAD request first to check content type and size | |
head_response = self.session.head(url, timeout=10, allow_redirects=True) | |
content_type = head_response.headers.get("Content-Type", "").lower() | |
# More flexible content type check | |
if not any( | |
html_type in content_type | |
for html_type in ["text/html", "application/xhtml+xml"] | |
): | |
logger.warning( | |
"Content type '%s' may not be HTML, proceeding anyway", content_type | |
) | |
# Get content with streaming for large pages | |
response = self.session.get(url, timeout=15) | |
response.raise_for_status() | |
# Use the response content directly instead of iter_content | |
soup = BeautifulSoup(response.content, "html.parser") | |
# Extract metadata and content in parallel | |
with ThreadPoolExecutor() as executor: | |
metadata_future = executor.submit(self._extract_metadata, soup) | |
content_future = executor.submit(self._extract_main_content, soup) | |
images_future = executor.submit(self._extract_images, soup, url) | |
metadata = metadata_future.result() | |
content = content_future.result() | |
images = images_future.result() | |
# Enhanced web-specific categorization | |
category = metadata.get("category", "") | |
if category and "web" in category.lower(): | |
metadata = self._enhance_web_metadata(metadata, content) | |
return content, images, metadata | |
except Exception as e: | |
logger.error("Error scraping URL %s: %s", url, str(e)) | |
raise | |
def _enhance_web_metadata(self, metadata: Metadata, content: str) -> Metadata: | |
"""Enhance metadata for web development related content. | |
Detects web frameworks and technologies mentioned in the content | |
and adds them to the metadata. | |
Args: | |
metadata: The current metadata dictionary | |
content: The article content to analyze | |
Returns: | |
Enhanced metadata dictionary with framework and technology information | |
""" | |
frameworks = self._detect_web_frameworks(content) | |
if frameworks: | |
metadata["frameworks"] = ", ".join(frameworks) | |
technologies = self._detect_web_technologies(content) | |
if technologies: | |
metadata["technologies"] = ", ".join(technologies) | |
return metadata | |
def _detect_web_frameworks(self, content: str) -> List[str]: | |
"""Detect mentions of web frameworks in the content. | |
Args: | |
content: The text content to analyze | |
Returns: | |
List of detected framework names | |
""" | |
framework_patterns = { | |
"React": r"\bReact(\.js)?\b", | |
"Angular": r"\bAngular\b", | |
"Vue": r"\bVue(\.js)?\b", | |
"Svelte": r"\bSvelte\b", | |
"Next.js": r"\bNext\.js\b", | |
"Nuxt.js": r"\bNuxt\.js\b", | |
"Express": r"\bExpress(\.js)?\b", | |
"Django": r"\bDjango\b", | |
"Flask": r"\bFlask\b", | |
"Laravel": r"\bLaravel\b", | |
} | |
return [ | |
fw | |
for fw, pattern in framework_patterns.items() | |
if re.search(pattern, content, re.I) | |
] | |
def _detect_web_technologies(self, content: str) -> List[str]: | |
"""Detect mentions of web technologies in the content. | |
Args: | |
content: The text content to analyze | |
Returns: | |
List of detected technology names | |
""" | |
tech_patterns: Dict[str, str] = { | |
"TypeScript": r"\bTypeScript\b", | |
"Webpack": r"\bWebpack\b", | |
"GraphQL": r"\bGraphQL\b", | |
"REST": r"\bREST(ful)?\b", | |
"WebSocket": r"\bWebSocket\b", | |
"PWA": r"\bPWA\b", | |
"SPA": r"\bSPA\b", | |
"SSR": r"\bSSR\b", | |
"CSR": r"\bCSR\b", | |
} | |
return [ | |
tech | |
for tech, pattern in tech_patterns.items() | |
if re.search(pattern, content, re.I) | |
] | |
def _extract_main_content(self, soup: BeautifulSoup) -> str: | |
"""Extract the main content from the parsed HTML. | |
Attempts to find the article content first, falling back to a more | |
general content extraction approach if needed. | |
Args: | |
soup: Parsed BeautifulSoup HTML document | |
Returns: | |
Extracted and cleaned main content text | |
""" | |
article = self._extract_article(soup) | |
if article: | |
return self._clean_content(article.get_text()) | |
return self._extract_main_content_fallback(soup) | |
def _extract_main_content_fallback(self, soup: BeautifulSoup) -> str: | |
"""Fallback method for extracting main content when article extraction fails. | |
Removes common non-content elements and attempts to find content | |
based on common HTML patterns. | |
Args: | |
soup: Parsed BeautifulSoup HTML document | |
Returns: | |
Extracted and cleaned content text | |
""" | |
for element in soup.find_all( | |
[ | |
"script", | |
"style", | |
"nav", | |
"header", | |
"footer", | |
"iframe", | |
"aside", | |
"form", | |
"button", | |
"noscript", | |
] | |
): | |
element.decompose() | |
main_content = soup.find( | |
["article", "main", "div"], | |
{"class": re.compile(r"article|post|content|entry", re.I)}, | |
) | |
if not main_content: | |
main_content = soup | |
content_elements = main_content.find_all( | |
["p", "h1", "h2", "h3", "h4", "h5", "h6"] | |
) | |
content = "\n\n".join([elem.get_text().strip() for elem in content_elements]) | |
lines = content.splitlines() | |
unique_lines = [] | |
seen = set() | |
for line in lines: | |
if line and line not in seen: | |
seen.add(line) | |
unique_lines.append(line) | |
return "\n".join(unique_lines) | |
def _extract_article(self, soup: BeautifulSoup) -> Optional[Tag]: | |
"""Extract the main article content using a scoring system. | |
Scores HTML elements based on various heuristics to identify | |
the most likely article content container. | |
Args: | |
soup: Parsed BeautifulSoup HTML document | |
Returns: | |
The highest scoring article element, or None if no suitable element is found | |
""" | |
candidates = soup.find_all(["article", "div", "section"]) | |
if not candidates: | |
return None | |
scored = [] | |
for elem in candidates: | |
score = 0 | |
text = elem.get_text().strip() | |
score += min(len(text.split()) / 100, 10) | |
score += len(elem.find_all("p")) * 0.5 | |
score += len(elem.find_all(["h1", "h2", "h3"])) * 0.5 | |
score -= len(elem.find_all("a")) * 0.1 | |
scored.append((score, elem)) | |
return max(scored, key=lambda x: x[0])[1] | |
def _extract_metadata(self, soup: BeautifulSoup) -> Metadata: | |
"""Extract metadata from the HTML document. | |
Extracts standard metadata like title, description, author, | |
and publication date from meta tags and common HTML patterns. | |
Args: | |
soup: Parsed BeautifulSoup HTML document | |
Returns: | |
Dictionary containing the extracted metadata | |
""" | |
metadata: Metadata = { | |
"title": soup.title.string if soup.title else None, | |
"description": None, | |
"author": None, | |
"date_published": None, | |
"category": "", # Ensure category is always a string | |
"emoji": None, | |
} | |
meta_desc = soup.find("meta", {"name": "description"}) | |
if meta_desc: | |
metadata["description"] = meta_desc.get("content") | |
author_elements = soup.find_all( | |
["a", "span", "meta"], | |
{ | |
"class": re.compile(r"author|byline", re.I), | |
"name": re.compile(r"author", re.I), | |
}, | |
) | |
if author_elements: | |
metadata["author"] = author_elements[0].get_text().strip() | |
date_elements = soup.find_all( | |
["time", "meta"], | |
{ | |
"class": re.compile(r"date|published|posted", re.I), | |
"property": re.compile(r"published_time", re.I), | |
}, | |
) | |
if date_elements: | |
metadata["date_published"] = date_elements[0].get( | |
"datetime", date_elements[0].get_text().strip() | |
) | |
return metadata | |
def _extract_images( | |
self, soup: BeautifulSoup, base_url: str | |
) -> List[Dict[str, Any]]: | |
"""Extract images and their metadata from the HTML document. | |
Finds all images, resolves their URLs, and extracts surrounding | |
context and alt text. | |
Args: | |
soup: Parsed BeautifulSoup HTML document | |
base_url: The base URL for resolving relative image URLs | |
Returns: | |
List of dictionaries containing image metadata | |
""" | |
images: List[Dict[str, Any]] = [] | |
for img in soup.find_all("img"): | |
try: | |
src = img.get("src", "") | |
if not src: | |
continue | |
if not bool(urlparse(src).netloc): | |
src = urljoin(base_url, src) | |
context = self._get_image_context(img) | |
images.append( | |
{ | |
"url": src, | |
"alt": img.get("alt", ""), | |
"context": context, | |
"is_blob": "blob" in src, | |
} | |
) | |
except RequestException as e: | |
logger.warning("Error processing image: %s", str(e)) | |
continue | |
return images | |
def _get_image_context(self, img_tag: Any) -> str: | |
"""Extract contextual text surrounding an image. | |
Looks for captions and surrounding paragraphs to provide | |
context for the image. | |
Args: | |
img_tag: BeautifulSoup Tag object for the image | |
Returns: | |
String containing the combined contextual text | |
""" | |
context = [] | |
caption = img_tag.find_next(["figcaption", "caption"]) | |
if caption: | |
context.append(caption.get_text().strip()) | |
prev_p = img_tag.find_previous("p") | |
next_p = img_tag.find_next("p") | |
if prev_p: | |
context.append(prev_p.get_text().strip()) | |
if next_p: | |
context.append(next_p.get_text().strip()) | |
return " ".join(context) | |
def _categorize_content(self, content: str) -> Tuple[str, str]: | |
"""Categorize the content based on keyword analysis. | |
Analyzes the content for technology-related keywords to determine | |
the primary category and assigns an appropriate emoji. | |
Args: | |
content: The text content to analyze | |
Returns: | |
Tuple of (category_name, category_emoji) | |
""" | |
content_lower = content.lower() | |
if any( | |
word in content_lower | |
for word in ["web", "frontend", "backend", "full stack"] | |
): | |
return "web", "🌐" | |
elif any( | |
word in content_lower for word in ["mobile", "ios", "android", "flutter"] | |
): | |
return "mobile", "📱" | |
elif any( | |
word in content_lower | |
for word in ["ai", "machine learning", "deep learning", "llm"] | |
): | |
return "ai", "🤖" | |
elif any( | |
word in content_lower | |
for word in ["cyber", "security", "hack", "vulnerability"] | |
): | |
return "cybersecurity", "🔒" | |
elif any(word in content_lower for word in ["cloud", "aws", "azure", "gcp"]): | |
return "cloud", "☁️" | |
elif any( | |
word in content_lower | |
for word in ["devops", "ci/cd", "kubernetes", "docker"] | |
): | |
return "devops", "🛠️" | |
elif any( | |
word in content_lower for word in ["data", "database", "sql", "big data"] | |
): | |
return "data", "📊" | |
elif any( | |
word in content_lower | |
for word in ["hardware", "iot", "embedded", "raspberry pi"] | |
): | |
return "hardware", "🖥️" | |
elif any( | |
word in content_lower | |
for word in ["software", "programming", "code", "developer"] | |
): | |
return "software", "💻" | |
else: | |
return "general", "📰" | |
def _clean_content(self, text: str) -> str: | |
"""Clean and deduplicate extracted text content. | |
Normalizes whitespace and removes duplicate lines while | |
preserving meaningful content. | |
Args: | |
text: The raw text content to clean | |
Returns: | |
Cleaned and deduplicated text content | |
""" | |
text = re.sub(r"\s+", " ", text).strip() | |
lines = text.splitlines() | |
unique_lines = [] | |
seen = set() | |
for line in lines: | |
if line and line not in seen: | |
seen.add(line) | |
unique_lines.append(line) | |
return "\n".join(unique_lines) | |
class ContentSummarizer: | |
"""A content summarization service that uses LLM to generate technical article summaries. | |
This class handles generating comprehensive summaries of technical content using | |
an LLM query manager. It includes prompt engineering and follow-up queries to | |
ensure high quality, detailed summaries. | |
Attributes: | |
config (Config): Configuration object containing API settings | |
llm_query_manager (LLMQueryManager): Manager for LLM API interactions | |
""" | |
def __init__(self, config: Config): | |
"""Initialize the summarizer with config and LLM query manager. | |
Args: | |
config (Config): Configuration object containing API settings | |
""" | |
self.config = config | |
self.llm_query_manager = LLMQueryManager(config) | |
async def summarize(self, content: str, metadata: Metadata, url: str) -> str: | |
"""Generate a comprehensive summary of technical content using LLM. | |
Sends the content to the LLM with a specialized system prompt and handles | |
follow-up queries if the initial summary is too brief. | |
Args: | |
content (str): The technical content to summarize | |
metadata (Metadata): Article metadata like title, author etc. | |
url (str): Source URL of the content | |
Returns: | |
str: The generated summary text | |
Raises: | |
Exception: If summarization fails | |
""" | |
system_prompt = self._get_system_prompt(url, metadata) | |
full_query = system_prompt + "\n\n" + content | |
try: | |
response = await self.llm_query_manager.query_model(full_query) | |
summary = response["response"] | |
if len(summary.split()) < 200: | |
logger.info("Summary is too short, requesting more details") | |
follow_up_query = ( | |
f"The previous summary was too brief. Please expand on the following points:\n" | |
f"1. Provide more technical details about the key concepts.\n" | |
f"2. Include additional examples or use cases.\n" | |
f"3. Add more context about the author's perspective or industry trends.\n" | |
f"Here is the original summary:\n{summary}" | |
) | |
response = await self.llm_query_manager.query_model(follow_up_query) | |
summary = response["response"] | |
return summary | |
except Exception as e: | |
logger.error("Summarization error: %s", str(e)) | |
raise | |
def _get_system_prompt(self, url: str, metadata: Metadata) -> str: | |
"""Generate the system prompt for the LLM summarization request. | |
Creates a detailed prompt that guides the LLM to generate a comprehensive | |
technical summary with specific sections and requirements. | |
Args: | |
url (str): Source URL of the content | |
metadata (Metadata): Article metadata containing title, author etc. | |
Returns: | |
str: The formatted system prompt | |
""" | |
return f"""You are an expert technical content analyzer and summarizer, specializing in software engineering, | |
full-stack development, app development, cybersecurity, and related technical fields. | |
Article Context: | |
- URL: {url} | |
- Title: {metadata.get('title', 'N/A')} | |
- Author: {metadata.get('author', 'N/A')} | |
- Date Published: {metadata.get('date_published', 'N/A')} | |
Your task is to: | |
1. Provide a comprehensive yet concise summary of the technical content (minimum 300 words). | |
2. Identify and explain key technical concepts, frameworks, or tools mentioned in detail. | |
3. Add relevant context and background information where necessary. | |
4. Include links to official documentation or relevant specifications when applicable. | |
5. Highlight any best practices or important technical considerations. | |
6. Note any potential limitations, challenges, or alternative approaches. | |
7. Structure the output with clear sections: | |
- Executive Summary (3-4 sentences) | |
- Key Technical Points (minimum 5 bullet points with detailed explanations) | |
- Detailed Analysis (in-depth discussion of the main concepts) | |
- Additional Resources & References (links to relevant documentation, tools, or further reading) | |
Please maintain technical accuracy while making the content accessible to developers | |
of varying experience levels. Include code examples or technical specifications only | |
if they are crucial for understanding the core concepts. | |
Article Content to Analyze:""" | |
class GoogleDocRequest(TypedDict): | |
"""Type definition for Google Docs text insertion request. | |
Attributes: | |
insertText (Dict[str, Any]): Text insertion parameters | |
""" | |
insertText: Dict[str, Any] | |
class GoogleDocImageRequest(TypedDict): | |
"""Type definition for Google Docs image insertion request. | |
Attributes: | |
insertInlineImage (Dict[str, Any]): Image insertion parameters | |
""" | |
insertInlineImage: Dict[str, Any] | |
class GoogleDocBatchUpdateRequest(TypedDict): | |
"""Type definition for Google Docs batch update request. | |
Attributes: | |
requests (List[Dict[str, Any]]): List of update operations | |
""" | |
requests: List[Dict[str, Any]] | |
class GoogleDocResponse(TypedDict): | |
"""Type definition for Google Docs API response. | |
Attributes: | |
documentId (str): ID of the created/updated document | |
title (str): Title of the document | |
""" | |
documentId: str | |
title: str | |
class GeminiResponse(TypedDict): | |
"""Type definition for Gemini API response. | |
Attributes: | |
candidates (List[Dict[str, Any]]): List of response candidates | |
""" | |
candidates: List[Dict[str, Any]] | |
class GoogleDocsManager: | |
"""Manager for Google Docs operations including document creation and updates. | |
Handles authentication and provides methods for creating new documents, | |
sharing them with users, and updating their content with summaries and images. | |
Attributes: | |
credentials (Credentials): Google service account credentials | |
docs_service (Resource): Google Docs API service | |
drive_service (Resource): Google Drive API service | |
markdown_converter (MarkdownConverter): Converter for markdown formatting | |
""" | |
def __init__(self, config: Config): | |
"""Initialize the manager with Google API credentials and services. | |
Args: | |
config (Config): Configuration containing Google credentials | |
""" | |
self.credentials = service_account.Credentials.from_service_account_file( | |
config["GOOGLE_CREDENTIALS_FILE"], | |
scopes=[ | |
"https://www.googleapis.com/auth/documents", | |
"https://www.googleapis.com/auth/drive", | |
], | |
) | |
self.docs_service = build( | |
"docs", "v1", credentials=self.credentials, static_discovery=False | |
) | |
self.drive_service = build( | |
"drive", "v3", credentials=self.credentials, static_discovery=False | |
) | |
self.markdown_converter = MarkdownConverter() | |
def create_document(self, title: str, user_email: str) -> str: | |
"""Create a new Google Doc and share it with a user. | |
Args: | |
title (str): Title for the new document | |
user_email (str): Email address to share the document with | |
Returns: | |
str: ID of the created document | |
Raises: | |
Exception: If document creation or sharing fails | |
""" | |
try: | |
# pylint: disable=no-member | |
doc: GoogleDocResponse = ( | |
self.docs_service.documents().create(body={"title": title}).execute() | |
) | |
document_id = str(doc["documentId"]) | |
# pylint: disable=no-member | |
self.drive_service.permissions().create( # type: ignore | |
fileId=document_id, | |
body={"type": "user", "role": "writer", "emailAddress": user_email}, | |
).execute() | |
return document_id | |
except Exception as e: | |
logger.error("Error creating Google Doc: %s", str(e)) | |
raise | |
def update_document( | |
self, | |
document_id: str, | |
summary: str, | |
images: List[FileMetadata], | |
metadata: Metadata, | |
) -> None: | |
"""Update a Google Doc with formatted content and images. | |
Args: | |
document_id (str): ID of the document to update | |
summary (str): Formatted summary text | |
images (List[FileMetadata]): List of image metadata | |
metadata (Metadata): Article metadata | |
Raises: | |
Exception: If document update fails | |
""" | |
try: | |
formatted_summary = self.markdown_converter.convert(summary) | |
document_requests: List[Dict[str, Any]] = self._create_update_requests( | |
formatted_summary, images, metadata | |
) | |
# pylint: disable=no-member | |
self.docs_service.documents().batchUpdate( | |
documentId=document_id, body={"requests": document_requests} | |
).execute() | |
logger.info("Google Doc updated successfully") | |
except Exception as e: | |
logger.error("Error updating Google Doc: %s", str(e)) | |
raise | |
def _create_update_requests( | |
self, summary: str, images: List[FileMetadata], metadata: Metadata | |
) -> List[Dict[str, Any]]: | |
"""Create a list of update requests for document content. | |
Generates requests to insert title, metadata, summary text and images | |
in the correct order and format. | |
Args: | |
summary (str): Formatted summary text | |
images (List[FileMetadata]): List of image metadata | |
metadata (Metadata): Article metadata | |
Returns: | |
List[Dict[str, Any]]: List of update operation requests | |
""" | |
update_requests: List[Dict[str, Any]] = [] | |
title_request: Dict[str, Any] = { | |
"insertText": { | |
"location": {"index": 1}, | |
"text": f"{metadata.get('emoji', '📰')} Technical Article Summary: {metadata.get('title', 'Untitled')}\n\n", | |
} | |
} | |
update_requests.append(title_request) | |
metadata_text = ( | |
f"Source: {metadata.get('title', 'N/A')}\n" | |
f"Author: {metadata.get('author', 'N/A')}\n" | |
f"Date Published: {metadata.get('date_published', 'N/A')}\n" | |
f"Category: {metadata.get('category', 'general')}\n\n" | |
) | |
metadata_request: Dict[str, Any] = { | |
"insertText": {"location": {"index": 1}, "text": metadata_text} | |
} | |
update_requests.append(metadata_request) | |
update_requests.append( | |
{"insertText": {"location": {"index": 1}, "text": summary + "\n\n"}} | |
) | |
for img in images: | |
if img.metadata.get("is_blob"): | |
try: | |
image_request: Dict[str, Any] = { | |
"insertInlineImage": { | |
"location": {"index": 1}, | |
"uri": img.filename, | |
"objectSize": { | |
"height": {"magnitude": 300, "unit": "PT"}, | |
"width": {"magnitude": 400, "unit": "PT"}, | |
}, | |
} | |
} | |
update_requests.append(image_request) | |
except (RequestException, HTTPError) as e: | |
logger.warning("Error adding image %s: %s", img.filename, str(e)) | |
continue | |
return update_requests | |
class MarkdownConverter: | |
"""A class for converting Markdown formatted text to plain text. | |
This class handles converting common Markdown syntax elements like headers, bold/italic text, | |
links, images, code blocks, blockquotes, and lists into plain text while preserving the | |
content and readability. | |
Attributes: | |
patterns (Dict[str, Pattern]): Dictionary of compiled regex patterns for matching | |
different Markdown syntax elements. Keys are pattern names and values are | |
compiled regex Pattern objects. | |
""" | |
def __init__(self): | |
"""Initialize the MarkdownConverter with regex patterns for Markdown syntax.""" | |
self.patterns = { | |
"header": re.compile(r"^(#{1,6})\s*(.*)"), | |
"bold": re.compile(r"\*\*(.*?)\*\*"), | |
"italic": re.compile(r"\*(.*?)\*"), | |
"link": re.compile(r"\[(.*?)\]\((.*?)\)"), | |
"image": re.compile(r"!\[(.*?)\]\((.*?)\)"), | |
"code": re.compile(r"`(.*?)`"), | |
"blockquote": re.compile(r"^>\s*(.*)"), | |
"unordered_list": re.compile(r"^[\*\-\+]\s*(.*)"), | |
"ordered_list": re.compile(r"^\d+\.\s*(.*)"), | |
"horizontal_rule": re.compile(r"^---|___|\*\*\*"), | |
"line_break": re.compile(r" $"), | |
} | |
def convert(self, markdown: str) -> str: | |
"""Convert Markdown formatted text to plain text. | |
Processes the input Markdown text line by line, removing Markdown syntax elements | |
while preserving the content and structure of the text. | |
Args: | |
markdown (str): The Markdown formatted text to convert. | |
Returns: | |
str: The converted plain text with Markdown syntax removed. | |
Example: | |
>>> converter = MarkdownConverter() | |
>>> markdown = "# Title\\n**Bold text** and *italic*\\n> Quote" | |
>>> converter.convert(markdown) | |
'Title\\nBold text and italic\\nQuote' | |
""" | |
lines = markdown.splitlines() | |
plain_lines = [] | |
for line in lines: | |
line = line.strip() | |
if not line: | |
plain_lines.append("") | |
continue | |
header_match = self.patterns["header"].match(line) | |
if header_match: | |
line = header_match.group(2) | |
line = self.patterns["bold"].sub(r"\1", line) | |
line = self.patterns["italic"].sub(r"\1", line) | |
line = self.patterns["link"].sub(r"\1", line) | |
line = self.patterns["image"].sub(r"\1", line) | |
line = self.patterns["code"].sub(r"\1", line) | |
blockquote_match = self.patterns["blockquote"].match(line) | |
if blockquote_match: | |
line = blockquote_match.group(1) | |
unordered_match = self.patterns["unordered_list"].match(line) | |
if unordered_match: | |
line = unordered_match.group(1) | |
ordered_match = self.patterns["ordered_list"].match(line) | |
if ordered_match: | |
line = ordered_match.group(1) | |
if self.patterns["horizontal_rule"].match(line): | |
continue | |
if self.patterns["line_break"].search(line): | |
continue | |
plain_lines.append(line) | |
return "\n".join(plain_lines) | |
async def main() -> None: | |
try: | |
config_manager = ConfigManager() | |
scraper = ContentScraper() | |
summarizer = ContentSummarizer(config_manager.config) | |
docs_manager = GoogleDocsManager(config_manager.config) | |
url = input("Enter the URL to scrape: ").strip() | |
user_email = input("Enter your Google email address: ").strip() | |
if not url.startswith(("http://", "https://")): | |
raise ValueError("Invalid URL. Please include http:// or https://") | |
logger.info("Scraping content from %s", url) | |
content, images, metadata = scraper.scrape_url(url) | |
file_metadata_images = [ | |
FileMetadata( | |
filename=img["url"], | |
metadata={ | |
"url": img["url"], | |
"alt": img["alt"], | |
"context": img["context"], | |
"is_blob": img["is_blob"] | |
} | |
) | |
for img in images | |
] | |
logger.info("Generating summary") | |
summary = await summarizer.summarize(content, metadata, url) | |
logger.info("Creating new Google Doc") | |
doc_title = metadata.get("title", "Technical Article Summary") | |
document_id = docs_manager.create_document(doc_title, user_email) | |
logger.info("Updating Google Doc") | |
docs_manager.update_document(document_id, summary, file_metadata_images, metadata) | |
logger.info("Process completed successfully! Document ID: %s", document_id) | |
except Exception as e: | |
logger.error("Error in main process: %s", str(e)) | |
raise | |
if __name__ == "__main__": | |
import asyncio | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment