Created
August 10, 2024 23:29
-
-
Save ColeMurray/bd444c24873fb6cadfef218cfd1576ea to your computer and use it in GitHub Desktop.
Automated Website Data Extraction
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import logging | |
import time | |
from typing import Dict, Any | |
import requests | |
from bs4 import BeautifulSoup | |
from openai import OpenAI | |
from requests.exceptions import RequestException | |
from tenacity import retry, stop_after_attempt, wait_random_exponential | |
from usp.tree import sitemap_tree_for_homepage | |
# Set up logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
client = OpenAI() | |
def generate_schema(task_description: str, model: str) -> Dict[str, Any]: | |
logger.info("Generating complex schema based on task description") | |
prompt = f""" | |
Task: Create a detailed Pydantic schema for extracted data based on the following task description: | |
"{task_description}" | |
Instructions: | |
1. Analyze the task description carefully. | |
2. Create a JSON object representing a complex schema with nested structures if necessary. | |
3. Use the following format for each field: | |
"field_name": {{ | |
"type": "<python_type>", | |
"description": "<brief description>", | |
"optional": true/false, | |
"validation": {{ | |
// Additional validation rules (if any) | |
}} | |
}} | |
4. For nested structures, use "type": "dict" and include a "properties" key with sub-fields. | |
5. For lists of objects, use "type": "list" and include an "items" key with the object schema. | |
6. Available Python types: str, int, float, bool, datetime, url, email, list, dict | |
7. Always include "url" and "confidence" fields in the root of the schema. | |
Example of a valid complex schema: | |
{{ | |
"url": {{ | |
"type": "str", | |
"description": "Source URL of the extracted data", | |
"optional": false | |
}}, | |
"title": {{ | |
"type": "str", | |
"description": "Title of the content", | |
"optional": false, | |
"validation": {{ | |
"min_length": 5, | |
"max_length": 200 | |
}} | |
}}, | |
"publish_date": {{ | |
"type": "datetime", | |
"description": "Publication date of the content", | |
"optional": true | |
}}, | |
"author": {{ | |
"type": "dict", | |
"description": "Author information", | |
"optional": true, | |
"properties": {{ | |
"name": {{ | |
"type": "str", | |
"description": "Author's name", | |
"optional": false | |
}}, | |
"email": {{ | |
"type": "email", | |
"description": "Author's email", | |
"optional": true | |
}} | |
}} | |
}}, | |
"tags": {{ | |
"type": "list", | |
"description": "List of tags associated with the content", | |
"optional": true, | |
"items": {{ | |
"type": "str" | |
}} | |
}}, | |
"confidence": {{ | |
"type": "float", | |
"description": "Confidence score of the extraction", | |
"optional": false, | |
"validation": {{ | |
"min_value": 0.0, | |
"max_value": 1.0 | |
}} | |
}} | |
}} | |
Now, generate a complex schema for the given task. Respond with only the JSON schema: | |
""" | |
response = client.chat.completions.create( | |
model=model, | |
messages=[{"role": "user", "content": prompt}], | |
response_format={"type": "json_object"} | |
) | |
try: | |
schema = json.loads(response.choices[0].message.content) | |
validate_schema(schema) | |
logger.info(f"Generated complex schema: {json.dumps(schema, indent=2)}") | |
return schema | |
except json.JSONDecodeError as e: | |
logger.error(f"Error decoding JSON from model output: {str(e)}") | |
raise | |
except ValueError as e: | |
logger.error(f"Invalid schema structure: {str(e)}") | |
raise | |
except Exception as e: | |
logger.error(f"Unexpected error in schema generation: {str(e)}") | |
raise | |
def validate_schema(schema: Dict[str, Any]): | |
required_fields = ["url", "confidence"] | |
for field in required_fields: | |
if field not in schema: | |
raise ValueError(f"Schema is missing required field: {field}") | |
def validate_field(field_name: str, field_schema: Dict[str, Any]): | |
required_keys = ["type", "description", "optional"] | |
for key in required_keys: | |
if key not in field_schema: | |
raise ValueError(f"Field '{field_name}' is missing required key: {key}") | |
valid_types = ["str", "int", "float", "bool", "datetime", "url", "email", "list", "dict"] | |
if field_schema["type"] not in valid_types: | |
raise ValueError(f"Field '{field_name}' has invalid type: {field_schema['type']}") | |
if field_schema["type"] == "dict" and "properties" not in field_schema: | |
raise ValueError(f"Field '{field_name}' of type 'dict' is missing 'properties'") | |
if field_schema["type"] == "list" and "items" not in field_schema: | |
raise ValueError(f"Field '{field_name}' of type 'list' is missing 'items'") | |
for field_name, field_schema in schema.items(): | |
validate_field(field_name, field_schema) | |
from pydantic import BaseModel, Field, ValidationError, create_model, EmailStr, HttpUrl, constr, \ | |
ConfigDict | |
from typing import List, Dict, Any, Optional, Literal | |
def create_pydantic_model(schema: Dict[str, Any]): | |
logger.info("Creating complex Pydantic model from schema") | |
def create_field(field_schema: Dict[str, Any]): | |
field_type = field_schema["type"] | |
is_optional = field_schema.get("optional", False) | |
validation = field_schema.get("validation", {}) | |
if field_type == "str": | |
if "enum" in validation: | |
field = Literal[tuple(validation["enum"])] | |
else: | |
field = constr(**{k: v for k, v in validation.items() if k in ["min_length", "max_length"]}) | |
elif field_type == "int": | |
field = int | |
elif field_type == "float": | |
field = float | |
elif field_type == "bool": | |
field = bool | |
elif field_type == "datetime": | |
field = datetime | |
elif field_type == "url": | |
field = HttpUrl | |
elif field_type == "email": | |
field = EmailStr | |
elif field_type == "list": | |
item_field = create_field(field_schema["items"]) | |
field = List[item_field] | |
elif field_type == "dict": | |
nested_model = create_pydantic_model(field_schema["properties"]) | |
field = nested_model | |
else: | |
logger.warning(f"Unknown field type: {field_type}. Defaulting to str.") | |
field = str | |
return Optional[field] if is_optional else field | |
fields = {} | |
for field_name, field_schema in schema.items(): | |
field_type = create_field(field_schema) | |
field_description = field_schema.get("description", "") | |
is_optional = field_schema.get("optional", False) | |
validation = field_schema.get("validation", {}) | |
field_info = Field( | |
default=None if is_optional else ..., | |
description=field_description, | |
**{k: v for k, v in validation.items() if k in ["gt", "ge", "lt", "le", "min_items", "max_items"]} | |
) | |
fields[field_name] = (field_type, field_info) | |
class ExtractedDataModel(BaseModel): | |
model_config = ConfigDict(arbitrary_types_allowed=True) | |
ExtractedData = create_model('ExtractedData', __base__=ExtractedDataModel, **fields) | |
logger.info(f"Created complex ExtractedData model: {ExtractedData.model_json_schema()}") | |
return ExtractedData | |
from datetime import datetime, timezone | |
def get_sitemap(site_url: str, max_urls: int = 100) -> List[str]: | |
logger.info(f"Fetching sitemap from {site_url}") | |
try: | |
def sort_key(page): | |
if page.last_modified is None: | |
return datetime.min.replace(tzinfo=timezone.utc) | |
if page.last_modified.tzinfo is None: | |
return page.last_modified.replace(tzinfo=timezone.utc) | |
return page.last_modified.astimezone(timezone.utc) | |
tree = sitemap_tree_for_homepage(site_url) | |
pages = list(tree.all_pages()) | |
pages = sorted(pages, key=sort_key, reverse=True) | |
logger.info(f"Found {len(pages)} pages in sitemap") | |
urls = [page.url for page in pages[:max_urls]] | |
urls = list(set(urls)) | |
logger.info(f"Returning {len(urls)} URLs for processing") | |
return urls | |
except Exception as e: | |
logger.error(f"Error fetching sitemap: {e}") | |
raise | |
@retry(stop=stop_after_attempt(3), wait=wait_random_exponential(min=1, max=60)) | |
def analyze_urls(urls: List[str], extraction_task: str, model: str) -> List[Dict[str, Any]]: | |
logger.info("Analyzing URLs for relevance") | |
prompt = f""" | |
Given the following extraction task: '{extraction_task}', analyze these URLs: | |
{json.dumps(urls, indent=2)} | |
For each URL, determine if it's relevant to the task and provide a brief reason. | |
You currently are in crawl mode: "low-budget" - be strict. only crawl a page if it is likely to be relevant. | |
Respond in the following JSON format: | |
{{ | |
"results": [ | |
{{ | |
"url": string, | |
"reason": string | |
"is_relevant": boolean, | |
}}, | |
... | |
] | |
}} | |
""" | |
print(f"Prompt: {prompt}") | |
try: | |
response = client.chat.completions.create( | |
model=model, | |
messages=[ | |
{"role": "system", | |
"content": "You are an expert data analysis assistant that analyzes URLs based on their relevance to a given task. In the reason, think through step by step why this is relevant. Be concise and critical."}, | |
{"role": "user", "content": prompt} | |
], | |
response_format={"type": "json_object"} | |
) | |
analyzed_urls = json.loads(response.choices[0].message.content) | |
print(analyzed_urls) | |
validated_urls = [url_data for url_data in analyzed_urls['results'] if url_data['is_relevant']] | |
logger.info(f"Analyzed {len(validated_urls)} relevant URLs") | |
return validated_urls | |
except Exception as e: | |
logger.error(f"Error analyzing URLs: {e}") | |
raise | |
@retry(stop=stop_after_attempt(3), wait=wait_random_exponential(min=1, max=60)) | |
def crawl_url(url): | |
logger.info(f"Crawling URL: {url}") | |
try: | |
response = requests.get(url, timeout=10) | |
response.raise_for_status() | |
return response.text | |
except RequestException as e: | |
logger.error(f"Error crawling URL {url}: {e}") | |
raise | |
def extract_main_content(html): | |
logger.info("Extracting main content from HTML") | |
soup = BeautifulSoup(html, 'html.parser') | |
for script in soup(["script", "style"]): | |
script.decompose() | |
return soup.get_text() | |
@retry(stop=stop_after_attempt(3), wait=wait_random_exponential(min=1, max=60)) | |
def extract_information(url, content, extraction_task, ExtractedData, model): | |
logger.info("Extracting complex information using LLM") | |
prompt = f""" | |
Given the following content from {url}, extract information relevant to this task: '{extraction_task}' | |
Respond in the following JSON format, adhering strictly to the provided schema: | |
{ExtractedData.schema_json()} | |
Ensure all required fields are filled and optional fields are included only if the information is available. | |
For nested structures and lists, provide complete and valid data. | |
Content: | |
{content[:8000]} # Increased limit for more context | |
""" | |
logger.info(f"Prompt: {prompt}") | |
try: | |
response = client.chat.completions.create( | |
model=model, | |
messages=[ | |
{"role": "system", | |
"content": "You are a helpful assistant that extracts specific, structured information from text based on a given task and schema. Only use the provided context, do not make up information"}, | |
{"role": "user", "content": prompt} | |
], | |
response_format={"type": "json_object"} | |
) | |
logger.info(f"Response: {response.choices[0].message.content}") | |
extracted_data = json.loads(response.choices[0].message.content) | |
validated_data = ExtractedData(**extracted_data) | |
return validated_data | |
except ValidationError as e: | |
logger.error(f"Validation error: {e}") | |
raise | |
except Exception as e: | |
logger.error(f"Error extracting information: {e}") | |
raise | |
def main(sitemap_url, extraction_task, model): | |
logger.info("Starting complex data extraction workflow") | |
try: | |
schema = generate_schema(extraction_task, model) | |
ExtractedData = create_pydantic_model(schema) | |
urls = get_sitemap(sitemap_url) | |
analyzed_urls = analyze_urls(urls, extraction_task, model) | |
results = [] | |
for url_analysis in analyzed_urls: | |
try: | |
html = crawl_url(url_analysis['url']) | |
content = extract_main_content(html) | |
extracted_info = extract_information(url_analysis['url'], content, extraction_task, ExtractedData, | |
model) | |
results.append(extracted_info.dict()) | |
time.sleep(1) # Be polite to the server | |
except Exception as e: | |
logger.error(f"Error processing URL {url_analysis['url']}: {e}") | |
continue | |
logger.info("Saving complex extraction results") | |
with open("complex_extraction_results.json", "w") as f: | |
json.dump(results, f, indent=2) | |
logger.info("Complex extraction complete!") | |
except Exception as e: | |
logger.error(f"An error occurred during the complex extraction process: {e}") | |
@retry(stop=stop_after_attempt(3), wait=wait_random_exponential(min=1, max=60)) | |
def consolidate_results(results: List[Dict[str, Any]], extraction_task: str, model: str) -> Dict[str, Any]: | |
logger.info("Consolidating extraction results") | |
prompt = f""" | |
Task: Consolidate the following extraction results into a single, comprehensive response. | |
Extraction task: '{extraction_task}' | |
Extracted data from multiple pages: | |
{json.dumps(results, indent=2)} | |
Instructions: | |
1. Analyze all the extracted data carefully. | |
2. Create a consolidated summary that includes all relevant information from all pages. | |
3. Organize the information logically and avoid redundancy. | |
4. If there are conflicting pieces of information, mention the discrepancies. | |
5. Provide an overall confidence score for the consolidated information. | |
6. Format your response as a JSON object with appropriate fields based on the extraction task. | |
7. Include a 'sources' field that lists the URLs from which information was extracted. | |
Respond with only the JSON object containing the consolidated information. | |
""" | |
try: | |
response = client.chat.completions.create( | |
model=model, | |
messages=[ | |
{"role": "system", | |
"content": "You are an expert data analyst tasked with consolidating and summarizing information extracted from multiple web pages."}, | |
{"role": "user", "content": prompt} | |
], | |
response_format={"type": "json_object"} | |
) | |
consolidated_data = json.loads(response.choices[0].message.content) | |
logger.info("Successfully consolidated extraction results") | |
return consolidated_data | |
except Exception as e: | |
logger.error(f"Error consolidating results: {e}") | |
raise | |
def main(sitemap_url, extraction_task, model): | |
logger.info("Starting complex data extraction workflow") | |
try: | |
schema = generate_schema(extraction_task, model) | |
ExtractedData = create_pydantic_model(schema) | |
urls = get_sitemap(sitemap_url) | |
analyzed_urls = analyze_urls(urls, extraction_task, model) | |
results = [] | |
for url_analysis in analyzed_urls: | |
try: | |
html = crawl_url(url_analysis['url']) | |
content = extract_main_content(html) | |
extracted_info = extract_information(url_analysis['url'], content, extraction_task, ExtractedData, | |
model) | |
results.append(extracted_info.dict()) | |
time.sleep(1) # Be polite to the server | |
except Exception as e: | |
logger.error(f"Error processing URL {url_analysis['url']}: {e}") | |
continue | |
logger.info("Saving individual extraction results") | |
with open("individual_extraction_results.json", "w") as f: | |
json.dump(results, f, indent=2) | |
logger.info("Consolidating results") | |
consolidated_result = consolidate_results(results, extraction_task, model) | |
logger.info("Saving consolidated extraction result") | |
with open("consolidated_extraction_result.json", "w") as f: | |
json.dump(consolidated_result, f, indent=2) | |
logger.info("Complex extraction and consolidation complete!") | |
except Exception as e: | |
logger.error(f"An error occurred during the complex extraction process: {str(e)}") | |
logger.exception("Detailed error information:") | |
if __name__ == "__main__": | |
sitemap_url = "https://example.com" | |
extraction_task = input("Please describe the complex extraction task: ") | |
model = "gpt-4o-mini" # You can change this to allow user input if desired | |
main(sitemap_url, extraction_task, model) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment