This transcribe a large pdf file into txt file using the openai batch API.
Here applied to 西安方言词典 ; the dictionary of the dialect from xi'an
This transcribe a large pdf file into txt file using the openai batch API.
Here applied to 西安方言词典 ; the dictionary of the dialect from xi'an
import os | |
import time | |
import json | |
import base64 | |
from openai import OpenAI | |
from pdf2image import convert_from_path | |
from tqdm import tqdm | |
# --- Configuration --- | |
KEY_FILE = os.path.expanduser("~/chinese_pdf_key") | |
PDF_FILE = "xian.pdf" | |
OUTPUT_FOLDER = "transcripts" | |
TEMP_BATCH_FILE = "batch_input.jsonl" | |
OUTPUT_IMAGE_DIR = "pdf_images" | |
MODEL = "gpt-4o" | |
START_PAGE = 1 | |
END_PAGE = 383 | |
PAGES_PER_BATCH = 50 | |
# --- Read API Key --- | |
with open(KEY_FILE, "r") as f: | |
OPENAI_API_KEY = f.read().strip() | |
client = OpenAI(api_key=OPENAI_API_KEY) | |
# --- Ensure output folders exist --- | |
os.makedirs(OUTPUT_FOLDER, exist_ok=True) | |
os.makedirs(OUTPUT_IMAGE_DIR, exist_ok=True) | |
# --- Helpers --- | |
def encode_image_to_base64(image_path): | |
with open(image_path, "rb") as f: | |
return base64.b64encode(f.read()).decode("utf-8") | |
def wait_for_batch_completion(batch_id): | |
print(f"⏳ Waiting for batch {batch_id} to complete...") | |
while True: | |
batch = client.batches.retrieve(batch_id) | |
status = batch.status | |
print(f"Batch status: {status}") | |
if status in ["completed", "failed", "cancelled", "expired"]: | |
return batch | |
time.sleep(10) | |
def save_batch_output(output_file_id, page_offset): | |
output = client.files.content(output_file_id) | |
for line in output.iter_lines(): | |
if not line.strip(): | |
continue | |
data = json.loads(line) | |
custom_id = data.get("custom_id") | |
page_num = int(custom_id.split("-")[1]) | |
text = data["response"]["body"]["choices"][0]["message"]["content"] | |
output_file = os.path.join(OUTPUT_FOLDER, f"page_{page_num:03}.txt") | |
with open(output_file, "w", encoding="utf-8") as f: | |
f.write(text) | |
# --- Main Loop --- | |
for batch_start in range(START_PAGE, END_PAGE + 1, PAGES_PER_BATCH): | |
batch_end = min(batch_start + PAGES_PER_BATCH - 1, END_PAGE) | |
print(f"\n🚀 Processing pages {batch_start} to {batch_end}...") | |
# Step 1: Convert pages to base64 | |
images = convert_from_path(PDF_FILE, dpi=300, first_page=batch_start, last_page=batch_end) | |
requests = [] | |
for idx, image in enumerate(tqdm(images, desc="Encoding images")): | |
page_number = batch_start + idx | |
image_path = os.path.join(OUTPUT_IMAGE_DIR, f"page_{page_number:03}.png") | |
image.save(image_path, "PNG") | |
base64_img = encode_image_to_base64(image_path) | |
os.remove(image_path) | |
requests.append({ | |
"custom_id": f"page-{page_number}", | |
"method": "POST", | |
"url": "/v1/chat/completions", | |
"body": { | |
"model": MODEL, | |
"messages": [ | |
{"role": "system", "content": "You are a professional OCR agent. Accurately transcribe the Chinese text from this dictionary page."}, | |
{"role": "user", "content": [ | |
{"type": "text", "text": "Please transcribe the following page."}, | |
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_img}"}} | |
]} | |
], | |
"temperature": 0.2 | |
} | |
}) | |
# Step 2: Write batch input file | |
with open(TEMP_BATCH_FILE, "w", encoding="utf-8") as f: | |
for req in requests: | |
f.write(json.dumps(req, ensure_ascii=False) + "\n") | |
# Step 3: Upload input file | |
batch_input_file = client.files.create(file=open(TEMP_BATCH_FILE, "rb"), purpose="batch") | |
print(f"✅ Uploaded input file: {batch_input_file.id}") | |
# Step 4: Create batch | |
batch = client.batches.create( | |
input_file_id=batch_input_file.id, | |
endpoint="/v1/chat/completions", | |
completion_window="24h", | |
metadata={"description": f"Chinese OCR pages {batch_start}-{batch_end}"} | |
) | |
print(f"✅ Created batch: {batch.id}") | |
# Step 5: Wait for batch to complete | |
batch = wait_for_batch_completion(batch.id) | |
if batch.status == "completed": | |
print("✅ Batch completed successfully. Saving output...") | |
save_batch_output(batch.output_file_id, batch_start) | |
else: | |
print(f"❌ Batch failed with status: {batch.status}") | |
break | |
print("\n🎯 All done!") |
import re | |
import sys | |
def clean_text_file(input_filename, output_filename): | |
""" | |
Cleans a text file by removing specific patterns, reformatting page numbers | |
with empty lines around them, keeping source indicators, removing lines | |
containing '○' character more than 30 times, collapsing multiple spaces | |
into a single space, and removing lines repeated more than 3 consecutive | |
times (excluding page numbers). | |
Args: | |
input_filename (str): The name of the input text file. | |
output_filename (str): The name of the output text file. | |
""" | |
with open(input_filename, 'r', encoding='utf-8') as infile, open(output_filename, 'w', encoding='utf-8') as outfile: | |
previous_content_line = None | |
consecutive_count = 0 | |
for line in infile: | |
# Remove lines if they contain the '○' character more than 30 times | |
if line.count('○') > 30: | |
continue | |
# Remove common English transcription headers and code block delimiters, and any line starting with "Sure" | |
cleaned_line = line.strip() | |
if cleaned_line.startswith("Sure") or cleaned_line.startswith("```"): | |
continue | |
# Identify and reformat page numbers, adding empty lines around them | |
page_match = re.match(r'page_(\d+)\.txt', cleaned_line) | |
if page_match: | |
page_number = int(page_match.group(1)) | |
outfile.write("\n") # Empty line before | |
outfile.write(f"Page {page_number}\n") # The page number line | |
outfile.write("\n") # Empty line after | |
# Reset consecutive line tracking for content lines when a page number is encountered | |
previous_content_line = None | |
consecutive_count = 0 | |
continue | |
# Collapse multiple spaces into a single space and strip leading/trailing whitespace | |
processed_line = re.sub(r'\s+', ' ', line).strip() | |
# Process content lines (non-empty after cleaning) | |
if processed_line: | |
# Check for consecutive repetitions of content lines | |
if processed_line == previous_content_line: | |
consecutive_count += 1 | |
else: | |
previous_content_line = processed_line | |
consecutive_count = 1 | |
# Write the line only if it has not been repeated more than 3 times consecutively | |
if consecutive_count <= 3: | |
outfile.write(processed_line + "\n") | |
else: | |
# If a line becomes empty after processing, it also breaks the consecutive sequence | |
previous_content_line = None | |
consecutive_count = 0 | |
if __name__ == "__main__": | |
if len(sys.argv) != 3: | |
print("Usage: python clean_script.py <input_filename> <output_filename>") | |
else: | |
input_file = sys.argv[1] | |
output_file = sys.argv[2] | |
clean_text_file(input_file, output_file) |