Created
August 15, 2025 00:26
-
-
Save wch/677307026b8fb0098967795a179c53fd to your computer and use it in GitHub Desktop.
Data extraction with ollama
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# pyright: strict | |
# Usage: | |
# python extract.py [-o output.txt] [-m model] [-c chunk_size] [-r overlap] [-q] <file_path> "<extraction_prompt>" | |
import argparse | |
import sys | |
from pathlib import Path | |
from typing import Generator | |
from chatlas import ChatOllama | |
DEFAULT_MODEL = "gemma3:270m" | |
def estimate_tokens(text: str) -> int: | |
"""Rough token estimation: approximately 4 characters per token.""" | |
return len(text) // 4 | |
def chunk_text( | |
text: str, chunk_size: int = 20000, overlap: int = 1000 | |
) -> Generator[str, None, None]: | |
"""Split text into overlapping chunks based on estimated token count.""" | |
char_chunk_size = chunk_size * 4 # Convert tokens to characters | |
char_overlap = overlap * 4 # Convert tokens to characters | |
start = 0 | |
while start < len(text): | |
end = start + char_chunk_size | |
# If this isn't the last chunk, try to break at a sentence or paragraph | |
if end < len(text): | |
# Look for sentence ending within the last 500 characters | |
break_point = text.rfind(".", end - 500, end) | |
if break_point == -1: | |
break_point = text.rfind("\n", end - 500, end) | |
if break_point != -1: | |
end = break_point + 1 | |
chunk = text[start:end] | |
yield chunk | |
# Move start forward, accounting for overlap | |
if end >= len(text): | |
break | |
start = end - char_overlap | |
def extract_from_file( | |
file_path: Path, | |
extraction_prompt: str, | |
model: str = DEFAULT_MODEL, | |
chunk_size: int = 20000, | |
overlap: int = 1000, | |
progress: bool = True, | |
) -> list[str]: | |
"""Extract data from a large file by processing it in chunks.""" | |
# Read the entire file | |
try: | |
with open(file_path, "r", encoding="utf-8") as f: | |
content = f.read() | |
except UnicodeDecodeError: | |
# Try with latin-1 encoding if utf-8 fails | |
with open(file_path, "r", encoding="latin-1") as f: | |
content = f.read() | |
# Process chunks | |
chunks = list(chunk_text(content, chunk_size, overlap)) | |
results: list[str] = [] | |
if progress: | |
print(f"Processing {len(chunks)} chunks...", file=sys.stderr) | |
for i, chunk in enumerate(chunks, 1): | |
# Initialize chat | |
chat = ChatOllama( | |
model=model, | |
system_prompt="You are a helpful assistant that extracts specific information from text data." | |
# system_prompt=extraction_prompt, | |
) | |
if progress: | |
print(f"Processing chunk {i}/{len(chunks)}", file=sys.stderr) | |
# Create the full prompt for this chunk | |
full_prompt = f"{extraction_prompt}\nExtract the information from the following text:\n<data>\n{chunk}\n</data>" | |
# full_prompt = f"This is the data. Extract the information from it.\n<data>\n{chunk}\n</data>" | |
# print(full_prompt, file=sys.stderr) | |
try: | |
response = chat.chat(full_prompt, stream=True) | |
results.append(str(response)) | |
except Exception as e: | |
print(f"Error processing chunk {i}: {e}", file=sys.stderr) | |
results.append(f"[Error processing chunk {i}: {e}]") | |
return results | |
def main(): | |
parser = argparse.ArgumentParser( | |
description="Extract data from large files using a local LLM with chunking" | |
) | |
parser.add_argument( | |
"file_path", type=Path, help="Path to the input file to process" | |
) | |
parser.add_argument("prompt", help="Extraction prompt/query to apply to each chunk") | |
parser.add_argument( | |
"-o", "--output", type=Path, help="Output file path (default: stdout)" | |
) | |
parser.add_argument( | |
"-m", | |
"--model", | |
default=DEFAULT_MODEL, | |
help=f"Model to use (default: {DEFAULT_MODEL})", | |
) | |
parser.add_argument( | |
"-c", | |
"--chunk-size", | |
type=int, | |
default=20000, | |
help="Chunk size in tokens (default: 20000)", | |
) | |
parser.add_argument( | |
"-r", | |
"--overlap", | |
type=int, | |
default=1000, | |
help="Overlap between chunks in tokens (default: 1000)", | |
) | |
parser.add_argument( | |
"-q", "--quiet", action="store_true", help="Suppress progress messages" | |
) | |
args = parser.parse_args() | |
# Validate input file | |
if not args.file_path.exists(): | |
print(f"Error: File '{args.file_path}' does not exist", file=sys.stderr) | |
sys.exit(1) | |
if not args.file_path.is_file(): | |
print(f"Error: '{args.file_path}' is not a file", file=sys.stderr) | |
sys.exit(1) | |
try: | |
# Extract data | |
results = extract_from_file( | |
args.file_path, | |
args.prompt, | |
args.model, | |
args.chunk_size, | |
args.overlap, | |
progress=not args.quiet, | |
) | |
# Output results | |
if args.output: | |
with open(args.output, "w", encoding="utf-8") as f: | |
for i, result in enumerate(results, 1): | |
f.write(f"=== Chunk {i} ===\n") | |
f.write(result) | |
f.write("\n\n") | |
if not args.quiet: | |
print(f"Results written to {args.output}", file=sys.stderr) | |
else: | |
for i, result in enumerate(results, 1): | |
print(f"=== Chunk {i} ===") | |
print(result) | |
print() | |
except KeyboardInterrupt: | |
print("\nInterrupted by user", file=sys.stderr) | |
sys.exit(1) | |
except Exception as e: | |
print(f"Error: {e}", file=sys.stderr) | |
sys.exit(1) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment