Skip to content

Instantly share code, notes, and snippets.

@waka-wa
Last active August 5, 2025 05:22
Show Gist options
  • Select an option

  • Save waka-wa/7de8ead906423a0a50b9b99a986d6258 to your computer and use it in GitHub Desktop.

Select an option

Save waka-wa/7de8ead906423a0a50b9b99a986d6258 to your computer and use it in GitHub Desktop.
Parallel OCR batch script using Ollama + qwen2.5vl. Extracts text from images, saves cleaned sidecar *.ext.txt files, handles timeouts, and auto-retries slow cases. Creates processed/, failed/, timedout/ folders.
from __future__ import annotations
"""Parallel OCR batch processor (Ollama + Qwen 2.5‑VL)
Refactor – 26 Jul 2025 (retry tweak + graceful Ctrl‑C + size ordering)
====================================================================
* Persistent `requests.Session` (1 connection reused)
* Single‑stage text cleanup via `_finalize_text()` – no second pass
* Unified `Text:` namespace prefix (NO trailing space)
* Prompt no longer instructs model to output a prefix
* **NEW:** configurable retry strategy – default is **one retry** (2 total attempts)
* **NEW:** graceful KeyboardInterrupt with optional folder open
* **NEW:** batch now queues **smallest files first** (by byte size)
"""
import base64
import logging
import os
import platform
import shutil
import subprocess
import threading
import time
from pathlib import Path
from typing import Optional, List
import psutil
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
try:
import GPUtil # type: ignore
except ImportError: # pragma: no cover
GPUtil = None # type: ignore
DEFAULT_WORKERS = 4 # Number of concurrent threads – 4 is a good default for most systems (Semaphore will still limit GPU calls)
DEFAULT_TIMEOUT = 24 # seconds - Don't recommend going below 20s unless the pictures are very small
DEFAULT_RETRIES = 1 # Number of retries on timeout – 0 means no retries, 1 means one retry (2 attempts total)
# Semaphore to limit concurrent GPU requests (1 at a time)
GPU_SEMAPHORE = threading.Semaphore(2)
logging.basicConfig(
level=logging.INFO,
format="%(levelname)s | %(message)s",
)
class TimeoutError(Exception):
"""Raised when the Ollama request exceeds the per‑image timeout."""
def _open_folder(path: Path) -> None:
"""Open *path* in the system file‑manager when possible."""
try:
system = platform.system()
if system == "Windows":
os.startfile(path) # type: ignore[attr-defined]
elif system == "Darwin":
subprocess.Popen(["open", str(path)])
else: # Linux / BSD
subprocess.Popen(["xdg-open", str(path)])
except Exception as exc:
logging.warning("Could not open folder: %s", exc)
def _remove_duplicate_words(text: str) -> str:
words = text.split()
if not words:
return text
cleaned = [words[0]]
for w in words[1:]:
if w != cleaned[-1]:
cleaned.append(w)
return " ".join(cleaned)
def _should_merge_line(line: str, *, min_words: int = 3, min_length: int = 15) -> bool:
words = line.split()
return len(words) <= min_words or len(line) <= min_length
class ParallelOCRProcessor:
"""Parallel OCR processor using Ollama HTTP API (Qwen 2.5‑VL)."""
SUPPORTED_IMG_EXT = (".png", ".jpg", ".jpeg", ".gif", ".bmp", ".tiff", ".webp")
SKIP_PHRASES = (
"i can see",
"the image contains",
"this image shows",
"looking at",
"based on the image",
"in this image",
"the text in the image",
"here is the text",
"here are the text elements",
)
def __init__(
self,
*,
max_workers: int = DEFAULT_WORKERS,
timeout_seconds: int = DEFAULT_TIMEOUT,
retries: int = DEFAULT_RETRIES,
):
self.model_name = "qwen2.5vl"
self.api_url = "http://localhost:11434/api/chat"
self.max_workers = max_workers
self.timeout_seconds = timeout_seconds
self.retries = retries
self.session = requests.Session()
self.lock = threading.Lock()
self.processed_count = 0
self.failed_count = 0
self.skipped_count = 0
self.timeout_count = 0
@staticmethod
def _prompt() -> str:
return (
"Extract *all* visible text from this image with perfect accuracy.\n\n"
"CRITICAL REQUIREMENTS:\n"
"- Transcribe text *exactly* as written (character‑for‑character).\n"
"- Preserve original formatting, spacing, and line breaks.\n"
"- Do **not** interpret, translate, or modify any text.\n"
"- Do **not** add explanations or context.\n\n"
"Output format: each distinct text element on its own line (no prefix)."
)
def _ocr_image(self, img_path: Path) -> str:
"""Base64‑encodes *img_path* and sends it to the Ollama HTTP API."""
try:
img_b64 = base64.b64encode(img_path.read_bytes()).decode()
except Exception as exc:
raise RuntimeError(f"Unable to read image: {exc}") from exc
payload = {
"model": self.model_name,
"messages": [
{
"role": "user",
"content": self._prompt(),
"images": [img_b64],
}
],
"stream": False,
}
max_attempts = self.retries + 1 # first attempt + N retries
with GPU_SEMAPHORE:
for attempt in range(1, max_attempts + 1):
try:
r = self.session.post(
self.api_url,
json=payload,
timeout=(5, self.timeout_seconds),
)
r.raise_for_status()
break # success!
except requests.exceptions.Timeout as exc:
if attempt == max_attempts:
raise TimeoutError(
f"OCR request exceeded {self.timeout_seconds}s ({self.retries} retry{'s' if self.retries==1 else 'ies'})"
) from exc
time.sleep(2 ** attempt) # simple back‑off
except requests.RequestException as exc:
if attempt == max_attempts:
raise RuntimeError(f"HTTP error: {exc}") from exc
time.sleep(2 ** attempt)
try:
data = r.json()
return data["message"]["content"] # type: ignore[index]
except Exception as exc:
raise RuntimeError("Unexpected JSON structure returned by Ollama") from exc
def _finalize_text(
self,
raw: str,
*,
min_words: int = 3,
min_length: int = 15,
) -> str:
"""Cleans *raw* transcription and returns final side‑car text."""
# 1) strip boilerplate & optional existing prefix
cleaned_lines: List[str] = []
for line in raw.strip().splitlines():
line = line.strip()
if not line:
continue
low = line.lower()
if any(p in low for p in self.SKIP_PHRASES):
continue
if line.startswith("Text:" ):
line = line[5:].lstrip() # remove accidental prefix + optional space
cleaned_lines.append(line)
# 2) remove duplicate words inside each line
cleaned_lines = [_remove_duplicate_words(l) for l in cleaned_lines]
# 3) de‑dupe identical lines (case‑insensitive) & merge short stubs
seen: set[str] = set()
processed: List[str] = []
merge_buf: List[str] = []
for line in cleaned_lines:
key = line.lower()
if key in seen:
continue
seen.add(key)
if _should_merge_line(line, min_words=min_words, min_length=min_length):
merge_buf.append(line)
continue
# flush any buffered short lines first
if merge_buf:
processed.append(" ".join(merge_buf))
merge_buf.clear()
processed.append(line)
if merge_buf:
processed.append(" ".join(merge_buf))
# 4) re‑add *namespace* prefix (**no** trailing space!)
return "\n".join(f"Text:{l}" for l in processed) + "\n"
@staticmethod
def _ensure_dir(path: Path) -> None:
path.mkdir(parents=True, exist_ok=True)
@staticmethod
def _move_file(src: Path, dst_dir: Path) -> Path:
"""Moves *src* into *dst_dir* avoiding name collisions."""
ParallelOCRProcessor._ensure_dir(dst_dir)
dst_path = dst_dir / src.name
counter = 1
while dst_path.exists():
dst_path = dst_dir / f"{src.stem}_{counter}{src.suffix}"
counter += 1
try:
return Path(shutil.move(str(src), dst_path))
except Exception as exc:
logging.error("Could not move %s → %s: %s", src, dst_path, exc)
return src
def _process_single(
self,
task: tuple[Path, Path, bool],
*,
processed_dir: Path,
failed_dir: Path,
timeout_dir: Path,
) -> None:
img_path, sidecar_path, overwrite = task
if sidecar_path.exists() and not overwrite:
with self.lock:
self.skipped_count += 1
return
fname = img_path.name
logging.info("➡️ %s (Thread %s)", fname, threading.current_thread().name)
try:
raw = self._ocr_image(img_path)
final_text = self._finalize_text(raw)
sidecar_path.write_text(final_text, encoding="utf‑8")
with self.lock:
self.processed_count += 1
logging.info("✅ %s – OCR complete", fname)
self._move_file(img_path, processed_dir)
self._move_file(sidecar_path, processed_dir)
except TimeoutError as exc:
with self.lock:
self.timeout_count += 1
logging.warning("⏰ TIMEOUT – %s: %s", fname, exc)
self._move_file(img_path, timeout_dir)
except Exception as exc:
with self.lock:
self.failed_count += 1
logging.error("❌ ERROR – %s: %s", fname, exc)
self._move_file(img_path, failed_dir)
if sidecar_path.exists():
self._move_file(sidecar_path, failed_dir)
@staticmethod
def _warn_high_load() -> None:
cpu = psutil.cpu_percent(interval=1)
ram = psutil.virtual_memory().percent
msg = f"CPU {cpu:.0f}% | RAM {ram:.0f}%"
if GPUtil:
try:
g = GPUtil.getGPUs()[0]
msg += f" | GPU {g.load * 100:.0f}% | VRAM {g.memoryUtil * 100:.0f}%"
except Exception:
pass
if cpu > 90 or ram > 85:
logging.warning("High system load: %s", msg)
def run_batch(self, folder: Path, *, overwrite: bool = False, test_count: Optional[int] = None):
if not folder.exists():
raise FileNotFoundError(folder)
processed_dir = folder / "processed"
failed_dir = folder / "failed"
timeout_dir = folder / "timedout"
for p in (processed_dir, failed_dir, timeout_dir):
self._ensure_dir(p)
# Build task list (smallest files first)
tasks = [
(img, img.with_suffix(img.suffix + ".txt"), overwrite)
for img in folder.iterdir()
if img.suffix.lower() in self.SUPPORTED_IMG_EXT
]
tasks.sort(key=lambda t: t[0].stat().st_size) # smallest → largest
if test_count:
tasks = tasks[: test_count]
total = len(tasks)
if not total:
logging.info("No images found – aborting.")
return
logging.info("Starting OCR batch: %s images | %s workers | %ss timeout", total, self.max_workers, self.timeout_seconds)
t0 = time.time()
try:
with ThreadPoolExecutor(max_workers=self.max_workers) as pool:
futures = {
pool.submit(
self._process_single,
task,
processed_dir=processed_dir,
failed_dir=failed_dir,
timeout_dir=timeout_dir,
): task
for task in tasks
}
for i, fut in enumerate(as_completed(futures), 1):
try:
fut.result() # surface exceptions
except Exception as exc:
logging.error("Unhandled exception: %s", exc)
if i % 20 == 0:
self._warn_high_load()
except KeyboardInterrupt:
logging.warning("Interrupted by user – shutting down threads …")
# cancel remaining futures and return (skip summary)
for fut in futures:
fut.cancel()
return
elapsed = time.time() - t0
logging.info("===== SUMMARY =====")
logging.info("Elapsed : %.1f min (%.0f s)", elapsed / 60, elapsed)
logging.info("Processed : %s", self.processed_count)
logging.info("Skipped : %s", self.skipped_count)
logging.info("Failed : %s", self.failed_count)
logging.info("Timed‑out : %s", self.timeout_count)
logging.info("===================")
# optional retry pass for timed‑out images
if self.timeout_count:
choice = input(
f"{self.timeout_count} images timed out. Increase timeout by 15s and re‑run? (y/n): "
).strip().lower()
if choice.startswith("y"):
new_timeout = self.timeout_seconds + 15
logging.info("Re‑running timed‑out images with %ss timeout …", new_timeout)
retry = ParallelOCRProcessor(
max_workers=max(1, self.max_workers - (2 if self.max_workers >= 8 else 0)),
timeout_seconds=new_timeout,
)
retry.run_batch(timeout_dir, overwrite=True)
def _prompt_int(msg: str, *, default: int, min_val: int = 1, max_val: int | None = None) -> int:
while True:
raw = input(msg).strip()
if not raw:
return default
try:
val = int(raw)
if val < min_val or (max_val is not None and val > max_val):
raise ValueError
return val
except ValueError:
print("Please enter a valid number.")
def get_cli_config() -> dict[str, object]:
print("🖼️ OCR Batch Processor — refactored")
print("=" * 47)
# Folder
while True:
folder_raw = input("\n📁 Folder containing images: ").strip().strip('"')
folder_path = Path(folder_raw)
if folder_path.exists():
img_count = sum(
1 for f in folder_path.iterdir() if f.suffix.lower() in ParallelOCRProcessor.SUPPORTED_IMG_EXT
)
print(f"✅ Found {img_count} images in {folder_path}")
break
print("❌ Folder not found, try again.")
overwrite = input("Overwrite existing .txt sidecars? (y/N): ").strip().lower().startswith("y")
workers = _prompt_int(f"Workers [default {DEFAULT_WORKERS}]: ", default=DEFAULT_WORKERS, max_val=32)
timeout = _prompt_int(f"Timeout seconds [default {DEFAULT_TIMEOUT}]: ", default=DEFAULT_TIMEOUT)
test_raw = input("Process only first N images (press Enter for full batch): ").strip()
test_n: Optional[int] = None
if test_raw:
try:
test_n = int(test_raw)
except ValueError:
print("Invalid number – running full batch.")
return {
"folder": folder_path,
"overwrite": overwrite,
"workers": workers,
"timeout": timeout,
"test_n": test_n,
}
def main() -> None:
cfg = get_cli_config()
print("\nConfiguration:")
print(f" Folder : {cfg['folder']}")
print(f" Workers : {cfg['workers']}")
print(f" Timeout : {cfg['timeout']}s")
print(f" Overwrite: {'Yes' if cfg['overwrite'] else 'No'}")
confirm = input("\nStart processing? (Y/n): ").strip().lower()
if confirm and not confirm.startswith("y"):
print("Operation cancelled.")
return
processor = ParallelOCRProcessor(
max_workers=cfg["workers"],
timeout_seconds=cfg["timeout"],
)
processor.run_batch(cfg["folder"], overwrite=cfg["overwrite"], test_count=cfg["test_n"])
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\nAbort")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment