Skip to content

Instantly share code, notes, and snippets.

@rom1504
Last active April 27, 2025 21:02
Show Gist options
  • Save rom1504/9400a2213dd5459def72cb030c0e9d28 to your computer and use it in GitHub Desktop.
Save rom1504/9400a2213dd5459def72cb030c0e9d28 to your computer and use it in GitHub Desktop.
Transcribe PDF using openai batch API
import os
import time
import json
import base64
from openai import OpenAI
from pdf2image import convert_from_path
from tqdm import tqdm
# --- Configuration ---
KEY_FILE = os.path.expanduser("~/chinese_pdf_key")
PDF_FILE = "xian.pdf"
OUTPUT_FOLDER = "transcripts"
TEMP_BATCH_FILE = "batch_input.jsonl"
OUTPUT_IMAGE_DIR = "pdf_images"
MODEL = "gpt-4o"
START_PAGE = 1
END_PAGE = 383
PAGES_PER_BATCH = 50
# --- Read API Key ---
with open(KEY_FILE, "r") as f:
OPENAI_API_KEY = f.read().strip()
client = OpenAI(api_key=OPENAI_API_KEY)
# --- Ensure output folders exist ---
os.makedirs(OUTPUT_FOLDER, exist_ok=True)
os.makedirs(OUTPUT_IMAGE_DIR, exist_ok=True)
# --- Helpers ---
def encode_image_to_base64(image_path):
with open(image_path, "rb") as f:
return base64.b64encode(f.read()).decode("utf-8")
def wait_for_batch_completion(batch_id):
print(f"⏳ Waiting for batch {batch_id} to complete...")
while True:
batch = client.batches.retrieve(batch_id)
status = batch.status
print(f"Batch status: {status}")
if status in ["completed", "failed", "cancelled", "expired"]:
return batch
time.sleep(10)
def save_batch_output(output_file_id, page_offset):
output = client.files.content(output_file_id)
for line in output.iter_lines():
if not line.strip():
continue
data = json.loads(line)
custom_id = data.get("custom_id")
page_num = int(custom_id.split("-")[1])
text = data["response"]["body"]["choices"][0]["message"]["content"]
output_file = os.path.join(OUTPUT_FOLDER, f"page_{page_num:03}.txt")
with open(output_file, "w", encoding="utf-8") as f:
f.write(text)
# --- Main Loop ---
for batch_start in range(START_PAGE, END_PAGE + 1, PAGES_PER_BATCH):
batch_end = min(batch_start + PAGES_PER_BATCH - 1, END_PAGE)
print(f"\n🚀 Processing pages {batch_start} to {batch_end}...")
# Step 1: Convert pages to base64
images = convert_from_path(PDF_FILE, dpi=300, first_page=batch_start, last_page=batch_end)
requests = []
for idx, image in enumerate(tqdm(images, desc="Encoding images")):
page_number = batch_start + idx
image_path = os.path.join(OUTPUT_IMAGE_DIR, f"page_{page_number:03}.png")
image.save(image_path, "PNG")
base64_img = encode_image_to_base64(image_path)
os.remove(image_path)
requests.append({
"custom_id": f"page-{page_number}",
"method": "POST",
"url": "/v1/chat/completions",
"body": {
"model": MODEL,
"messages": [
{"role": "system", "content": "You are a professional OCR agent. Accurately transcribe the Chinese text from this dictionary page."},
{"role": "user", "content": [
{"type": "text", "text": "Please transcribe the following page."},
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_img}"}}
]}
],
"temperature": 0.2
}
})
# Step 2: Write batch input file
with open(TEMP_BATCH_FILE, "w", encoding="utf-8") as f:
for req in requests:
f.write(json.dumps(req, ensure_ascii=False) + "\n")
# Step 3: Upload input file
batch_input_file = client.files.create(file=open(TEMP_BATCH_FILE, "rb"), purpose="batch")
print(f"✅ Uploaded input file: {batch_input_file.id}")
# Step 4: Create batch
batch = client.batches.create(
input_file_id=batch_input_file.id,
endpoint="/v1/chat/completions",
completion_window="24h",
metadata={"description": f"Chinese OCR pages {batch_start}-{batch_end}"}
)
print(f"✅ Created batch: {batch.id}")
# Step 5: Wait for batch to complete
batch = wait_for_batch_completion(batch.id)
if batch.status == "completed":
print("✅ Batch completed successfully. Saving output...")
save_batch_output(batch.output_file_id, batch_start)
else:
print(f"❌ Batch failed with status: {batch.status}")
break
print("\n🎯 All done!")
import re
import sys
def clean_text_file(input_filename, output_filename):
"""
Cleans a text file by removing specific patterns, reformatting page numbers
with empty lines around them, keeping source indicators, removing lines
containing '○' character more than 30 times, collapsing multiple spaces
into a single space, and removing lines repeated more than 3 consecutive
times (excluding page numbers).
Args:
input_filename (str): The name of the input text file.
output_filename (str): The name of the output text file.
"""
with open(input_filename, 'r', encoding='utf-8') as infile, open(output_filename, 'w', encoding='utf-8') as outfile:
previous_content_line = None
consecutive_count = 0
for line in infile:
# Remove lines if they contain the '○' character more than 30 times
if line.count('○') > 30:
continue
# Remove common English transcription headers and code block delimiters, and any line starting with "Sure"
cleaned_line = line.strip()
if cleaned_line.startswith("Sure") or cleaned_line.startswith("```"):
continue
# Identify and reformat page numbers, adding empty lines around them
page_match = re.match(r'page_(\d+)\.txt', cleaned_line)
if page_match:
page_number = int(page_match.group(1))
outfile.write("\n") # Empty line before
outfile.write(f"Page {page_number}\n") # The page number line
outfile.write("\n") # Empty line after
# Reset consecutive line tracking for content lines when a page number is encountered
previous_content_line = None
consecutive_count = 0
continue
# Collapse multiple spaces into a single space and strip leading/trailing whitespace
processed_line = re.sub(r'\s+', ' ', line).strip()
# Process content lines (non-empty after cleaning)
if processed_line:
# Check for consecutive repetitions of content lines
if processed_line == previous_content_line:
consecutive_count += 1
else:
previous_content_line = processed_line
consecutive_count = 1
# Write the line only if it has not been repeated more than 3 times consecutively
if consecutive_count <= 3:
outfile.write(processed_line + "\n")
else:
# If a line becomes empty after processing, it also breaks the consecutive sequence
previous_content_line = None
consecutive_count = 0
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: python clean_script.py <input_filename> <output_filename>")
else:
input_file = sys.argv[1]
output_file = sys.argv[2]
clean_text_file(input_file, output_file)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment