Last active
March 14, 2025 06:07
-
-
Save arya-oss/4a5309fe8e78117a8d910c7255cd80b4 to your computer and use it in GitHub Desktop.
Build an AI ChatBot with Gemma3 with Streamlit
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Ollama Chat | |
This is a simple chatbot that uses Ollama to generate responses. | |
Tech Stack: | |
- Ollama | |
- Streamlit | |
- Python | |
# Run using streamlit | |
ollama pull gemma3:12b | |
streamlit run ollama_chat.py | |
""" | |
import ollama | |
import streamlit as st | |
from PIL import Image | |
import pytesseract | |
import io | |
import fitz # PyMuPDF for PDF processing | |
import base64 | |
import json | |
import os | |
from datetime import datetime | |
# Constants | |
HISTORY_DIR = "chat_history" | |
MAX_HISTORY_FILES = 10 # Maximum number of history files to keep | |
MAX_TITLE_LENGTH = 50 # Maximum length for chat titles | |
def ensure_history_dir(): | |
"""Ensure the chat history directory exists""" | |
if not os.path.exists(HISTORY_DIR): | |
os.makedirs(HISTORY_DIR) | |
def generate_chat_title(messages): | |
"""Generate a title from the first user message""" | |
for message in messages: | |
if message["role"] == "user": | |
title = message["content"][:MAX_TITLE_LENGTH] | |
# Truncate at the last complete word if necessary | |
if len(message["content"]) > MAX_TITLE_LENGTH: | |
title = title.rsplit(' ', 1)[0] + "..." | |
return title | |
return "New Chat" | |
def save_chat_history(messages, filename=None): | |
"""Save chat history to a JSON file""" | |
ensure_history_dir() | |
if filename is None: | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
title = generate_chat_title(messages) | |
filename = f"chat_history_{timestamp}.json" | |
filepath = os.path.join(HISTORY_DIR, filename) | |
# Clean up old history files if we exceed the maximum | |
cleanup_old_history_files() | |
# Save messages along with the title | |
data = { | |
"messages": messages, | |
"title": generate_chat_title(messages) | |
} | |
with open(filepath, 'w') as f: | |
json.dump(data, f) | |
return filename | |
def load_chat_history(filename=None): | |
"""Load chat history from a specific file or the most recent one""" | |
ensure_history_dir() | |
try: | |
if filename: | |
filepath = os.path.join(HISTORY_DIR, filename) | |
if os.path.exists(filepath): | |
with open(filepath, 'r') as f: | |
data = json.load(f) | |
# Handle both old and new format files | |
if isinstance(data, dict) and "messages" in data: | |
return data["messages"] | |
return data | |
else: | |
history_files = get_chat_history_files() | |
if history_files: | |
latest_file = os.path.join(HISTORY_DIR, history_files[0]["filename"]) | |
with open(latest_file, 'r') as f: | |
data = json.load(f) | |
if isinstance(data, dict) and "messages" in data: | |
return data["messages"] | |
return data | |
except Exception as e: | |
st.error(f"Error loading chat history: {str(e)}") | |
return [] | |
def delete_chat_history(filename): | |
"""Delete a specific chat history file""" | |
try: | |
filepath = os.path.join(HISTORY_DIR, filename) | |
if os.path.exists(filepath): | |
os.remove(filepath) | |
return True | |
except Exception as e: | |
st.error(f"Error deleting chat history: {str(e)}") | |
return False | |
def get_chat_history_files(): | |
"""Get list of chat history files with their timestamps and titles""" | |
ensure_history_dir() | |
try: | |
history_files = [] | |
for filename in os.listdir(HISTORY_DIR): | |
if filename.startswith("chat_history_"): | |
filepath = os.path.join(HISTORY_DIR, filename) | |
# Extract timestamp from filename | |
timestamp_str = filename.replace("chat_history_", "").replace(".json", "") | |
try: | |
timestamp = datetime.strptime(timestamp_str, "%Y%m%d_%H%M%S") | |
formatted_time = timestamp.strftime("%Y-%m-%d %H:%M:%S") | |
# Load the title from the file | |
with open(filepath, 'r') as f: | |
data = json.load(f) | |
title = data.get("title", "Untitled Chat") if isinstance(data, dict) else "Untitled Chat" | |
history_files.append({ | |
"filename": filename, | |
"timestamp": timestamp, | |
"display_name": formatted_time, | |
"title": title | |
}) | |
except (ValueError, json.JSONDecodeError, IOError): | |
continue | |
# Sort by timestamp, newest first | |
return sorted(history_files, key=lambda x: x["timestamp"], reverse=True) | |
except Exception as e: | |
st.error(f"Error listing chat histories: {str(e)}") | |
return [] | |
def cleanup_old_history_files(): | |
"""Remove old history files if we exceed the maximum""" | |
try: | |
history_files = get_chat_history_files() | |
if len(history_files) > MAX_HISTORY_FILES: | |
for old_file in history_files[MAX_HISTORY_FILES:]: | |
os.remove(os.path.join(HISTORY_DIR, old_file["filename"])) | |
except Exception as e: | |
st.error(f"Error cleaning up history files: {str(e)}") | |
system_prompt = """ | |
You are a helpful assistant that can answer questions and help with tasks. | |
You can also analyze images and PDFs that users share with you. | |
""" | |
def extract_text_from_pdf(pdf_file): | |
"""Extract text from uploaded PDF file""" | |
text = "" | |
try: | |
pdf_bytes = pdf_file.read() | |
doc = fitz.open(stream=pdf_bytes, filetype="pdf") | |
for page in doc: | |
text += page.get_text() | |
return text | |
except Exception as e: | |
return f"Error extracting text from PDF: {str(e)}" | |
def process_image(image_file): | |
"""Process uploaded image and extract text using OCR""" | |
try: | |
# Read the image file | |
image_bytes = image_file.read() | |
image = Image.open(io.BytesIO(image_bytes)) | |
# Perform OCR | |
text = pytesseract.image_to_string(image) | |
# Convert image to base64 for display | |
buffered = io.BytesIO() | |
image.save(buffered, format="PNG") | |
img_str = base64.b64encode(buffered.getvalue()).decode() | |
return text.strip(), img_str | |
except Exception as e: | |
return f"Error processing image: {str(e)}", None | |
def chat(user_prompt, model="gemma3:12b", files_content="", message_history=None): | |
""" | |
Chat with the model using conversation history for context | |
""" | |
# Initialize messages with system prompt | |
messages = [{'role': 'assistant', 'content': system_prompt}] | |
# Add conversation history if provided | |
if message_history: | |
for msg in message_history: | |
# Convert our message format to Ollama's format | |
messages.append({ | |
'role': msg['role'], | |
'content': msg['content'] | |
}) | |
# Add current prompt | |
full_prompt = user_prompt | |
if files_content: | |
full_prompt = f"{files_content}\n\nUser question: {user_prompt}" | |
messages.append({ | |
'role': 'user', | |
'content': f"Model being used is {model}. {full_prompt}" | |
}) | |
stream = ollama.chat( | |
model=model, | |
messages=messages, | |
stream=True, | |
) | |
return stream | |
def stream_parser(stream): | |
for chunk in stream: | |
yield chunk["message"]["content"] | |
# Set the title of the chatbot | |
st.title("Ollama Chatbot") | |
# Initialize session state for deletion confirmations | |
if "show_delete_all_dialog" not in st.session_state: | |
st.session_state.show_delete_all_dialog = False | |
if "delete_chat_id" not in st.session_state: | |
st.session_state.delete_chat_id = None | |
# Sidebar for chat management | |
with st.sidebar: | |
st.title("Chat Management") | |
# New Chat button | |
if st.button("New Chat", key="new_chat"): | |
st.session_state.messages = [] | |
st.session_state.current_chat = None | |
st.rerun() | |
# Clear History button with confirmation | |
if st.button("Clear All History"): | |
st.session_state.show_delete_all_dialog = True | |
st.rerun() | |
# Delete all confirmation dialog | |
if st.session_state.show_delete_all_dialog: | |
st.warning("Are you sure you want to delete all chat histories?") | |
col1, col2 = st.columns(2) | |
with col1: | |
if st.button("Yes, delete all"): | |
try: | |
for file in os.listdir(HISTORY_DIR): | |
os.remove(os.path.join(HISTORY_DIR, file)) | |
st.session_state.messages = [] | |
st.session_state.current_chat = None | |
st.session_state.show_delete_all_dialog = False | |
st.rerun() | |
except Exception as e: | |
st.error(f"Error clearing history: {str(e)}") | |
with col2: | |
if st.button("Cancel"): | |
st.session_state.show_delete_all_dialog = False | |
st.rerun() | |
# Chat history selection | |
st.subheader("Chat History") | |
chat_histories = get_chat_history_files() | |
# Individual chat deletion confirmation dialog | |
if st.session_state.delete_chat_id: | |
history_to_delete = next((h for h in chat_histories if h["filename"] == st.session_state.delete_chat_id), None) | |
if history_to_delete: | |
st.warning(f"Delete chat '{history_to_delete['title']}'?") | |
col1, col2 = st.columns(2) | |
with col1: | |
if st.button("Yes, delete"): | |
if delete_chat_history(st.session_state.delete_chat_id): | |
if st.session_state.current_chat == st.session_state.delete_chat_id: | |
st.session_state.messages = [] | |
st.session_state.current_chat = None | |
st.session_state.delete_chat_id = None | |
st.rerun() | |
with col2: | |
if st.button("Cancel"): | |
st.session_state.delete_chat_id = None | |
st.rerun() | |
# Display chat histories with delete buttons | |
for history in chat_histories: | |
col1, col2 = st.columns([4, 1]) | |
with col1: | |
if st.button(f"📝 {history['title']}\n{history['display_name']}", key=history['filename']): | |
st.session_state.messages = load_chat_history(history['filename']) | |
st.session_state.current_chat = history['filename'] | |
st.rerun() | |
with col2: | |
if st.button("🗑️", key=f"delete_{history['filename']}"): | |
st.session_state.delete_chat_id = history['filename'] | |
st.rerun() | |
# Initialize the chat history and current chat | |
if "messages" not in st.session_state: | |
st.session_state.messages = [] | |
if "current_chat" not in st.session_state: | |
st.session_state.current_chat = None | |
# Display chat history on app re-run | |
for message in st.session_state.messages: | |
with st.chat_message(message["role"]): | |
if "image" in message: | |
st.image(f"data:image/png;base64,{message['image']}") | |
st.markdown(message["content"]) | |
# File upload section | |
uploaded_files = st.file_uploader("Upload PDF or Image files", type=['pdf', 'png', 'jpg', 'jpeg'], accept_multiple_files=True) | |
# Process uploaded files | |
files_content = "" | |
uploaded_images = [] | |
if uploaded_files: | |
for file in uploaded_files: | |
if file.type == "application/pdf": | |
pdf_text = extract_text_from_pdf(file) | |
files_content += f"\nContent from PDF '{file.name}':\n{pdf_text}\n" | |
elif file.type.startswith('image/'): | |
image_text, img_str = process_image(file) | |
if img_str: | |
uploaded_images.append(img_str) | |
files_content += f"\nContent from Image '{file.name}':\n{image_text}\n" | |
# React to user input | |
if user_prompt := st.chat_input("What would you like to ask?"): | |
# Display user prompt and any uploaded images in chat message widget | |
with st.chat_message("user"): | |
st.markdown(user_prompt) | |
for img_str in uploaded_images: | |
st.image(f"data:image/png;base64,{img_str}") | |
# Add user's prompt and images to session state | |
message_data = {"role": "user", "content": user_prompt} | |
if uploaded_images: | |
message_data["image"] = uploaded_images[0] # Store the first image for history | |
st.session_state.messages.append(message_data) | |
with st.spinner('Generating response...'): | |
# Pass the existing messages as context | |
llm_stream = chat( | |
user_prompt, | |
files_content=files_content, | |
message_history=st.session_state.messages # Add conversation history | |
) | |
# streams the response back to the screen | |
stream_output = st.write_stream(stream_parser(llm_stream)) | |
# appends response to the message list | |
st.session_state.messages.append({"role": "assistant", "content": stream_output}) | |
# Save chat history after each message | |
if st.session_state.current_chat: | |
save_chat_history(st.session_state.messages, st.session_state.current_chat) | |
else: | |
new_filename = save_chat_history(st.session_state.messages) | |
st.session_state.current_chat = new_filename |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
ollama==0.4.7 | |
pillow==11.1.0 | |
PyMuPDF==1.25.3 | |
pytesseract==0.3.13 | |
streamlit==1.41.1 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment