Last active
October 20, 2023 16:09
-
-
Save smach/0e90bc0f4c37c749cf404a73f7f9dc5f to your computer and use it in GitHub Desktop.
Chat with your uploaded file
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This code is a slight modification from the code at | |
# https://docs.chainlit.io/examples/qa | |
# that adds handling PDF and Word doc files in addition to text files | |
# Load all necessary libraries and modules | |
import chainlit as cl | |
from langchain.memory import ChatMessageHistory, ConversationBufferMemory | |
from langchain.docstore.document import Document | |
from langchain.prompts.chat import ( | |
ChatPromptTemplate, | |
SystemMessagePromptTemplate, | |
HumanMessagePromptTemplate, | |
) | |
from langchain.chat_models import ChatOpenAI | |
from langchain.chains import ( | |
ConversationalRetrievalChain, | |
) | |
from langchain.vectorstores import Chroma | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from PyPDF2 import PdfReader | |
import docxpy | |
from langchain.embeddings.openai import OpenAIEmbeddings | |
from typing import List | |
import os | |
import shutil | |
import tempfile | |
# Get your OpenAI API key from the .env file (there are other ways to do this) | |
from dotenv import load_dotenv | |
load_dotenv() | |
OPENAI_API_KEY = os.getenv('OPENAI_API_KEY') | |
# Create the system prompt that will be used for all user queries. Tweak as desired. | |
system_template = """Use the following pieces of context to answer the users question. | |
If you don't know the answer, just say that you don't know, don't try to make up an answer. | |
ALWAYS return a "SOURCES" part in your answer. | |
The "SOURCES" part should be a reference to the source of the document from which you got your answer. | |
And if the user greets with greetings like Hi, hello, How are you, etc reply accordingly as well. | |
Example of your response should be: | |
The answer is foo | |
SOURCES: xyz | |
Begin! | |
---------------- | |
{summaries}""" | |
messages = [ | |
SystemMessagePromptTemplate.from_template(system_template), | |
HumanMessagePromptTemplate.from_template("{question}"), | |
] | |
prompt = ChatPromptTemplate.from_messages(messages) | |
chain_type_kwargs = {"prompt": prompt} | |
# This chainlit decorator tells the systemm what to do when a chat begins | |
@cl.on_chat_start | |
async def on_chat_start(): | |
files = None | |
# Wait for the user to upload a file | |
while files == None: | |
files = await cl.AskFileMessage( | |
content="Please upload a text, .pdf, or .docx file to begin!", | |
accept=["text/plain", "application/pdf", | |
"application/vnd.openxmlformats-officedocument.wordprocessingml.document"], | |
max_size_mb=20, | |
timeout=180, | |
).send() | |
file = files[0] | |
msg = cl.Message( | |
content=f"Processing `{file.name}`...", disable_human_feedback=True | |
) | |
await msg.send() | |
# Decode the file if it's a .txt file | |
if file.name.endswith('.txt'): | |
text = file.content.decode("utf-8") | |
# Save temporary copy of files that aren't .txt in order to use various Python libraries to convert to text | |
def save_temp_copy(uploaded_file_path): | |
# Create a temporary directory | |
temp_dir = tempfile.mkdtemp() | |
# Copy the uploaded file to the temporary directory | |
temp_file_path = shutil.copy(uploaded_file_path, temp_dir) | |
# Return the path of the temporary file | |
print("The temporary file path is ") | |
print(temp_file_path) | |
return temp_file_path | |
# Process if a PDF file | |
if file.name.endswith('.pdf'): | |
pdf_path = save_temp_copy(file.name) | |
text = "" | |
reader = PdfReader(pdf_path) | |
for page in reader.pages: | |
text += page.extract_text() + "\n\n" | |
# Process if a Word .docx file | |
if file.name.endswith('.docx'): | |
docx_path = save_temp_copy(file.name) | |
text = docxpy.process(docx_path) | |
# Split the text into chunks | |
text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=1000, chunk_overlap=100) | |
texts = text_splitter.split_text(text) | |
# Create metadata for each text chunk | |
metadatas = [{"source": f"{i}-pl"} for i in range(len(texts))] | |
# Create a Chroma vector store | |
embeddings = OpenAIEmbeddings() | |
docsearch = await cl.make_async(Chroma.from_texts)( | |
texts, embeddings, metadatas=metadatas | |
) | |
message_history = ChatMessageHistory() | |
memory = ConversationBufferMemory( | |
memory_key="chat_history", | |
output_key="answer", | |
chat_memory=message_history, | |
return_messages=True, | |
) | |
# Create a chain that uses the Chroma vector store | |
chain = ConversationalRetrievalChain.from_llm( | |
ChatOpenAI(model_name="gpt-3.5-turbo", temperature=0, streaming=True), | |
chain_type="stuff", | |
retriever=docsearch.as_retriever(), | |
memory=memory, | |
return_source_documents=True, | |
) | |
# Let the user know that the system is ready | |
msg.content = f"Processing `{file.name}` done. You can now ask questions!" | |
await msg.update() | |
cl.user_session.set("chain", chain) | |
# This decorator tells the system what to do when it gets a message back from the API (CHECK THIS) | |
@cl.on_message | |
async def main(message): | |
chain = cl.user_session.get("chain") # type: ConversationalRetrievalChain | |
cb = cl.AsyncLangchainCallbackHandler() | |
res = await chain.acall(message, callbacks=[cb]) | |
answer = res["answer"] | |
source_documents = res["source_documents"] # type: List[Document] | |
text_elements = [] # type: List[cl.Text] | |
if source_documents: | |
for source_idx, source_doc in enumerate(source_documents): | |
source_name = f"source_{source_idx}" | |
# Create the text element referenced in the message | |
text_elements.append( | |
cl.Text(content=source_doc.page_content, name=source_name) | |
) | |
source_names = [text_el.name for text_el in text_elements] | |
if source_names: | |
answer += f"\nSources: {', '.join(source_names)}" | |
else: | |
answer += "\nNo sources found" | |
await cl.Message(content=answer, elements=text_elements).send() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment