This transcribe a large pdf file into txt file using the openai batch API.
Here applied to 西安方言词典 ; the dictionary of the dialect from xi'an
This transcribe a large pdf file into txt file using the openai batch API.
Here applied to 西安方言词典 ; the dictionary of the dialect from xi'an
| import os | |
| import time | |
| import json | |
| import base64 | |
| from openai import OpenAI | |
| from pdf2image import convert_from_path | |
| from tqdm import tqdm | |
| # --- Configuration --- | |
| KEY_FILE = os.path.expanduser("~/chinese_pdf_key") | |
| PDF_FILE = "xian.pdf" | |
| OUTPUT_FOLDER = "transcripts" | |
| TEMP_BATCH_FILE = "batch_input.jsonl" | |
| OUTPUT_IMAGE_DIR = "pdf_images" | |
| MODEL = "gpt-4o" | |
| START_PAGE = 1 | |
| END_PAGE = 383 | |
| PAGES_PER_BATCH = 50 | |
| # --- Read API Key --- | |
| with open(KEY_FILE, "r") as f: | |
| OPENAI_API_KEY = f.read().strip() | |
| client = OpenAI(api_key=OPENAI_API_KEY) | |
| # --- Ensure output folders exist --- | |
| os.makedirs(OUTPUT_FOLDER, exist_ok=True) | |
| os.makedirs(OUTPUT_IMAGE_DIR, exist_ok=True) | |
| # --- Helpers --- | |
| def encode_image_to_base64(image_path): | |
| with open(image_path, "rb") as f: | |
| return base64.b64encode(f.read()).decode("utf-8") | |
| def wait_for_batch_completion(batch_id): | |
| print(f"⏳ Waiting for batch {batch_id} to complete...") | |
| while True: | |
| batch = client.batches.retrieve(batch_id) | |
| status = batch.status | |
| print(f"Batch status: {status}") | |
| if status in ["completed", "failed", "cancelled", "expired"]: | |
| return batch | |
| time.sleep(10) | |
| def save_batch_output(output_file_id, page_offset): | |
| output = client.files.content(output_file_id) | |
| for line in output.iter_lines(): | |
| if not line.strip(): | |
| continue | |
| data = json.loads(line) | |
| custom_id = data.get("custom_id") | |
| page_num = int(custom_id.split("-")[1]) | |
| text = data["response"]["body"]["choices"][0]["message"]["content"] | |
| output_file = os.path.join(OUTPUT_FOLDER, f"page_{page_num:03}.txt") | |
| with open(output_file, "w", encoding="utf-8") as f: | |
| f.write(text) | |
| # --- Main Loop --- | |
| for batch_start in range(START_PAGE, END_PAGE + 1, PAGES_PER_BATCH): | |
| batch_end = min(batch_start + PAGES_PER_BATCH - 1, END_PAGE) | |
| print(f"\n🚀 Processing pages {batch_start} to {batch_end}...") | |
| # Step 1: Convert pages to base64 | |
| images = convert_from_path(PDF_FILE, dpi=300, first_page=batch_start, last_page=batch_end) | |
| requests = [] | |
| for idx, image in enumerate(tqdm(images, desc="Encoding images")): | |
| page_number = batch_start + idx | |
| image_path = os.path.join(OUTPUT_IMAGE_DIR, f"page_{page_number:03}.png") | |
| image.save(image_path, "PNG") | |
| base64_img = encode_image_to_base64(image_path) | |
| os.remove(image_path) | |
| requests.append({ | |
| "custom_id": f"page-{page_number}", | |
| "method": "POST", | |
| "url": "/v1/chat/completions", | |
| "body": { | |
| "model": MODEL, | |
| "messages": [ | |
| {"role": "system", "content": "You are a professional OCR agent. Accurately transcribe the Chinese text from this dictionary page."}, | |
| {"role": "user", "content": [ | |
| {"type": "text", "text": "Please transcribe the following page."}, | |
| {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_img}"}} | |
| ]} | |
| ], | |
| "temperature": 0.2 | |
| } | |
| }) | |
| # Step 2: Write batch input file | |
| with open(TEMP_BATCH_FILE, "w", encoding="utf-8") as f: | |
| for req in requests: | |
| f.write(json.dumps(req, ensure_ascii=False) + "\n") | |
| # Step 3: Upload input file | |
| batch_input_file = client.files.create(file=open(TEMP_BATCH_FILE, "rb"), purpose="batch") | |
| print(f"✅ Uploaded input file: {batch_input_file.id}") | |
| # Step 4: Create batch | |
| batch = client.batches.create( | |
| input_file_id=batch_input_file.id, | |
| endpoint="/v1/chat/completions", | |
| completion_window="24h", | |
| metadata={"description": f"Chinese OCR pages {batch_start}-{batch_end}"} | |
| ) | |
| print(f"✅ Created batch: {batch.id}") | |
| # Step 5: Wait for batch to complete | |
| batch = wait_for_batch_completion(batch.id) | |
| if batch.status == "completed": | |
| print("✅ Batch completed successfully. Saving output...") | |
| save_batch_output(batch.output_file_id, batch_start) | |
| else: | |
| print(f"❌ Batch failed with status: {batch.status}") | |
| break | |
| print("\n🎯 All done!") |
| import re | |
| import sys | |
| def clean_text_file(input_filename, output_filename): | |
| """ | |
| Cleans a text file by removing specific patterns, reformatting page numbers | |
| with empty lines around them, keeping source indicators, removing lines | |
| containing '○' character more than 30 times, collapsing multiple spaces | |
| into a single space, and removing lines repeated more than 3 consecutive | |
| times (excluding page numbers). | |
| Args: | |
| input_filename (str): The name of the input text file. | |
| output_filename (str): The name of the output text file. | |
| """ | |
| with open(input_filename, 'r', encoding='utf-8') as infile, open(output_filename, 'w', encoding='utf-8') as outfile: | |
| previous_content_line = None | |
| consecutive_count = 0 | |
| for line in infile: | |
| # Remove lines if they contain the '○' character more than 30 times | |
| if line.count('○') > 30: | |
| continue | |
| # Remove common English transcription headers and code block delimiters, and any line starting with "Sure" | |
| cleaned_line = line.strip() | |
| if cleaned_line.startswith("Sure") or cleaned_line.startswith("```"): | |
| continue | |
| # Identify and reformat page numbers, adding empty lines around them | |
| page_match = re.match(r'page_(\d+)\.txt', cleaned_line) | |
| if page_match: | |
| page_number = int(page_match.group(1)) | |
| outfile.write("\n") # Empty line before | |
| outfile.write(f"Page {page_number}\n") # The page number line | |
| outfile.write("\n") # Empty line after | |
| # Reset consecutive line tracking for content lines when a page number is encountered | |
| previous_content_line = None | |
| consecutive_count = 0 | |
| continue | |
| # Collapse multiple spaces into a single space and strip leading/trailing whitespace | |
| processed_line = re.sub(r'\s+', ' ', line).strip() | |
| # Process content lines (non-empty after cleaning) | |
| if processed_line: | |
| # Check for consecutive repetitions of content lines | |
| if processed_line == previous_content_line: | |
| consecutive_count += 1 | |
| else: | |
| previous_content_line = processed_line | |
| consecutive_count = 1 | |
| # Write the line only if it has not been repeated more than 3 times consecutively | |
| if consecutive_count <= 3: | |
| outfile.write(processed_line + "\n") | |
| else: | |
| # If a line becomes empty after processing, it also breaks the consecutive sequence | |
| previous_content_line = None | |
| consecutive_count = 0 | |
| if __name__ == "__main__": | |
| if len(sys.argv) != 3: | |
| print("Usage: python clean_script.py <input_filename> <output_filename>") | |
| else: | |
| input_file = sys.argv[1] | |
| output_file = sys.argv[2] | |
| clean_text_file(input_file, output_file) |