Last active
August 5, 2025 05:22
-
-
Save waka-wa/7de8ead906423a0a50b9b99a986d6258 to your computer and use it in GitHub Desktop.
Parallel OCR batch script using Ollama + qwen2.5vl. Extracts text from images, saves cleaned sidecar *.ext.txt files, handles timeouts, and auto-retries slow cases. Creates processed/, failed/, timedout/ folders.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from __future__ import annotations | |
| """Parallel OCR batch processor (Ollama + Qwen 2.5‑VL) | |
| Refactor – 26 Jul 2025 (retry tweak + graceful Ctrl‑C + size ordering) | |
| ==================================================================== | |
| * Persistent `requests.Session` (1 connection reused) | |
| * Single‑stage text cleanup via `_finalize_text()` – no second pass | |
| * Unified `Text:` namespace prefix (NO trailing space) | |
| * Prompt no longer instructs model to output a prefix | |
| * **NEW:** configurable retry strategy – default is **one retry** (2 total attempts) | |
| * **NEW:** graceful KeyboardInterrupt with optional folder open | |
| * **NEW:** batch now queues **smallest files first** (by byte size) | |
| """ | |
| import base64 | |
| import logging | |
| import os | |
| import platform | |
| import shutil | |
| import subprocess | |
| import threading | |
| import time | |
| from pathlib import Path | |
| from typing import Optional, List | |
| import psutil | |
| import requests | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| try: | |
| import GPUtil # type: ignore | |
| except ImportError: # pragma: no cover | |
| GPUtil = None # type: ignore | |
| DEFAULT_WORKERS = 4 # Number of concurrent threads – 4 is a good default for most systems (Semaphore will still limit GPU calls) | |
| DEFAULT_TIMEOUT = 24 # seconds - Don't recommend going below 20s unless the pictures are very small | |
| DEFAULT_RETRIES = 1 # Number of retries on timeout – 0 means no retries, 1 means one retry (2 attempts total) | |
| # Semaphore to limit concurrent GPU requests (1 at a time) | |
| GPU_SEMAPHORE = threading.Semaphore(2) | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(levelname)s | %(message)s", | |
| ) | |
| class TimeoutError(Exception): | |
| """Raised when the Ollama request exceeds the per‑image timeout.""" | |
| def _open_folder(path: Path) -> None: | |
| """Open *path* in the system file‑manager when possible.""" | |
| try: | |
| system = platform.system() | |
| if system == "Windows": | |
| os.startfile(path) # type: ignore[attr-defined] | |
| elif system == "Darwin": | |
| subprocess.Popen(["open", str(path)]) | |
| else: # Linux / BSD | |
| subprocess.Popen(["xdg-open", str(path)]) | |
| except Exception as exc: | |
| logging.warning("Could not open folder: %s", exc) | |
| def _remove_duplicate_words(text: str) -> str: | |
| words = text.split() | |
| if not words: | |
| return text | |
| cleaned = [words[0]] | |
| for w in words[1:]: | |
| if w != cleaned[-1]: | |
| cleaned.append(w) | |
| return " ".join(cleaned) | |
| def _should_merge_line(line: str, *, min_words: int = 3, min_length: int = 15) -> bool: | |
| words = line.split() | |
| return len(words) <= min_words or len(line) <= min_length | |
| class ParallelOCRProcessor: | |
| """Parallel OCR processor using Ollama HTTP API (Qwen 2.5‑VL).""" | |
| SUPPORTED_IMG_EXT = (".png", ".jpg", ".jpeg", ".gif", ".bmp", ".tiff", ".webp") | |
| SKIP_PHRASES = ( | |
| "i can see", | |
| "the image contains", | |
| "this image shows", | |
| "looking at", | |
| "based on the image", | |
| "in this image", | |
| "the text in the image", | |
| "here is the text", | |
| "here are the text elements", | |
| ) | |
| def __init__( | |
| self, | |
| *, | |
| max_workers: int = DEFAULT_WORKERS, | |
| timeout_seconds: int = DEFAULT_TIMEOUT, | |
| retries: int = DEFAULT_RETRIES, | |
| ): | |
| self.model_name = "qwen2.5vl" | |
| self.api_url = "http://localhost:11434/api/chat" | |
| self.max_workers = max_workers | |
| self.timeout_seconds = timeout_seconds | |
| self.retries = retries | |
| self.session = requests.Session() | |
| self.lock = threading.Lock() | |
| self.processed_count = 0 | |
| self.failed_count = 0 | |
| self.skipped_count = 0 | |
| self.timeout_count = 0 | |
| @staticmethod | |
| def _prompt() -> str: | |
| return ( | |
| "Extract *all* visible text from this image with perfect accuracy.\n\n" | |
| "CRITICAL REQUIREMENTS:\n" | |
| "- Transcribe text *exactly* as written (character‑for‑character).\n" | |
| "- Preserve original formatting, spacing, and line breaks.\n" | |
| "- Do **not** interpret, translate, or modify any text.\n" | |
| "- Do **not** add explanations or context.\n\n" | |
| "Output format: each distinct text element on its own line (no prefix)." | |
| ) | |
| def _ocr_image(self, img_path: Path) -> str: | |
| """Base64‑encodes *img_path* and sends it to the Ollama HTTP API.""" | |
| try: | |
| img_b64 = base64.b64encode(img_path.read_bytes()).decode() | |
| except Exception as exc: | |
| raise RuntimeError(f"Unable to read image: {exc}") from exc | |
| payload = { | |
| "model": self.model_name, | |
| "messages": [ | |
| { | |
| "role": "user", | |
| "content": self._prompt(), | |
| "images": [img_b64], | |
| } | |
| ], | |
| "stream": False, | |
| } | |
| max_attempts = self.retries + 1 # first attempt + N retries | |
| with GPU_SEMAPHORE: | |
| for attempt in range(1, max_attempts + 1): | |
| try: | |
| r = self.session.post( | |
| self.api_url, | |
| json=payload, | |
| timeout=(5, self.timeout_seconds), | |
| ) | |
| r.raise_for_status() | |
| break # success! | |
| except requests.exceptions.Timeout as exc: | |
| if attempt == max_attempts: | |
| raise TimeoutError( | |
| f"OCR request exceeded {self.timeout_seconds}s ({self.retries} retry{'s' if self.retries==1 else 'ies'})" | |
| ) from exc | |
| time.sleep(2 ** attempt) # simple back‑off | |
| except requests.RequestException as exc: | |
| if attempt == max_attempts: | |
| raise RuntimeError(f"HTTP error: {exc}") from exc | |
| time.sleep(2 ** attempt) | |
| try: | |
| data = r.json() | |
| return data["message"]["content"] # type: ignore[index] | |
| except Exception as exc: | |
| raise RuntimeError("Unexpected JSON structure returned by Ollama") from exc | |
| def _finalize_text( | |
| self, | |
| raw: str, | |
| *, | |
| min_words: int = 3, | |
| min_length: int = 15, | |
| ) -> str: | |
| """Cleans *raw* transcription and returns final side‑car text.""" | |
| # 1) strip boilerplate & optional existing prefix | |
| cleaned_lines: List[str] = [] | |
| for line in raw.strip().splitlines(): | |
| line = line.strip() | |
| if not line: | |
| continue | |
| low = line.lower() | |
| if any(p in low for p in self.SKIP_PHRASES): | |
| continue | |
| if line.startswith("Text:" ): | |
| line = line[5:].lstrip() # remove accidental prefix + optional space | |
| cleaned_lines.append(line) | |
| # 2) remove duplicate words inside each line | |
| cleaned_lines = [_remove_duplicate_words(l) for l in cleaned_lines] | |
| # 3) de‑dupe identical lines (case‑insensitive) & merge short stubs | |
| seen: set[str] = set() | |
| processed: List[str] = [] | |
| merge_buf: List[str] = [] | |
| for line in cleaned_lines: | |
| key = line.lower() | |
| if key in seen: | |
| continue | |
| seen.add(key) | |
| if _should_merge_line(line, min_words=min_words, min_length=min_length): | |
| merge_buf.append(line) | |
| continue | |
| # flush any buffered short lines first | |
| if merge_buf: | |
| processed.append(" ".join(merge_buf)) | |
| merge_buf.clear() | |
| processed.append(line) | |
| if merge_buf: | |
| processed.append(" ".join(merge_buf)) | |
| # 4) re‑add *namespace* prefix (**no** trailing space!) | |
| return "\n".join(f"Text:{l}" for l in processed) + "\n" | |
| @staticmethod | |
| def _ensure_dir(path: Path) -> None: | |
| path.mkdir(parents=True, exist_ok=True) | |
| @staticmethod | |
| def _move_file(src: Path, dst_dir: Path) -> Path: | |
| """Moves *src* into *dst_dir* avoiding name collisions.""" | |
| ParallelOCRProcessor._ensure_dir(dst_dir) | |
| dst_path = dst_dir / src.name | |
| counter = 1 | |
| while dst_path.exists(): | |
| dst_path = dst_dir / f"{src.stem}_{counter}{src.suffix}" | |
| counter += 1 | |
| try: | |
| return Path(shutil.move(str(src), dst_path)) | |
| except Exception as exc: | |
| logging.error("Could not move %s → %s: %s", src, dst_path, exc) | |
| return src | |
| def _process_single( | |
| self, | |
| task: tuple[Path, Path, bool], | |
| *, | |
| processed_dir: Path, | |
| failed_dir: Path, | |
| timeout_dir: Path, | |
| ) -> None: | |
| img_path, sidecar_path, overwrite = task | |
| if sidecar_path.exists() and not overwrite: | |
| with self.lock: | |
| self.skipped_count += 1 | |
| return | |
| fname = img_path.name | |
| logging.info("➡️ %s (Thread %s)", fname, threading.current_thread().name) | |
| try: | |
| raw = self._ocr_image(img_path) | |
| final_text = self._finalize_text(raw) | |
| sidecar_path.write_text(final_text, encoding="utf‑8") | |
| with self.lock: | |
| self.processed_count += 1 | |
| logging.info("✅ %s – OCR complete", fname) | |
| self._move_file(img_path, processed_dir) | |
| self._move_file(sidecar_path, processed_dir) | |
| except TimeoutError as exc: | |
| with self.lock: | |
| self.timeout_count += 1 | |
| logging.warning("⏰ TIMEOUT – %s: %s", fname, exc) | |
| self._move_file(img_path, timeout_dir) | |
| except Exception as exc: | |
| with self.lock: | |
| self.failed_count += 1 | |
| logging.error("❌ ERROR – %s: %s", fname, exc) | |
| self._move_file(img_path, failed_dir) | |
| if sidecar_path.exists(): | |
| self._move_file(sidecar_path, failed_dir) | |
| @staticmethod | |
| def _warn_high_load() -> None: | |
| cpu = psutil.cpu_percent(interval=1) | |
| ram = psutil.virtual_memory().percent | |
| msg = f"CPU {cpu:.0f}% | RAM {ram:.0f}%" | |
| if GPUtil: | |
| try: | |
| g = GPUtil.getGPUs()[0] | |
| msg += f" | GPU {g.load * 100:.0f}% | VRAM {g.memoryUtil * 100:.0f}%" | |
| except Exception: | |
| pass | |
| if cpu > 90 or ram > 85: | |
| logging.warning("High system load: %s", msg) | |
| def run_batch(self, folder: Path, *, overwrite: bool = False, test_count: Optional[int] = None): | |
| if not folder.exists(): | |
| raise FileNotFoundError(folder) | |
| processed_dir = folder / "processed" | |
| failed_dir = folder / "failed" | |
| timeout_dir = folder / "timedout" | |
| for p in (processed_dir, failed_dir, timeout_dir): | |
| self._ensure_dir(p) | |
| # Build task list (smallest files first) | |
| tasks = [ | |
| (img, img.with_suffix(img.suffix + ".txt"), overwrite) | |
| for img in folder.iterdir() | |
| if img.suffix.lower() in self.SUPPORTED_IMG_EXT | |
| ] | |
| tasks.sort(key=lambda t: t[0].stat().st_size) # smallest → largest | |
| if test_count: | |
| tasks = tasks[: test_count] | |
| total = len(tasks) | |
| if not total: | |
| logging.info("No images found – aborting.") | |
| return | |
| logging.info("Starting OCR batch: %s images | %s workers | %ss timeout", total, self.max_workers, self.timeout_seconds) | |
| t0 = time.time() | |
| try: | |
| with ThreadPoolExecutor(max_workers=self.max_workers) as pool: | |
| futures = { | |
| pool.submit( | |
| self._process_single, | |
| task, | |
| processed_dir=processed_dir, | |
| failed_dir=failed_dir, | |
| timeout_dir=timeout_dir, | |
| ): task | |
| for task in tasks | |
| } | |
| for i, fut in enumerate(as_completed(futures), 1): | |
| try: | |
| fut.result() # surface exceptions | |
| except Exception as exc: | |
| logging.error("Unhandled exception: %s", exc) | |
| if i % 20 == 0: | |
| self._warn_high_load() | |
| except KeyboardInterrupt: | |
| logging.warning("Interrupted by user – shutting down threads …") | |
| # cancel remaining futures and return (skip summary) | |
| for fut in futures: | |
| fut.cancel() | |
| return | |
| elapsed = time.time() - t0 | |
| logging.info("===== SUMMARY =====") | |
| logging.info("Elapsed : %.1f min (%.0f s)", elapsed / 60, elapsed) | |
| logging.info("Processed : %s", self.processed_count) | |
| logging.info("Skipped : %s", self.skipped_count) | |
| logging.info("Failed : %s", self.failed_count) | |
| logging.info("Timed‑out : %s", self.timeout_count) | |
| logging.info("===================") | |
| # optional retry pass for timed‑out images | |
| if self.timeout_count: | |
| choice = input( | |
| f"{self.timeout_count} images timed out. Increase timeout by 15s and re‑run? (y/n): " | |
| ).strip().lower() | |
| if choice.startswith("y"): | |
| new_timeout = self.timeout_seconds + 15 | |
| logging.info("Re‑running timed‑out images with %ss timeout …", new_timeout) | |
| retry = ParallelOCRProcessor( | |
| max_workers=max(1, self.max_workers - (2 if self.max_workers >= 8 else 0)), | |
| timeout_seconds=new_timeout, | |
| ) | |
| retry.run_batch(timeout_dir, overwrite=True) | |
| def _prompt_int(msg: str, *, default: int, min_val: int = 1, max_val: int | None = None) -> int: | |
| while True: | |
| raw = input(msg).strip() | |
| if not raw: | |
| return default | |
| try: | |
| val = int(raw) | |
| if val < min_val or (max_val is not None and val > max_val): | |
| raise ValueError | |
| return val | |
| except ValueError: | |
| print("Please enter a valid number.") | |
| def get_cli_config() -> dict[str, object]: | |
| print("🖼️ OCR Batch Processor — refactored") | |
| print("=" * 47) | |
| # Folder | |
| while True: | |
| folder_raw = input("\n📁 Folder containing images: ").strip().strip('"') | |
| folder_path = Path(folder_raw) | |
| if folder_path.exists(): | |
| img_count = sum( | |
| 1 for f in folder_path.iterdir() if f.suffix.lower() in ParallelOCRProcessor.SUPPORTED_IMG_EXT | |
| ) | |
| print(f"✅ Found {img_count} images in {folder_path}") | |
| break | |
| print("❌ Folder not found, try again.") | |
| overwrite = input("Overwrite existing .txt sidecars? (y/N): ").strip().lower().startswith("y") | |
| workers = _prompt_int(f"Workers [default {DEFAULT_WORKERS}]: ", default=DEFAULT_WORKERS, max_val=32) | |
| timeout = _prompt_int(f"Timeout seconds [default {DEFAULT_TIMEOUT}]: ", default=DEFAULT_TIMEOUT) | |
| test_raw = input("Process only first N images (press Enter for full batch): ").strip() | |
| test_n: Optional[int] = None | |
| if test_raw: | |
| try: | |
| test_n = int(test_raw) | |
| except ValueError: | |
| print("Invalid number – running full batch.") | |
| return { | |
| "folder": folder_path, | |
| "overwrite": overwrite, | |
| "workers": workers, | |
| "timeout": timeout, | |
| "test_n": test_n, | |
| } | |
| def main() -> None: | |
| cfg = get_cli_config() | |
| print("\nConfiguration:") | |
| print(f" Folder : {cfg['folder']}") | |
| print(f" Workers : {cfg['workers']}") | |
| print(f" Timeout : {cfg['timeout']}s") | |
| print(f" Overwrite: {'Yes' if cfg['overwrite'] else 'No'}") | |
| confirm = input("\nStart processing? (Y/n): ").strip().lower() | |
| if confirm and not confirm.startswith("y"): | |
| print("Operation cancelled.") | |
| return | |
| processor = ParallelOCRProcessor( | |
| max_workers=cfg["workers"], | |
| timeout_seconds=cfg["timeout"], | |
| ) | |
| processor.run_batch(cfg["folder"], overwrite=cfg["overwrite"], test_count=cfg["test_n"]) | |
| if __name__ == "__main__": | |
| try: | |
| main() | |
| except KeyboardInterrupt: | |
| print("\nAbort") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment