Skip to content

Instantly share code, notes, and snippets.

@grahama1970
Last active October 12, 2024 17:53
Show Gist options
  • Save grahama1970/33c7de0f7dc5827f5c9018f28ca21eb0 to your computer and use it in GitHub Desktop.
Save grahama1970/33c7de0f7dc5827f5c9018f28ca21eb0 to your computer and use it in GitHub Desktop.
import os
import requests
import asyncio
import regex as re
from requests.exceptions import RequestException
import json
from bs4 import BeautifulSoup
from pandas import read_html
from uuid import uuid4
import pandas as pd
from tenacity import retry, stop_after_attempt, wait_fixed
from dotenv import load_dotenv
import importlib
from tqdm.asyncio import tqdm
import concurrent.futures
import multiprocessing
from utils import check_memory_usage # Assuming you save it in a utils.py file
from verifaix.llm_client.get_litellm_response import get_litellm_response
# Load environment variables from .env file
load_dotenv('../.env')
class HTMLExtractor:
def __init__(self, directory=None, url=None, arango_db=None, memory_threshold_mb=500, task_timeout_seconds=60):
self.directory = directory
self.url = url
self.arango_db = arango_db
self.section_hierarchy = []
self._logger = None
self._text_normalizer = None
self._regex_patterns = None
self._arango_db_helper = None
self.memory_threshold_mb = memory_threshold_mb # Set memory threshold in MB
self.task_timeout_seconds = task_timeout_seconds # Set task timeout in seconds
# Validate environment variables
required_vars = ['ARANGO_DB_HOST', 'ARANGO_DB_USERNAME', 'ARANGO_DB_PASSWORD', 'ARANGO_DB_NAME', 'ARANGO_DB_COLLECTION_NAME']
for var in required_vars:
if not os.getenv(var):
raise ValueError(f"Environment variable {var} is missing or not set.")
###
# Logger and Utility Loaders
###
@property
def logger(self):
"""Lazy-load the ColoredLogger."""
if self._logger is None:
try:
ColoredLogger = importlib.import_module('verifaix.utils.colored_logger_mini').setup_logger
self._logger = ColoredLogger(__name__)
except ImportError as e:
print(f"Failed to import ColoredLogger: {e}. Using standard logger.")
import logging
self._logger = logging.getLogger(__name__)
self._logger.setLevel(logging.INFO)
return self._logger
@property
def text_normalizer(self):
"""Lazy-load the TextNormalizer with a fallback to basic whitespace normalization."""
if self._text_normalizer is None:
try:
TextNormalizer = importlib.import_module('verifaix.utils.text_normalizer').TextNormalizer
TextNormalizerConfig = importlib.import_module('verifaix.utils.text_normalizer').TextNormalizerConfig
config = TextNormalizerConfig(settings_type='advanced')
self._text_normalizer = TextNormalizer(config=config)
except ImportError as e:
self.logger.error(f"Failed to import TextNormalizer: {e}. Falling back to basic text normalization.")
self._text_normalizer = self._basic_text_normalizer
return self._text_normalizer
def _basic_text_normalizer(self, text):
"""Fallback text normalizer that removes excessive whitespace."""
self.logger.info("Using basic text normalization (whitespace removal).")
return re.sub(r'\s+', ' ', text).strip()
@property
def regex_patterns(self):
"""Lazy-load the RegexPatterns."""
if self._regex_patterns is None:
try:
RegexPatterns = importlib.import_module('verifaix.utils.regex_patterns').RegexPatterns
self._regex_patterns = RegexPatterns()
except ImportError as e:
self.logger.error(f"Failed to import RegexPatterns: {e}")
self._regex_patterns = {
"email": r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
"phone": r"\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}",
"address": r"\d+\s+\w+\s+\w+\s+\w+",
"date": r"\d{1,2}/\d{1,2}/\d{2,4}"
}
return self._regex_patterns
@property
def arango_db_helper(self):
"""Lazy-load the ArangoDBHelper."""
if self._arango_db_helper is None:
try:
ArangoDBHelper = importlib.import_module('verifaix.utils.arango_db_helper').ArangoDBHelper
self._arango_db_helper = ArangoDBHelper()
self.logger.debug("ArangoDBHelper loaded successfully.")
except ImportError as e:
self.logger.error(f"Failed to import ArangoDBHelper: {e}")
raise
return self._arango_db_helper
@property
def table_quality_evaluator(self):
"""Lazy-load the TableQualityEvaluator with a fallback."""
if self._table_quality_evaluator is None:
try:
TableQualityEvaluator = importlib.import_module('verifaix.camelot_extractor.table_quality_evaluator').TableQualityEvaluator
self._table_quality_evaluator = TableQualityEvaluator(pdf_path=self.url or self.directory)
except ImportError as e:
self.logger.error(f"Failed to import TableQualityEvaluator: {e}. Using fallback evaluator.")
self._table_quality_evaluator = self._basic_table_evaluator() # Fallback to basic evaluator
return self._table_quality_evaluator
def _basic_table_evaluator(self):
"""Fallback table quality evaluator that returns default scores."""
class BasicTableEvaluator:
def calculate_table_quality(self, tables):
return {'average_table_extraction_quality': 0.5, 'table_scores': [{'score': 0.5}]}
return BasicTableEvaluator()
###
# HTML Scraping and Processing Methods
###
@retry(stop=stop_after_attempt(3), wait=wait_fixed(2))
async def scrape_page(self, url):
"""Scrapes the page asynchronously and handles retries."""
try:
# Offload the blocking requests.get() to a separate thread
response = await asyncio.to_thread(requests.get, url)
response.raise_for_status()
return response.content
except RequestException as e:
self.logger.error(f"Error fetching page {url}: {e}")
raise
async def process_directory(self):
"""Processes all HTML files in the provided directory concurrently using available threads with enhanced memory management.
- Files are processed using a queue-based system to dynamically control memory usage.
- Memory checks ensure tasks are processed only when enough memory is available.
- Detailed logging is added to track the progress of file submissions and processing.
"""
if not self.directory or not os.path.isdir(self.directory):
self.logger.error(f"Provided directory '{self.directory}' is invalid.")
return
# Get list of HTML files in the directory
files = [f for f in os.listdir(self.directory) if f.endswith(".html")]
num_threads = multiprocessing.cpu_count() # Use available CPU threads
# Define chunk size for files to avoid overwhelming system resources
chunk_size = min(len(files), num_threads * 2) # Process files in batches, using double the number of available threads
# Create an asyncio Queue for task management
task_queue = asyncio.Queue()
# Load all files into the task queue
for file_name in files:
await task_queue.put(file_name)
self.logger.info(f"Added {task_queue.qsize()} files to the task queue for processing.")
# Process tasks dynamically, checking memory usage before each task
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
while not task_queue.empty():
# Intelligent memory management: wait until memory usage drops below threshold
while not check_memory_usage(self.memory_threshold_mb, self.logger):
self.logger.warning("Memory usage is high. Waiting for memory to clear...")
await asyncio.sleep(5) # Wait for memory to clear before processing the next batch of files
# Process files in chunks
file_chunk = []
for _ in range(min(chunk_size, task_queue.qsize())):
file_chunk.append(await task_queue.get())
self.logger.info(f"Processing a chunk of {len(file_chunk)} files.")
# Submit file processing tasks using run_in_executor
futures = {
asyncio.create_task(asyncio.to_thread(self._process_single_file, os.path.join(self.directory, file_name))): file_name
for file_name in file_chunk
}
# Throttle mechanism to ensure memory is not overused
await asyncio.sleep(0.1) # Small sleep to release resources and avoid overwhelming memory
# Asynchronous loop to gather results from the chunk
for future in asyncio.as_completed(futures):
file_name = futures[future]
try:
await future # Await the completion of each task
self.logger.info(f"Successfully processed {file_name}")
except Exception as e:
self.logger.error(f"Error processing file {file_name}: {e}")
# Log the number of remaining files in the queue
self.logger.info(f"{task_queue.qsize()} files remaining in the task queue.")
self.logger.info("Finished processing all files in the directory.")
@retry(stop=stop_after_attempt(3), wait=wait_fixed(2))
async def _process_single_file(self, file_path):
"""Helper method to read and process a single file.
- Uses asyncio.to_thread() to offload blocking file I/O to a separate thread, preventing the event loop from being blocked.
- The HTML content is read, sections are extracted, and data is stored in ArangoDB asynchronously.
This method is used in a multi-threaded context via ThreadPoolExecutor to allow parallel file processing.
Args:
file_path (str): The path to the HTML file being processed.
"""
if not check_memory_usage(self.memory_threshold_mb, self.logger):
await asyncio.sleep(2) # Wait for system to free up memory
return # Skip processing if memory is too high
try:
# Read the file content asynchronously
html_content = await asyncio.to_thread(self._read_file, file_path)
# Extract sections and store them
await self.extract_and_store_sections(html_content, file_path)
# Evaluate the tables for quality
section = next((s for s in self.section_hierarchy if 'tables' in s['content'] and s['content']['tables']), None)
if section:
tables = section['content']['tables']
quality_result = self.evaluate_tables(tables)
self.logger.info(f"Table quality evaluation result: {quality_result}")
except Exception as e:
self.logger.error(f"Error processing file {file_path}: {e}")
def evaluate_tables(self, tables):
"""Run the extracted tables through TableQualityEvaluator and log the results."""
self.logger.info(f"Evaluating {len(tables)} extracted tables for quality.")
try:
quality_result = self.table_quality_evaluator.calculate_table_quality(tables)
self.logger.info(f"Table quality evaluation completed. Average quality: {quality_result['average_table_extraction_quality']}")
return quality_result
except Exception as e:
self.logger.error(f"Error during table quality evaluation: {e}")
return {'average_table_extraction_quality': 0.0, 'table_scores': []} # Default to a neutral result
def _read_file(self, file_path):
"""Helper method to read files synchronously for use with asyncio.to_thread."""
try:
with open(file_path, 'r', encoding='utf-8') as f:
return f.read()
except FileNotFoundError:
self.logger.error(f"File not found: {file_path}")
return ""
except IOError as e:
self.logger.error(f"IO error while reading file {file_path}: {e}")
return ""
async def process_url(self):
main_page = await self.scrape_page(self.url)
soup = BeautifulSoup(main_page, 'html.parser')
links = [a['href'] for a in soup.find_all('a', href=True)]
await self.extract_and_store_sections(main_page, self.url)
async for link in tqdm(links, desc="Processing linked pages"):
full_url = requests.compat.urljoin(self.url, link)
try:
page_content = await self.scrape_page(full_url)
await self.extract_and_store_sections(page_content, full_url)
except Exception as e:
self.logger.error(f"Failed to scrape {full_url}: {e}")
###
# Section and Content Extraction Methods
###
async def extract_and_store_sections(self, html_content, page_url=None):
section_stack = self._parse_sections(html_content, page_url)
self._extract_content(html_content, section_stack)
tasks = self._generate_tasks_for_sections(section_stack)
async for _ in tqdm(asyncio.as_completed(tasks), total=len(tasks), desc="Processing sections"):
await _
await self._store_sections_in_arango()
def _parse_sections(self, html_content, page_url):
"""Parses headers and creates section hierarchy."""
soup = BeautifulSoup(html_content, 'html.parser')
page_id = page_url or 'local_file'
section_stack = []
for header in soup.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6']):
level = int(header.name[1])
title = header.get_text(strip=True)
section = {
'id': str(uuid4()),
'title': title,
'level': level,
'page_id': page_id,
'parent_section_id': section_stack[-1]['id'] if section_stack else None,
'content': {
'text': [],
'images': [],
'tables': []
}
}
self.section_hierarchy.append(section)
while section_stack and section_stack[-1]['level'] >= level:
section_stack.pop()
section_stack.append(section)
return section_stack
def _extract_content(self, html_content, section_stack):
"""Extracts text, images, and tables and associates them with sections."""
soup = BeautifulSoup(html_content, 'html.parser')
for p in soup.find_all('p'):
section = section_stack[-1]
section['content']['text'].append(p.get_text(strip=True))
for img in soup.find_all('img'):
section = section_stack[-1]
img_url = img['src']
section['content']['images'].append(img_url)
tables = read_html(html_content)
for table in tables:
section = section_stack[-1]
section['content']['tables'].append(table)
def _generate_tasks_for_sections(self, section_stack):
"""Generates async tasks for section LLM operations."""
tasks = []
for section in section_stack:
tasks.append(self._generate_summary_for_section(section))
tasks += [self._generate_image_description(section, img) for img in section['content']['images']]
tasks += [self._generate_table_metadata(section, table) for table in section['content']['tables']]
return tasks
async def _store_sections_in_arango(self):
failed_sections = []
async for section in tqdm(self.section_hierarchy, desc="Storing sections in ArangoDB"):
success = await self.insert_section_to_arango(section)
if not success:
failed_sections.append(section)
if failed_sections:
self.logger.error(f"Failed to store {len(failed_sections)} sections. Tracking for potential retry.")
self._log_failed_sections(failed_sections)
else:
self.logger.info("All sections stored successfully.")
@retry(stop=stop_after_attempt(3), wait=wait_fixed(2))
async def insert_section_to_arango(self, section):
try:
result = await self.insert_bulk_json(os.getenv('ARANGO_DB_COLLECTION_NAME', 'topics'), [section])
return result
except Exception as e:
self.logger.error(f"Failed to insert section '{section['title']}' into ArangoDB: {e}")
return False
def _log_failed_sections(self, failed_sections):
"""Logs failed sections for reprocessing or manual inspection."""
failed_file = "failed_sections.json"
try:
with open(failed_file, 'w', encoding='utf-8') as f:
json.dump(failed_sections, f, ensure_ascii=False, indent=4)
self.logger.info(f"Failed sections logged in {failed_file}")
except Exception as e:
self.logger.error(f"Error logging failed sections: {e}")
###
# Asynchronous LLM Methods for Summaries, Tables, and Images
###
async def _generate_image_description(self, section: dict, img_url: str):
"""Generates image description using LLM and adds it to the section content."""
system_message = "You are an AI assistant tasked with generating descriptions for images."
user_message = f"Generate a description for the image '{img_url}' in the section '{section['title']}'"
messages = [
{"role": "system", "content": system_message},
{"role": "user", "content": user_message}
]
llm_params = {"model": "openai/gpt-4o", "temperature": 0.7}
try:
response = await get_litellm_response(messages, llm_params)
description = response['choices'][0]['message']['content']
for img in section['content']['images']:
if img == img_url:
img_data = {'url': img_url, 'description': description}
section['content']['images'].append(img_data)
except Exception as e:
self.logger.error(f"Error generating image description for {img_url}: {e}")
async def _generate_table_metadata(self, section: dict, table: pd.DataFrame):
"""Generates metadata for a table using LLM, including title, description, and AQL queries."""
# Define the schema for table metadata extraction
table_schema = {
"section_title": section.get("title", "Unknown Section Title"),
"section_summary": section.get("summary", "No summary available"),
"tables": [
{
"title": (
"Title of Table extracted verbatim from surrounding text if it exists. "
"If the Table title does not exist, Infer a Table Title and prepend with 'Inferred:' \n"
"If a title cannot be inferred, leave this field as 'Unknown'."
),
"description": "Description of Table using the table data, title (if exists), and surrounding text",
"data": {
"header": "Well-formatted header columns (if they exist). If headers do not exist or are incomplete, output None. Use the header text verbatim.",
"data": "Well-formatted table data. Use table text verbatim. Beyond formatting, do not make any changes to the table data."
},
"queries": [
{
"minValue": 3,
"maxValue": 5,
"aql": "Generate 3-5 distinct AQL queries that accurately describe the table data. Each query should explore a different aspect of the table, such as filtering by connections, types, or signal descriptions. The queries should vary from simple filters to more complex techniques such as joins, subqueries, or sorting. For each AQL query, group it with its corresponding human questions.",
"human_questions": "For each AQL query, generate 3-5 human questions that represent the specific query. These questions should cover various levels of detail and be phrased differently to explore the same query in multiple ways. Some questions should reflect expert-level domain knowledge of hardware verification, like an understanding of prolog and state machines, while others should reflect a layperson's understanding, such as a Project Manager's level of knowledge. Make sure the questions are grouped with their corresponding AQL query."
}
]
}
]
}
# Prepare system message
system_message = (
"You are an expert hardware verification engineer tasked with extracting table information from a PDF document.\n"
"Only Answer in Well-formatted JSON--do not include any other text.\n"
"**BACKGROUND**:\n"
"PDF documents are notoriously difficult to extract tables from, and often tables are split across multiple pages.\n"
"**Task**:\n"
"Analyze the provided table within the following section context and output well-formatted JSON based on the table schema:\n"
f"{json.dumps(table_schema, indent=4)}"
)
user_message = f"Generate metadata for this table in the section '{section['title']}': {table}"
messages = [
{"role": "system", "content": system_message},
{"role": "user", "content": user_message}
]
# Use the best model for table metadata extraction
llm_params = {"model": "openai/gpt-4o", "temperature": 0.3}
try:
response = await get_litellm_response(messages, llm_params)
metadata = response['choices'][0]['message']['content']
section['content']['tables'].append({'table': table, 'metadata': metadata})
except Exception as e:
self.logger.error(f"Error generating table metadata for section {section['title']}: {e}")
async def _generate_summary_for_section(self, section: dict):
"""Generates a summary for a section using LLM."""
section_content = "\n".join(section['content']['text'])
system_message = "You are an AI assistant tasked with summarizing sections of a document."
user_message = f"Generate a summary for the section '{section['title']}' with the following content: {section_content}"
messages = [
{"role": "system", "content": system_message},
{"role": "user", "content": user_message}
]
llm_params = {"model": "openai/gpt-4o", "temperature": 0.3}
try:
response = await get_litellm_response(messages, llm_params)
summary = response['choices'][0]['message']['content']
section['summary'] = summary
except Exception as e:
self.logger.error(f"Error generating summary for section {section['title']}: {e}")
###
# ArangoDB Methods
###
async def insert_bulk_json(self, collection_name: str, json_data: list):
"""Inserts bulk JSON data into the specified ArangoDB collection."""
try:
await self.arango_db_helper.ensure_database_exists()
# Check if the collection exists, create it if it doesn't
if not await asyncio.to_thread(self.arango_db_helper.db.has_collection, collection_name):
await asyncio.to_thread(self.arango_db_helper.db.create_collection, collection_name)
self.logger.debug(f"Created collection: {collection_name}")
collection = self.arango_db_helper.db.collection(collection_name)
# Insert the bulk data
result = await asyncio.to_thread(collection.insert_many, json_data)
self.logger.debug(f"Successfully inserted {len(result)} documents into {collection_name}")
return True
except Exception as e:
self.logger.error(f"Failed to insert data into collection {collection_name}: {str(e)}")
return False
###
# Main Method
###
async def run(self):
"""Main method to either process directory or URL."""
if self.directory:
self.logger.info(f"Processing directory: {self.directory}")
await self.process_directory()
elif self.url:
self.logger.info(f"Processing URL: {self.url}")
await self.process_url()
else:
raise ValueError("Either 'directory' or 'url' must be provided.")
###
# Example Usage
###
async def main():
# Example usage:
# Assuming an ArangoDB client is properly set up
# arango_db = ArangoClient().db('my_database')
extractor = HTMLExtractor(directory="/path/to/html_files", arango_db=None) # Replace with the actual ArangoDB client
await extractor.run() # Await the asynchronous run method
if __name__ == "__main__":
# Simply use asyncio.run() to run the main function
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment