Last active
July 29, 2020 06:41
-
-
Save codemation/755e4b8b32a47ef326a2a9bf50c35782 to your computer and use it in GitHub Desktop.
Task scheduling design pattern for Fastapi web framework, can be used for internal worker / consumer or as a method of sending post_request response hooks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio, uuid | |
from fastapi.websockets import WebSocket | |
from fastapi.testclient import TestClient | |
from fastapi import FastAPI | |
import logging as log | |
log.basicConfig() | |
server = FastAPI() | |
server.clients = {} | |
server.tasks = [] | |
async def get_client_by_id(client_id: str, path: str): | |
""" | |
creates an async generator for holding the context | |
of a TestClient.websocket_connect open | |
""" | |
async def client(): | |
log.warning(f"started client with id: {client_id} and path {path}") | |
c = TestClient(server) | |
with c.websocket_connect(f"{path}") as websocket: | |
while True: | |
result = yield websocket | |
if result == 'finished': | |
log.warning(f"cleaning up client with id: {client_id} and path {path}") | |
break | |
# returns open connetion, if exists | |
if not client_id in server.clients: | |
# creates & assigns generator to server.clients | |
server.clients[client_id] = client() | |
# initializes generator with .asend(None) & returns open websocket | |
return await server.clients[client_id].asend(None) | |
return await server.clients[client_id].asend(client_id) | |
async def worker(client, interval): | |
""" | |
waits for new work in queue & sleeps | |
""" | |
while True: | |
if len(server.tasks) == 0: | |
log.warning(f"worker {client} found no tasks to run, sleeping {interval}") | |
await asyncio.sleep(interval) | |
continue | |
job = server.tasks.pop(0) | |
if job == 'finished': | |
return | |
result = await job | |
log.warning(f"worker finished job: {result}") | |
@server.websocket_route("/worker") | |
async def run_worker(websocket: WebSocket): | |
await websocket.accept() | |
config = await websocket.receive_json() | |
client, interval = config['client'], config['interval'] | |
await websocket.send_json({"message": f"started worker {client} with config {config}"}) | |
try: | |
await worker(client, interval) | |
except Exception as e: | |
print(repr(e)) | |
finally: | |
log.warning(f"worker {client} exiting") | |
await cleanup_client(config['client']) | |
await websocket.close() | |
async def work(job_id, duration): | |
print(f"starting {job_id} for {duration} seconds") | |
await asyncio.sleep(duration) # Fake Work | |
print(f"finished {job_id} for {duration} seconds") | |
return {"message": f"{job_id} completed"} | |
@server.get("/work/add") | |
async def add_work(): | |
server.tasks.append( | |
work(str(uuid.uuid1()), 10) # | |
) | |
return {"message": "added work"} | |
@server.get("/worker/{interval}") | |
async def create_worker(interval: int): | |
return await add_worker(interval) | |
async def add_worker(interval: int): | |
import time | |
start = time.time() | |
client_id = str(uuid.uuid1()) | |
# pull open websocket client conection | |
websocket = await get_client_by_id(client_id, "/worker") | |
try: | |
# send work to websocket server | |
websocket.send_json({"interval": interval, 'client': client_id}) | |
# waits for ack of work received | |
result = websocket.receive_json() | |
log.warning(f"started worker: {result} after {time.time()-start} seconds") | |
except Exception: | |
await cleanup_client(client_id) | |
return result | |
async def cleanup_client(client_id): | |
""" | |
cleanup open websocket connection | |
""" | |
if client_id in server.clients: | |
try: | |
await sever.clients[client_id].asend('finished') | |
except Exception: | |
print(f"I cleaned up client with {client_id}") | |
@server.on_event("startup") | |
async def start_workers(): | |
""" | |
creates default running workers on app start | |
""" | |
for _ in range(2): | |
await add_worker(10) | |
@server.on_event("shutdown") | |
async def close_sessions(): | |
""" | |
on app shutdown, closes workers & exits open websocket context | |
""" | |
for client in server.clients: | |
server.tasks.append('finished') | |
await cleanup_client(client) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment