Created
July 22, 2020 13:40
-
-
Save codemation/b05e36a916404df0b26c41beb1a2e051 to your computer and use it in GitHub Desktop.
A generic implementation for post-response hooks", using websocket & async generators with fastapi
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# The following assumes run(sever) was called with an existing fastapi router as input var | |
def run(server): | |
import asyncio, uuid | |
from fastapi.websockets import WebSocket | |
from fastapi.testclient import TestClient | |
server.clients = {} | |
async def get_client_by_id(client_id: str, path: str): | |
""" | |
creates an async generator for holding the context | |
of a TestClient.websocket_connect open | |
""" | |
async def client(): | |
log.warning(f"started client with id: {client_id} and path {path}") | |
c = TestClient(server) | |
with c.websocket_connect(f"{path}") as websocket: | |
while True: | |
result = yield websocket | |
if result == 'finished': | |
log.warning(f"cleaning up client with id: {client_id} and path {path}") | |
break | |
# returns open connetion, if exists | |
if not client_id in server.clients: | |
# creates & assigns generator to server.clients | |
server.clients[client_id] = client() | |
# initializes generator with .asend(None) & returns open websocket | |
return await server.clients[client_id].asend(None) | |
return await server.clients[client_id].asend(client_id) | |
async def cleanup_client(client_id): | |
""" | |
cleanup open websocket connection | |
""" | |
if client_id in server.clients: | |
try: | |
await sever.clients[client_id].asend('finished') | |
except Exception: | |
print(f"I cleaned up client with {client_id}") | |
async def post_response_work(client_id, duration): | |
print(f"I started work with {client_id} which will take {duration} seconds") | |
await asyncio.sleep(duration) | |
print(f"I finished work with {client_id} after {duration} seconds") | |
await cleanup_client(client_id) | |
@server.websocket_route("/post_response") | |
async def attach_databases(websocket: WebSocket): | |
await websocket.accept() | |
# wait for work | |
work = await websocket.receive_json() | |
# send receipt of work received | |
await websocket.send_json({"message": f"I started a post_response with {work['work']}"}) | |
# start work with will continue after client response | |
await post_response_work(work['client'], work['work']) | |
await websocket.close() | |
async def trigger_post_response(work): | |
client = TestClient(server) | |
import time | |
start = time.time() | |
client_id = str(uuid.uuid1()) | |
# pull open websocket client conection | |
websocket = await get_client_by_id(client_id, "/post_response") | |
# send work to websocket server | |
websocket.send_json({"work": work, 'client': client_id}) | |
# waits for ack of work received | |
result = websocket.receive_json() | |
print(f"trigger_post_response: {result} after {time.time()-start} seconds") | |
return result | |
@server.get("/work/{work}") | |
async def start_work(work: int): | |
return await trigger_post_response(work) | |
""" | |
$ uvicorn server:app | |
INFO: Started server process [18160] | |
INFO: Waiting for application startup. | |
INFO: Application startup complete. | |
INFO: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit) | |
WARNING: started client with id: a11a7248-cc1c-11ea-9ada-f3f7bc2ffe6c and path /post_response | |
I started work with a11a7248-cc1c-11ea-9ada-f3f7bc2ffe6c which will take 5 seconds | |
trigger_post_response: {'message': 'I started a post_response with 5'} after 0.008193492889404297 seconds | |
INFO: 127.0.0.1:45868 - "GET /work/5 HTTP/1.1" 200 OK | |
I finished work with a11a7248-cc1c-11ea-9ada-f3f7bc2ffe6c after 5 seconds | |
I cleaned up client with a11a7248-cc1c-11ea-9ada-f3f7bc2ffe6c | |
""" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment