Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save deepanshumehtaa/882db0abcaf57f933b9457e6c03102c1 to your computer and use it in GitHub Desktop.
Save deepanshumehtaa/882db0abcaf57f933b9457e6c03102c1 to your computer and use it in GitHub Desktop.
FastAPI with dequeue and custom threadpool executor.py
import concurrent.futures
import collections
import threading
import time
import random
from fastapi import FastAPI, BackgroundTasks
from fastapi.responses import JSONResponse
t_pool = concurrent.futures.ThreadPoolExecutor(max_workers=3)
app = FastAPI(
swagger_ui_parameters={
"defaultModelsExpandDepth": -1, # not render `schema` page in bottom of swagger
"tryItOutEnabled": True,
"docExpansion": "none", # collapse all
}
)
@app.on_event("shutdown")
async def shutdown_event():
stop_event.set()
t_pool.shutdown(wait=True)
# Task queue
task_deque = collections.deque()
# Lock for thread-safe access to deque
deque_lock = threading.Lock()
# Event to signal stopping the executor
stop_event = threading.Event()
def execute_task(task_name):
"""Simulate task execution"""
sleep_time = random.randint(2, 8)
print(f"Starting task: {task_name}, it will take {sleep_time} seconds")
time.sleep(sleep_time)
print(f"Finished task: {task_name}")
def task_consumer(executor):
"""Consume tasks from deque and execute them"""
while not stop_event.is_set() or task_deque:
with deque_lock:
if task_deque:
task_name = task_deque.popleft()
else:
task_name = None
if task_name:
executor.submit(execute_task, task_name)
else:
time.sleep(0.1) # Wait if deque is empty
# Start the task consumer
consumer_thread = threading.Thread(target=task_consumer, args=(t_pool, ))
consumer_thread.daemon = True # Set as daemon so it stops when main thread stops
consumer_thread.start()
def add_task_to_queue(task_name):
with deque_lock:
task_deque.append(task_name)
print(f"Added task to queue: {task_name}")
@app.post("/submit-task/")
async def submit_task(task_name: str, background_tasks: BackgroundTasks):
background_tasks.add_task(add_task_to_queue, task_name)
return JSONResponse(content={"message": f"Task {task_name} submitted successfully"}, status_code=202)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment