Skip to content

Instantly share code, notes, and snippets.

@spdin
Created July 1, 2025 02:31
Show Gist options
  • Save spdin/32e7d683e6b00bc754dd13f93b45ce52 to your computer and use it in GitHub Desktop.
Save spdin/32e7d683e6b00bc754dd13f93b45ce52 to your computer and use it in GitHub Desktop.
import base64
import requests
# Set up Datasaur API
API_URL = "https://deployment.datasaur.ai/api/deployment/8/1937/chat/completions"
headers = {
'Authorization': 'Bearer <api-key>',
'Content-Type': 'application/json'
}
def encode_image(image_path):
"""Encode the image to base64."""
try:
with open(image_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode('utf-8')
except FileNotFoundError:
print(f"Error: The file {image_path} was not found.")
return None
except Exception as e:
print(f"Error: {e}")
return None
def extract_text_from_image(image_path):
# Getting the base64 string
base64_image = encode_image(image_path)
if base64_image is None:
return None
try:
payload = {
"messages": [{
"role": "user",
"content": [
{
"type": "url",
"url": f"data:text/html;base64,{base64_image}",
"options": {
"include_page_screenshot_as_image": True
}
},
]
}]
}
response = requests.post(API_URL, headers=headers, json=payload)
response_data = response.json()
# Extract usage information if available
prompt_tokens = response_data.get("usage", {}).get("prompt_tokens", 0)
completion_tokens = response_data.get("usage", {}).get("completion_tokens", 0)
output = {
"text": response_data["choices"][0]["message"]["content"],
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens
}
return output
except Exception as e:
print(f"Error processing image {image_path}: {e}")
return None
#!/usr/bin/env python3
import asyncio
import os
import sys
import tempfile
from pathlib import Path
from typing import Any, Sequence, List, Dict
import logging
from mcp.server.models import InitializationOptions
from mcp.server import NotificationOptions, Server
from mcp.server.stdio import stdio_server
from mcp.types import (
CallToolRequest,
CallToolResult,
ListToolsRequest,
TextContent,
Tool,
)
# Import the existing parser function
from llm_pdf_parser import extract_text_from_image
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("parser-server")
# Create the MCP server instance
app = Server("parser-server")
# Supported file extensions
SUPPORTED_IMAGE_FORMATS = {'.jpg', '.jpeg', '.png'}
SUPPORTED_PDF_FORMAT = {'.pdf'}
ALL_SUPPORTED_FORMATS = SUPPORTED_IMAGE_FORMATS | SUPPORTED_PDF_FORMAT
def is_supported_file(file_path: str) -> bool:
"""Check if the file format is supported."""
return Path(file_path).suffix.lower() in ALL_SUPPORTED_FORMATS
def is_pdf_file(file_path: str) -> bool:
"""Check if the file is a PDF."""
return Path(file_path).suffix.lower() == '.pdf'
def is_image_file(file_path: str) -> bool:
"""Check if the file is an image."""
return Path(file_path).suffix.lower() in SUPPORTED_IMAGE_FORMATS
def convert_pdf_to_images(pdf_path: str, output_dir: str = None) -> List[str]:
"""Convert PDF to images and return list of image paths."""
try:
from pdf2image import convert_from_path
if output_dir is None:
output_dir = tempfile.mkdtemp()
# Convert PDF to images
images = convert_from_path(pdf_path)
image_paths = []
pdf_name = Path(pdf_path).stem
for i, image in enumerate(images):
image_path = os.path.join(output_dir, f"{pdf_name}_page_{i+1}.png")
image.save(image_path, 'PNG')
image_paths.append(image_path)
logger.info(f"Converted page {i+1} to {image_path}")
return image_paths
except ImportError:
logger.error("pdf2image library is not installed. Please install it with: pip install pdf2image")
raise Exception("pdf2image library is required for PDF processing")
except Exception as e:
logger.error(f"Error converting PDF to images: {e}")
raise e
async def parse_single_image(image_path: str) -> Dict[str, Any]:
"""Parse a single image file."""
try:
logger.info(f"Parsing image: {image_path}")
result = extract_text_from_image(image_path)
if result is None:
return {
"success": False,
"error": f"Failed to process image: {image_path}",
"file_path": image_path
}
return {
"success": True,
"file_path": image_path,
"text": result.get("text", ""),
"prompt_tokens": result.get("prompt_tokens", 0),
"completion_tokens": result.get("completion_tokens", 0),
"total_tokens": result.get("prompt_tokens", 0) + result.get("completion_tokens", 0)
}
except Exception as e:
logger.error(f"Error parsing image {image_path}: {e}")
return {
"success": False,
"error": str(e),
"file_path": image_path
}
async def parse_pdf_file(pdf_path: str) -> Dict[str, Any]:
"""Parse a PDF file by converting it to images first."""
temp_dir = None
try:
# Create temporary directory for images
temp_dir = tempfile.mkdtemp()
logger.info(f"Converting PDF to images: {pdf_path}")
# Convert PDF to images
image_paths = convert_pdf_to_images(pdf_path, temp_dir)
if not image_paths:
return {
"success": False,
"error": "No pages found in PDF",
"file_path": pdf_path
}
# Parse each page
pages_results = []
total_prompt_tokens = 0
total_completion_tokens = 0
for i, image_path in enumerate(image_paths):
page_result = await parse_single_image(image_path)
page_result["page_number"] = i + 1
pages_results.append(page_result)
if page_result["success"]:
total_prompt_tokens += page_result.get("prompt_tokens", 0)
total_completion_tokens += page_result.get("completion_tokens", 0)
# Combine all text from successful pages
combined_text = ""
successful_pages = 0
failed_pages = []
for page_result in pages_results:
if page_result["success"]:
successful_pages += 1
combined_text += f"\n--- Page {page_result['page_number']} ---\n"
combined_text += page_result["text"]
else:
failed_pages.append(page_result["page_number"])
result = {
"success": successful_pages > 0,
"file_path": pdf_path,
"total_pages": len(image_paths),
"successful_pages": successful_pages,
"failed_pages": failed_pages,
"combined_text": combined_text.strip(),
"pages_details": pages_results,
"total_prompt_tokens": total_prompt_tokens,
"total_completion_tokens": total_completion_tokens,
"total_tokens": total_prompt_tokens + total_completion_tokens
}
return result
except Exception as e:
logger.error(f"Error parsing PDF {pdf_path}: {e}")
return {
"success": False,
"error": str(e),
"file_path": pdf_path
}
finally:
# Clean up temporary files
if temp_dir and os.path.exists(temp_dir):
try:
import shutil
shutil.rmtree(temp_dir)
logger.info(f"Cleaned up temporary directory: {temp_dir}")
except Exception as e:
logger.warning(f"Failed to clean up temporary directory {temp_dir}: {e}")
@app.list_tools()
async def handle_list_tools() -> list[Tool]:
"""List available tools for the parser server."""
return [
Tool(
name="parse_file",
description="Parse text from PDF or image files (jpg, jpeg, png). For PDFs, converts to images first and processes each page.",
inputSchema={
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "Path to the PDF or image file to parse"
}
},
"required": ["file_path"]
}
),
Tool(
name="parse_multiple_files",
description="Parse text from multiple PDF or image files",
inputSchema={
"type": "object",
"properties": {
"file_paths": {
"type": "array",
"items": {
"type": "string"
},
"description": "List of paths to PDF or image files to parse"
}
},
"required": ["file_paths"]
}
),
Tool(
name="get_supported_formats",
description="Get list of supported file formats",
inputSchema={
"type": "object",
"properties": {},
"additionalProperties": False
}
)
]
@app.call_tool()
async def handle_call_tool(name: str, arguments: dict) -> list[TextContent]:
"""Handle tool calls for the parser server."""
if name == "parse_file":
file_path = arguments.get("file_path")
if not file_path:
return [TextContent(type="text", text="❌ Error: file_path is required")]
if not os.path.exists(file_path):
return [TextContent(type="text", text=f"❌ Error: File not found: {file_path}")]
if not is_supported_file(file_path):
supported_formats = ", ".join(ALL_SUPPORTED_FORMATS)
return [TextContent(type="text", text=f"❌ Error: Unsupported file format. Supported formats: {supported_formats}")]
try:
if is_pdf_file(file_path):
result = await parse_pdf_file(file_path)
else:
result = await parse_single_image(file_path)
if result["success"]:
if is_pdf_file(file_path):
# response = f"✅ Successfully parsed PDF: {file_path}\n"
# response += f"📄 Total pages: {result['total_pages']}\n"
# response += f"✅ Successfully processed: {result['successful_pages']} pages\n"
# if result['failed_pages']:
# response += f"❌ Failed pages: {result['failed_pages']}\n"
# response += f"🔢 Total tokens used: {result['total_tokens']} (prompt: {result['total_prompt_tokens']}, completion: {result['total_completion_tokens']})\n\n"
# response += "📝 Extracted Text:\n"
# response += result["combined_text"]
response = result["combined_text"]
else:
# response = f"✅ Successfully parsed image: {file_path}\n"
# response += f"🔢 Tokens used: {result['total_tokens']} (prompt: {result['prompt_tokens']}, completion: {result['completion_tokens']})\n\n"
# response += "📝 Extracted Text:\n"
# response += result["text"]
response = result["text"]
return [TextContent(type="text", text=response)]
else:
return [TextContent(type="text", text=f"❌ Failed to parse file: {result.get('error', 'Unknown error')}")]
except Exception as e:
logger.error(f"Error in parse_file: {e}")
return [TextContent(type="text", text=f"❌ Error processing file: {str(e)}")]
elif name == "parse_multiple_files":
file_paths = arguments.get("file_paths", [])
if not file_paths:
return [TextContent(type="text", text="❌ Error: file_paths list is required")]
if not isinstance(file_paths, list):
return [TextContent(type="text", text="❌ Error: file_paths must be a list")]
results = []
total_tokens = 0
successful_files = 0
failed_files = []
for file_path in file_paths:
if not os.path.exists(file_path):
failed_files.append(f"{file_path} (not found)")
continue
if not is_supported_file(file_path):
failed_files.append(f"{file_path} (unsupported format)")
continue
try:
if is_pdf_file(file_path):
result = await parse_pdf_file(file_path)
else:
result = await parse_single_image(file_path)
if result["success"]:
successful_files += 1
total_tokens += result.get("total_tokens", 0)
results.append(result)
else:
failed_files.append(f"{file_path} ({result.get('error', 'Unknown error')})")
except Exception as e:
failed_files.append(f"{file_path} ({str(e)})")
# Format response
response = f"📊 Batch Processing Results:\n"
response += f"✅ Successfully processed: {successful_files} files\n"
response += f"❌ Failed: {len(failed_files)} files\n"
response += f"🔢 Total tokens used: {total_tokens}\n\n"
if failed_files:
response += "❌ Failed files:\n"
for failed_file in failed_files:
response += f" - {failed_file}\n"
response += "\n"
if results:
response += "📝 Extracted Text from all files:\n"
response += "=" * 50 + "\n"
for result in results:
response += f"\n📄 File: {result['file_path']}\n"
response += "-" * 30 + "\n"
if is_pdf_file(result['file_path']):
response += result.get("combined_text", "")
else:
response += result.get("text", "")
response += "\n" + "=" * 50 + "\n"
return [TextContent(type="text", text=response)]
elif name == "get_supported_formats":
formats_info = {
"Images": list(SUPPORTED_IMAGE_FORMATS),
"Documents": list(SUPPORTED_PDF_FORMAT)
}
response = "📋 Supported File Formats:\n\n"
for category, formats in formats_info.items():
response += f"📂 {category}:\n"
for fmt in formats:
response += f" - {fmt}\n"
response += "\n"
response += "ℹ️ Note: PDF files are converted to images first, then processed page by page."
return [TextContent(type="text", text=response)]
else:
return [TextContent(type="text", text=f"❌ Unknown tool: {name}")]
async def main():
"""Main entry point for the parser server."""
# Run the server using stdin/stdout streams
async with stdio_server() as (read_stream, write_stream):
await app.run(
read_stream,
write_stream,
InitializationOptions(
server_name="parser-server",
server_version="1.0.0",
capabilities=app.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment