Last active
August 2, 2024 16:40
-
-
Save oneryalcin/2921408da70266aa61f9c40cb2973865 to your computer and use it in GitHub Desktop.
Server Side Events (SSE) with FastAPi and (partially) Langchain
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# I couldn't get return generators from chains so I had to do a bit of low level SSE, Hope this is useful | |
# Probably you'll use another Vector Store instead of OpenSearch, but if you want to mimic what I did here, | |
# please use the fork of `OpenSearchVectorSearch` in https://github.com/oneryalcin/langchain | |
import json | |
import os | |
import logging | |
from typing import List, Generator | |
import requests | |
import sseclient | |
from fastapi import FastAPI | |
from fastapi.responses import StreamingResponse | |
from langchain.vectorstores import OpenSearchVectorSearch | |
from langchain.embeddings.openai import OpenAIEmbeddings | |
from langchain.text_splitter import Document | |
OPENSEARCH_URL = '<VECTOR STORE URL>' | |
OPENSEARCH_INDEX = '<VECTOR STORE INDEX NAME>' | |
OPENAI_API_KEY = '<OPENAI KEY>' | |
OPENAI_COMPLETIONS_V1_URL = "https://api.openai.com/v1/completions" | |
app = FastAPI( | |
title='Streaming Langchain with Vectorsearch', | |
description='Example streaming API endpoint for VectorSearch and OpenAI', | |
version='0.0.1', | |
) | |
@app.on_event("startup") | |
async def startup_event(): | |
global docsearch | |
logger.info("Loading vector store") | |
docsearch = OpenSearchVectorSearch( | |
opensearch_url=OPENSEARCH_URL, | |
index_name=OPENSEARCH_INDEX, | |
embedding_function=OpenAIEmbeddings() | |
) | |
def streaming_request(prompt: str, model: str = 'text-davinci-003'): | |
"""Generator for each chunk received from OpenAI as response | |
:param prompt: User Prompt | |
:param model: OpenAI Model name | |
:return: generator object for streaming response from OpenAI | |
""" | |
req_headers = { | |
'Accept': 'text/event-stream', | |
'Authorization': 'Bearer ' + os.getenv('OPENAI_API_KEY') | |
} | |
req_body = { | |
'model': model, | |
'prompt': prompt, | |
'max_tokens': 400, | |
'temperature': 0, | |
'stream': True, | |
} | |
request = requests.post(url=OPENAI_COMPLETIONS_V1_URL, stream=True, headers=req_headers, json=req_body) | |
client = sseclient.SSEClient(request) | |
for event in client.events(): | |
if event.data != '[DONE]': | |
text = json.loads(event.data)['choices'][0]['text'] | |
yield json.dumps({'data': text}) | |
else: | |
yield '[DONE]' | |
def gen_prompt(docs: List[Document], query: str) -> str: | |
"""Build question prompt | |
:param docs: Documents returned from the Vector search | |
:param query: User question | |
:return: generator object for streaming response from OpenAI | |
""" | |
return f"""To answer the question please only use the Context given, nothing else. Do not make up answer, | |
simply say 'I don't know' if you are not sure. | |
Question: {query} | |
Context: {[doc.page_content for doc in docs]} | |
Answer: | |
""" | |
@app.post('/streaming/ask') | |
async def main(query: str) -> StreamingResponse: | |
"""Streaming API, this endpoint uses Server Side Events | |
:param query: User question | |
:return: Streaming Response chunks from OpenAI | |
""" | |
docs = docsearch.similarity_search(query, k=4) | |
prompt = gen_prompt(docs, query) | |
return StreamingResponse(streaming_request(prompt), media_type="application/json") | |
if __name__ == '__main__': | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=8000, log_level="debug") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Check streaming with CURL or postman: