Skip to content

Instantly share code, notes, and snippets.

@arya-oss
Last active March 14, 2025 06:07
Show Gist options
  • Save arya-oss/4a5309fe8e78117a8d910c7255cd80b4 to your computer and use it in GitHub Desktop.
Save arya-oss/4a5309fe8e78117a8d910c7255cd80b4 to your computer and use it in GitHub Desktop.
Build an AI ChatBot with Gemma3 with Streamlit
"""
Ollama Chat
This is a simple chatbot that uses Ollama to generate responses.
Tech Stack:
- Ollama
- Streamlit
- Python
# Run using streamlit
ollama pull gemma3:12b
streamlit run ollama_chat.py
"""
import ollama
import streamlit as st
from PIL import Image
import pytesseract
import io
import fitz # PyMuPDF for PDF processing
import base64
import json
import os
from datetime import datetime
# Constants
HISTORY_DIR = "chat_history"
MAX_HISTORY_FILES = 10 # Maximum number of history files to keep
MAX_TITLE_LENGTH = 50 # Maximum length for chat titles
def ensure_history_dir():
"""Ensure the chat history directory exists"""
if not os.path.exists(HISTORY_DIR):
os.makedirs(HISTORY_DIR)
def generate_chat_title(messages):
"""Generate a title from the first user message"""
for message in messages:
if message["role"] == "user":
title = message["content"][:MAX_TITLE_LENGTH]
# Truncate at the last complete word if necessary
if len(message["content"]) > MAX_TITLE_LENGTH:
title = title.rsplit(' ', 1)[0] + "..."
return title
return "New Chat"
def save_chat_history(messages, filename=None):
"""Save chat history to a JSON file"""
ensure_history_dir()
if filename is None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
title = generate_chat_title(messages)
filename = f"chat_history_{timestamp}.json"
filepath = os.path.join(HISTORY_DIR, filename)
# Clean up old history files if we exceed the maximum
cleanup_old_history_files()
# Save messages along with the title
data = {
"messages": messages,
"title": generate_chat_title(messages)
}
with open(filepath, 'w') as f:
json.dump(data, f)
return filename
def load_chat_history(filename=None):
"""Load chat history from a specific file or the most recent one"""
ensure_history_dir()
try:
if filename:
filepath = os.path.join(HISTORY_DIR, filename)
if os.path.exists(filepath):
with open(filepath, 'r') as f:
data = json.load(f)
# Handle both old and new format files
if isinstance(data, dict) and "messages" in data:
return data["messages"]
return data
else:
history_files = get_chat_history_files()
if history_files:
latest_file = os.path.join(HISTORY_DIR, history_files[0]["filename"])
with open(latest_file, 'r') as f:
data = json.load(f)
if isinstance(data, dict) and "messages" in data:
return data["messages"]
return data
except Exception as e:
st.error(f"Error loading chat history: {str(e)}")
return []
def delete_chat_history(filename):
"""Delete a specific chat history file"""
try:
filepath = os.path.join(HISTORY_DIR, filename)
if os.path.exists(filepath):
os.remove(filepath)
return True
except Exception as e:
st.error(f"Error deleting chat history: {str(e)}")
return False
def get_chat_history_files():
"""Get list of chat history files with their timestamps and titles"""
ensure_history_dir()
try:
history_files = []
for filename in os.listdir(HISTORY_DIR):
if filename.startswith("chat_history_"):
filepath = os.path.join(HISTORY_DIR, filename)
# Extract timestamp from filename
timestamp_str = filename.replace("chat_history_", "").replace(".json", "")
try:
timestamp = datetime.strptime(timestamp_str, "%Y%m%d_%H%M%S")
formatted_time = timestamp.strftime("%Y-%m-%d %H:%M:%S")
# Load the title from the file
with open(filepath, 'r') as f:
data = json.load(f)
title = data.get("title", "Untitled Chat") if isinstance(data, dict) else "Untitled Chat"
history_files.append({
"filename": filename,
"timestamp": timestamp,
"display_name": formatted_time,
"title": title
})
except (ValueError, json.JSONDecodeError, IOError):
continue
# Sort by timestamp, newest first
return sorted(history_files, key=lambda x: x["timestamp"], reverse=True)
except Exception as e:
st.error(f"Error listing chat histories: {str(e)}")
return []
def cleanup_old_history_files():
"""Remove old history files if we exceed the maximum"""
try:
history_files = get_chat_history_files()
if len(history_files) > MAX_HISTORY_FILES:
for old_file in history_files[MAX_HISTORY_FILES:]:
os.remove(os.path.join(HISTORY_DIR, old_file["filename"]))
except Exception as e:
st.error(f"Error cleaning up history files: {str(e)}")
system_prompt = """
You are a helpful assistant that can answer questions and help with tasks.
You can also analyze images and PDFs that users share with you.
"""
def extract_text_from_pdf(pdf_file):
"""Extract text from uploaded PDF file"""
text = ""
try:
pdf_bytes = pdf_file.read()
doc = fitz.open(stream=pdf_bytes, filetype="pdf")
for page in doc:
text += page.get_text()
return text
except Exception as e:
return f"Error extracting text from PDF: {str(e)}"
def process_image(image_file):
"""Process uploaded image and extract text using OCR"""
try:
# Read the image file
image_bytes = image_file.read()
image = Image.open(io.BytesIO(image_bytes))
# Perform OCR
text = pytesseract.image_to_string(image)
# Convert image to base64 for display
buffered = io.BytesIO()
image.save(buffered, format="PNG")
img_str = base64.b64encode(buffered.getvalue()).decode()
return text.strip(), img_str
except Exception as e:
return f"Error processing image: {str(e)}", None
def chat(user_prompt, model="gemma3:12b", files_content="", message_history=None):
"""
Chat with the model using conversation history for context
"""
# Initialize messages with system prompt
messages = [{'role': 'assistant', 'content': system_prompt}]
# Add conversation history if provided
if message_history:
for msg in message_history:
# Convert our message format to Ollama's format
messages.append({
'role': msg['role'],
'content': msg['content']
})
# Add current prompt
full_prompt = user_prompt
if files_content:
full_prompt = f"{files_content}\n\nUser question: {user_prompt}"
messages.append({
'role': 'user',
'content': f"Model being used is {model}. {full_prompt}"
})
stream = ollama.chat(
model=model,
messages=messages,
stream=True,
)
return stream
def stream_parser(stream):
for chunk in stream:
yield chunk["message"]["content"]
# Set the title of the chatbot
st.title("Ollama Chatbot")
# Initialize session state for deletion confirmations
if "show_delete_all_dialog" not in st.session_state:
st.session_state.show_delete_all_dialog = False
if "delete_chat_id" not in st.session_state:
st.session_state.delete_chat_id = None
# Sidebar for chat management
with st.sidebar:
st.title("Chat Management")
# New Chat button
if st.button("New Chat", key="new_chat"):
st.session_state.messages = []
st.session_state.current_chat = None
st.rerun()
# Clear History button with confirmation
if st.button("Clear All History"):
st.session_state.show_delete_all_dialog = True
st.rerun()
# Delete all confirmation dialog
if st.session_state.show_delete_all_dialog:
st.warning("Are you sure you want to delete all chat histories?")
col1, col2 = st.columns(2)
with col1:
if st.button("Yes, delete all"):
try:
for file in os.listdir(HISTORY_DIR):
os.remove(os.path.join(HISTORY_DIR, file))
st.session_state.messages = []
st.session_state.current_chat = None
st.session_state.show_delete_all_dialog = False
st.rerun()
except Exception as e:
st.error(f"Error clearing history: {str(e)}")
with col2:
if st.button("Cancel"):
st.session_state.show_delete_all_dialog = False
st.rerun()
# Chat history selection
st.subheader("Chat History")
chat_histories = get_chat_history_files()
# Individual chat deletion confirmation dialog
if st.session_state.delete_chat_id:
history_to_delete = next((h for h in chat_histories if h["filename"] == st.session_state.delete_chat_id), None)
if history_to_delete:
st.warning(f"Delete chat '{history_to_delete['title']}'?")
col1, col2 = st.columns(2)
with col1:
if st.button("Yes, delete"):
if delete_chat_history(st.session_state.delete_chat_id):
if st.session_state.current_chat == st.session_state.delete_chat_id:
st.session_state.messages = []
st.session_state.current_chat = None
st.session_state.delete_chat_id = None
st.rerun()
with col2:
if st.button("Cancel"):
st.session_state.delete_chat_id = None
st.rerun()
# Display chat histories with delete buttons
for history in chat_histories:
col1, col2 = st.columns([4, 1])
with col1:
if st.button(f"📝 {history['title']}\n{history['display_name']}", key=history['filename']):
st.session_state.messages = load_chat_history(history['filename'])
st.session_state.current_chat = history['filename']
st.rerun()
with col2:
if st.button("🗑️", key=f"delete_{history['filename']}"):
st.session_state.delete_chat_id = history['filename']
st.rerun()
# Initialize the chat history and current chat
if "messages" not in st.session_state:
st.session_state.messages = []
if "current_chat" not in st.session_state:
st.session_state.current_chat = None
# Display chat history on app re-run
for message in st.session_state.messages:
with st.chat_message(message["role"]):
if "image" in message:
st.image(f"data:image/png;base64,{message['image']}")
st.markdown(message["content"])
# File upload section
uploaded_files = st.file_uploader("Upload PDF or Image files", type=['pdf', 'png', 'jpg', 'jpeg'], accept_multiple_files=True)
# Process uploaded files
files_content = ""
uploaded_images = []
if uploaded_files:
for file in uploaded_files:
if file.type == "application/pdf":
pdf_text = extract_text_from_pdf(file)
files_content += f"\nContent from PDF '{file.name}':\n{pdf_text}\n"
elif file.type.startswith('image/'):
image_text, img_str = process_image(file)
if img_str:
uploaded_images.append(img_str)
files_content += f"\nContent from Image '{file.name}':\n{image_text}\n"
# React to user input
if user_prompt := st.chat_input("What would you like to ask?"):
# Display user prompt and any uploaded images in chat message widget
with st.chat_message("user"):
st.markdown(user_prompt)
for img_str in uploaded_images:
st.image(f"data:image/png;base64,{img_str}")
# Add user's prompt and images to session state
message_data = {"role": "user", "content": user_prompt}
if uploaded_images:
message_data["image"] = uploaded_images[0] # Store the first image for history
st.session_state.messages.append(message_data)
with st.spinner('Generating response...'):
# Pass the existing messages as context
llm_stream = chat(
user_prompt,
files_content=files_content,
message_history=st.session_state.messages # Add conversation history
)
# streams the response back to the screen
stream_output = st.write_stream(stream_parser(llm_stream))
# appends response to the message list
st.session_state.messages.append({"role": "assistant", "content": stream_output})
# Save chat history after each message
if st.session_state.current_chat:
save_chat_history(st.session_state.messages, st.session_state.current_chat)
else:
new_filename = save_chat_history(st.session_state.messages)
st.session_state.current_chat = new_filename
ollama==0.4.7
pillow==11.1.0
PyMuPDF==1.25.3
pytesseract==0.3.13
streamlit==1.41.1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment