Last active
February 25, 2023 04:46
-
-
Save altescy/8bf2c57710daf6153d20fde1498b5cea to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import time | |
from multiprocessing import Process, Queue | |
from typing import Callable, Generic, Iterator, TypeVar | |
from uuid import uuid4 | |
import fastapi | |
from pydantic import BaseModel | |
T = TypeVar("T") | |
logger = logging.getLogger(__name__) | |
class Done: | |
... | |
class Task(Process, Generic[T]): | |
def __init__(self, taskid: str, func: Callable[[], Iterator[T]]) -> None: | |
super().__init__() | |
self._taskid = taskid | |
self._func = func | |
self._queue: Queue[T | Done] = Queue() | |
self._done = False | |
@property | |
def taskid(self) -> str: | |
return self._taskid | |
@property | |
def done(self) -> bool: | |
return self._done | |
def run(self) -> None: | |
for output in self._func(): | |
self._queue.put(output) | |
self._queue.put(Done()) | |
def result(self) -> list[T]: | |
result: list[T] = [] | |
while not self._queue.empty(): | |
item = self._queue.get() | |
if isinstance(item, Done): | |
self._done = True | |
break | |
result.append(item) | |
return result | |
class Response(BaseModel, Generic[T]): | |
taskid: str | |
finished: bool | |
result: list[T] | |
app = fastapi.FastAPI() | |
tasks: dict[str, Task] = {} | |
def multistep_task() -> Iterator[str]: | |
for i in range(10): | |
time.sleep(1) | |
yield f"result of step {i}" | |
@app.post("/api/execute") | |
async def execute() -> str: | |
taskid = uuid4().hex | |
task = Task(taskid, multistep_task) | |
task.start() | |
tasks[taskid] = task | |
return task.taskid | |
@app.get("/api/status/{task_id}") | |
async def status(task_id: str) -> Response[str]: | |
task = tasks.get(task_id) | |
if task is None: | |
raise fastapi.HTTPException(status_code=404, detail="Task not found") | |
result = task.result() | |
if task.done: | |
tasks.pop(task.taskid) | |
response = Response(taskid=task.taskid, finished=task.done, result=result) # type: ignore[var-annotated] | |
return response | |
@app.post("/api/status/{task_id}/terminate") | |
async def terminate(task_id: str) -> None: | |
task = tasks.get(task_id) | |
if task is None: | |
raise fastapi.HTTPException(status_code=404, detail="Task not found") | |
task.terminate() | |
tasks.pop(task.taskid) | |
@app.get("/", response_class=fastapi.responses.HTMLResponse) | |
async def root() -> str: | |
return """ | |
<!DOCTYPE html> | |
<html> | |
<head> | |
<title>FastAPI</title> | |
</head> | |
<body> | |
<button id="execute-btn">Execute</button> | |
<button id="terminate-btn" disable>Terminate</button> | |
<div id="status">...</div> | |
<div | |
id="results" | |
style="background-color: #000; color: #fff; padding: 1em;" | |
></div> | |
<script> | |
let running_taskid = null; | |
let terminated = false; | |
document.getElementById("execute-btn").addEventListener("click", async () => { | |
const response = await fetch("/api/execute", {method: "POST"}); | |
running_taskid = await response.json(); | |
terminated = false; | |
document.getElementById("status").innerText = `Running task ${running_taskid}...`; | |
document.getElementById("results").innerText = ""; | |
document.getElementById("execute-btn").disabled = true; | |
document.getElementById("terminate-btn").disabled = false; | |
while (!terminated) { | |
const response = await fetch(`/api/status/${running_taskid}`); | |
const status = await response.json(); | |
for (const result of status.result) { | |
let line = document.createElement("div"); | |
line.innerText = result; | |
document.getElementById("results").appendChild(line); | |
} | |
if (status.finished) { | |
break; | |
} | |
await new Promise(resolve => setTimeout(resolve, 1000)); | |
} | |
if (!terminated) { | |
document.getElementById("status").innerText = `Task ${running_taskid} successfully finished!`; | |
} | |
document.getElementById("execute-btn").disabled = false; | |
document.getElementById("terminate-btn").disabled = true; | |
running_taskid = null; | |
}); | |
document.getElementById("terminate-btn").addEventListener("click", async () => { | |
if (running_taskid === null) { | |
return; | |
} | |
await fetch(`/api/status/${running_taskid}/terminate`, {method: "POST"}); | |
document.getElementById("status").innerText = `Task ${running_taskid} terminated!`; | |
document.getElementById("execute-btn").disabled = false; | |
document.getElementById("terminate-btn").disabled = true; | |
running_taskid = null; | |
terminated = true; | |
}); | |
</script> | |
</body> | |
</html> | |
""" | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=8000) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
pip insatll fastapi curl -s https://gist.githubusercontent.com/altescy/8bf2c57710daf6153d20fde1498b5cea/raw | python