Skip to content

Instantly share code, notes, and snippets.

@jvelezmagic
Last active September 10, 2024 19:40
Show Gist options
  • Save jvelezmagic/f3653cc2ddab1c91e86751c8b423a1b6 to your computer and use it in GitHub Desktop.
Save jvelezmagic/f3653cc2ddab1c91e86751c8b423a1b6 to your computer and use it in GitHub Desktop.
QA Chatbot streaming with source documents example using FastAPI, LangChain Expression Language, OpenAI, and Chroma.
"""QA Chatbot streaming using FastAPI, LangChain Expression Language , OpenAI, and Chroma.
Features
--------
- Persistent Chat Memory:
Stores chat history in a local file.
- Persistent Vector Store:
Stores document embeddings in a local vector store.
- Standalone Question Generation:
Rephrases follow-up questions to standalone questions in their original language.
- Document Retrieval:
Searches and retrieves relevant documents based on user queries.
- Context-Aware Responses:
Generates responses based on a combined context from relevant documents.
- Streaming Responses:
Streams responses in real time either as plain text or as Server-Sent Events (SSE).
SSE also sends the relevant documents as context.
Next Steps
----------
- Add a proper exception handling mechanism during the streaming process.
- Add pruning to the conversation buffer memory to prevent it from growing too large.
- Combine documents using a more sophisticated method than simply concatenating them.
Usage
-----
1. Install dependencies:
```bash
pip install fastapi==0.99.1 uvicorn==0.23.2 python-dotenv==1.0.0 chromadb==0.4.5 tiktoken==0.4.0 langchain==0.0.257 openai==0.27.8
```
or
```bash
poetry install
```
2. Run the server:
```bash
uvicorn main:app --reload
```
3. curl the server:
With plain text:
```bash
curl --no-buffer -X 'POST' \
'http://localhost:8000/chat' \
-H 'accept: text/plain' \
-H 'Content-Type: application/json' \
-d '{
"session_id": "session_1",
"message": "who'\''s playing in the river?"
}'
```
With SSE:
```bash
curl --no-buffer -X 'POST' \
'http://localhost:8000/chat/sse/' \
-H 'accept: text/event-stream' \
-H 'Content-Type: application/json' \
-d '{
"session_id": "session_2",
"message": "who'\''s playing in the garden?"
}'
Cheers!
@jvelezmagic"""
import os
from functools import lru_cache
from typing import AsyncGenerator, Literal
from fastapi import Depends, FastAPI
from fastapi.responses import StreamingResponse
from langchain.chat_models import ChatOpenAI
from langchain.embeddings import OpenAIEmbeddings
from langchain.memory import ConversationBufferMemory, FileChatMessageHistory
from langchain.prompts import PromptTemplate
from langchain.schema import BaseChatMessageHistory, Document, format_document
from langchain.schema.output_parser import StrOutputParser
from langchain.vectorstores import Chroma
from pydantic import BaseModel, BaseSettings
class Settings(BaseSettings):
openai_api_key: str
class Config: # type: ignore
env_file = ".env"
env_file_encoding = "utf-8"
class ChatRequest(BaseModel):
session_id: str
message: str
class ChatSSEResponse(BaseModel):
type: Literal["context", "start", "streaming", "end", "error"]
value: str | list[Document]
@lru_cache()
def get_settings() -> Settings:
return Settings() # type: ignore
@lru_cache()
def get_vectorstore() -> Chroma:
settings = get_settings()
embeddings = OpenAIEmbeddings(openai_api_key=settings.openai_api_key) # type: ignore
vectorstore = Chroma(
collection_name="chroma",
embedding_function=embeddings,
persist_directory="chroma",
)
return vectorstore
def combine_documents(
docs: list[Document],
document_prompt: PromptTemplate = PromptTemplate.from_template("{page_content}"),
document_separator: str = "\n\n",
) -> str:
doc_strings = [format_document(doc, document_prompt) for doc in docs]
return document_separator.join(doc_strings)
app = FastAPI(
title="QA Chatbot Streaming using FastAPI, LangChain Expression Language , OpenAI, and Chroma",
version="0.1.0",
)
@app.on_event("startup")
async def startup_event() -> None:
vectorstore = get_vectorstore()
is_collection_empty: bool = vectorstore._collection.count() == 0 # type: ignore
if is_collection_empty:
vectorstore.add_texts( # type: ignore
texts=[
"Cats are playing in the garden.",
"Dogs are playing in the river.",
"Dogs and cats are mortal enemies, but they often play together.",
]
)
if not os.path.exists("message_store"):
os.mkdir("message_store")
async def generate_standalone_question(
chat_history: str, question: str, settings: Settings
) -> str:
prompt = PromptTemplate.from_template(
template="""Given the following conversation and a follow up question, rephrase the follow up question to be a standalone question, in its original language.
Chat History:
{chat_history}
Follow Up Input: {question}
Standalone question:"""
)
llm = ChatOpenAI(temperature=0, openai_api_key=settings.openai_api_key)
chain = prompt | llm | StrOutputParser() # type: ignore
return await chain.ainvoke( # type: ignore
{
"chat_history": chat_history,
"question": question,
}
)
async def search_relevant_documents(query: str, k: int = 5) -> list[Document]:
vectorstore = get_vectorstore()
retriever = vectorstore.as_retriever()
return await retriever.aget_relevant_documents(query=query, k=k)
async def generate_response(
context: str, chat_memory: BaseChatMessageHistory, message: str, settings: Settings
) -> AsyncGenerator[str, None]:
prompt = PromptTemplate.from_template(
"""Answer the question based only on the following context:
{context}
Question: {question}"""
)
llm = ChatOpenAI(temperature=0, openai_api_key=settings.openai_api_key)
chain = prompt | llm # type: ignore
response = ""
async for token in chain.astream({"context": context, "question": message}): # type: ignore
yield token.content
response += token.content
chat_memory.add_user_message(message=message)
chat_memory.add_ai_message(message=response)
async def generate_sse_response(
context: list[Document],
chat_memory: BaseChatMessageHistory,
message: str,
settings: Settings,
) -> AsyncGenerator[str, ChatSSEResponse]:
prompt = PromptTemplate.from_template(
"""Answer the question based only on the following context:
{context}
Question: {question}"""
)
llm = ChatOpenAI(temperature=0, openai_api_key=settings.openai_api_key)
chain = prompt | llm # type: ignore
response = ""
yield ChatSSEResponse(type="context", value=context).json()
try:
yield ChatSSEResponse(type="start", value="").json()
async for token in chain.astream({"context": context, "question": message}): # type: ignore
yield ChatSSEResponse(type="streaming", value=token.content).json()
response += token.content
yield ChatSSEResponse(type="end", value="").json()
chat_memory.add_user_message(message=message)
chat_memory.add_ai_message(message=response)
except Exception as e: # TODO: Add proper exception handling
yield ChatSSEResponse(type="error", value=str(e)).json()
@app.post("/chat")
async def chat(
request: ChatRequest, settings: Settings = Depends(get_settings)
) -> StreamingResponse:
memory_key = f"./message_store/{request.session_id}.json"
chat_memory = FileChatMessageHistory(file_path=memory_key)
memory = ConversationBufferMemory(chat_memory=chat_memory, return_messages=False)
standalone_question = await generate_standalone_question(
chat_history=memory.buffer, question=request.message, settings=settings
)
relevant_documents = await search_relevant_documents(query=standalone_question)
combined_documents = combine_documents(relevant_documents)
return StreamingResponse(
generate_response(
context=combined_documents,
chat_memory=chat_memory,
message=request.message,
settings=settings,
),
media_type="text/plain",
)
@app.post("/chat/sse/")
async def chat_sse(
request: ChatRequest, settings: Settings = Depends(get_settings)
) -> StreamingResponse:
memory_key = f"./message_store/{request.session_id}.json"
chat_memory = FileChatMessageHistory(file_path=memory_key)
memory = ConversationBufferMemory(chat_memory=chat_memory, return_messages=False)
standalone_question = await generate_standalone_question(
chat_history=memory.buffer, question=request.message, settings=settings
)
relevant_documents = await search_relevant_documents(query=standalone_question, k=2)
return StreamingResponse(
generate_sse_response(
context=relevant_documents,
chat_memory=chat_memory,
message=request.message,
settings=settings,
),
media_type="text/event-stream",
)
[tool.poetry]
name = "langchain-language-expression-streaming-fastapi"
version = "0.1.0"
description = ""
authors = ["Jesús Vélez Santiago"]
packages = [{include = "app"}]
[tool.poetry.dependencies]
python = "^3.10"
langchain = "^0.0.257"
openai = "^0.27.8"
fastapi = "0.99.1"
uvicorn = "^0.23.2"
python-dotenv = "^1.0.0"
chromadb = "^0.4.5"
tiktoken = "^0.4.0"
[tool.poetry.group.dev.dependencies]
black = "^23.7.0"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
@sowsan
Copy link

sowsan commented Dec 15, 2023

Great content! I am struggling to make this work with AgentExecutor. Output never come as streamed token. Any ideas how to make the LCEL implementation of AgentExecutor to stream?

agent_executor = AgentExecutor(agent=agent_chain, memory=memory, verbose=True, tools=tool_list,return_intermediate_steps=False)

async for token in agent_executor .astream({"input ": "Hello"}):
yield token.content
response += token.content

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment