Last active
October 12, 2024 17:53
-
-
Save grahama1970/33c7de0f7dc5827f5c9018f28ca21eb0 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| import requests | |
| import asyncio | |
| import regex as re | |
| from requests.exceptions import RequestException | |
| import json | |
| from bs4 import BeautifulSoup | |
| from pandas import read_html | |
| from uuid import uuid4 | |
| import pandas as pd | |
| from tenacity import retry, stop_after_attempt, wait_fixed | |
| from dotenv import load_dotenv | |
| import importlib | |
| from tqdm.asyncio import tqdm | |
| import concurrent.futures | |
| import multiprocessing | |
| from utils import check_memory_usage # Assuming you save it in a utils.py file | |
| from verifaix.llm_client.get_litellm_response import get_litellm_response | |
| # Load environment variables from .env file | |
| load_dotenv('../.env') | |
| class HTMLExtractor: | |
| def __init__(self, directory=None, url=None, arango_db=None, memory_threshold_mb=500, task_timeout_seconds=60): | |
| self.directory = directory | |
| self.url = url | |
| self.arango_db = arango_db | |
| self.section_hierarchy = [] | |
| self._logger = None | |
| self._text_normalizer = None | |
| self._regex_patterns = None | |
| self._arango_db_helper = None | |
| self.memory_threshold_mb = memory_threshold_mb # Set memory threshold in MB | |
| self.task_timeout_seconds = task_timeout_seconds # Set task timeout in seconds | |
| # Validate environment variables | |
| required_vars = ['ARANGO_DB_HOST', 'ARANGO_DB_USERNAME', 'ARANGO_DB_PASSWORD', 'ARANGO_DB_NAME', 'ARANGO_DB_COLLECTION_NAME'] | |
| for var in required_vars: | |
| if not os.getenv(var): | |
| raise ValueError(f"Environment variable {var} is missing or not set.") | |
| ### | |
| # Logger and Utility Loaders | |
| ### | |
| @property | |
| def logger(self): | |
| """Lazy-load the ColoredLogger.""" | |
| if self._logger is None: | |
| try: | |
| ColoredLogger = importlib.import_module('verifaix.utils.colored_logger_mini').setup_logger | |
| self._logger = ColoredLogger(__name__) | |
| except ImportError as e: | |
| print(f"Failed to import ColoredLogger: {e}. Using standard logger.") | |
| import logging | |
| self._logger = logging.getLogger(__name__) | |
| self._logger.setLevel(logging.INFO) | |
| return self._logger | |
| @property | |
| def text_normalizer(self): | |
| """Lazy-load the TextNormalizer with a fallback to basic whitespace normalization.""" | |
| if self._text_normalizer is None: | |
| try: | |
| TextNormalizer = importlib.import_module('verifaix.utils.text_normalizer').TextNormalizer | |
| TextNormalizerConfig = importlib.import_module('verifaix.utils.text_normalizer').TextNormalizerConfig | |
| config = TextNormalizerConfig(settings_type='advanced') | |
| self._text_normalizer = TextNormalizer(config=config) | |
| except ImportError as e: | |
| self.logger.error(f"Failed to import TextNormalizer: {e}. Falling back to basic text normalization.") | |
| self._text_normalizer = self._basic_text_normalizer | |
| return self._text_normalizer | |
| def _basic_text_normalizer(self, text): | |
| """Fallback text normalizer that removes excessive whitespace.""" | |
| self.logger.info("Using basic text normalization (whitespace removal).") | |
| return re.sub(r'\s+', ' ', text).strip() | |
| @property | |
| def regex_patterns(self): | |
| """Lazy-load the RegexPatterns.""" | |
| if self._regex_patterns is None: | |
| try: | |
| RegexPatterns = importlib.import_module('verifaix.utils.regex_patterns').RegexPatterns | |
| self._regex_patterns = RegexPatterns() | |
| except ImportError as e: | |
| self.logger.error(f"Failed to import RegexPatterns: {e}") | |
| self._regex_patterns = { | |
| "email": r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}", | |
| "phone": r"\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}", | |
| "address": r"\d+\s+\w+\s+\w+\s+\w+", | |
| "date": r"\d{1,2}/\d{1,2}/\d{2,4}" | |
| } | |
| return self._regex_patterns | |
| @property | |
| def arango_db_helper(self): | |
| """Lazy-load the ArangoDBHelper.""" | |
| if self._arango_db_helper is None: | |
| try: | |
| ArangoDBHelper = importlib.import_module('verifaix.utils.arango_db_helper').ArangoDBHelper | |
| self._arango_db_helper = ArangoDBHelper() | |
| self.logger.debug("ArangoDBHelper loaded successfully.") | |
| except ImportError as e: | |
| self.logger.error(f"Failed to import ArangoDBHelper: {e}") | |
| raise | |
| return self._arango_db_helper | |
| @property | |
| def table_quality_evaluator(self): | |
| """Lazy-load the TableQualityEvaluator with a fallback.""" | |
| if self._table_quality_evaluator is None: | |
| try: | |
| TableQualityEvaluator = importlib.import_module('verifaix.camelot_extractor.table_quality_evaluator').TableQualityEvaluator | |
| self._table_quality_evaluator = TableQualityEvaluator(pdf_path=self.url or self.directory) | |
| except ImportError as e: | |
| self.logger.error(f"Failed to import TableQualityEvaluator: {e}. Using fallback evaluator.") | |
| self._table_quality_evaluator = self._basic_table_evaluator() # Fallback to basic evaluator | |
| return self._table_quality_evaluator | |
| def _basic_table_evaluator(self): | |
| """Fallback table quality evaluator that returns default scores.""" | |
| class BasicTableEvaluator: | |
| def calculate_table_quality(self, tables): | |
| return {'average_table_extraction_quality': 0.5, 'table_scores': [{'score': 0.5}]} | |
| return BasicTableEvaluator() | |
| ### | |
| # HTML Scraping and Processing Methods | |
| ### | |
| @retry(stop=stop_after_attempt(3), wait=wait_fixed(2)) | |
| async def scrape_page(self, url): | |
| """Scrapes the page asynchronously and handles retries.""" | |
| try: | |
| # Offload the blocking requests.get() to a separate thread | |
| response = await asyncio.to_thread(requests.get, url) | |
| response.raise_for_status() | |
| return response.content | |
| except RequestException as e: | |
| self.logger.error(f"Error fetching page {url}: {e}") | |
| raise | |
| async def process_directory(self): | |
| """Processes all HTML files in the provided directory concurrently using available threads with enhanced memory management. | |
| - Files are processed using a queue-based system to dynamically control memory usage. | |
| - Memory checks ensure tasks are processed only when enough memory is available. | |
| - Detailed logging is added to track the progress of file submissions and processing. | |
| """ | |
| if not self.directory or not os.path.isdir(self.directory): | |
| self.logger.error(f"Provided directory '{self.directory}' is invalid.") | |
| return | |
| # Get list of HTML files in the directory | |
| files = [f for f in os.listdir(self.directory) if f.endswith(".html")] | |
| num_threads = multiprocessing.cpu_count() # Use available CPU threads | |
| # Define chunk size for files to avoid overwhelming system resources | |
| chunk_size = min(len(files), num_threads * 2) # Process files in batches, using double the number of available threads | |
| # Create an asyncio Queue for task management | |
| task_queue = asyncio.Queue() | |
| # Load all files into the task queue | |
| for file_name in files: | |
| await task_queue.put(file_name) | |
| self.logger.info(f"Added {task_queue.qsize()} files to the task queue for processing.") | |
| # Process tasks dynamically, checking memory usage before each task | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor: | |
| while not task_queue.empty(): | |
| # Intelligent memory management: wait until memory usage drops below threshold | |
| while not check_memory_usage(self.memory_threshold_mb, self.logger): | |
| self.logger.warning("Memory usage is high. Waiting for memory to clear...") | |
| await asyncio.sleep(5) # Wait for memory to clear before processing the next batch of files | |
| # Process files in chunks | |
| file_chunk = [] | |
| for _ in range(min(chunk_size, task_queue.qsize())): | |
| file_chunk.append(await task_queue.get()) | |
| self.logger.info(f"Processing a chunk of {len(file_chunk)} files.") | |
| # Submit file processing tasks using run_in_executor | |
| futures = { | |
| asyncio.create_task(asyncio.to_thread(self._process_single_file, os.path.join(self.directory, file_name))): file_name | |
| for file_name in file_chunk | |
| } | |
| # Throttle mechanism to ensure memory is not overused | |
| await asyncio.sleep(0.1) # Small sleep to release resources and avoid overwhelming memory | |
| # Asynchronous loop to gather results from the chunk | |
| for future in asyncio.as_completed(futures): | |
| file_name = futures[future] | |
| try: | |
| await future # Await the completion of each task | |
| self.logger.info(f"Successfully processed {file_name}") | |
| except Exception as e: | |
| self.logger.error(f"Error processing file {file_name}: {e}") | |
| # Log the number of remaining files in the queue | |
| self.logger.info(f"{task_queue.qsize()} files remaining in the task queue.") | |
| self.logger.info("Finished processing all files in the directory.") | |
| @retry(stop=stop_after_attempt(3), wait=wait_fixed(2)) | |
| async def _process_single_file(self, file_path): | |
| """Helper method to read and process a single file. | |
| - Uses asyncio.to_thread() to offload blocking file I/O to a separate thread, preventing the event loop from being blocked. | |
| - The HTML content is read, sections are extracted, and data is stored in ArangoDB asynchronously. | |
| This method is used in a multi-threaded context via ThreadPoolExecutor to allow parallel file processing. | |
| Args: | |
| file_path (str): The path to the HTML file being processed. | |
| """ | |
| if not check_memory_usage(self.memory_threshold_mb, self.logger): | |
| await asyncio.sleep(2) # Wait for system to free up memory | |
| return # Skip processing if memory is too high | |
| try: | |
| # Read the file content asynchronously | |
| html_content = await asyncio.to_thread(self._read_file, file_path) | |
| # Extract sections and store them | |
| await self.extract_and_store_sections(html_content, file_path) | |
| # Evaluate the tables for quality | |
| section = next((s for s in self.section_hierarchy if 'tables' in s['content'] and s['content']['tables']), None) | |
| if section: | |
| tables = section['content']['tables'] | |
| quality_result = self.evaluate_tables(tables) | |
| self.logger.info(f"Table quality evaluation result: {quality_result}") | |
| except Exception as e: | |
| self.logger.error(f"Error processing file {file_path}: {e}") | |
| def evaluate_tables(self, tables): | |
| """Run the extracted tables through TableQualityEvaluator and log the results.""" | |
| self.logger.info(f"Evaluating {len(tables)} extracted tables for quality.") | |
| try: | |
| quality_result = self.table_quality_evaluator.calculate_table_quality(tables) | |
| self.logger.info(f"Table quality evaluation completed. Average quality: {quality_result['average_table_extraction_quality']}") | |
| return quality_result | |
| except Exception as e: | |
| self.logger.error(f"Error during table quality evaluation: {e}") | |
| return {'average_table_extraction_quality': 0.0, 'table_scores': []} # Default to a neutral result | |
| def _read_file(self, file_path): | |
| """Helper method to read files synchronously for use with asyncio.to_thread.""" | |
| try: | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| return f.read() | |
| except FileNotFoundError: | |
| self.logger.error(f"File not found: {file_path}") | |
| return "" | |
| except IOError as e: | |
| self.logger.error(f"IO error while reading file {file_path}: {e}") | |
| return "" | |
| async def process_url(self): | |
| main_page = await self.scrape_page(self.url) | |
| soup = BeautifulSoup(main_page, 'html.parser') | |
| links = [a['href'] for a in soup.find_all('a', href=True)] | |
| await self.extract_and_store_sections(main_page, self.url) | |
| async for link in tqdm(links, desc="Processing linked pages"): | |
| full_url = requests.compat.urljoin(self.url, link) | |
| try: | |
| page_content = await self.scrape_page(full_url) | |
| await self.extract_and_store_sections(page_content, full_url) | |
| except Exception as e: | |
| self.logger.error(f"Failed to scrape {full_url}: {e}") | |
| ### | |
| # Section and Content Extraction Methods | |
| ### | |
| async def extract_and_store_sections(self, html_content, page_url=None): | |
| section_stack = self._parse_sections(html_content, page_url) | |
| self._extract_content(html_content, section_stack) | |
| tasks = self._generate_tasks_for_sections(section_stack) | |
| async for _ in tqdm(asyncio.as_completed(tasks), total=len(tasks), desc="Processing sections"): | |
| await _ | |
| await self._store_sections_in_arango() | |
| def _parse_sections(self, html_content, page_url): | |
| """Parses headers and creates section hierarchy.""" | |
| soup = BeautifulSoup(html_content, 'html.parser') | |
| page_id = page_url or 'local_file' | |
| section_stack = [] | |
| for header in soup.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6']): | |
| level = int(header.name[1]) | |
| title = header.get_text(strip=True) | |
| section = { | |
| 'id': str(uuid4()), | |
| 'title': title, | |
| 'level': level, | |
| 'page_id': page_id, | |
| 'parent_section_id': section_stack[-1]['id'] if section_stack else None, | |
| 'content': { | |
| 'text': [], | |
| 'images': [], | |
| 'tables': [] | |
| } | |
| } | |
| self.section_hierarchy.append(section) | |
| while section_stack and section_stack[-1]['level'] >= level: | |
| section_stack.pop() | |
| section_stack.append(section) | |
| return section_stack | |
| def _extract_content(self, html_content, section_stack): | |
| """Extracts text, images, and tables and associates them with sections.""" | |
| soup = BeautifulSoup(html_content, 'html.parser') | |
| for p in soup.find_all('p'): | |
| section = section_stack[-1] | |
| section['content']['text'].append(p.get_text(strip=True)) | |
| for img in soup.find_all('img'): | |
| section = section_stack[-1] | |
| img_url = img['src'] | |
| section['content']['images'].append(img_url) | |
| tables = read_html(html_content) | |
| for table in tables: | |
| section = section_stack[-1] | |
| section['content']['tables'].append(table) | |
| def _generate_tasks_for_sections(self, section_stack): | |
| """Generates async tasks for section LLM operations.""" | |
| tasks = [] | |
| for section in section_stack: | |
| tasks.append(self._generate_summary_for_section(section)) | |
| tasks += [self._generate_image_description(section, img) for img in section['content']['images']] | |
| tasks += [self._generate_table_metadata(section, table) for table in section['content']['tables']] | |
| return tasks | |
| async def _store_sections_in_arango(self): | |
| failed_sections = [] | |
| async for section in tqdm(self.section_hierarchy, desc="Storing sections in ArangoDB"): | |
| success = await self.insert_section_to_arango(section) | |
| if not success: | |
| failed_sections.append(section) | |
| if failed_sections: | |
| self.logger.error(f"Failed to store {len(failed_sections)} sections. Tracking for potential retry.") | |
| self._log_failed_sections(failed_sections) | |
| else: | |
| self.logger.info("All sections stored successfully.") | |
| @retry(stop=stop_after_attempt(3), wait=wait_fixed(2)) | |
| async def insert_section_to_arango(self, section): | |
| try: | |
| result = await self.insert_bulk_json(os.getenv('ARANGO_DB_COLLECTION_NAME', 'topics'), [section]) | |
| return result | |
| except Exception as e: | |
| self.logger.error(f"Failed to insert section '{section['title']}' into ArangoDB: {e}") | |
| return False | |
| def _log_failed_sections(self, failed_sections): | |
| """Logs failed sections for reprocessing or manual inspection.""" | |
| failed_file = "failed_sections.json" | |
| try: | |
| with open(failed_file, 'w', encoding='utf-8') as f: | |
| json.dump(failed_sections, f, ensure_ascii=False, indent=4) | |
| self.logger.info(f"Failed sections logged in {failed_file}") | |
| except Exception as e: | |
| self.logger.error(f"Error logging failed sections: {e}") | |
| ### | |
| # Asynchronous LLM Methods for Summaries, Tables, and Images | |
| ### | |
| async def _generate_image_description(self, section: dict, img_url: str): | |
| """Generates image description using LLM and adds it to the section content.""" | |
| system_message = "You are an AI assistant tasked with generating descriptions for images." | |
| user_message = f"Generate a description for the image '{img_url}' in the section '{section['title']}'" | |
| messages = [ | |
| {"role": "system", "content": system_message}, | |
| {"role": "user", "content": user_message} | |
| ] | |
| llm_params = {"model": "openai/gpt-4o", "temperature": 0.7} | |
| try: | |
| response = await get_litellm_response(messages, llm_params) | |
| description = response['choices'][0]['message']['content'] | |
| for img in section['content']['images']: | |
| if img == img_url: | |
| img_data = {'url': img_url, 'description': description} | |
| section['content']['images'].append(img_data) | |
| except Exception as e: | |
| self.logger.error(f"Error generating image description for {img_url}: {e}") | |
| async def _generate_table_metadata(self, section: dict, table: pd.DataFrame): | |
| """Generates metadata for a table using LLM, including title, description, and AQL queries.""" | |
| # Define the schema for table metadata extraction | |
| table_schema = { | |
| "section_title": section.get("title", "Unknown Section Title"), | |
| "section_summary": section.get("summary", "No summary available"), | |
| "tables": [ | |
| { | |
| "title": ( | |
| "Title of Table extracted verbatim from surrounding text if it exists. " | |
| "If the Table title does not exist, Infer a Table Title and prepend with 'Inferred:' \n" | |
| "If a title cannot be inferred, leave this field as 'Unknown'." | |
| ), | |
| "description": "Description of Table using the table data, title (if exists), and surrounding text", | |
| "data": { | |
| "header": "Well-formatted header columns (if they exist). If headers do not exist or are incomplete, output None. Use the header text verbatim.", | |
| "data": "Well-formatted table data. Use table text verbatim. Beyond formatting, do not make any changes to the table data." | |
| }, | |
| "queries": [ | |
| { | |
| "minValue": 3, | |
| "maxValue": 5, | |
| "aql": "Generate 3-5 distinct AQL queries that accurately describe the table data. Each query should explore a different aspect of the table, such as filtering by connections, types, or signal descriptions. The queries should vary from simple filters to more complex techniques such as joins, subqueries, or sorting. For each AQL query, group it with its corresponding human questions.", | |
| "human_questions": "For each AQL query, generate 3-5 human questions that represent the specific query. These questions should cover various levels of detail and be phrased differently to explore the same query in multiple ways. Some questions should reflect expert-level domain knowledge of hardware verification, like an understanding of prolog and state machines, while others should reflect a layperson's understanding, such as a Project Manager's level of knowledge. Make sure the questions are grouped with their corresponding AQL query." | |
| } | |
| ] | |
| } | |
| ] | |
| } | |
| # Prepare system message | |
| system_message = ( | |
| "You are an expert hardware verification engineer tasked with extracting table information from a PDF document.\n" | |
| "Only Answer in Well-formatted JSON--do not include any other text.\n" | |
| "**BACKGROUND**:\n" | |
| "PDF documents are notoriously difficult to extract tables from, and often tables are split across multiple pages.\n" | |
| "**Task**:\n" | |
| "Analyze the provided table within the following section context and output well-formatted JSON based on the table schema:\n" | |
| f"{json.dumps(table_schema, indent=4)}" | |
| ) | |
| user_message = f"Generate metadata for this table in the section '{section['title']}': {table}" | |
| messages = [ | |
| {"role": "system", "content": system_message}, | |
| {"role": "user", "content": user_message} | |
| ] | |
| # Use the best model for table metadata extraction | |
| llm_params = {"model": "openai/gpt-4o", "temperature": 0.3} | |
| try: | |
| response = await get_litellm_response(messages, llm_params) | |
| metadata = response['choices'][0]['message']['content'] | |
| section['content']['tables'].append({'table': table, 'metadata': metadata}) | |
| except Exception as e: | |
| self.logger.error(f"Error generating table metadata for section {section['title']}: {e}") | |
| async def _generate_summary_for_section(self, section: dict): | |
| """Generates a summary for a section using LLM.""" | |
| section_content = "\n".join(section['content']['text']) | |
| system_message = "You are an AI assistant tasked with summarizing sections of a document." | |
| user_message = f"Generate a summary for the section '{section['title']}' with the following content: {section_content}" | |
| messages = [ | |
| {"role": "system", "content": system_message}, | |
| {"role": "user", "content": user_message} | |
| ] | |
| llm_params = {"model": "openai/gpt-4o", "temperature": 0.3} | |
| try: | |
| response = await get_litellm_response(messages, llm_params) | |
| summary = response['choices'][0]['message']['content'] | |
| section['summary'] = summary | |
| except Exception as e: | |
| self.logger.error(f"Error generating summary for section {section['title']}: {e}") | |
| ### | |
| # ArangoDB Methods | |
| ### | |
| async def insert_bulk_json(self, collection_name: str, json_data: list): | |
| """Inserts bulk JSON data into the specified ArangoDB collection.""" | |
| try: | |
| await self.arango_db_helper.ensure_database_exists() | |
| # Check if the collection exists, create it if it doesn't | |
| if not await asyncio.to_thread(self.arango_db_helper.db.has_collection, collection_name): | |
| await asyncio.to_thread(self.arango_db_helper.db.create_collection, collection_name) | |
| self.logger.debug(f"Created collection: {collection_name}") | |
| collection = self.arango_db_helper.db.collection(collection_name) | |
| # Insert the bulk data | |
| result = await asyncio.to_thread(collection.insert_many, json_data) | |
| self.logger.debug(f"Successfully inserted {len(result)} documents into {collection_name}") | |
| return True | |
| except Exception as e: | |
| self.logger.error(f"Failed to insert data into collection {collection_name}: {str(e)}") | |
| return False | |
| ### | |
| # Main Method | |
| ### | |
| async def run(self): | |
| """Main method to either process directory or URL.""" | |
| if self.directory: | |
| self.logger.info(f"Processing directory: {self.directory}") | |
| await self.process_directory() | |
| elif self.url: | |
| self.logger.info(f"Processing URL: {self.url}") | |
| await self.process_url() | |
| else: | |
| raise ValueError("Either 'directory' or 'url' must be provided.") | |
| ### | |
| # Example Usage | |
| ### | |
| async def main(): | |
| # Example usage: | |
| # Assuming an ArangoDB client is properly set up | |
| # arango_db = ArangoClient().db('my_database') | |
| extractor = HTMLExtractor(directory="/path/to/html_files", arango_db=None) # Replace with the actual ArangoDB client | |
| await extractor.run() # Await the asynchronous run method | |
| if __name__ == "__main__": | |
| # Simply use asyncio.run() to run the main function | |
| asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment