Created
March 31, 2023 08:44
-
-
Save liviaerxin/4de85f251941bb385b6bcc8374e471be to your computer and use it in GitHub Desktop.
Serving long-running/heavy-computation task via FastAPI and concurrent.futures.ProcessPoolExecutor #python #fastapi #ml
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Serving long-running/heavy-computation task via FastAPI and concurrent.futures.ProcessPoolExecutor | |
usage: | |
```sh | |
uvicorn main:app --reload | |
``` | |
""" | |
from fastapi import FastAPI | |
import os | |
import sys | |
import asyncio | |
from typing import List | |
import queue | |
from concurrent.futures import ProcessPoolExecutor, Future | |
app = FastAPI() | |
executor = ProcessPoolExecutor(1) | |
task_list: List[Future] = [] | |
def task(x: int): | |
import time | |
import os | |
print(f"PID[{os.getpid()}] Parent[{os.getppid()}] start task") | |
time.sleep(x) | |
print(f"PID[{os.getpid()}] Parent[{os.getppid()}] end task") | |
return {"pid": os.getpid(), "completed": True} | |
def callback(future: Future): | |
import os | |
print( | |
f"PID[{os.getpid()}] Parent[{os.getppid()}] callback result[{future.result()}]" | |
) | |
async def task_consumer(): | |
global task_list | |
while True: | |
for future in task_list: | |
if future.done(): | |
print( | |
f"PID[{os.getpid()}] Parent[{os.getppid()}] task_consumer [{future}]" | |
) | |
task_list.remove(future) | |
print(f"PID[{os.getpid()}] Parent[{os.getppid()}] .") | |
await asyncio.sleep(1) | |
@app.on_event("startup") | |
async def startup(): | |
print(f"PID[{os.getpid()}] Parent[{os.getppid()}] app startup") | |
asyncio.create_task(task_consumer()) | |
@app.get("/") | |
async def root(): | |
print(f"PID[{os.getpid()}] Parent[{os.getppid()}] root") | |
return {"message": "Hello World"} | |
@app.get("/analyze") | |
async def analyze(): | |
print(f"PID[{os.getpid()}] Parent[{os.getppid()}] analyze") | |
future = executor.submit(task, 10) | |
future.add_done_callback(callback) | |
task_list.append(future) | |
# res = pool.apply_async(task, args=(10,), callback=callback) | |
# NOTE: block the main asyncio loop, other requests are not available | |
# res.get() | |
# try: | |
# result = res.get(timeout=-1) | |
# except TimeoutError: | |
# sys.stdout.write(".") | |
# await asyncio.sleep(0.01) | |
return {"result": future.done()} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment