Last active
September 10, 2024 19:40
-
-
Save jvelezmagic/f3653cc2ddab1c91e86751c8b423a1b6 to your computer and use it in GitHub Desktop.
QA Chatbot streaming with source documents example using FastAPI, LangChain Expression Language, OpenAI, and Chroma.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""QA Chatbot streaming using FastAPI, LangChain Expression Language , OpenAI, and Chroma. | |
Features | |
-------- | |
- Persistent Chat Memory: | |
Stores chat history in a local file. | |
- Persistent Vector Store: | |
Stores document embeddings in a local vector store. | |
- Standalone Question Generation: | |
Rephrases follow-up questions to standalone questions in their original language. | |
- Document Retrieval: | |
Searches and retrieves relevant documents based on user queries. | |
- Context-Aware Responses: | |
Generates responses based on a combined context from relevant documents. | |
- Streaming Responses: | |
Streams responses in real time either as plain text or as Server-Sent Events (SSE). | |
SSE also sends the relevant documents as context. | |
Next Steps | |
---------- | |
- Add a proper exception handling mechanism during the streaming process. | |
- Add pruning to the conversation buffer memory to prevent it from growing too large. | |
- Combine documents using a more sophisticated method than simply concatenating them. | |
Usage | |
----- | |
1. Install dependencies: | |
```bash | |
pip install fastapi==0.99.1 uvicorn==0.23.2 python-dotenv==1.0.0 chromadb==0.4.5 tiktoken==0.4.0 langchain==0.0.257 openai==0.27.8 | |
``` | |
or | |
```bash | |
poetry install | |
``` | |
2. Run the server: | |
```bash | |
uvicorn main:app --reload | |
``` | |
3. curl the server: | |
With plain text: | |
```bash | |
curl --no-buffer -X 'POST' \ | |
'http://localhost:8000/chat' \ | |
-H 'accept: text/plain' \ | |
-H 'Content-Type: application/json' \ | |
-d '{ | |
"session_id": "session_1", | |
"message": "who'\''s playing in the river?" | |
}' | |
``` | |
With SSE: | |
```bash | |
curl --no-buffer -X 'POST' \ | |
'http://localhost:8000/chat/sse/' \ | |
-H 'accept: text/event-stream' \ | |
-H 'Content-Type: application/json' \ | |
-d '{ | |
"session_id": "session_2", | |
"message": "who'\''s playing in the garden?" | |
}' | |
Cheers! | |
@jvelezmagic""" | |
import os | |
from functools import lru_cache | |
from typing import AsyncGenerator, Literal | |
from fastapi import Depends, FastAPI | |
from fastapi.responses import StreamingResponse | |
from langchain.chat_models import ChatOpenAI | |
from langchain.embeddings import OpenAIEmbeddings | |
from langchain.memory import ConversationBufferMemory, FileChatMessageHistory | |
from langchain.prompts import PromptTemplate | |
from langchain.schema import BaseChatMessageHistory, Document, format_document | |
from langchain.schema.output_parser import StrOutputParser | |
from langchain.vectorstores import Chroma | |
from pydantic import BaseModel, BaseSettings | |
class Settings(BaseSettings): | |
openai_api_key: str | |
class Config: # type: ignore | |
env_file = ".env" | |
env_file_encoding = "utf-8" | |
class ChatRequest(BaseModel): | |
session_id: str | |
message: str | |
class ChatSSEResponse(BaseModel): | |
type: Literal["context", "start", "streaming", "end", "error"] | |
value: str | list[Document] | |
@lru_cache() | |
def get_settings() -> Settings: | |
return Settings() # type: ignore | |
@lru_cache() | |
def get_vectorstore() -> Chroma: | |
settings = get_settings() | |
embeddings = OpenAIEmbeddings(openai_api_key=settings.openai_api_key) # type: ignore | |
vectorstore = Chroma( | |
collection_name="chroma", | |
embedding_function=embeddings, | |
persist_directory="chroma", | |
) | |
return vectorstore | |
def combine_documents( | |
docs: list[Document], | |
document_prompt: PromptTemplate = PromptTemplate.from_template("{page_content}"), | |
document_separator: str = "\n\n", | |
) -> str: | |
doc_strings = [format_document(doc, document_prompt) for doc in docs] | |
return document_separator.join(doc_strings) | |
app = FastAPI( | |
title="QA Chatbot Streaming using FastAPI, LangChain Expression Language , OpenAI, and Chroma", | |
version="0.1.0", | |
) | |
@app.on_event("startup") | |
async def startup_event() -> None: | |
vectorstore = get_vectorstore() | |
is_collection_empty: bool = vectorstore._collection.count() == 0 # type: ignore | |
if is_collection_empty: | |
vectorstore.add_texts( # type: ignore | |
texts=[ | |
"Cats are playing in the garden.", | |
"Dogs are playing in the river.", | |
"Dogs and cats are mortal enemies, but they often play together.", | |
] | |
) | |
if not os.path.exists("message_store"): | |
os.mkdir("message_store") | |
async def generate_standalone_question( | |
chat_history: str, question: str, settings: Settings | |
) -> str: | |
prompt = PromptTemplate.from_template( | |
template="""Given the following conversation and a follow up question, rephrase the follow up question to be a standalone question, in its original language. | |
Chat History: | |
{chat_history} | |
Follow Up Input: {question} | |
Standalone question:""" | |
) | |
llm = ChatOpenAI(temperature=0, openai_api_key=settings.openai_api_key) | |
chain = prompt | llm | StrOutputParser() # type: ignore | |
return await chain.ainvoke( # type: ignore | |
{ | |
"chat_history": chat_history, | |
"question": question, | |
} | |
) | |
async def search_relevant_documents(query: str, k: int = 5) -> list[Document]: | |
vectorstore = get_vectorstore() | |
retriever = vectorstore.as_retriever() | |
return await retriever.aget_relevant_documents(query=query, k=k) | |
async def generate_response( | |
context: str, chat_memory: BaseChatMessageHistory, message: str, settings: Settings | |
) -> AsyncGenerator[str, None]: | |
prompt = PromptTemplate.from_template( | |
"""Answer the question based only on the following context: | |
{context} | |
Question: {question}""" | |
) | |
llm = ChatOpenAI(temperature=0, openai_api_key=settings.openai_api_key) | |
chain = prompt | llm # type: ignore | |
response = "" | |
async for token in chain.astream({"context": context, "question": message}): # type: ignore | |
yield token.content | |
response += token.content | |
chat_memory.add_user_message(message=message) | |
chat_memory.add_ai_message(message=response) | |
async def generate_sse_response( | |
context: list[Document], | |
chat_memory: BaseChatMessageHistory, | |
message: str, | |
settings: Settings, | |
) -> AsyncGenerator[str, ChatSSEResponse]: | |
prompt = PromptTemplate.from_template( | |
"""Answer the question based only on the following context: | |
{context} | |
Question: {question}""" | |
) | |
llm = ChatOpenAI(temperature=0, openai_api_key=settings.openai_api_key) | |
chain = prompt | llm # type: ignore | |
response = "" | |
yield ChatSSEResponse(type="context", value=context).json() | |
try: | |
yield ChatSSEResponse(type="start", value="").json() | |
async for token in chain.astream({"context": context, "question": message}): # type: ignore | |
yield ChatSSEResponse(type="streaming", value=token.content).json() | |
response += token.content | |
yield ChatSSEResponse(type="end", value="").json() | |
chat_memory.add_user_message(message=message) | |
chat_memory.add_ai_message(message=response) | |
except Exception as e: # TODO: Add proper exception handling | |
yield ChatSSEResponse(type="error", value=str(e)).json() | |
@app.post("/chat") | |
async def chat( | |
request: ChatRequest, settings: Settings = Depends(get_settings) | |
) -> StreamingResponse: | |
memory_key = f"./message_store/{request.session_id}.json" | |
chat_memory = FileChatMessageHistory(file_path=memory_key) | |
memory = ConversationBufferMemory(chat_memory=chat_memory, return_messages=False) | |
standalone_question = await generate_standalone_question( | |
chat_history=memory.buffer, question=request.message, settings=settings | |
) | |
relevant_documents = await search_relevant_documents(query=standalone_question) | |
combined_documents = combine_documents(relevant_documents) | |
return StreamingResponse( | |
generate_response( | |
context=combined_documents, | |
chat_memory=chat_memory, | |
message=request.message, | |
settings=settings, | |
), | |
media_type="text/plain", | |
) | |
@app.post("/chat/sse/") | |
async def chat_sse( | |
request: ChatRequest, settings: Settings = Depends(get_settings) | |
) -> StreamingResponse: | |
memory_key = f"./message_store/{request.session_id}.json" | |
chat_memory = FileChatMessageHistory(file_path=memory_key) | |
memory = ConversationBufferMemory(chat_memory=chat_memory, return_messages=False) | |
standalone_question = await generate_standalone_question( | |
chat_history=memory.buffer, question=request.message, settings=settings | |
) | |
relevant_documents = await search_relevant_documents(query=standalone_question, k=2) | |
return StreamingResponse( | |
generate_sse_response( | |
context=relevant_documents, | |
chat_memory=chat_memory, | |
message=request.message, | |
settings=settings, | |
), | |
media_type="text/event-stream", | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[tool.poetry] | |
name = "langchain-language-expression-streaming-fastapi" | |
version = "0.1.0" | |
description = "" | |
authors = ["Jesús Vélez Santiago"] | |
packages = [{include = "app"}] | |
[tool.poetry.dependencies] | |
python = "^3.10" | |
langchain = "^0.0.257" | |
openai = "^0.27.8" | |
fastapi = "0.99.1" | |
uvicorn = "^0.23.2" | |
python-dotenv = "^1.0.0" | |
chromadb = "^0.4.5" | |
tiktoken = "^0.4.0" | |
[tool.poetry.group.dev.dependencies] | |
black = "^23.7.0" | |
[build-system] | |
requires = ["poetry-core"] | |
build-backend = "poetry.core.masonry.api" |
If you get TypeError: unsupported operand type(s) for |: 'type' and 'types.GenericAlias'
in line value: str | list[Document]
, you should replace it with value: Union[str, list[Document]]
.
Hi, If you want an MVP (Minimal Working Example), this is what I've extracted from the script.
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain.chat_models import ChatOpenAI
from langchain.prompts import PromptTemplate
from pydantic import BaseModel
from typing import AsyncGenerator
class ChatRequest(BaseModel):
session_id: str
message: str
app = FastAPI(
title="QA Chatbot Streaming using FastAPI, LangChain Expression Language , OpenAI, and Chroma",
version="0.1.0",
)
async def generate_response(
context: str,
message: str,
) -> AsyncGenerator[str, None]:
prompt = PromptTemplate.from_template(
"""Answer the question based only on the following context:
{context}
Question: {question}"""
)
llm = ChatOpenAI(
temperature=0,
openai_api_key=YOUR_OPENAI_KEY,
)
chain = prompt | llm # type: ignore
response = ""
async for token in chain.astream({"context": context, "question": message}): # type: ignore
yield token.content
response += token.content
@app.post("/chat")
async def chat(
request: ChatRequest,
) -> StreamingResponse:
return StreamingResponse(
generate_response(
context="",
message=request.message,
),
media_type="text/plain",
)
Great content! I am struggling to make this work with AgentExecutor. Output never come as streamed token. Any ideas how to make the LCEL implementation of AgentExecutor to stream?
agent_executor = AgentExecutor(agent=agent_chain, memory=memory, verbose=True, tools=tool_list,return_intermediate_steps=False)
async for token in agent_executor .astream({"input ": "Hello"}):
yield token.content
response += token.content
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Just fond out how to do this.