Last active
August 9, 2023 09:26
-
-
Save UtsavChokshiCNU/d8c1660b5cf3b4b58bce0877c64f4a91 to your computer and use it in GitHub Desktop.
Sample Websocket Server that follows Query Pipeline contract and mocks it
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
### How to use it ? | |
### Run following commands : | |
pip install simple-websocket-server==0.4.4 pydantic==2.1.1 | |
python3 qa_websocket_server.py | |
### Send following request in postman using websocket : | |
ws://localhost:8000 | |
{ | |
"id": "test123", | |
"service": "qa", | |
"attributes": { | |
"stream": true, | |
"question": "Hello, how are you ?", | |
"top_k": 10 | |
} | |
} | |
""" | |
import json | |
import logging | |
import secrets | |
import string | |
import threading | |
from datetime import datetime | |
from enum import Enum | |
from time import sleep | |
from typing import Dict, List | |
from pydantic import BaseModel | |
from simple_websocket_server import WebSocket, WebSocketServer | |
import simple_websocket_server | |
# Create a logger object | |
logger = logging.getLogger("server_logger") | |
logger.setLevel(logging.DEBUG) # Set the logging level | |
# Create a handler to define where the log messages will be sent | |
handler = logging.StreamHandler() # Sends log messages to console | |
handler.setLevel(logging.DEBUG) # Set the handler's logging level | |
# Create a formatter to define the log message format | |
formatter = logging.Formatter( | |
"%(asctime)s - %(levelname)s - %(message)s") | |
handler.setFormatter(formatter) | |
# Add the handler to the logger | |
logger.addHandler(handler) | |
simple_websocket_server.HANDSHAKE_STR = ( | |
'HTTP/1.1 101 Switching Protocols\r\n' | |
'Upgrade: WebSocket\r\n' | |
'Connection: Upgrade\r\n' | |
'Access-Control-Allow-Origin: *\r\n' | |
'Sec-WebSocket-Accept: %(acceptstr)s\r\n\r\n' | |
) | |
class ServiceEnum(str, Enum): | |
Summary = "summary" | |
QA = "qa" | |
class WebsocketRequestAttributes(BaseModel): | |
stream: bool | |
question: str | |
top_k: int | |
class WebsocketRequest(BaseModel): | |
id: str | |
service: ServiceEnum | |
attributes: WebsocketRequestAttributes | |
def get_start_notification_message(request_id: str, llm_corpus_id: str, question: str) -> Dict: | |
return { | |
"id": request_id, | |
"service": "qa", | |
"response": { | |
"llm_corpus_id": llm_corpus_id, | |
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"), | |
"type": "NOTIFICATION", | |
"data": { | |
"notification_type": "START", | |
"message": f"Searching for: {question}", | |
"question": question | |
} | |
} | |
} | |
def get_intermediate_notification_message(request_id: str, llm_corpus_id: str, question: str) -> Dict: | |
return { | |
"id": request_id, | |
"service": "qa", | |
"response": { | |
"llm_corpus_id": llm_corpus_id, | |
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"), | |
"type": "NOTIFICATION", | |
"data": { | |
"notification_type": "INTERMEDIATE", | |
"message": "Generating answer for you....", | |
"question": question | |
} | |
} | |
} | |
def split_into_chunks(input_string: str, chunk_size: int = 3) -> List[str]: | |
return [input_string[i:i + chunk_size] for i in range(0, len(input_string), chunk_size)] | |
def get_streaming_answer_chunk(request_id: str, llm_corpus_id: str, question: str, chunk: str) -> Dict: | |
return { | |
"id": request_id, | |
"service": "qa", | |
"response": { | |
"llm_corpus_id": llm_corpus_id, | |
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"), | |
"type": "STREAMING_ANSWER", | |
"data": { | |
"question": question, | |
"chunk": chunk, | |
"references": [] | |
} | |
} | |
} | |
def get_streaming_answer_messages(request_id: str, llm_corpus_id: str, question: str) -> List[Dict]: | |
sample_answer = """ | |
Yes, you can add more direct numbers to an existing user extension. | |
Login to your account portal with an admin user's credentials, select a user and see the screenshot below to add a direct number. | |
https://community.ringcentral.com/storage/attachments/3290-1650565501457.png | |
No, you cannot send SMS on behalf of another person within a user group and using the RC app. However, you can implement your own app using the /sms or /batches (for high volume SMS) API to send SMS messages. Your app can be a password flow authentication which always uses the user name and password (or JWT token) of the user who owns the SMS phone number for sending and receiving SMS messages. Now, you can control your app's SMS feature access by requiring users to login the app with their own credentials, then detect their identity (e.g. extension id) to allow the user continue using the app or not. | |
Think about a fax machine in a locked room and you allow only certain employees to enter the room to send faxes. | |
""" | |
chunks = ["START"] + split_into_chunks(sample_answer) + ["END"] | |
messages = [get_streaming_answer_chunk( | |
request_id, llm_corpus_id, question, chunk) for chunk in chunks] | |
messages[-1]["response"]["data"]["references"] = [ | |
{ | |
"id": "1", | |
"title": "Sample Doc 1", | |
"link": "https://google.co.in", | |
"date": "2022-02-28T15:03:19.600+0000" | |
}, | |
{ | |
"id": "2", | |
"title": "Sample Doc 2", | |
"link": "https://google.co.in", | |
"date": "2023-02-28T15:03:19.600+0000", | |
}, | |
{ | |
"id": "3", | |
"title": "Sample Doc 3", | |
"link": "https://google.co.in", | |
"date": "2021-02-28T15:03:19.600+0000", | |
} | |
] | |
messages[-1]["response"]["data"]["latency"] = 19 | |
messages[-1]["response"]["data"]["cost"] = 0.003 | |
return messages | |
def generate_random_string(length: int = 22): | |
characters = string.ascii_letters + string.digits | |
random_string = ''.join(secrets.choice(characters) for _ in range(length)) | |
return random_string | |
def send_messages(client, ws_request: WebsocketRequest): | |
llm_corpus_id = generate_random_string() | |
try: | |
message = get_start_notification_message( | |
request_id=ws_request.id, llm_corpus_id=llm_corpus_id, question=ws_request.attributes.question) | |
logger.debug(message) | |
client.send_message(json.dumps(message)) | |
except Exception: | |
logger.exception("Error in sending start notification") | |
sleep(3) | |
try: | |
message = get_intermediate_notification_message( | |
request_id=ws_request.id, llm_corpus_id=llm_corpus_id, question=ws_request.attributes.question) | |
logger.debug(message) | |
client.send_message(json.dumps(message)) | |
except Exception: | |
logger.exception("Error in sending intermediate notification") | |
sleep(3) | |
try: | |
messages = get_streaming_answer_messages( | |
request_id=ws_request.id, llm_corpus_id=llm_corpus_id, question=ws_request.attributes.question) | |
logger.debug( | |
f"Sending {len(messages)} messages for streaming answer ..") | |
for message in messages: | |
client.send_message(json.dumps(message)) | |
sleep(0.2) | |
logger.info(f"Sent {len(messages)} messages for streaming answer") | |
except Exception: | |
logger.exception("Error in sending streaming answers") | |
logger.info(f"Total active threads: {threading.active_count()}") | |
class QAWebSocket(WebSocket): | |
def handle(self): | |
try: | |
ws_request = WebsocketRequest(**json.loads(self.data)) | |
except Exception as e: | |
logger.exception("Error in handling request message") | |
thread = threading.Thread( | |
target=send_messages, args=(self, ws_request)) | |
logger.info("Offloading sending messages to new thread !") | |
thread.start() | |
def connected(self): | |
logger.info(f"{self.address} connected") | |
def handle_close(self): | |
logger.info(f"{self.address} closed") | |
server = WebSocketServer("", 8000, QAWebSocket) | |
logger.info("Starting server at : ws://localhost:8000") | |
server.serve_forever() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment