Last active
May 17, 2025 16:17
-
-
Save JupyterJones/e8d189aa52c4f6205bef162878dbf79a to your computer and use it in GitHub Desktop.
this contains the code and directions for an AI teaching/learning Assistant it is extremely effective and makes learning fun for any topic
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Guide: Generating and Using Topic-Specific Learning Assistant Backend | |
https://github.com/JupyterJones/frontend | |
This document outlines how to use the Flask-based App Generator to create new topic-specific FastAPI Backend Applications for your learning assistant project.1. OverviewThe system consists of two main Python scripts:Flask App Generator (app_builder.py - your second script):A Flask web application that provides a user interface to input a topic name and a structured prompt.It uses a master template (template_app_v2.py) to generate a new, customized FastAPI backend application tailored to the specified topic.It also creates necessary directories and the topic-specific structured prompt file.FastAPI Backend Template (template_app_v2.py - your first script):A comprehensive FastAPI application template designed to serve as a backend for a learning assistant.It includes features like Q&A with Gemini, conversation history, file management, vector search with ChromaDB, context-enhanced responses, conversation generation, and essay section generation.This template contains placeholders (%%TOPIC_LOWER%% and %%TOPIC_CAPITALIZED%%) that the App Generator replaces.Workflow: Use the Flask App Generator to create a new Python file (e.g., nutrition.py) and associated files from the template. Then, run this newly generated Python file as a standalone FastAPI service.2. PrerequisitesPython: Version 3.9 or higher is recommended (as indicated by the shebang #!/mnt/HDD500/reactjs-learning-assistant/backend/venv/bin/python3.9 in your builder script).PIP: Python package installer.Gemini API Key: You need a valid API key for Google's Gemini models.Virtual Environment (Recommended): To manage dependencies cleanly.python3 -m venv venv | |
source venv/bin/activate # On Linux/macOS | |
# venv\Scripts\activate # On Windows | |
requirements.txt | |
# Flask and related utilities for the app generator | |
Flask>=2.0 | |
icecream>=2.0 | |
# FastAPI and related utilities for the generated applications | |
fastapi>=0.90.0 | |
uvicorn[standard]>=0.20.0 # For running FastAPI, [standard] includes performance extras | |
pydantic>=1.10.0,<2.0.0 # Version 1.x is widely used with FastAPI, ensure compatibility or update template for V2 | |
# HTTP client for making API calls (e.g., to Gemini) | |
requests>=2.25.0 | |
# Vector database and sentence embeddings | |
chromadb>=0.4.0 | |
sentence-transformers>=2.2.0 | |
# For managing environment variables (e.g., API keys) | |
python-dotenv>=0.20.0 | |
# Google Generative AI SDK (Official Python client for Gemini) | |
# Note: Your current template uses 'requests' for direct REST API calls to Gemini. | |
# This library is included as it's the standard SDK and often used in Gemini projects, | |
# providing more features than direct REST calls. If you strictly want to stick to | |
# the template's direct REST calls, this could be optional. | |
google-generativeai>=0.5.0 | |
------------------ | |
Required Python Libraries: Install the following using pip:pip install Flask icecream fastapi uvicorn[standard] requests pydantic chromadb sentence-transformers python-dotenv | |
(Note: python-dotenv is good practice for managing environment variables, though your template also uses Gemini_key.py)3. File StructureOrganize your project as follows for clarity:your_project_directory/ | |
├── app_builder.py # The Flask App Generator script | |
├── TEXT/ | |
│ └── template_app_v2.py # The FastAPI Backend Template | |
│ | |
├── Gemini_key.py # Optional: if you store your API key here | |
├── .env # Optional: if you use python-dotenv for API key | |
│ | |
└── generated_apps/ # (This directory will be created by app_builder.py if OUTPUT_BASE_DIR is set to './generated_apps') | |
├── <topic_lower>.py # Generated FastAPI app for a topic | |
├── <topic_lower>_files/ | |
│ └── structured_prompt_<topic_lower>.txt | |
│ └── (other files for seeding or generated by the app) | |
└── <topic_lower>_chroma_db/ | |
└── (ChromaDB persistence files) | |
└── <topic_lower>_history.db | |
└── <topic_lower>_conversation.db | |
Important:In app_builder.py, ensure TEMPLATE_FILENAME points to the correct path of your template_app_v2.py. Currently, it's TEXT/template_app_v2.py.The OUTPUT_BASE_DIR in app_builder.py determines where the generated applications and their associated files/directories will be created. If it's . (as in your script), they will be created in the same directory as app_builder.py. You might want to change this to a subdirectory like './generated_apps/' for better organization.4. Setup InstructionsCreate Project Directory: Set up a directory for your project.Place Scripts:Copy app_builder.py into your project directory.Create a subdirectory named TEXT (if it doesn't exist) and place template_app_v2.py inside it.Install Dependencies: Open your terminal, navigate to the project directory, activate your virtual environment (if using one), and run:pip install Flask icecream fastapi uvicorn[standard] requests pydantic chromadb sentence-transformers python-dotenv | |
Set up Gemini API Key:Option A (Environment Variable - Recommended): Create a file named .env in your project root and add your API key:GEMINI_API_KEY="YOUR_ACTUAL_GEMINI_API_KEY" | |
The template_app_v2.py already has a fallback to os.getenv("GEMINI_API_KEY").Option B (Gemini_key.py): Create a file named Gemini_key.py in the same directory where the generated FastAPI app will run (or in a location accessible via Python's module search path) with the content:API_KEY = "YOUR_ACTUAL_GEMINI_API_KEY" | |
The template attempts to import API_KEY from this file.5. Using the Flask App Generator (app_builder.py)Run the Generator:Open your terminal, navigate to your project directory, and run:python app_builder.py | |
You should see output like:* Serving Flask app 'app_builder' | |
* Debug mode: on | |
WARNING: This is a development server. Do not use it in a production deployment. | |
* Running on http://127.0.0.1:5400 | |
Access the Web Interface:Open your web browser and go to http://127.0.0.1:5400.Fill out the Form:Topic Name: Enter the desired topic for your new learning assistant (e.g., "Nutrition Science", "Ancient History", "Python Programming").Structured Prompt Text: Paste the base prompt structure that will be used by the generated FastAPI app, particularly for conversation generation. This prompt should ideally include placeholders like {start_prompt} (for the user's initial query/statement) and {context} (for information retrieved from ChromaDB). Example for a "Nutrition Science" topic:Generate a natural-sounding, helpful, and substantial conversation or dialogue about nutrition between two characters named Alex (who is learning) and Dr. Nutri (an expert). | |
Instructions: | |
- Alex should initiate the conversation, asking a question or making a statement based on the "User's Starting Point" below. | |
- Dr. Nutri should respond to Alex, primarily using information found in the "Relevant Context" provided. If no context is found, Dr. Nutri should state that they don't have specific information but can discuss the topic generally. | |
- They should continue the conversation naturally for several exchanges, thoroughly exploring the topic and potentially related aspects based on the context. | |
- Clearly label who is speaking on each line using "Alex: " or "Dr. Nutri: ". | |
- Maintain a friendly and informative tone throughout the dialogue. | |
- Base Dr. Nutri's replies on the context, but paraphrase and integrate naturally. Ensure responses add value and depth. If the context doesn't cover a point Alex raises, Dr. Nutri should acknowledge that. | |
User's Starting Point: "{start_prompt}" | |
Relevant Context from Knowledge Base: | |
--- | |
{context} | |
--- | |
Generated Conversation: | |
Submit the Form: Click the "Generate Topic App" button.Check Output:If successful, you'll see a success message on the web page.In your OUTPUT_BASE_DIR (e.g., your project directory or ./generated_apps/), you will find:A new Python file: e.g., nutrition_science.py.A new directory for files: e.g., nutrition_science_files/.Inside this, structured_prompt_nutrition_science.txt containing the prompt you pasted.A new directory for ChromaDB: e.g., nutrition_science_chroma_db/.SQLite database files will also be created here once the generated app runs (e.g., nutrition_science_history.db).6. Running the Generated FastAPI BackendNavigate (if needed): If your OUTPUT_BASE_DIR in app_builder.py was set to a subdirectory (e.g., generated_apps), navigate into that directory in your terminal. If it was . then your generated <topic_lower>.py file is in the main project directory.Run the FastAPI App:Execute the generated Python file using Uvicorn. For example, if you generated an app for "Nutrition Science" (which created nutrition_science.py):uvicorn nutrition_science:app --reload --port 8000 | |
nutrition_science:app: Tells Uvicorn to look for an object named app inside the nutrition_science.py file.--reload: Enables auto-reloading for development (optional).--port 8000: Specifies the port (you can choose another if 8000 is in use).Access the API:The API will be running (e.g., at http://127.0.0.1:8000).Interactive API documentation (Swagger UI) will be available at http://127.0.0.1:8000/docs.Alternative API documentation (ReDoc) will be available at http://127.0.0.1:8000/redoc.7. Key Features of the Generated BackendOnce running, your new topic-specific FastAPI backend provides several endpoints, including:/ask/gemini: Direct Q&A with Gemini, with results saved to ChromaDB and a local file./api/ask: Q&A with Gemini, with results saved to the topic's history SQLite DB and a local file./api/search_and_enhance: Searches ChromaDB for context and uses Gemini to synthesize an answer./api/generate_conversation: Generates a dialogue using the topic's structured prompt and ChromaDB context./api/generate_essay: Generates an essay section based on a topic and ChromaDB context./api/history: CRUD operations for Q&A history./api/files: CRUD operations for text files in the topic's _files directory./api/seed_data: To populate ChromaDB from text files.(And others as defined in the template)8. Seeding Data into the Generated BackendTo make your learning assistant knowledgeable about the specific topic, you need to "seed" it with relevant information.Prepare Text Files:Create plain text files (.txt) containing information about your topic.Place these files into the topic-specific _files directory (e.g., generated_apps/nutrition_science_files/).Each file can contain multiple paragraphs. The seeding process will split the content by blank lines (one or more empty lines between paragraphs).Use the /api/seed_data Endpoint:Go to the API documentation (e.g., http://127.0.0.1:8000/docs).Find the /api/seed_data endpoint (usually under a "Files" or "ChromaDB" tag).Click "Try it out".In the request body, provide a list of filenames (relative to the topic's _files directory) that you want to seed. Example Request Body:{ | |
"filenames": [ | |
"vitamins_overview.txt", | |
"minerals_deep_dive.txt" | |
] | |
} | |
Execute the request. The content of these files will be processed, embedded, and stored in the topic's ChromaDB collection.9. Troubleshootingapp_builder.py - Template File Not Found: Ensure TEMPLATE_FILENAME in app_builder.py correctly points to template_app_v2.py.Generated App - Gemini_key.py or API Key Error: Verify your GEMINI_API_KEY is correctly set up (either via .env or Gemini_key.py accessible to the generated script).Generated App - ChromaDB Version Mismatch: The template includes detailed startup error messages for this. It usually means the Python environment used to run the generated app has a different chromadb library version than when the ChromaDB files were first created. The solution is often to delete the topic's _chroma_db directory and re-seed the data.Generated App - Module Not Found: Ensure all dependencies listed in Prerequisites are installed in the Python environment you're using to run the generated FastAPI app.Permissions: Ensure the script has write permissions to OUTPUT_BASE_DIR to create files and directories.10. Workflow SummarySetup: Install prerequisites, set API key, place template and builder scripts.Generate: Run app_builder.py, fill the form on http://127.0.0.1:5400 to generate a new topic backend.Run Generated App: Use uvicorn <topic_lower>:app --reload --port <your_port> to start the new FastAPI service.Seed Data: Prepare .txt files with topic information and use the /api/seed_data endpoint of the running generated app to populate its knowledge base.Interact: Use the various API endpoints of your newly generated and seeded topic assistant.This comprehensive setup allows for rapid development and deployment of multiple, specialized learning assistant backends. | |
---------------- app_generator_v2.py ---------------- | |
#!/mnt/HDD500/reactjs-learning-assistant/backend/venv/bin/python3.9 | |
''' | |
DOCUMENTATION | |
Flask Application Generator (app_generator_v2.py) | |
Analysis: This Flask script serves as a powerful utility to bootstrap new topic-specific FastAPI backends based on your template_app_v2.py. | |
Core Functionality:Web Interface: Provides a simple HTML form to input: | |
Topic Name: The subject for the new learning assistant (e.g., "Quantum Physics"). | |
Structured Prompt Text: The base prompt specific to this topic, used for features like conversation generation in the FastAPI app. | |
Input Processing & Sanitization: Retrieves topic_name and structured_prompt from the form.Performs basic validation to ensure inputs are provided.Sanitizes the topic_name using sanitize_for_path() to create a lowercase, underscore-separated version suitable for filenames and directory names (e.g., "quantum_physics").Creates a capitalized version using topic_name.title() (e.g., "Quantum Physics") for display purposes within the generated code. | |
Path Generation: Dynamically defines paths for:The output Python file for the new FastAPI app (e.g., quantum_physics.py).A directory for topic-specific files (e.g., quantum_physics_files/).A directory for ChromaDB persistence (e.g., quantum_physics_chroma_db/).The structured prompt text file (e.g., quantum_physics_files/structured_prompt_quantum_physics.txt). | |
Template Customization:Reads the content of your TEMPLATE_FILENAME (TEXT/template_app_v2.py).Replaces placeholders %%TOPIC_LOWER%% and %%TOPIC_CAPITALIZED%% with the derived topic names. | |
File and Directory Creation: Creates the necessary output directories (_files, _chroma_db) if they don't already exist.Writes the modified template content to the new topic-specific Python file.Saves the provided structured prompt into its designated text file within the _files directory. | |
Feedback & Error Handling:Displays success or error messages on the web page.Checks if the main Python file for the topic already exists to prevent accidental overwrites.Handles FileNotFoundError if the main template is missing.Includes a general try-except block to catch other unexpected errors during the generation process.Uses icecream (ic()) for helpful debugging output to the console.Strengths of the Flask Builder:Automation & Efficiency: Drastically reduces the manual effort and potential for errors when setting up a new topic backend.Consistency: Ensures that all generated topic backends adhere to the same structure and conventions defined in the master template. | |
User-Friendly (Developer UX): The web form is more convenient than a command-line script, especially for pasting multi-line structured prompts.Clear Structure: The generated output (Python file, directories, prompt file) is well-organized.Good Basic Validation: Prevents common issues like missing inputs or overwriting existing applications.Helpful Debugging: The use of icecream is a good practice for development and troubleshooting.Self-Contained HTML: Embedding the HTML template within the Python script keeps the builder simple and portable.Minor Considerations/Potential Enhancements for the Builder:Advanced Capitalization: topic_name.title() is good for most cases. If you encounter topics with acronyms (e.g., "AI") or specific casing preferences that title() doesn't handle (it would make "AI" into "Ai"), you might consider a more sophisticated capitalization strategy or allow manual input for the capitalized version if strictness is required in generated class names or UI titles. For the current template's usage (mostly display strings and tags), title() is likely sufficient.Configuration Flexibility: OUTPUT_BASE_DIR and TEMPLATE_FILENAME are hardcoded. For greater flexibility, these could be configurable via environment variables or additional form fields. | |
Security Note: While render_template_string is used, the message variable is server-generated, mitigating XSS risks. This is generally fine for an internal development tool. Interaction with the FastAPI Template (template_app_v2.py)The builder script is designed to work seamlessly with the FastAPI template you previously shared. The placeholders %%TOPIC_LOWER%% and %%TOPIC_CAPITALIZED%% in the FastAPI template are correctly targeted and replaced by this builder. | |
This ensures that each generated FastAPI application is properly configured with:Topic-specific database file paths (SQLite history, conversation).Topic-specific directory paths (_files, _chroma_db).A unique ChromaDB collection name (e.g., quantum_physics_qa).Topic-aware logging/print statements.Contextualized prompts for Gemini (e.g., "In the context of Quantum Physics...").Correctly named structured_prompt_<topic_lower>.txt file which the generated FastAPI app will then load.The FastAPI template itself (as analyzed previously) is robust, feature-rich, and uses modern practices. The builder complements it perfectly by automating its instantiation for various topics. | |
Overall:This builder script is a very effective tool for your project. It's well-written, handles common cases gracefully, and significantly enhances your ability to scale your learning assistant to new topics by leveraging the power of your FastAPI template. The combination of the Flask builder and the FastAPI template creates a solid foundation for your application. | |
''' | |
import os | |
import traceback | |
from flask import Flask, request, render_template_string | |
from icecream import ic # Using icecream for debugging | |
app = Flask(__name__) | |
app.secret_key = 'your_secret_key' # Consider using environment variables for secrets | |
# --- Configuration --- | |
OUTPUT_BASE_DIR = '.' # Base directory for generated files/folders | |
# *** IMPORTANT: Update this to the path of your NEW template file *** | |
TEMPLATE_FILENAME = 'TEXT/template_app_v2.py' # Assumes new template is here | |
# Ensure base output directory exists | |
os.makedirs(OUTPUT_BASE_DIR, exist_ok=True) | |
def sanitize_for_path(name): | |
"""Sanitizes a string to be suitable for use in file/directory names.""" | |
# Keep alphanumeric, replace others with underscore, ensure lowercase | |
return ''.join(c if c.isalnum() else '_' for c in name.strip()).lower() | |
#-------------------- | |
# Main Flask route for the generator form | |
@app.route('/', methods=['GET', 'POST']) | |
def index(): | |
message = None | |
success = False | |
if request.method == 'POST': | |
# Get data from the submitted form | |
topic_name = request.form.get('topic_name') | |
structured_prompt = request.form.get('structured_prompt') | |
# Use ic for debugging form inputs | |
ic(f"Received Topic Name: {topic_name}") | |
ic(f"Received Structured Prompt (first 100 chars): {structured_prompt[:100]}") | |
# Basic validation | |
if not topic_name or not structured_prompt: | |
message = 'Topic name and structured prompt are required.' | |
ic(f"Validation Error: {message}") | |
else: | |
try: | |
# Prepare topic names (lowercase for paths/ids, title case for display) | |
topic_lower = sanitize_for_path(topic_name) | |
# Use title() for simple capitalization, adjust if needed for specific cases like AI -> Ai | |
topic_capitalized = topic_name.title() | |
# Define output paths based on the sanitized topic name | |
output_py_file = os.path.join(OUTPUT_BASE_DIR, f"{topic_lower}.py") | |
output_files_dir = os.path.join(OUTPUT_BASE_DIR, f"{topic_lower}_files") | |
output_chroma_dir = os.path.join(OUTPUT_BASE_DIR, f"{topic_lower}_chroma_db") # For ChromaDB persistence | |
output_prompt_file = os.path.join(output_files_dir, f"structured_prompt_{topic_lower}.txt") | |
# Log the generated paths for debugging | |
ic(f"Generated Python File Path: {output_py_file}") | |
ic(f"Generated Files Directory Path: {output_files_dir}") | |
ic(f"Generated ChromaDB Directory Path: {output_chroma_dir}") | |
ic(f"Generated Prompt File Path: {output_prompt_file}") | |
# Check if the main Python file already exists to prevent overwriting | |
if os.path.exists(output_py_file): | |
message = f"Error: Backend file '{os.path.basename(output_py_file)}' already exists for topic '{topic_name}'. Please choose a different topic name or delete the existing file." | |
ic(message) | |
else: | |
# Read the template file content | |
try: | |
with open(TEMPLATE_FILENAME, 'r', encoding='utf-8') as f_template: | |
template_content = f_template.read() | |
ic(f"Template file '{TEMPLATE_FILENAME}' read successfully.") | |
except FileNotFoundError: | |
message = f"FATAL Error: Template file '{TEMPLATE_FILENAME}' not found. Cannot generate application." | |
ic(message) | |
# Return immediately if template is missing | |
return render_template_string(HTML_TEMPLATE, message=message, success=False) | |
except Exception as read_err: | |
message = f"FATAL Error: Could not read template file '{TEMPLATE_FILENAME}': {read_err}" | |
ic(message) | |
ic(traceback.format_exc()) | |
return render_template_string(HTML_TEMPLATE, message=message, success=False) | |
# --- *** MODIFIED PLACEHOLDER REPLACEMENT *** --- | |
# Replace placeholders with the derived topic names | |
content = template_content.replace("%%TOPIC_LOWER%%", topic_lower) | |
content = content.replace("%%TOPIC_CAPITALIZED%%", topic_capitalized) | |
ic("Placeholders replaced in template content.") | |
# --- *** END OF MODIFICATION *** --- | |
# Create necessary output directories | |
# exist_ok=True prevents errors if directories already exist | |
os.makedirs(output_files_dir, exist_ok=True) | |
# ChromaDB directory is also needed, though the client might create it too | |
os.makedirs(output_chroma_dir, exist_ok=True) | |
ic(f"Ensured directories exist: {output_files_dir}, {output_chroma_dir}") | |
# Write the modified content to the new Python file | |
with open(output_py_file, 'w', encoding='utf-8') as f_out: | |
f_out.write(content) | |
ic(f"Generated Python file written: {output_py_file}") | |
# Write the provided structured prompt to its file | |
with open(output_prompt_file, 'w', encoding='utf-8') as f_prompt: | |
f_prompt.write(structured_prompt) | |
ic(f"Structured prompt file written: {output_prompt_file}") | |
# Set success message | |
message = f"Successfully generated backend application '{os.path.basename(output_py_file)}' for topic '{topic_name}'. Files are located in the '{OUTPUT_BASE_DIR}' directory." | |
ic(message) | |
success = True | |
except Exception as e: | |
# Catch any unexpected errors during the process | |
ic("An unexpected exception occurred during generation.") | |
ic(e) | |
ic(traceback.format_exc()) | |
message = f"An unexpected error occurred during generation: {e}" | |
success = False # Ensure success is false on error | |
# Render the HTML page, passing the message and success status | |
return render_template_string(HTML_TEMPLATE, message=message, success=success) | |
#-------------------- | |
# HTML template for the web form (remains the same) | |
HTML_TEMPLATE = """ | |
<!DOCTYPE html> | |
<html lang="en"> | |
<head> | |
<meta charset="UTF-8"> | |
<title>Generate Topic Assistant App</title> | |
<style> | |
body { font-family: sans-serif; margin: 2em; background-color: #f4f4f4; color: #333; } | |
h1 { color: #0056b3; } | |
form { background-color: #fff; padding: 20px; border-radius: 8px; box-shadow: 0 2px 5px rgba(0,0,0,0.1); max-width: 600px; margin: 20px auto;} | |
label { display: block; margin-bottom: 8px; font-weight: bold; color: #555; } | |
input[type="text"], textarea { | |
display: block; | |
margin-bottom: 15px; | |
width: 95%; /* Slightly less than 100% for padding */ | |
padding: 10px; | |
border: 1px solid #ccc; | |
border-radius: 4px; | |
font-size: 1rem; | |
} | |
textarea { height: 200px; font-family: monospace; } | |
button { | |
padding: 10px 20px; | |
background-color: #007bff; | |
color: white; | |
border: none; | |
border-radius: 4px; | |
cursor: pointer; | |
font-size: 1rem; | |
transition: background-color 0.2s ease; | |
} | |
button:hover { background-color: #0056b3; } | |
.message { margin: 20px auto; padding: 15px; border-radius: 5px; max-width: 600px; text-align: center; } | |
.success { background-color: #d4edda; color: #155724; border: 1px solid #c3e6cb; } | |
.error { background-color: #f8d7da; color: #721c24; border: 1px solid #f5c6cb; } | |
</style> | |
</head> | |
<body> | |
<h1>Generate New Topic Assistant Backend</h1> | |
{% if message %} | |
<div class="message {{ 'success' if success else 'error' }}">{{ message }}</div> | |
{% endif %} | |
<form method="POST"> | |
<label for="topic_name">Topic Name:</label> | |
<input type="text" id="topic_name" name="topic_name" required placeholder="e.g., Consciousness, Quantum Physics"> | |
<label for="structured_prompt">Structured Prompt Text:</label> | |
<textarea id="structured_prompt" name="structured_prompt" required placeholder="Paste the base prompt structure for this topic here... Use {start_prompt} and {context} as placeholders."></textarea> | |
<button type="submit">Generate Topic App</button> | |
</form> | |
</body> | |
</html> | |
""" | |
#-------------------- | |
# Main execution block to run the Flask development server | |
if __name__ == '__main__': | |
ic("Starting Flask app generator...") | |
# Runs on port 5400 by default, debug=True enables auto-reload and more detailed errors | |
# Set debug=False for production environments | |
app.run(debug=True, port=5400) | |
------------------ template_app_v2.py ------------------ | |
#!/usr/bin/env python3 | |
# Template v2 for topic-specific learning assistant backend | |
# Includes Q&A, History, Files, Search, Conversation, and Essay Generation | |
import os | |
import sys | |
import requests | |
import json | |
import sqlite3 | |
import uuid | |
import re | |
import traceback | |
from datetime import datetime | |
from typing import List, Optional | |
# --- !! IMPORTANT !! --- | |
# This is a template file. A generator script (like app_builder.py) should replace | |
# `%%TOPIC_LOWER%%` with the actual topic name (e.g., "finance", "consciousness") in lowercase | |
# and `%%TOPIC_CAPITALIZED%%` with the capitalized version (e.g., "Finance", "Consciousness") | |
# before saving as `<topic_name>.py`. | |
# --- ** TOPIC-SPECIFIC CONSTANTS (To be replaced by generator) ** --- | |
TOPIC_NAME_LOWER = "%%TOPIC_LOWER%%" | |
TOPIC_NAME_CAPITALIZED = "%%TOPIC_CAPITALIZED%%" | |
# --- Configuration --- | |
# Assuming Gemini_key.py is available in the execution environment | |
try: | |
from Gemini_key import API_KEY | |
except ImportError: | |
API_KEY = os.getenv("GEMINI_API_KEY") # Fallback to environment variable | |
# Check if API Key was successfully loaded | |
if not API_KEY: | |
print("FATAL ERROR: GEMINI_API_KEY is not set in Gemini_key.py or environment variable.") | |
sys.exit(1) | |
GEMINI_API_KEY = API_KEY | |
GEMINI_API_URL = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash:generateContent?key={GEMINI_API_KEY}" # Using Gemini Flash by default | |
# Topic-specific file paths derived from TOPIC_NAME_LOWER | |
BASE_DIR = os.path.dirname(os.path.abspath(__file__)) # Get directory where the script runs | |
HISTORY_DB_FILE = os.path.join(BASE_DIR, f"{TOPIC_NAME_LOWER}_history.db") | |
CONVERSATION_DB_FILE = os.path.join(BASE_DIR, f"{TOPIC_NAME_LOWER}_conversation.db") | |
FILES_DIR = os.path.join(BASE_DIR, f"{TOPIC_NAME_LOWER}_files") | |
CHROMA_DB_PATH = os.path.join(BASE_DIR, f"{TOPIC_NAME_LOWER}_chroma_db") | |
STRUCTURED_PROMPT_FILE = os.path.join(FILES_DIR, f"structured_prompt_{TOPIC_NAME_LOWER}.txt") | |
# Topic-specific ChromaDB collection name | |
CHROMA_COLLECTION_NAME = f"{TOPIC_NAME_LOWER}_qa" # Convention: topic name + "_qa" | |
# Shared database file (independent of topic) | |
STOCK_DB_FILE = os.path.join(BASE_DIR, '30_day_data.db') # Example shared DB | |
# --- Initialize directories --- | |
# Ensure necessary directories exist before proceeding | |
try: | |
os.makedirs(FILES_DIR, exist_ok=True) | |
# ChromaDB path directory is handled by the client upon initialization | |
except OSError as e: | |
print(f"Warning: Could not create directories needed for {TOPIC_NAME_CAPITALIZED}: {e}") | |
# --- Dependencies --- | |
try: | |
from fastapi import FastAPI, HTTPException, Request, status, Path | |
from fastapi.middleware.cors import CORSMiddleware | |
from pydantic import BaseModel | |
import chromadb | |
from sentence_transformers import SentenceTransformer | |
except ImportError as e: | |
print(f"FATAL ERROR: Missing required libraries. Please install them. {e}") | |
print("Try: pip install fastapi uvicorn[standard] requests pydantic python-dotenv chromadb sentence-transformers") | |
sys.exit(1) | |
# --- Pydantic Models --- | |
# Define data structures for API requests and responses | |
class AskRequest(BaseModel): | |
question: str | |
class AskResponse(BaseModel): | |
answer: str | |
class HistoryItem(BaseModel): | |
id: int | |
timestamp: str | |
question: str | |
answer: str | |
class HistoryItemUpdate(BaseModel): | |
question: str | |
answer: str | |
class FileCreateRequest(BaseModel): | |
filename: str # Relative to topic's FILES_DIR | |
content: str | |
class FileUpdateRequest(BaseModel): | |
content: str # Relative to topic's FILES_DIR | |
class FileResponse(BaseModel): | |
filename: str # Relative to topic's FILES_DIR | |
content: str | |
class MessageResponse(BaseModel): | |
message: str | |
class FileInfo(BaseModel): | |
filename: str # Relative to topic's FILES_DIR | |
class VideoIdResponse(BaseModel): | |
video_id: str # Example static response model | |
class GeminiViewRequest(BaseModel): | |
prompt: str | |
class GeminiViewResponse(BaseModel): | |
response: str | |
class SearchRequest(BaseModel): | |
query: str | |
num_results: int = 3 | |
class SearchResult(BaseModel): | |
text: str | |
source: Optional[str] = None | |
class EnhancedResponse(BaseModel): | |
results: List[SearchResult] | |
enhanced_summary: Optional[str] = None | |
class ConversationRequest(BaseModel): | |
start_prompt: str | |
num_context_results: int = 8 | |
class ConversationResponse(BaseModel): | |
conversation: str | |
class StockData(BaseModel): # Example for shared data | |
ticker: str | |
date: str | |
open: float | |
high: float | |
low: float | |
close: float | |
volume: int | |
class SeedRequest(BaseModel): | |
filenames: List[str] # Relative to topic's FILES_DIR | |
# Models for Essay Generation | |
class EssayRequest(BaseModel): | |
essay_topic: str | |
num_context_results: int = 7 # Number of context snippets to retrieve | |
temperature: float = 0.7 # Generation temperature | |
class EssayResponse(BaseModel): | |
generated_essay_section: str | |
retrieved_context_count: int # Info about how much context was used | |
# --- Global Variables for Initialized Resources --- | |
# These will be initialized in the startup_event to be shared across requests | |
chroma_collection: Optional[chromadb.Collection] = None | |
embedding_model: Optional[SentenceTransformer] = None | |
base_conversation_prompt_template: Optional[str] = None | |
# --- FastAPI App Initialization --- | |
# App title and description will use the capitalized topic name placeholder | |
app = FastAPI( | |
title=f"Gemini 1.5 %%TOPIC_CAPITALIZED%% Educational Assistant API", | |
description=f"<p>API for the %%TOPIC_CAPITALIZED%% Educational Assistant. Provides endpoints for Q&A, history, file management, search, conversation generation, and essay generation based on a ChromaDB knowledge base.</p>", | |
version="1.1.0" # Version including essay feature | |
) | |
# --- CORS Configuration --- | |
# Define allowed origins for frontend access | |
origins = [ | |
"http://localhost:3000", # Example React dev server | |
"http://127.0.0.1:3000", | |
# Add other origins as needed, e.g., production frontend URL or LAN IPs | |
"http://192.168.1.100:3000", | |
] | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=origins, | |
allow_credentials=True, | |
allow_methods=["*"], # Allow all standard methods | |
allow_headers=["*"], # Allow all headers | |
) | |
# --- Database Helper Functions (SQLite - History, Conversation, Stock) --- | |
# These functions handle interactions with the topic-specific SQLite databases | |
# and the shared stock database. | |
# History DB (%%TOPIC_LOWER%%_history.db) | |
def create_history_connection(): | |
"""Establishes connection to the topic's history SQLite DB.""" | |
conn = None | |
try: | |
conn = sqlite3.connect(HISTORY_DB_FILE) | |
conn.row_factory = sqlite3.Row # Return rows as dict-like objects | |
return conn | |
except sqlite3.Error as e: | |
print(f"Error connecting to history DB '{HISTORY_DB_FILE}': {e}") | |
return conn | |
def create_history_table(): | |
"""Creates the history table if it doesn't exist in the topic's history DB.""" | |
sql_create_table = """ | |
CREATE TABLE IF NOT EXISTS history ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, | |
question TEXT NOT NULL, | |
answer TEXT NOT NULL | |
); | |
""" | |
conn = create_history_connection() | |
if conn is not None: | |
try: | |
cursor = conn.cursor() | |
cursor.execute(sql_create_table) | |
conn.commit() | |
print(f"History table in '{HISTORY_DB_FILE}' checked/created successfully.") | |
except sqlite3.Error as e: | |
print(f"Error creating history table in '{HISTORY_DB_FILE}': {e}") | |
finally: | |
conn.close() | |
else: | |
print(f"Error! Cannot create DB connection for history table '{HISTORY_DB_FILE}'.") | |
def save_history_item(question: str, answer: str): | |
"""Saves a question and answer pair to the topic's history DB.""" | |
sql = "INSERT INTO history (question, answer) VALUES (?, ?)" | |
conn = create_history_connection() | |
if conn is not None: | |
try: | |
cursor = conn.cursor() | |
cursor.execute(sql, (question, answer)) | |
conn.commit() | |
print(f"Saved Q&A to history DB '{HISTORY_DB_FILE}'. Q: {question[:50]}...") | |
except sqlite3.Error as e: | |
print(f"Error saving to history DB '{HISTORY_DB_FILE}': {e}") | |
print(traceback.format_exc()) | |
finally: | |
conn.close() | |
else: | |
print(f"Error! Cannot create DB connection for saving history '{HISTORY_DB_FILE}'.") | |
def fetch_history() -> List[HistoryItem]: | |
"""Fetches all history items from the topic's history DB.""" | |
sql = "SELECT id, timestamp, question, answer FROM history ORDER BY timestamp DESC" | |
history_items = [] | |
conn = create_history_connection() | |
if conn is not None: | |
try: | |
cursor = conn.cursor() | |
cursor.execute(sql) | |
rows = cursor.fetchall() | |
# Convert rows to Pydantic models | |
history_items = [HistoryItem(id=row["id"], timestamp=row["timestamp"], question=row["question"], answer=row["answer"]) for row in rows] | |
except sqlite3.Error as e: | |
print(f"Error fetching history from '{HISTORY_DB_FILE}': {e}") | |
print(traceback.format_exc()) | |
finally: | |
conn.close() | |
else: | |
print(f"Error! Cannot create DB connection for fetching history '{HISTORY_DB_FILE}'.") | |
return history_items | |
# Conversation DB (%%TOPIC_LOWER%%_conversation.db) | |
def create_conversation_connection(): | |
"""Establishes connection to the topic's conversation SQLite DB.""" | |
conn = None | |
try: | |
conn = sqlite3.connect(CONVERSATION_DB_FILE) | |
conn.row_factory = sqlite3.Row | |
return conn | |
except sqlite3.Error as e: | |
print(f"Error connecting to conversation DB '{CONVERSATION_DB_FILE}': {e}") | |
return conn | |
def create_conversation_table(): | |
"""Creates the conversations table if it doesn't exist in the topic's conversation DB.""" | |
sql_create_table = """ | |
CREATE TABLE IF NOT EXISTS conversations ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, | |
start_prompt TEXT NOT NULL, | |
generated_conversation TEXT NOT NULL | |
); | |
""" | |
conn = create_conversation_connection() | |
if conn is not None: | |
try: | |
cursor = conn.cursor() | |
cursor.execute(sql_create_table) | |
conn.commit() | |
print(f"Conversation table in '{CONVERSATION_DB_FILE}' checked/created successfully.") | |
except sqlite3.Error as e: | |
print(f"Error creating conversation table in '{CONVERSATION_DB_FILE}': {e}") | |
finally: | |
conn.close() | |
else: | |
print(f"Error! Cannot create DB connection for conversation table '{CONVERSATION_DB_FILE}'.") | |
def save_conversation_to_db(start_prompt: str, conversation_text: str): | |
"""Saves a generated conversation to the topic's conversation DB.""" | |
sql = "INSERT INTO conversations (start_prompt, generated_conversation) VALUES (?, ?)" | |
conn = create_conversation_connection() | |
if conn is not None: | |
try: | |
cursor = conn.cursor() | |
cursor.execute(sql, (start_prompt, conversation_text)) | |
conn.commit() | |
print(f"Conversation saved to DB '{CONVERSATION_DB_FILE}' successfully.") | |
except sqlite3.Error as e: | |
print(f"Error saving conversation to DB '{CONVERSATION_DB_FILE}': {e}") | |
print(traceback.format_exc()) | |
finally: | |
conn.close() | |
else: | |
print(f"Error! Cannot create DB connection for saving conversation '{CONVERSATION_DB_FILE}'.") | |
# Stock DB (Shared - 30_day_data.db) | |
def create_stock_connection(): | |
"""Establishes connection to the shared stock SQLite DB.""" | |
conn = None | |
try: | |
conn = sqlite3.connect(STOCK_DB_FILE) | |
conn.row_factory = sqlite3.Row | |
return conn | |
except sqlite3.Error as e: | |
print(f"Error connecting to stock DB ({STOCK_DB_FILE}): {e}") | |
return conn | |
def create_stock_table(): | |
"""Creates the shared stock_prices table if it doesn't exist.""" | |
sql_create_table = """ | |
CREATE TABLE IF NOT EXISTS stock_prices ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
ticker TEXT NOT NULL, | |
date TEXT NOT NULL, | |
open REAL, high REAL, low REAL, close REAL, volume INTEGER, | |
UNIQUE(ticker, date) | |
); | |
""" | |
conn = None | |
try: | |
conn = sqlite3.connect(STOCK_DB_FILE) | |
cursor = conn.cursor() | |
cursor.execute(sql_create_table) | |
conn.commit() | |
print(f"Stock prices table in '{STOCK_DB_FILE}' checked/created successfully.") | |
except sqlite3.Error as e: | |
print(f"Error creating stock table in '{STOCK_DB_FILE}': {e}") | |
finally: | |
if conn: conn.close() | |
# --- Utility Functions --- | |
def sanitize_filename(input_str: str, max_length: int = 50) -> str: | |
"""Creates a safe filename component from a string, suitable for use within the topic's FILES_DIR.""" | |
# Remove characters potentially unsafe for filenames across different OS | |
sanitized = re.sub(r'[\\/*?:"<>|]', "", input_str) | |
# Replace whitespace with underscores | |
sanitized = re.sub(r'\s+', '_', sanitized) | |
# Collapse multiple underscores | |
sanitized = re.sub(r'_+', '_', sanitized) | |
# Truncate to max length | |
sanitized = sanitized[:max_length] | |
# Remove leading/trailing underscores that might result from replacements | |
sanitized = sanitized.strip('_') | |
# Ensure lowercase and add .txt extension if a name exists, otherwise use default | |
return sanitized.lower() + ".txt" if sanitized else "default.txt" | |
def load_structured_prompt() -> Optional[str]: | |
"""Loads the base structured prompt for conversation generation from the topic's file.""" | |
try: | |
with open(STRUCTURED_PROMPT_FILE, "r", encoding="utf-8") as f: | |
return f.read() | |
except FileNotFoundError: | |
# It's okay if the file doesn't exist, we have a fallback | |
print(f"INFO: Structured prompt file not found at {STRUCTURED_PROMPT_FILE}. Using default fallback.") | |
return None | |
except Exception as e: | |
print(f"ERROR: Failed to read structured prompt file {STRUCTURED_PROMPT_FILE}: {e}") | |
return None # Treat as if not found on error | |
# --- Essay Generation Helper Functions --- | |
# These functions are specific to the essay generation endpoint | |
def get_essay_context(topic_query: str, n_results: int) -> List[str]: | |
""" | |
Queries the globally initialized ChromaDB collection for context relevant to the essay topic query. | |
Uses the global `embedding_model` and `chroma_collection`. | |
""" | |
global embedding_model, chroma_collection # Access globals initialized at startup | |
print(f"\n--- Retrieving context for essay query: '{topic_query[:100]}...' ---") | |
# Check if necessary resources are initialized | |
if embedding_model is None or chroma_collection is None: | |
print("Error: Embedding model or ChromaDB collection not initialized during essay context retrieval.") | |
# Raise an internal error or return empty? Returning empty might be safer for API stability. | |
return [] | |
if n_results <= 0: | |
print("Number of results requested is zero or less, skipping context retrieval.") | |
return [] | |
try: | |
# Generate embedding for the essay topic/query | |
query_embedding = embedding_model.encode(topic_query).tolist() | |
# Query the specific collection for relevant documents | |
results = chroma_collection.query( | |
query_embeddings=[query_embedding], | |
n_results=n_results, | |
include=["documents"] # We only need the text content for the context | |
) | |
context_texts = [] | |
# Check if results were found and extract the document text | |
if results and results.get("documents") and results["documents"][0]: | |
docs = results["documents"][0] | |
print(f"Found {len(docs)} potential context snippets for essay.") | |
for doc_string in docs: | |
# Attempt to extract text, handling potential JSON structure used during seeding | |
actual_text = doc_string | |
try: | |
# If documents were stored as JSON strings {"text": "...", "source": "..."} | |
doc_data = json.loads(doc_string) | |
if isinstance(doc_data, dict) and "text" in doc_data: | |
actual_text = doc_data["text"] | |
except json.JSONDecodeError: | |
pass # It's just plain text | |
# Add non-empty text to the context list | |
if actual_text and actual_text.strip(): | |
context_texts.append(actual_text.strip()) | |
print(f"Extracted {len(context_texts)} relevant context snippets for essay.") | |
else: | |
print("No relevant context found in ChromaDB for this essay query.") | |
return context_texts | |
except Exception as e: | |
print(f"Error querying ChromaDB for essay context: {e}") | |
print(traceback.format_exc()) | |
return [] # Return empty list on error | |
def call_gemini_for_essay(essay_topic: str, context: List[str], model_temperature: float) -> str: | |
""" | |
Constructs a prompt including the essay topic and retrieved context, | |
calls the Gemini API, and returns the generated essay section text. | |
Uses global `GEMINI_API_URL` and `GEMINI_API_KEY`. | |
""" | |
global GEMINI_API_URL, GEMINI_API_KEY # Access globals | |
print("\n--- Generating essay section with Gemini ---") | |
if not GEMINI_API_KEY: # Should be caught earlier, but double-check | |
return "Error: Gemini API Key is missing." | |
# Format the retrieved context for the prompt | |
if not context: | |
print("Warning: No context retrieved from database for essay. Generation will rely solely on the model's knowledge.") | |
context_str = "No specific context was retrieved from the knowledge base." | |
else: | |
context_str = "\n---\n".join(context) | |
print(f"Providing {len(context)} context snippets to the model for essay.") | |
# Define the prompt structure for Gemini essay generation | |
# This prompt guides the AI to use the context effectively | |
essay_prompt = f""" | |
Write a well-structured and coherent section for an essay on the following topic. | |
Base your writing PRIMARILY on the provided 'Relevant Context'. | |
Synthesize the information from the context naturally into paragraphs. | |
Do not simply list the context points; explain and elaborate on them in an essay format. | |
If the context is insufficient for a comprehensive answer, focus on what is available in the context but aim for a helpful response. | |
Essay Topic: "{essay_topic}" | |
Relevant Context from Knowledge Base: | |
--- | |
{context_str} | |
--- | |
Generated Essay Section (based primarily on the context): | |
""" | |
print(f"Sending prompt to Gemini API for essay (Topic: {essay_topic[:100]}...).") | |
# Prepare headers and payload for the API request | |
headers = {'Content-Type': 'application/json'} | |
data = { | |
"contents": [{"parts": [{"text": essay_prompt}]}], | |
"generationConfig": {"temperature": model_temperature} | |
} | |
try: | |
# Make the POST request to the Gemini API | |
# Using a longer timeout as essay generation might take more time | |
response = requests.post(GEMINI_API_URL, headers=headers, json=data, timeout=180) | |
response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx) | |
response_data = response.json() | |
# --- Robust Gemini Response Parsing --- | |
# (Adapted from other endpoints for consistency) | |
generated_text = "Error: Could not parse essay section from Gemini response." | |
candidates = response_data.get('candidates') | |
if candidates and len(candidates) > 0: | |
candidate = candidates[0] # Get the first candidate | |
content = candidate.get('content') | |
if content and 'parts' in content: | |
# Concatenate text from all parts | |
generated_text = "".join(part.get("text", "") for part in content['parts']).strip() | |
else: | |
# Check for finish reason if no content parts | |
finish_reason = candidate.get('finishReason', 'UNKNOWN') | |
generated_text = f"Error: Generation stopped unexpectedly. Reason: {finish_reason}." | |
# Check safety ratings for more details if blocked | |
safety_ratings = candidate.get('safetyRatings') | |
if safety_ratings: | |
blocked_categories = [r['category'] for r in safety_ratings if r.get('probability') in ['MEDIUM', 'HIGH']] | |
if blocked_categories: | |
generated_text += f" Potentially blocked due to categories: {', '.join(blocked_categories)}." | |
elif 'promptFeedback' in response_data: | |
# Handle cases where the prompt itself was blocked | |
block_reason = response_data['promptFeedback'].get('blockReason') | |
if block_reason: | |
generated_text = f"Error: Prompt blocked by Gemini. Reason: {block_reason}." | |
else: | |
generated_text = "Error: Received prompt feedback from Gemini without generated content or block reason." | |
elif 'error' in response_data: | |
# Handle explicit API errors returned by Gemini | |
error_msg = response_data.get("error", {}).get("message", "Unknown Gemini API Error") | |
generated_text = f"Error: Gemini API Error reported: {error_msg}" | |
# Final check if the extracted text is meaningful | |
if not generated_text or generated_text.startswith("Error:"): | |
print(f"Gemini essay generation failed or returned an error message.") | |
# Return the error message itself for clarity | |
return generated_text if generated_text else "Error: Empty response received from Gemini." | |
else: | |
print("Essay section generated successfully by Gemini.") | |
return generated_text.strip() # Return the clean generated text | |
except requests.exceptions.Timeout: | |
print("Error: The request to the Gemini API timed out.") | |
return "Error: Request timed out while contacting Gemini API." | |
except requests.exceptions.RequestException as e: | |
# Handle network errors, invalid responses, etc. | |
print(f"Error calling Gemini API for essay: {e}") | |
return f"Error: Failed to communicate with Gemini API - {e}" | |
except Exception as e: | |
# Catch any other unexpected errors during generation | |
print(f"Unexpected error during Gemini essay generation: {e}") | |
print(traceback.format_exc()) | |
return f"Error: An unexpected error occurred during essay generation - {e}" | |
# --- FastAPI Startup Event --- | |
# Use lifespan context manager for modern FastAPI event handling | |
# @app.on_event("startup") # Deprecated way | |
# async def startup_event(): | |
# # ... initialization code ... | |
# Recommended lifespan context manager approach: | |
from contextlib import asynccontextmanager | |
@asynccontextmanager | |
async def lifespan(app: FastAPI): | |
# Code to run on startup | |
global chroma_client, chroma_collection, embedding_model, base_conversation_prompt_template | |
print(f"--- Running startup tasks for topic: {TOPIC_NAME_CAPITALIZED} ---") | |
# Initialize SQLite DBs/Tables | |
create_history_table() | |
create_conversation_table() | |
create_stock_table() # For shared stock data | |
# Initialize ChromaDB Client, Collection, and Embedding Model | |
initialization_successful = False | |
try: | |
print(f"Initializing ChromaDB client at: {CHROMA_DB_PATH}") | |
chroma_client = chromadb.PersistentClient(path=CHROMA_DB_PATH) | |
print(f"Getting or creating ChromaDB collection: '{CHROMA_COLLECTION_NAME}'") | |
# Use get_or_create_collection for the main app startup | |
# ** CRITICAL **: This might fail if schema mismatch exists | |
chroma_collection = chroma_client.get_or_create_collection(name=CHROMA_COLLECTION_NAME) | |
print(f"Successfully accessed ChromaDB collection: '{CHROMA_COLLECTION_NAME}'.") | |
# Initialize Embedding Model | |
print("Loading SentenceTransformer model: 'all-mpnet-base-v2'...") | |
embedding_model = SentenceTransformer('all-mpnet-base-v2') | |
print("Embedding model loaded successfully.") | |
initialization_successful = True # Mark success | |
except Exception as e: | |
# Handle the specific version mismatch error clearly | |
error_str = str(e).lower() | |
if "no such column" in error_str and "collections.topic" in error_str: | |
print("\n" + "*"*60) | |
print("FATAL STARTUP ERROR: ChromaDB Version Mismatch Detected!") | |
print(f"Error details: {e}") | |
print("The 'chromadb' library version is incompatible with the database") | |
print(f"files found in '{CHROMA_DB_PATH}'.") | |
print("\nRECOMMENDED FIX:") | |
print(" 1. Ensure this application is running in the EXACT same Python environment") | |
print(f" used when the '{CHROMA_DB_PATH}' directory was first created/seeded.") | |
print(" 2. If the environment is correct, the database file is outdated.") | |
print(f" You likely need to:") | |
print(f" a) Stop this application.") | |
print(f" b) Delete the directory: '{CHROMA_DB_PATH}'") | |
print(f" c) Re-seed the data using this application's /api/seed_data endpoint.") | |
print(f" d) Restart this application.") | |
print("*** Deleting the directory will remove all seeded data for this topic! ***") | |
print("*"*60 + "\n") | |
else: | |
# Handle other initialization errors | |
print(f"\nFATAL STARTUP ERROR: Could not initialize ChromaDB or SentenceTransformer model: {e}") | |
print(traceback.format_exc()) | |
# Optional: Rethrow or handle differently if needed, but logging is crucial | |
# For now, we just print the error; the app might still start but endpoints using these will fail. | |
# Consider sys.exit(1) here if these are absolutely critical for any operation. | |
print("CRITICAL ERROR DURING STARTUP - Some functionalities might fail.") | |
# Load Structured Prompt for Conversations (only if initialization was okay) | |
if initialization_successful: | |
base_conversation_prompt_template = load_structured_prompt() | |
if base_conversation_prompt_template is None: | |
print("WARNING: Conversation generation may fail due to missing base prompt.") | |
# Define fallback prompt if file is missing or unreadable | |
base_conversation_prompt_template = """ | |
Generate a natural-sounding, helpful, and substantial conversation or dialogue between two characters named Jack and Esperanza. | |
Instructions: | |
- Jack should initiate the conversation, asking a question or making a statement based on the "User's Starting Point" below. | |
- Esperanza should respond to Jack, primarily using information found in the "Relevant Context" provided. If no context is found, she should state that she doesn't have specific information but can discuss the topic generally. | |
- They should continue the conversation naturally for a substantial dialogue (around 20 exchanges total between them), thoroughly exploring the topic and potentially related aspects based on the context. | |
- Clearly label who is speaking on each line using "Jack: " or "Esperanza: ". | |
- Maintain a friendly and informative tone throughout the dialogue. | |
- Base Esperanza's replies on the context, but paraphrase and integrate naturally, don't just copy chunks. Ensure her responses add value and depth where possible using the context. If the context doesn't cover a point Jack raises, Esperanza should acknowledge that. | |
User's Starting Point: "{start_prompt}" | |
Relevant Context from Knowledge Base: | |
--- | |
{context} | |
--- | |
Generated Conversation (around 20 exchanges): | |
""" | |
else: | |
print(f"Successfully loaded base conversation prompt from {STRUCTURED_PROMPT_FILE}") | |
else: | |
base_conversation_prompt_template = None # Ensure it's None if init failed | |
print("Skipping structured prompt loading due to earlier initialization errors.") | |
print(f"--- Startup tasks for {TOPIC_NAME_CAPITALIZED} completed. ---") | |
yield # Application runs here | |
# Code to run on shutdown (optional) | |
print(f"--- Running shutdown tasks for topic: {TOPIC_NAME_CAPITALIZED} ---") | |
# Cleanup resources if needed (e.g., close persistent connections) | |
# Note: ChromaDB PersistentClient doesn't typically require explicit close | |
# Assign the lifespan context manager to the app | |
app.router.lifespan_context = lifespan | |
# --- API Endpoints --- | |
@app.get("/") | |
async def root(): | |
""" Root endpoint providing a welcome message for the specific topic. """ | |
return {"message": f"Welcome to the {TOPIC_NAME_CAPITALIZED} Learning Assistant API!"} | |
# --- Gemini Interaction Endpoints --- | |
# Uses globally initialized resources (embedding_model, chroma_collection) | |
@app.post("/ask/gemini", response_model=GeminiViewResponse, tags=["Gemini", TOPIC_NAME_CAPITALIZED]) | |
async def ask_gemini_for_viewing(request: GeminiViewRequest): | |
""" | |
Asks Gemini a question, saves Q&A to ChromaDB and a file, returns response. | |
Uses globally initialized resources. | |
""" | |
global embedding_model, chroma_collection # Ensure access to initialized globals | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Received question for viewing: {request.prompt[:100]}...") | |
if not GEMINI_API_KEY: | |
raise HTTPException(status_code=500, detail="Server configuration error: Gemini API key missing.") | |
# Check if resources needed are available (initialized at startup) | |
if embedding_model is None or chroma_collection is None: | |
print("Error in /ask/gemini: ChromaDB or embedding model not initialized.") | |
raise HTTPException(status_code=503, detail="Service Unavailable: Knowledge base components not ready.") | |
headers = {'Content-Type': 'application/json'} | |
prompt = f"In the context of {TOPIC_NAME_CAPITALIZED}, please explain the following clearly and concisely. Provide examples where appropriate:\n\n{request.prompt}" | |
data = { | |
"contents": [{"parts": [{"text": prompt}]}], | |
"generationConfig": { "temperature": 0.7 } | |
} | |
try: | |
# Call Gemini API | |
response = requests.post(GEMINI_API_URL, headers=headers, json=data, timeout=90) | |
response.raise_for_status() | |
response_data = response.json() | |
# Parse Gemini Response (Robust parsing) | |
extracted_text = "Error: Could not parse answer from Gemini." | |
# ... (Include full robust parsing logic here as before) ... | |
candidates = response_data.get('candidates') | |
if candidates and len(candidates) > 0: | |
candidate = candidates[0] | |
content = candidate.get('content') | |
if content and 'parts' in content: | |
extracted_text = "".join(part.get("text", "") for part in content['parts']).strip() | |
else: | |
finish_reason = candidate.get('finishReason', 'UNKNOWN') | |
extracted_text = f"Error: Generation stopped unexpectedly. Reason: {finish_reason}." | |
# Optional: Check safety ratings | |
elif 'promptFeedback' in response_data: | |
block_reason = response_data['promptFeedback'].get('blockReason') | |
extracted_text = f"Error: Prompt blocked by Gemini. Reason: {block_reason}." if block_reason else "Error: Received prompt feedback without content." | |
elif 'error' in response_data: | |
error_msg = response_data.get("error", {}).get("message", "Unknown Gemini API Error") | |
extracted_text = f"Error: Gemini API Error reported: {error_msg}" | |
if not extracted_text.strip() or extracted_text.startswith("Error:"): | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Gemini viewing generation failed: {extracted_text}") | |
else: | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Extracted answer for viewing: {extracted_text[:100]}...") | |
# --- Save question and answer to TOPIC'S ChromaDB --- | |
if not extracted_text.startswith("Error:"): | |
try: | |
question_embedding = embedding_model.encode(request.prompt).tolist() | |
answer_embedding = embedding_model.encode(extracted_text).tolist() | |
q_id = f"{TOPIC_NAME_LOWER}_q_{uuid.uuid4()}" | |
a_id = f"{TOPIC_NAME_LOWER}_a_{uuid.uuid4()}" | |
q_metadata = {"type": "question", "source": "ask_gemini_viewing", "topic": TOPIC_NAME_LOWER, "timestamp": datetime.now().isoformat()} | |
a_metadata = {"type": "answer", "source": "ask_gemini_viewing", "question_id": q_id, "topic": TOPIC_NAME_LOWER, "timestamp": datetime.now().isoformat()} | |
chroma_collection.add( | |
embeddings=[question_embedding, answer_embedding], | |
documents=[request.prompt, extracted_text], | |
ids=[q_id, a_id], | |
metadatas=[q_metadata, a_metadata] | |
) | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Successfully saved Q&A ({q_id}) to ChromaDB.") | |
except Exception as e: | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Warning: Error saving Q&A to ChromaDB: {e}") | |
print(traceback.format_exc()) | |
else: | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Skipping save to ChromaDB due to generation error.") | |
# --- Save question and answer to file in TOPIC'S directory --- | |
filename = sanitize_filename(request.prompt) | |
filepath = os.path.join(FILES_DIR, filename) | |
try: | |
with open(filepath, "w", encoding="utf-8") as f: | |
f.write(f"--- Question ---\n{request.prompt}\n\n--- Answer ---\n{extracted_text}") | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Saved Q&A attempt to file: {filepath}") | |
except Exception as e: | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Warning: Error saving Q&A to file '{filepath}': {e}") | |
print(traceback.format_exc()) | |
return GeminiViewResponse(response=extracted_text) | |
except requests.exceptions.RequestException as e: | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Error calling Gemini API: {e}") | |
raise HTTPException(status_code=503, detail=f"Gemini API communication error: {e}") | |
except Exception as e: | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Unexpected error in /ask/gemini: {e}") | |
print(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Internal server error: {e}") | |
@app.post("/api/ask", response_model=AskResponse, tags=["Gemini", "History", TOPIC_NAME_CAPITALIZED]) | |
async def ask_and_record(request: AskRequest): | |
""" | |
Asks Gemini a question, saves Q&A to history DB and file. | |
""" | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Received question for recording: {request.question[:100]}...") | |
if not GEMINI_API_KEY: | |
raise HTTPException(status_code=500, detail="Server configuration error: Gemini API key missing.") | |
headers = {'Content-Type': 'application/json'} | |
prompt = f"In the context of {TOPIC_NAME_CAPITALIZED}, explain:\n\n{request.question}" | |
data = {"contents": [{"parts": [{"text": prompt}]}], "generationConfig": {"temperature": 0.7}} | |
try: | |
response = requests.post(GEMINI_API_URL, headers=headers, json=data, timeout=90) | |
response.raise_for_status() | |
response_data = response.json() | |
# Parse Gemini Response (Robust parsing) | |
extracted_text = "Error: Could not parse answer from Gemini." | |
# ... (Include full robust parsing logic here) ... | |
if candidates := response_data.get('candidates'): | |
if len(candidates) > 0 and (content := candidates[0].get('content')) and (parts := content.get('parts')): | |
extracted_text = "".join(part.get("text", "") for part in parts).strip() | |
elif finish_reason := candidates[0].get('finishReason'): | |
extracted_text = f"Error: Generation stopped. Reason: {finish_reason}." | |
elif prompt_feedback := response_data.get('promptFeedback'): | |
if block_reason := prompt_feedback.get('blockReason'): | |
extracted_text = f"Error: Prompt blocked. Reason: {block_reason}" | |
if not extracted_text.strip() or extracted_text.startswith("Error:"): | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Gemini recording generation failed: {extracted_text}") | |
else: | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Extracted answer for recording: {extracted_text[:100]}...") | |
# --- Save to TOPIC'S History SQLite DB --- | |
if not extracted_text.startswith("Error:"): | |
try: | |
save_history_item(request.question, extracted_text) | |
except Exception as e: | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Warning: Error saving to history DB: {e}") | |
print(traceback.format_exc()) | |
else: | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Skipping save to history DB due to generation error.") | |
# --- Save to file in TOPIC'S directory --- | |
filename = sanitize_filename(request.question) | |
filepath = os.path.join(FILES_DIR, filename) | |
try: | |
with open(filepath, "w", encoding="utf-8") as f: | |
f.write(f"--- Question ---\n{request.question}\n\n--- Answer ---\n{extracted_text}") | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Saved Q&A attempt to file: {filepath}") | |
except Exception as e: | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Warning: Error saving Q&A to file '{filepath}': {e}") | |
print(traceback.format_exc()) | |
return AskResponse(answer=extracted_text) | |
except requests.exceptions.RequestException as e: | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Error calling Gemini API: {e}") | |
raise HTTPException(status_code=503, detail=f"Gemini API communication error: {e}") | |
except Exception as e: | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Unexpected error in /api/ask: {e}") | |
print(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Internal server error: {e}") | |
# --- Search, Conversation, Essay, History, Files, Seed, Stock Endpoints --- | |
@app.post("/api/search_and_enhance", response_model=EnhancedResponse, tags=["Search", "Gemini", TOPIC_NAME_CAPITALIZED]) | |
async def search_and_enhance(request: SearchRequest): | |
""" | |
Searches the topic's ChromaDB, then uses Gemini to synthesize | |
an enhanced summary based on the results. Uses global resources. | |
""" | |
global embedding_model, chroma_collection # Ensure access to globals | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Received search query: {request.query[:100]}... (num_results={request.num_results})") | |
if request.num_results <= 0: | |
raise HTTPException(status_code=400, detail="Number of results must be positive.") | |
if embedding_model is None or chroma_collection is None: | |
print("Error in /api/search_and_enhance: ChromaDB or embedding model not initialized.") | |
raise HTTPException(status_code=503, detail="Service Unavailable: Knowledge base components not ready.") | |
try: | |
query_embedding = embedding_model.encode(request.query).tolist() | |
results = chroma_collection.query( | |
query_embeddings=[query_embedding], | |
n_results=request.num_results, | |
include=["documents", "metadatas"] # Include metadata for source info | |
) | |
if not results or not results.get("documents") or not results["documents"][0]: | |
print(f"[{TOPIC_NAME_CAPITALIZED}] No relevant info found in ChromaDB for search.") | |
return EnhancedResponse(results=[], enhanced_summary="No relevant information found in the knowledge base for this topic.") | |
# Process results (extract text and source) | |
search_results = [] | |
context_texts = [] | |
docs = results["documents"][0] | |
metas = results.get("metadatas", [[]])[0] | |
metas = metas if len(metas) == len(docs) else [{}] * len(docs) # Ensure safety | |
for i in range(len(docs)): | |
doc_text = docs[i] | |
source = "Unknown" | |
actual_text = doc_text | |
try: | |
doc_data = json.loads(doc_text) | |
if isinstance(doc_data, dict): | |
actual_text = doc_data.get("text", doc_text) | |
source = doc_data.get("source_file", "Unknown") | |
except json.JSONDecodeError: pass | |
if source == "Unknown" and metas[i] and isinstance(metas[i], dict): | |
source = metas[i].get("source", "Unknown") | |
search_results.append(SearchResult(text=actual_text, source=source)) | |
context_texts.append(actual_text) | |
context = "\n---\n".join(context_texts) | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Found {len(context_texts)} context snippets for enhancement.") | |
# Enhance results with Gemini | |
enhancement_prompt = f""" | |
Based ONLY on the following relevant information retrieved for the '{TOPIC_NAME_LOWER}' topic, provide a synthesized answer to the query. Do NOT add external information. | |
User's Query: "{request.query}" | |
Relevant Context: | |
--- | |
{context} | |
--- | |
Synthesized Answer (for {TOPIC_NAME_LOWER}, based ONLY on context): | |
""" | |
headers = {'Content-Type': 'application/json'} | |
data = {"contents": [{"parts": [{"text": enhancement_prompt}]}], "generationConfig": {"temperature": 0.5}} | |
gemini_response_raw = requests.post(GEMINI_API_URL, headers=headers, json=data, timeout=90) | |
gemini_response_raw.raise_for_status() | |
gemini_response_data = gemini_response_raw.json() | |
# Parse Gemini Response (Robust parsing) | |
enhanced_summary = "Error: Could not parse summary from Gemini." | |
# ... (Include full robust parsing logic here) ... | |
if candidates := gemini_response_data.get('candidates'): | |
if len(candidates) > 0 and (content := candidates[0].get('content')) and (parts := content.get('parts')): | |
enhanced_summary = "".join(part.get("text", "") for part in parts).strip() | |
elif finish_reason := candidates[0].get('finishReason'): | |
enhanced_summary = f"Error: Generation stopped. Reason: {finish_reason}." | |
elif prompt_feedback := gemini_response_data.get('promptFeedback'): | |
if block_reason := prompt_feedback.get('blockReason'): | |
enhanced_summary = f"Error: Prompt blocked. Reason: {block_reason}" | |
if enhanced_summary.startswith("Error:"): | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Gemini enhancement failed: {enhanced_summary}") | |
else: | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Enhanced summary generated: {enhanced_summary[:100]}...") | |
return EnhancedResponse(results=search_results, enhanced_summary=enhanced_summary) | |
except requests.exceptions.RequestException as e: | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Error calling Gemini API during enhancement: {e}") | |
raise HTTPException(status_code=503, detail=f"Gemini API communication error: {e}") | |
except Exception as e: | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Error searching/enhancing: {e}") | |
print(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Error searching/enhancing: {e}") | |
@app.post("/api/generate_conversation", response_model=ConversationResponse, tags=["Conversation", "Gemini", TOPIC_NAME_CAPITALIZED]) | |
async def generate_conversation(request: ConversationRequest): | |
""" | |
Generates a conversation using the topic's structured prompt and ChromaDB context, | |
saves the result to the topic's conversation.db. Uses global resources. | |
""" | |
global base_conversation_prompt_template, embedding_model, chroma_collection # Access globals | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Received request to generate conversation starting with: {request.start_prompt[:100]}...") | |
if base_conversation_prompt_template is None: | |
print("ERROR: Base conversation prompt template not loaded during startup.") | |
raise HTTPException(status_code=500, detail="Server configuration error: Base prompt missing.") | |
if embedding_model is None or chroma_collection is None: | |
print("Error in /api/generate_conversation: ChromaDB or embedding model not initialized.") | |
raise HTTPException(status_code=503, detail="Service Unavailable: Knowledge base components not ready.") | |
try: | |
# 1. Embed the starting prompt | |
query_embedding = embedding_model.encode(request.start_prompt).tolist() | |
# 2. Query TOPIC'S ChromaDB for relevant context | |
results = chroma_collection.query( | |
query_embeddings=[query_embedding], | |
n_results=request.num_context_results, | |
include=["documents"] | |
) | |
# Extract context | |
context_str = "No specific context found in the knowledge base for this topic." | |
context_texts = [] | |
if results and results.get("documents") and results["documents"][0]: | |
docs = results["documents"][0] | |
extracted_count = 0 | |
for doc_string in docs: | |
actual_text = doc_string | |
try: | |
doc_data = json.loads(doc_string) | |
if isinstance(doc_data, dict) and "text" in doc_data: actual_text = doc_data["text"] | |
except json.JSONDecodeError: pass | |
if actual_text.strip(): | |
context_texts.append(actual_text.strip()) | |
extracted_count += 1 | |
if context_texts: | |
context_str = "\n---\n".join(context_texts) | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Found {extracted_count} context snippets for conversation.") | |
else: print(f"[{TOPIC_NAME_CAPITALIZED}] Context docs found but no text extracted for conversation.") | |
else: print(f"[{TOPIC_NAME_CAPITALIZED}] No relevant context found for conversation.") | |
# 3. Format the BASE prompt loaded at startup with current request details | |
try: | |
final_conversation_prompt = base_conversation_prompt_template.format( | |
start_prompt=request.start_prompt, | |
context=context_str | |
) | |
except KeyError as e: | |
print(f"ERROR: Placeholder {e} missing in the structured prompt file: {STRUCTURED_PROMPT_FILE}") | |
raise HTTPException(status_code=500, detail=f"Server configuration error: Invalid prompt template.") | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Sending formatted prompt to Gemini for conversation...") | |
# 4. Call Gemini | |
headers = {'Content-Type': 'application/json'} | |
data = { | |
"contents": [{"parts": [{"text": final_conversation_prompt}]}], | |
"generationConfig": {"temperature": 0.75} | |
} | |
response = requests.post(GEMINI_API_URL, headers=headers, json=data, timeout=180) | |
response.raise_for_status() | |
response_data = response.json() | |
# 5. Extract the response (Robust parsing) | |
generated_conversation = "Error: Could not parse conversation from Gemini." | |
# ... (Include full robust parsing logic here) ... | |
if candidates := response_data.get('candidates'): | |
if len(candidates) > 0 and (content := candidates[0].get('content')) and (parts := content.get('parts')): | |
generated_conversation = "".join(part.get("text", "") for part in parts).strip() | |
elif finish_reason := candidates[0].get('finishReason'): | |
generated_conversation = f"Error: Generation stopped. Reason: {finish_reason}." | |
elif prompt_feedback := response_data.get('promptFeedback'): | |
if block_reason := prompt_feedback.get('blockReason'): | |
generated_conversation = f"Error: Prompt blocked. Reason: {block_reason}" | |
if generated_conversation.startswith("Error:"): | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Gemini conversation generation failed: {generated_conversation}") | |
else: | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Generated conversation: {generated_conversation[:150]}...") | |
# 6. Save the conversation to the TOPIC'S conversation.db | |
if not generated_conversation.startswith("Error:"): | |
try: | |
save_conversation_to_db(request.start_prompt, generated_conversation) | |
except Exception as db_save_error: | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Warning: Error saving conversation to DB: {db_save_error}") | |
print(traceback.format_exc()) | |
else: | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Skipping DB save for conversation due to generation issue.") | |
# 7. Return the generated conversation (or error string) | |
return ConversationResponse(conversation=generated_conversation) | |
except requests.exceptions.RequestException as e: | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Error calling Gemini API during conversation: {e}") | |
raise HTTPException(status_code=503, detail=f"Gemini API communication error: {e}") | |
except Exception as e: | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Error generating conversation: {e}") | |
print(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Error generating conversation: {e}") | |
# --- Essay Generation Endpoint --- | |
@app.post("/api/generate_essay", response_model=EssayResponse, tags=["Essay", "Gemini", TOPIC_NAME_CAPITALIZED]) | |
async def generate_essay_endpoint(request: EssayRequest): | |
""" | |
Generates an essay section based on a topic, using context retrieved | |
from this topic's ChromaDB knowledge base. | |
""" | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Received request to generate essay on: {request.essay_topic[:100]}...") | |
# Check if resources are available (they should be if startup succeeded) | |
if embedding_model is None or chroma_collection is None: | |
print("Error in /api/generate_essay: ChromaDB or embedding model not initialized.") | |
raise HTTPException(status_code=503, detail="Service Unavailable: Knowledge base components not ready.") | |
# 1. Retrieve context from ChromaDB using the helper function | |
context_snippets = get_essay_context(request.essay_topic, request.num_context_results) | |
context_count = len(context_snippets) | |
# 2. Call Gemini using the helper function | |
generated_section = call_gemini_for_essay(request.essay_topic, context_snippets, request.temperature) | |
# 3. Return the result | |
# The generated_section might contain an error message from the helper function | |
return EssayResponse( | |
generated_essay_section=generated_section, | |
retrieved_context_count=context_count | |
) | |
# --- History Endpoints --- | |
# (Unchanged from previous version) | |
@app.get("/api/history", response_model=List[HistoryItem], tags=["History", TOPIC_NAME_CAPITALIZED]) | |
async def get_history(): | |
""" Retrieves all history items from the topic's history database. """ | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Fetching all history items...") | |
try: | |
history_data = fetch_history() | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Found {len(history_data)} history items.") | |
return history_data | |
except Exception as e: | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Error in /api/history endpoint: {e}") | |
raise HTTPException(status_code=500, detail=f"Failed to fetch history: {e}") | |
@app.get("/api/history/{item_id}", response_model=HistoryItem, tags=["History", TOPIC_NAME_CAPITALIZED]) | |
async def get_history_item(item_id: int): | |
""" Retrieves a specific history item by its ID from the topic's DB. """ | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Fetching history item ID: {item_id}") | |
conn = create_history_connection() | |
if conn is None: | |
raise HTTPException(status_code=503, detail=f"DB connection error ({TOPIC_NAME_LOWER} history)") | |
try: | |
cursor = conn.cursor() | |
cursor.execute("SELECT id, timestamp, question, answer FROM history WHERE id = ?", (item_id,)) | |
row = cursor.fetchone() | |
if row: | |
return HistoryItem(**row) # Directly unpack row into model | |
else: | |
raise HTTPException(status_code=404, detail=f"History item ID {item_id} not found in {TOPIC_NAME_LOWER} topic") | |
except Exception as e: | |
print(f"Error fetching history item {item_id}: {e}") | |
raise HTTPException(status_code=500, detail=f"Error fetching history item: {e}") | |
finally: | |
if conn: conn.close() | |
@app.put("/api/history/{item_id}", response_model=HistoryItem, tags=["History", TOPIC_NAME_CAPITALIZED]) | |
async def update_history_item(item_id: int, item_update: HistoryItemUpdate): | |
""" Updates a specific history item in the topic's DB. """ | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Updating history item ID: {item_id}") | |
conn = create_history_connection() | |
if conn is None: | |
raise HTTPException(status_code=503, detail=f"DB connection error ({TOPIC_NAME_LOWER} history)") | |
try: | |
cursor = conn.cursor() | |
# Check existence first | |
cursor.execute("SELECT id FROM history WHERE id = ?", (item_id,)) | |
if cursor.fetchone() is None: | |
raise HTTPException(status_code=404, detail=f"History item ID {item_id} not found for update in {TOPIC_NAME_LOWER}") | |
cursor.execute( | |
"UPDATE history SET question=?, answer=?, timestamp=CURRENT_TIMESTAMP WHERE id=?", | |
(item_update.question, item_update.answer, item_id), | |
) | |
conn.commit() | |
# Check if rows were affected if needed: rowcount = cursor.rowcount | |
# Fetch the updated item to return it | |
cursor.execute("SELECT id, timestamp, question, answer FROM history WHERE id = ?", (item_id,)) | |
row = cursor.fetchone() | |
if row: | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Successfully updated history item {item_id}.") | |
return HistoryItem(**row) | |
else: | |
# This case should ideally not happen if the initial check passed and commit succeeded | |
print(f"Error: Failed to retrieve history item {item_id} after update.") | |
raise HTTPException(status_code=500, detail="Failed to retrieve updated item after update.") | |
except Exception as e: | |
print(f"Error updating history item {item_id}: {e}") | |
if conn: conn.rollback() # Rollback changes on error | |
raise HTTPException(status_code=500, detail=f"Error updating history item: {e}") | |
finally: | |
if conn: conn.close() | |
# --- Text File CRUD Endpoints --- | |
# (Unchanged from previous version) | |
@app.post("/api/files", response_model=MessageResponse, tags=["Files", TOPIC_NAME_CAPITALIZED]) | |
async def create_topic_file(file_data: FileCreateRequest): | |
""" Creates a new text file in the topic's specific file directory. """ | |
filename = sanitize_filename(file_data.filename) | |
if ".." in filename or filename.startswith(("/", "\\")): | |
raise HTTPException(status_code=400, detail="Invalid filename component.") | |
filepath = os.path.join(FILES_DIR, filename) | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Attempting to create file: {filepath}") | |
try: | |
with open(filepath, "w", encoding="utf-8") as f: | |
f.write(file_data.content) | |
print(f"[{TOPIC_NAME_CAPITALIZED}] File '{filename}' created successfully.") | |
return MessageResponse(message=f"File '{filename}' created successfully.") | |
except Exception as e: | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Error creating file '{filepath}': {e}") | |
raise HTTPException(status_code=500, detail=f"Error creating file: {e}") | |
@app.get("/api/files/{filename}", response_model=FileResponse, tags=["Files", TOPIC_NAME_CAPITALIZED]) | |
async def read_topic_file(filename: str = Path(..., title="Filename", description="The name of the file within the topic's directory")): | |
""" Reads the content of a text file from the topic's directory. """ | |
if ".." in filename or filename.startswith(("/", "\\")): | |
raise HTTPException(status_code=400, detail="Invalid filename path component.") | |
filepath = os.path.join(FILES_DIR, filename) | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Attempting to read file: {filepath}") | |
if not os.path.isfile(filepath): | |
raise HTTPException(status_code=404, detail=f"File '{filename}' not found in topic '{TOPIC_NAME_LOWER}'.") | |
try: | |
with open(filepath, "r", encoding="utf-8") as f: | |
content = f.read() | |
return FileResponse(filename=filename, content=content) | |
except Exception as e: | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Error reading file '{filepath}': {e}") | |
raise HTTPException(status_code=500, detail=f"Error reading file: {e}") | |
@app.put("/api/files/{filename}", response_model=MessageResponse, tags=["Files", TOPIC_NAME_CAPITALIZED]) | |
async def update_topic_file(file_data: FileUpdateRequest, filename: str = Path(..., title="Filename", description="The name of the file within the topic's directory")): | |
""" Updates the content of an existing text file in the topic's directory. """ | |
if ".." in filename or filename.startswith(("/", "\\")): | |
raise HTTPException(status_code=400, detail="Invalid filename path component.") | |
filepath = os.path.join(FILES_DIR, filename) | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Attempting to update file: {filepath}") | |
if not os.path.isfile(filepath): | |
raise HTTPException(status_code=404, detail=f"File '{filename}' not found for update in topic '{TOPIC_NAME_LOWER}'.") | |
try: | |
with open(filepath, "w", encoding="utf-8") as f: | |
f.write(file_data.content) | |
print(f"[{TOPIC_NAME_CAPITALIZED}] File '{filename}' updated successfully.") | |
return MessageResponse(message=f"File '{filename}' updated successfully.") | |
except Exception as e: | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Error updating file '{filepath}': {e}") | |
raise HTTPException(status_code=500, detail=f"Error updating file: {e}") | |
@app.delete("/api/files/{filename}", response_model=MessageResponse, tags=["Files", TOPIC_NAME_CAPITALIZED]) | |
async def delete_topic_file(filename: str = Path(..., title="Filename", description="The name of the file within the topic's directory")): | |
""" Deletes a text file from the topic's directory. """ | |
if ".." in filename or filename.startswith(("/", "\\")): | |
raise HTTPException(status_code=400, detail="Invalid filename path component.") | |
filepath = os.path.join(FILES_DIR, filename) | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Attempting to delete file: {filepath}") | |
if not os.path.isfile(filepath): | |
raise HTTPException(status_code=404, detail=f"File '{filename}' not found for deletion in topic '{TOPIC_NAME_LOWER}'.") | |
try: | |
os.remove(filepath) | |
print(f"[{TOPIC_NAME_CAPITALIZED}] File '{filename}' deleted successfully.") | |
return MessageResponse(message=f"File '{filename}' deleted successfully.") | |
except Exception as e: | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Error deleting file '{filepath}': {e}") | |
raise HTTPException(status_code=500, detail=f"Error deleting file: {e}") | |
@app.get("/api/files", response_model=List[FileInfo], tags=["Files", TOPIC_NAME_CAPITALIZED]) | |
async def list_topic_files(): | |
""" Lists all text files (matching *.txt) in the topic's specific file directory. """ | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Listing files in directory: {FILES_DIR}") | |
try: | |
if not os.path.isdir(FILES_DIR): | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Files directory '{FILES_DIR}' not found.") | |
return [] # Return empty list if directory doesn't exist | |
filenames = [f for f in os.listdir(FILES_DIR) if os.path.isfile(os.path.join(FILES_DIR, f)) and f.lower().endswith(".txt")] | |
filenames.sort() | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Found {len(filenames)} '.txt' files.") | |
return [FileInfo(filename=name) for name in filenames] | |
except Exception as e: | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Error listing files in '{FILES_DIR}': {e}") | |
raise HTTPException(status_code=500, detail=f"Error listing files: {e}") | |
# --- Data Seeding Endpoint --- | |
# (Unchanged from previous version, uses global resources) | |
@app.post("/api/seed_data", response_model=MessageResponse, tags=["Files", "ChromaDB", TOPIC_NAME_CAPITALIZED]) | |
async def seed_topic_files(request: SeedRequest): | |
""" | |
Processes specific text files from the topic's FILES_DIR, splits them | |
by paragraph, and adds them to the topic's ChromaDB collection. | |
Uses global `embedding_model` and `chroma_collection`. | |
""" | |
global embedding_model, chroma_collection # Access globals | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Starting data seeding for files: {request.filenames}") | |
processed_paragraphs = 0 | |
processed_files_count = 0 | |
errors = [] | |
files_not_found = [] | |
skipped_invalid_format = [] | |
if embedding_model is None or chroma_collection is None: | |
print("Error in /api/seed_data: ChromaDB or embedding model not initialized.") | |
raise HTTPException(status_code=503, detail="Service Unavailable: Knowledge base components not ready for seeding.") | |
if not request.filenames: | |
raise HTTPException(status_code=400, detail="No filenames provided for seeding.") | |
try: | |
if not os.path.isdir(FILES_DIR): | |
raise HTTPException(status_code=404, detail=f"Files directory '{FILES_DIR}' not found for seeding.") | |
all_files_in_topic_dir = {f for f in os.listdir(FILES_DIR) if os.path.isfile(os.path.join(FILES_DIR, f))} | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Error listing topic files directory: {e}") | |
for requested_filename in request.filenames: | |
if not requested_filename.lower().endswith(".txt") or ".." in requested_filename or requested_filename.startswith(("/", "\\")): | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Skipping invalid seed filename format: {requested_filename}") | |
skipped_invalid_format.append(requested_filename) | |
continue | |
if requested_filename not in all_files_in_topic_dir: | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Seed file not found in topic directory: {requested_filename}") | |
files_not_found.append(requested_filename) | |
continue | |
filepath = os.path.join(FILES_DIR, requested_filename) | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Processing seed file: {filepath}") | |
try: | |
with open(filepath, "r", encoding="utf-8") as file: text_content = file.read() | |
paragraphs = re.split(r'\n\s*\n+', text_content.strip()) # Split by blank lines | |
print(f" - Found {len(paragraphs)} potential paragraphs.") | |
docs_to_add, embeddings_to_add, ids_to_add, metadatas_to_add = [], [], [], [] | |
for para_idx, para in enumerate(paragraphs): | |
paragraph = para.strip() | |
if paragraph: | |
try: | |
doc_data = {"source_file": requested_filename, "text": paragraph} | |
doc_json = json.dumps(doc_data) | |
para_embedding = embedding_model.encode(paragraph).tolist() | |
para_hash = uuid.uuid5(uuid.NAMESPACE_DNS, f"{TOPIC_NAME_LOWER}_{requested_filename}_{para_idx}") | |
doc_id = f"seed_{para_hash}" | |
doc_metadata = {"type": "seeded_paragraph", "source": requested_filename, "topic": TOPIC_NAME_LOWER, "paragraph_index": para_idx} | |
ids_to_add.append(doc_id) | |
docs_to_add.append(doc_json) | |
embeddings_to_add.append(para_embedding) | |
metadatas_to_add.append(doc_metadata) | |
except Exception as encode_err: | |
print(f" - Error encoding/preparing paragraph {para_idx} from {requested_filename}: {encode_err}") | |
errors.append(f"Encoding error: {requested_filename}[{para_idx}]") | |
if docs_to_add: | |
try: | |
chroma_collection.add( | |
documents=docs_to_add, embeddings=embeddings_to_add, | |
ids=ids_to_add, metadatas=metadatas_to_add | |
) | |
print(f" - Added {len(docs_to_add)} paragraphs from {requested_filename} to ChromaDB.") | |
processed_paragraphs += len(docs_to_add) | |
except Exception as chroma_err: | |
print(f" - Error adding batch from {requested_filename} to ChromaDB: {chroma_err}") | |
print(traceback.format_exc()) | |
errors.append(f"ChromaDB add error: {requested_filename}") | |
else: | |
print(f" - No valid paragraphs found to add from {requested_filename}.") | |
processed_files_count += 1 | |
except Exception as file_err: | |
print(f"[{TOPIC_NAME_CAPITALIZED}] Error processing seed file {requested_filename}: {file_err}") | |
errors.append(f"File processing error: {requested_filename}") | |
print(traceback.format_exc()) | |
summary_parts = [f"[{TOPIC_NAME_CAPITALIZED}] Seeding attempted for {len(request.filenames)} files."] | |
summary_parts.append(f"Processed {processed_paragraphs} paragraphs from {processed_files_count} files.") | |
if skipped_invalid_format: summary_parts.append(f"Skipped invalid names: {len(skipped_invalid_format)}.") | |
if files_not_found: summary_parts.append(f"Files not found: {len(files_not_found)}.") | |
if errors: summary_parts.append(f"Encountered {len(errors)} errors (see server logs for details).") | |
summary_message = " ".join(summary_parts) | |
print(summary_message) | |
return MessageResponse(message=summary_message) | |
# --- Stock Data Endpoint (Shared - Unchanged) --- | |
@app.get("/api/stockdata/{ticker}", response_model=List[StockData], tags=["Stock Data"]) | |
async def get_stock_data_api(ticker: str): | |
""" Retrieves historical stock data (shared, not topic-specific). """ | |
safe_ticker = re.sub(r'[^A-Z0-9.]', '', ticker.upper()) | |
if not safe_ticker: raise HTTPException(status_code=400, detail="Invalid ticker format.") | |
print(f"[Shared] Fetching stock data for ticker: {safe_ticker}") | |
conn = create_stock_connection() | |
if conn is None: raise HTTPException(status_code=503, detail="DB connection error (stock)") | |
try: | |
cursor = conn.cursor() | |
cursor.execute("SELECT ticker, date, open, high, low, close, volume FROM stock_prices WHERE ticker = ? ORDER BY date ASC", (safe_ticker,)) | |
rows = cursor.fetchall() | |
if not rows: return [] | |
print(f"[Shared] Found {len(rows)} records for ticker: {safe_ticker}") | |
return [ StockData(**row) for row in rows ] | |
except sqlite3.Error as e: | |
print(f"[Shared] DB error fetching stock data for {safe_ticker}: {e}") | |
raise HTTPException(status_code=500, detail=f"Stock DB error: {e}") | |
finally: | |
if conn: conn.close() | |
# --- Static Video ID Endpoints (Unchanged) --- | |
@app.get("/video_id", response_model=VideoIdResponse, tags=["Static"]) | |
async def get_video_id(): | |
return VideoIdResponse(video_id="bt_i7sQgqEs") | |
@app.get("/postman_video", response_model=VideoIdResponse, tags=["Static"]) | |
async def postman_video(): | |
return VideoIdResponse(video_id="Zd1t0PI-YHk") | |
# --- Main execution block --- | |
# This allows running the script directly using uvicorn | |
if __name__ == "__main__": | |
import uvicorn | |
print(f"--- Preparing to start FastAPI server for topic: {TOPIC_NAME_CAPITALIZED} ---") | |
# Get port from environment or use default (e.g., 8000) | |
port = int(os.getenv("PORT", 8000)) | |
# Determine reload status for development | |
reload_status = os.getenv("DEV_RELOAD", "false").lower() == "true" | |
log_level = "info" if not reload_status else "debug" | |
print(f"Attempting to run on http://0.0.0.0:{port} (Reload={reload_status}, LogLevel={log_level})") | |
print("Note: Initialization of DBs, ChromaDB, and models happens during the application lifespan startup.") | |
print("Watch console output for initialization status and potential errors (like ChromaDB version mismatches).") | |
# Run the FastAPI app using uvicorn | |
# The string format 'filename:app_variable' tells uvicorn where to find the app | |
# Replace '%%TOPIC_LOWER%%' in the run command string with the actual topic name | |
# This assumes the saved filename matches TOPIC_NAME_LOWER.py | |
uvicorn.run(f"{TOPIC_NAME_LOWER}:app", host="0.0.0.0", port=port, reload=reload_status, log_level=log_level) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment