Created
July 10, 2025 06:46
-
-
Save upeter/f869725f5ce6cb6bd38ae99b7fdad5cc to your computer and use it in GitHub Desktop.
Solution homework 1 of AI-SDLC-July-2025 course
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import gradio as gr | |
import fitz # PyMuPDF | |
import sqlite3 | |
from datetime import datetime | |
import uuid | |
from openai import OpenAI | |
import tiktoken | |
import json | |
import logging | |
import httpx | |
DB_FILE = "pdf_qa_logs_homework.db" | |
# Set up logging to capture OpenAI API calls | |
logging.basicConfig(level=logging.DEBUG) | |
logger = logging.getLogger(__name__) | |
# Create a custom handler to capture API logs | |
class APILogHandler(logging.Handler): | |
def __init__(self): | |
super().__init__() | |
self.api_logs = [] | |
def emit(self, record): | |
if 'httpx' in record.name and (record.levelno >= logging.DEBUG): | |
self.api_logs.append({ | |
'timestamp': datetime.now().isoformat(), | |
'level': record.levelname, | |
'message': record.getMessage() | |
}) | |
api_log_handler = APILogHandler() | |
httpx_logger = logging.getLogger("httpx") | |
httpx_logger.addHandler(api_log_handler) | |
httpx_logger.setLevel(logging.DEBUG) | |
# Initialize OpenAI client with logging enabled | |
client = OpenAI( | |
api_key=os.getenv("OPENAI_API_KEY"), | |
http_client=httpx.Client( | |
event_hooks={ | |
'request': [lambda request: logger.debug(f"Request: {request.method} {request.url}")], | |
'response': [lambda response: logger.debug(f"Response: {response.status_code}")] | |
} | |
) | |
) | |
# Initialize the database | |
def init_db(): | |
conn = sqlite3.connect(DB_FILE) | |
c = conn.cursor() | |
c.execute('''CREATE TABLE IF NOT EXISTS interactions ( | |
id TEXT PRIMARY KEY, | |
timestamp TEXT, | |
pdf_name TEXT, | |
query TEXT, | |
response TEXT, | |
raw_payloads TEXT)''') | |
conn.commit() | |
conn.close() | |
init_db() | |
# Extract text from PDF | |
def extract_text_from_pdf(pdf_bytes): | |
pdf_doc = fitz.open(stream=pdf_bytes, filetype="pdf") | |
text = "" | |
for page_num in range(pdf_doc.page_count): | |
page = pdf_doc.load_page(page_num) | |
text += page.get_text("text") | |
return text | |
# Chunk text to fit within token limits | |
def chunk_text(text, max_tokens=3000): | |
encoding = tiktoken.encoding_for_model("gpt-4o-mini") | |
tokens = encoding.encode(text) | |
chunks = [] | |
for i in range(0, len(tokens), max_tokens): | |
chunk_tokens = tokens[i:i + max_tokens] | |
chunk_text = encoding.decode(chunk_tokens) | |
chunks.append(chunk_text) | |
return chunks | |
# Query PDF using OpenAI API | |
def query_pdf_with_openai(pdf_text, query): | |
# Clear previous API logs | |
api_log_handler.api_logs.clear() | |
# Store all API calls for logging | |
api_calls = [] | |
# Chunk the text if it's too long | |
chunks = chunk_text(pdf_text) | |
# If text is short enough, use it directly | |
if len(chunks) == 1: | |
messages = [ | |
{"role": "system", "content": "You are a helpful assistant that answers questions based on the provided PDF content. Use only the information from the PDF to answer questions."}, | |
{"role": "user", "content": f"PDF Content:\n{pdf_text}\n\nQuestion: {query}"} | |
] | |
# Make the API call | |
response = client.chat.completions.create( | |
model="gpt-4o-mini", | |
messages=messages, | |
max_tokens=500, | |
temperature=0.1 | |
) | |
# Capture the API call info using the response object and logs | |
api_calls.append({ | |
"type": "main_query", | |
"request": { | |
"model": "gpt-4o-mini", | |
"messages": messages, | |
"max_tokens": 500, | |
"temperature": 0.1 | |
}, | |
"response": response.model_dump(), | |
"http_logs": api_log_handler.api_logs.copy() | |
}) | |
return response.choices[0].message.content, api_calls | |
# For longer texts, search through chunks for relevant content | |
relevant_chunks = [] | |
for i, chunk in enumerate(chunks): | |
# Clear logs for this chunk | |
api_log_handler.api_logs.clear() | |
# Use OpenAI to determine if chunk is relevant | |
relevance_messages = [ | |
{"role": "system", "content": "Determine if the following text chunk contains information relevant to the user's question. Respond with 'YES' if relevant, 'NO' if not relevant."}, | |
{"role": "user", "content": f"Text chunk:\n{chunk}\n\nQuestion: {query}"} | |
] | |
relevance_response = client.chat.completions.create( | |
model="gpt-4o-mini", | |
messages=relevance_messages, | |
max_tokens=10, | |
temperature=0 | |
) | |
api_calls.append({ | |
"type": f"relevance_check_chunk_{i}", | |
"request": { | |
"model": "gpt-4o-mini", | |
"messages": relevance_messages, | |
"max_tokens": 10, | |
"temperature": 0 | |
}, | |
"response": relevance_response.model_dump(), | |
"http_logs": api_log_handler.api_logs.copy() | |
}) | |
if "YES" in relevance_response.choices[0].message.content.upper(): | |
relevant_chunks.append(chunk) | |
# Combine relevant chunks and answer the question | |
if relevant_chunks: | |
combined_text = "\n\n".join(relevant_chunks[:3]) # Use top 3 relevant chunks | |
else: | |
combined_text = chunks[0] # Fallback to first chunk if no relevant chunks found | |
# Clear logs for final query | |
api_log_handler.api_logs.clear() | |
messages = [ | |
{"role": "system", "content": "You are a helpful assistant that answers questions based on the provided PDF content. Use only the information from the PDF to answer questions. If the information is not available in the provided content, say so."}, | |
{"role": "user", "content": f"PDF Content:\n{combined_text}\n\nQuestion: {query}"} | |
] | |
response = client.chat.completions.create( | |
model="gpt-4o-mini", | |
messages=messages, | |
max_tokens=500, | |
temperature=0.1 | |
) | |
api_calls.append({ | |
"type": "final_query", | |
"request": { | |
"model": "gpt-4o-mini", | |
"messages": messages, | |
"max_tokens": 500, | |
"temperature": 0.1 | |
}, | |
"response": response.model_dump(), | |
"http_logs": api_log_handler.api_logs.copy() | |
}) | |
return response.choices[0].message.content, api_calls | |
# Log to SQLite | |
def log_interaction(pdf_name, query, response, raw_payloads): | |
conn = sqlite3.connect(DB_FILE) | |
c = conn.cursor() | |
interaction_id = str(uuid.uuid4()) | |
timestamp = datetime.now().isoformat() | |
# Convert raw payloads to pretty-printed JSON string | |
raw_payloads_json = json.dumps(raw_payloads, indent=4, ensure_ascii=False) | |
c.execute("INSERT INTO interactions VALUES (?, ?, ?, ?, ?, ?)", | |
(interaction_id, timestamp, pdf_name, query, response, raw_payloads_json)) | |
conn.commit() | |
conn.close() | |
# Query the PDF | |
def query_pdf(pdf, query): | |
if pdf is None: | |
return "Please upload a PDF." | |
if not query.strip(): | |
return "Please enter a valid query." | |
if not os.getenv("OPENAI_API_KEY"): | |
return "OpenAI API key not found. Please set the OPENAI_API_KEY environment variable." | |
try: | |
pdf_name = pdf.name if hasattr(pdf, 'name') else "Uploaded PDF" | |
pdf_text = extract_text_from_pdf(pdf) | |
if not pdf_text.strip(): | |
return "No text could be extracted from the PDF." | |
response, raw_payloads = query_pdf_with_openai(pdf_text, query) | |
log_interaction(pdf_name, query, response, raw_payloads) | |
return response | |
except Exception as e: | |
return f"An error occurred: {str(e)}" | |
# Gradio app | |
with gr.Blocks() as app: | |
gr.Markdown("# PDF Q&A with OpenAI") | |
gr.Markdown("Upload a PDF and ask questions about its content using OpenAI's GPT model.") | |
pdf_upload = gr.File(label="Upload PDF", type="binary") | |
query_input = gr.Textbox(label="Ask a question about the PDF") | |
output = gr.Textbox(label="Answer", lines=5) | |
query_button = gr.Button("Submit") | |
query_button.click(query_pdf, inputs=[pdf_upload, query_input], outputs=output) | |
if __name__ == "__main__": | |
app.launch() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment