Skip to content

Instantly share code, notes, and snippets.

@tjunxiang92
Created June 8, 2024 06:21
Show Gist options
  • Save tjunxiang92/ae15e30c9fc9607d98b0c8e4773fc6bb to your computer and use it in GitHub Desktop.
Save tjunxiang92/ae15e30c9fc9607d98b0c8e4773fc6bb to your computer and use it in GitHub Desktop.
Converting Websocket to SSE Protocol with Python with websockets, fastapi, sse-starlette
import json
from fastapi import FastAPI, Request
from sse_starlette.sse import EventSourceResponse
import asyncio
import uvicorn
from websockets.client import connect
app = FastAPI()
async def connect_websockets():
ws_connection = await connect("ws://localhost:5678")
# Close the connection when the client disconnects. Needs to be in a separate function as FastAPI will stop responding after an await is called while the request is ending.
async def close_connection():
print("Closing Websocket")
await ws_connection.close()
try:
while True:
# websocket.send('{"Hello": "world!"}')
msg = await ws_connection.recv()
yield {
"data": msg,# json.dumps()
}
except asyncio.CancelledError as e:
asyncio.create_task(close_connection())
@app.get("/")
def read_sse(req: Request):
return EventSourceResponse(connect_websockets())
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000, log_level='info')
import asyncio
import datetime
import random
import websockets.server as websockets
from websockets.exceptions import ConnectionClosed
async def time(websocket, path):
while True:
now = datetime.datetime.now().isoformat() + "Z"
try:
print(f"Sending {now}")
await websocket.send(now)
except ConnectionClosed:
print("Client disconnected. Do cleanup")
break
await asyncio.sleep(2)
start_server = websockets.serve(time, "127.0.0.1", 5678)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment