Created
March 31, 2023 08:21
-
-
Save liviaerxin/ee6c1548ef739dda87f49b64890784d3 to your computer and use it in GitHub Desktop.
Serving long-running/heavy-computation task via FastAPI and multiprocessing.Process #python #fastapi #ml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Serving long-running/heavy-computation task via FastAPI and multiprocessing.Process | |
usage: | |
```sh | |
uvicorn main:app --reload | |
``` | |
""" | |
from fastapi import FastAPI | |
from multiprocessing import Pool, Process, TimeoutError, Queue | |
from multiprocessing.pool import AsyncResult | |
import os | |
import sys | |
import asyncio | |
from typing import List | |
import queue | |
app = FastAPI() | |
q = Queue() | |
def task(q: Queue, x: int): | |
import time | |
import os | |
print(f"PID[{os.getpid()}] Parent[{os.getppid()}] start task") | |
time.sleep(x) | |
print(f"PID[{os.getpid()}] Parent[{os.getppid()}] end task") | |
q.put({"pid": os.getpid(), "completed": True}) | |
def callback(result): | |
import os | |
import json | |
with open("test.txt", "w") as f: | |
json.dump(result, f) | |
print(f"PID[{os.getpid()}] Parent[{os.getppid()}] callback result[{result}]") | |
async def task_consumer(): | |
global q | |
while True: | |
try: | |
result = q.get_nowait() | |
print(f"PID[{os.getpid()}] Parent[{os.getppid()}] task_consumer [{result}]") | |
except queue.Empty: | |
print(f"PID[{os.getpid()}] Parent[{os.getppid()}] .") | |
await asyncio.sleep(1) | |
@app.on_event("startup") | |
async def startup(): | |
print(f"PID[{os.getpid()}] Parent[{os.getppid()}] app startup") | |
asyncio.create_task(task_consumer()) | |
@app.get("/") | |
async def root(): | |
print(f"PID[{os.getpid()}] Parent[{os.getppid()}] root") | |
return {"message": "Hello World"} | |
@app.get("/analyze") | |
async def analyze(): | |
print(f"PID[{os.getpid()}] Parent[{os.getppid()}] analyze") | |
result = {} | |
# pool = Pool(processes=1) | |
# pool.apply_async( | |
# task, | |
# args=(10,), | |
# ) | |
global q | |
p = Process( | |
target=task, | |
args=( | |
q, | |
10, | |
), | |
) | |
p.start() | |
# res = pool.apply_async(task, args=(10,), callback=callback) | |
# NOTE: block the main asyncio loop, other requests are not available | |
# res.get() | |
# try: | |
# result = res.get(timeout=-1) | |
# except TimeoutError: | |
# sys.stdout.write(".") | |
# await asyncio.sleep(0.01) | |
# return {"result": res.ready()} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment