Skip to content

Instantly share code, notes, and snippets.

@DCCoder90
Last active November 12, 2025 16:07
Show Gist options
  • Select an option

  • Save DCCoder90/989ddb34e85daf56b89d52b8b4436a39 to your computer and use it in GitHub Desktop.

Select an option

Save DCCoder90/989ddb34e85daf56b89d52b8b4436a39 to your computer and use it in GitHub Desktop.
Small multimodal API for storing data in ChromaDB for AI applications. https://docs.trychroma.com/docs/overview/introduction
import uvicorn
import chromadb
import uuid
import json
import io
from pypdf import PdfReader
from PIL import Image
from fastapi import FastAPI, UploadFile, File, Form, HTTPException
from typing import Optional
from chromadb.utils import embedding_functions
app = FastAPI(
title="Multimodal File API",
description="API to upload/update/delete files in ChromaDB.",
version="1.0.0"
)
try:
clip_model_name = "clip-ViT-B-32" #Using CLIP for text and images
clip_embedding_function = embedding_functions.SentenceTransformerEmbeddingFunction(
model_name=clip_model_name
)
client = chromadb.Client()
collection = client.get_or_create_collection(
name="multimodal_store", #Current collection name, may want to change this
embedding_function=clip_embedding_function
)
except Exception as e:
print(f"Error initializing ChromaDB or loading model: {e}")
exit(1)
def parse_metadata(metadata_str: Optional[str]) -> dict:
#Safely parses a JSON string into a dictionary.
if metadata_str is None:
return {}
try:
return json.loads(metadata_str)
except json.JSONDecodeError:
raise HTTPException(
status_code=400,
detail="Invalid JSON format for metadata."
)
async def process_file_for_chroma(file: UploadFile):
#Processes a file, returning its content and type.
content = await file.read()
if file.content_type.startswith("image/"):
try:
image_data = Image.open(io.BytesIO(content))
return "image", image_data
except Exception as e:
raise HTTPException(status_code=400, detail=f"Error parsing image: {e}")
elif file.content_type == "application/pdf":
try:
pdf_stream = io.BytesIO(content)
reader = PdfReader(pdf_stream)
text_parts = [page.extract_text() for page in reader.pages]
return "document", "\n".join(text_parts)
except Exception as e:
raise HTTPException(status_code=400, detail=f"Error parsing PDF: {e}")
else:
try:
return "document", content.decode('utf-8')
except UnicodeDecodeError:
raise HTTPException(
status_code=400,
detail="File is not a valid image, PDF, or UTF-8 text file."
)
@app.post("/upload/", summary="Add new file (TXT, PDF, or Image)")
async def create_file_entry(
file: UploadFile = File(...),
metadata: Optional[str] = Form(None)
):
try:
doc_id = str(uuid.uuid4())
doc_metadata = parse_metadata(metadata)
doc_metadata["filename"] = file.filename
file_type, data = await process_file_for_chroma(file)
if file_type == "image":
collection.add(
images=[data],
metadatas=[doc_metadata],
ids=[doc_id]
)
else: # A document
collection.add(
documents=[data],
metadatas=[doc_metadata],
ids=[doc_id]
)
return {"message": "File added successfully", "id": doc_id, "type": file_type}
except HTTPException as e:
raise e
except Exception as e:
raise HTTPException(status_code=500, detail=f"An error occurred: {e}")
@app.put("/update/{doc_id}", summary="Update existing file")
async def update_file_entry(
doc_id: str,
file: UploadFile = File(...),
metadata: Optional[str] = Form(None)
):
try:
existing = collection.get(ids=[doc_id])
if not existing['ids']:
raise HTTPException(status_code=404, detail=f"File with id '{doc_id}' not found.")
doc_metadata = parse_metadata(metadata)
doc_metadata["filename"] = file.filename
file_type, data = await process_file_for_chroma(file)
if file_type == "image":
collection.update(
images=[data],
metadatas=[doc_metadata],
ids=[doc_id]
)
else:
collection.update(
documents=[data],
metadatas=[doc_metadata],
ids=[doc_id]
)
return {"message": "File updated successfully", "id": doc_id, "type": file_type}
except HTTPException as e:
raise e
except Exception as e:
raise HTTPException(status_code=500, detail=f"An error occurred: {e}")
@app.delete("/delete/{doc_id}", summary="Delete a file")
async def delete_file_entry(doc_id: str):
try:
existing = collection.get(ids=[doc_id])
if not existing['ids']:
raise HTTPException(status_code=404, detail=f"File with id '{doc_id}' not found.")
collection.delete(ids=[doc_id])
return {"message": "File deleted successfully", "id": doc_id}
except HTTPException as e:
raise e
except Exception as e:
raise HTTPException(status_code=500, detail=f"An error occurred: {e}")
@app.get("/get/{doc_id}", summary="Get file details (metadata only)")
async def get_file_entry(doc_id: str):
"""
Retrieve the metadata for a given ID.
"""
try:
entry = collection.get(ids=[doc_id], include=["metadatas", "documents"])
if not entry['ids']:
raise HTTPException(status_code=404, detail=f"File with id '{doc_id}' not found.")
return {
"id": entry["ids"][0],
"metadata": entry["metadatas"][0],
"extracted_text": entry["documents"][0] if entry["documents"] else "N/A (was an image)"
}
except HTTPException as e:
raise e
except Exception as e:
raise HTTPException(status_code=500, detail=f"An error occurred: {e}")
if __name__ == "__main__":
print(f"Starting API server with '{clip_model_name}' model...")
print("View API docs at http://127.0.0.1:8000/docs")
uvicorn.run(app, host="127.0.0.1", port=8000) #Currently only binding to 127.0.0.1, may want to update this
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment