Created
July 1, 2025 02:31
-
-
Save spdin/32e7d683e6b00bc754dd13f93b45ce52 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import base64 | |
import requests | |
# Set up Datasaur API | |
API_URL = "https://deployment.datasaur.ai/api/deployment/8/1937/chat/completions" | |
headers = { | |
'Authorization': 'Bearer <api-key>', | |
'Content-Type': 'application/json' | |
} | |
def encode_image(image_path): | |
"""Encode the image to base64.""" | |
try: | |
with open(image_path, "rb") as image_file: | |
return base64.b64encode(image_file.read()).decode('utf-8') | |
except FileNotFoundError: | |
print(f"Error: The file {image_path} was not found.") | |
return None | |
except Exception as e: | |
print(f"Error: {e}") | |
return None | |
def extract_text_from_image(image_path): | |
# Getting the base64 string | |
base64_image = encode_image(image_path) | |
if base64_image is None: | |
return None | |
try: | |
payload = { | |
"messages": [{ | |
"role": "user", | |
"content": [ | |
{ | |
"type": "url", | |
"url": f"data:text/html;base64,{base64_image}", | |
"options": { | |
"include_page_screenshot_as_image": True | |
} | |
}, | |
] | |
}] | |
} | |
response = requests.post(API_URL, headers=headers, json=payload) | |
response_data = response.json() | |
# Extract usage information if available | |
prompt_tokens = response_data.get("usage", {}).get("prompt_tokens", 0) | |
completion_tokens = response_data.get("usage", {}).get("completion_tokens", 0) | |
output = { | |
"text": response_data["choices"][0]["message"]["content"], | |
"prompt_tokens": prompt_tokens, | |
"completion_tokens": completion_tokens | |
} | |
return output | |
except Exception as e: | |
print(f"Error processing image {image_path}: {e}") | |
return None |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import asyncio | |
import os | |
import sys | |
import tempfile | |
from pathlib import Path | |
from typing import Any, Sequence, List, Dict | |
import logging | |
from mcp.server.models import InitializationOptions | |
from mcp.server import NotificationOptions, Server | |
from mcp.server.stdio import stdio_server | |
from mcp.types import ( | |
CallToolRequest, | |
CallToolResult, | |
ListToolsRequest, | |
TextContent, | |
Tool, | |
) | |
# Import the existing parser function | |
from llm_pdf_parser import extract_text_from_image | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger("parser-server") | |
# Create the MCP server instance | |
app = Server("parser-server") | |
# Supported file extensions | |
SUPPORTED_IMAGE_FORMATS = {'.jpg', '.jpeg', '.png'} | |
SUPPORTED_PDF_FORMAT = {'.pdf'} | |
ALL_SUPPORTED_FORMATS = SUPPORTED_IMAGE_FORMATS | SUPPORTED_PDF_FORMAT | |
def is_supported_file(file_path: str) -> bool: | |
"""Check if the file format is supported.""" | |
return Path(file_path).suffix.lower() in ALL_SUPPORTED_FORMATS | |
def is_pdf_file(file_path: str) -> bool: | |
"""Check if the file is a PDF.""" | |
return Path(file_path).suffix.lower() == '.pdf' | |
def is_image_file(file_path: str) -> bool: | |
"""Check if the file is an image.""" | |
return Path(file_path).suffix.lower() in SUPPORTED_IMAGE_FORMATS | |
def convert_pdf_to_images(pdf_path: str, output_dir: str = None) -> List[str]: | |
"""Convert PDF to images and return list of image paths.""" | |
try: | |
from pdf2image import convert_from_path | |
if output_dir is None: | |
output_dir = tempfile.mkdtemp() | |
# Convert PDF to images | |
images = convert_from_path(pdf_path) | |
image_paths = [] | |
pdf_name = Path(pdf_path).stem | |
for i, image in enumerate(images): | |
image_path = os.path.join(output_dir, f"{pdf_name}_page_{i+1}.png") | |
image.save(image_path, 'PNG') | |
image_paths.append(image_path) | |
logger.info(f"Converted page {i+1} to {image_path}") | |
return image_paths | |
except ImportError: | |
logger.error("pdf2image library is not installed. Please install it with: pip install pdf2image") | |
raise Exception("pdf2image library is required for PDF processing") | |
except Exception as e: | |
logger.error(f"Error converting PDF to images: {e}") | |
raise e | |
async def parse_single_image(image_path: str) -> Dict[str, Any]: | |
"""Parse a single image file.""" | |
try: | |
logger.info(f"Parsing image: {image_path}") | |
result = extract_text_from_image(image_path) | |
if result is None: | |
return { | |
"success": False, | |
"error": f"Failed to process image: {image_path}", | |
"file_path": image_path | |
} | |
return { | |
"success": True, | |
"file_path": image_path, | |
"text": result.get("text", ""), | |
"prompt_tokens": result.get("prompt_tokens", 0), | |
"completion_tokens": result.get("completion_tokens", 0), | |
"total_tokens": result.get("prompt_tokens", 0) + result.get("completion_tokens", 0) | |
} | |
except Exception as e: | |
logger.error(f"Error parsing image {image_path}: {e}") | |
return { | |
"success": False, | |
"error": str(e), | |
"file_path": image_path | |
} | |
async def parse_pdf_file(pdf_path: str) -> Dict[str, Any]: | |
"""Parse a PDF file by converting it to images first.""" | |
temp_dir = None | |
try: | |
# Create temporary directory for images | |
temp_dir = tempfile.mkdtemp() | |
logger.info(f"Converting PDF to images: {pdf_path}") | |
# Convert PDF to images | |
image_paths = convert_pdf_to_images(pdf_path, temp_dir) | |
if not image_paths: | |
return { | |
"success": False, | |
"error": "No pages found in PDF", | |
"file_path": pdf_path | |
} | |
# Parse each page | |
pages_results = [] | |
total_prompt_tokens = 0 | |
total_completion_tokens = 0 | |
for i, image_path in enumerate(image_paths): | |
page_result = await parse_single_image(image_path) | |
page_result["page_number"] = i + 1 | |
pages_results.append(page_result) | |
if page_result["success"]: | |
total_prompt_tokens += page_result.get("prompt_tokens", 0) | |
total_completion_tokens += page_result.get("completion_tokens", 0) | |
# Combine all text from successful pages | |
combined_text = "" | |
successful_pages = 0 | |
failed_pages = [] | |
for page_result in pages_results: | |
if page_result["success"]: | |
successful_pages += 1 | |
combined_text += f"\n--- Page {page_result['page_number']} ---\n" | |
combined_text += page_result["text"] | |
else: | |
failed_pages.append(page_result["page_number"]) | |
result = { | |
"success": successful_pages > 0, | |
"file_path": pdf_path, | |
"total_pages": len(image_paths), | |
"successful_pages": successful_pages, | |
"failed_pages": failed_pages, | |
"combined_text": combined_text.strip(), | |
"pages_details": pages_results, | |
"total_prompt_tokens": total_prompt_tokens, | |
"total_completion_tokens": total_completion_tokens, | |
"total_tokens": total_prompt_tokens + total_completion_tokens | |
} | |
return result | |
except Exception as e: | |
logger.error(f"Error parsing PDF {pdf_path}: {e}") | |
return { | |
"success": False, | |
"error": str(e), | |
"file_path": pdf_path | |
} | |
finally: | |
# Clean up temporary files | |
if temp_dir and os.path.exists(temp_dir): | |
try: | |
import shutil | |
shutil.rmtree(temp_dir) | |
logger.info(f"Cleaned up temporary directory: {temp_dir}") | |
except Exception as e: | |
logger.warning(f"Failed to clean up temporary directory {temp_dir}: {e}") | |
@app.list_tools() | |
async def handle_list_tools() -> list[Tool]: | |
"""List available tools for the parser server.""" | |
return [ | |
Tool( | |
name="parse_file", | |
description="Parse text from PDF or image files (jpg, jpeg, png). For PDFs, converts to images first and processes each page.", | |
inputSchema={ | |
"type": "object", | |
"properties": { | |
"file_path": { | |
"type": "string", | |
"description": "Path to the PDF or image file to parse" | |
} | |
}, | |
"required": ["file_path"] | |
} | |
), | |
Tool( | |
name="parse_multiple_files", | |
description="Parse text from multiple PDF or image files", | |
inputSchema={ | |
"type": "object", | |
"properties": { | |
"file_paths": { | |
"type": "array", | |
"items": { | |
"type": "string" | |
}, | |
"description": "List of paths to PDF or image files to parse" | |
} | |
}, | |
"required": ["file_paths"] | |
} | |
), | |
Tool( | |
name="get_supported_formats", | |
description="Get list of supported file formats", | |
inputSchema={ | |
"type": "object", | |
"properties": {}, | |
"additionalProperties": False | |
} | |
) | |
] | |
@app.call_tool() | |
async def handle_call_tool(name: str, arguments: dict) -> list[TextContent]: | |
"""Handle tool calls for the parser server.""" | |
if name == "parse_file": | |
file_path = arguments.get("file_path") | |
if not file_path: | |
return [TextContent(type="text", text="❌ Error: file_path is required")] | |
if not os.path.exists(file_path): | |
return [TextContent(type="text", text=f"❌ Error: File not found: {file_path}")] | |
if not is_supported_file(file_path): | |
supported_formats = ", ".join(ALL_SUPPORTED_FORMATS) | |
return [TextContent(type="text", text=f"❌ Error: Unsupported file format. Supported formats: {supported_formats}")] | |
try: | |
if is_pdf_file(file_path): | |
result = await parse_pdf_file(file_path) | |
else: | |
result = await parse_single_image(file_path) | |
if result["success"]: | |
if is_pdf_file(file_path): | |
# response = f"✅ Successfully parsed PDF: {file_path}\n" | |
# response += f"📄 Total pages: {result['total_pages']}\n" | |
# response += f"✅ Successfully processed: {result['successful_pages']} pages\n" | |
# if result['failed_pages']: | |
# response += f"❌ Failed pages: {result['failed_pages']}\n" | |
# response += f"🔢 Total tokens used: {result['total_tokens']} (prompt: {result['total_prompt_tokens']}, completion: {result['total_completion_tokens']})\n\n" | |
# response += "📝 Extracted Text:\n" | |
# response += result["combined_text"] | |
response = result["combined_text"] | |
else: | |
# response = f"✅ Successfully parsed image: {file_path}\n" | |
# response += f"🔢 Tokens used: {result['total_tokens']} (prompt: {result['prompt_tokens']}, completion: {result['completion_tokens']})\n\n" | |
# response += "📝 Extracted Text:\n" | |
# response += result["text"] | |
response = result["text"] | |
return [TextContent(type="text", text=response)] | |
else: | |
return [TextContent(type="text", text=f"❌ Failed to parse file: {result.get('error', 'Unknown error')}")] | |
except Exception as e: | |
logger.error(f"Error in parse_file: {e}") | |
return [TextContent(type="text", text=f"❌ Error processing file: {str(e)}")] | |
elif name == "parse_multiple_files": | |
file_paths = arguments.get("file_paths", []) | |
if not file_paths: | |
return [TextContent(type="text", text="❌ Error: file_paths list is required")] | |
if not isinstance(file_paths, list): | |
return [TextContent(type="text", text="❌ Error: file_paths must be a list")] | |
results = [] | |
total_tokens = 0 | |
successful_files = 0 | |
failed_files = [] | |
for file_path in file_paths: | |
if not os.path.exists(file_path): | |
failed_files.append(f"{file_path} (not found)") | |
continue | |
if not is_supported_file(file_path): | |
failed_files.append(f"{file_path} (unsupported format)") | |
continue | |
try: | |
if is_pdf_file(file_path): | |
result = await parse_pdf_file(file_path) | |
else: | |
result = await parse_single_image(file_path) | |
if result["success"]: | |
successful_files += 1 | |
total_tokens += result.get("total_tokens", 0) | |
results.append(result) | |
else: | |
failed_files.append(f"{file_path} ({result.get('error', 'Unknown error')})") | |
except Exception as e: | |
failed_files.append(f"{file_path} ({str(e)})") | |
# Format response | |
response = f"📊 Batch Processing Results:\n" | |
response += f"✅ Successfully processed: {successful_files} files\n" | |
response += f"❌ Failed: {len(failed_files)} files\n" | |
response += f"🔢 Total tokens used: {total_tokens}\n\n" | |
if failed_files: | |
response += "❌ Failed files:\n" | |
for failed_file in failed_files: | |
response += f" - {failed_file}\n" | |
response += "\n" | |
if results: | |
response += "📝 Extracted Text from all files:\n" | |
response += "=" * 50 + "\n" | |
for result in results: | |
response += f"\n📄 File: {result['file_path']}\n" | |
response += "-" * 30 + "\n" | |
if is_pdf_file(result['file_path']): | |
response += result.get("combined_text", "") | |
else: | |
response += result.get("text", "") | |
response += "\n" + "=" * 50 + "\n" | |
return [TextContent(type="text", text=response)] | |
elif name == "get_supported_formats": | |
formats_info = { | |
"Images": list(SUPPORTED_IMAGE_FORMATS), | |
"Documents": list(SUPPORTED_PDF_FORMAT) | |
} | |
response = "📋 Supported File Formats:\n\n" | |
for category, formats in formats_info.items(): | |
response += f"📂 {category}:\n" | |
for fmt in formats: | |
response += f" - {fmt}\n" | |
response += "\n" | |
response += "ℹ️ Note: PDF files are converted to images first, then processed page by page." | |
return [TextContent(type="text", text=response)] | |
else: | |
return [TextContent(type="text", text=f"❌ Unknown tool: {name}")] | |
async def main(): | |
"""Main entry point for the parser server.""" | |
# Run the server using stdin/stdout streams | |
async with stdio_server() as (read_stream, write_stream): | |
await app.run( | |
read_stream, | |
write_stream, | |
InitializationOptions( | |
server_name="parser-server", | |
server_version="1.0.0", | |
capabilities=app.get_capabilities( | |
notification_options=NotificationOptions(), | |
experimental_capabilities={}, | |
), | |
), | |
) | |
if __name__ == "__main__": | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment