Created
July 11, 2025 14:47
-
-
Save deepanshumehtaa/882db0abcaf57f933b9457e6c03102c1 to your computer and use it in GitHub Desktop.
FastAPI with dequeue and custom threadpool executor.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import concurrent.futures | |
import collections | |
import threading | |
import time | |
import random | |
from fastapi import FastAPI, BackgroundTasks | |
from fastapi.responses import JSONResponse | |
t_pool = concurrent.futures.ThreadPoolExecutor(max_workers=3) | |
app = FastAPI( | |
swagger_ui_parameters={ | |
"defaultModelsExpandDepth": -1, # not render `schema` page in bottom of swagger | |
"tryItOutEnabled": True, | |
"docExpansion": "none", # collapse all | |
} | |
) | |
@app.on_event("shutdown") | |
async def shutdown_event(): | |
stop_event.set() | |
t_pool.shutdown(wait=True) | |
# Task queue | |
task_deque = collections.deque() | |
# Lock for thread-safe access to deque | |
deque_lock = threading.Lock() | |
# Event to signal stopping the executor | |
stop_event = threading.Event() | |
def execute_task(task_name): | |
"""Simulate task execution""" | |
sleep_time = random.randint(2, 8) | |
print(f"Starting task: {task_name}, it will take {sleep_time} seconds") | |
time.sleep(sleep_time) | |
print(f"Finished task: {task_name}") | |
def task_consumer(executor): | |
"""Consume tasks from deque and execute them""" | |
while not stop_event.is_set() or task_deque: | |
with deque_lock: | |
if task_deque: | |
task_name = task_deque.popleft() | |
else: | |
task_name = None | |
if task_name: | |
executor.submit(execute_task, task_name) | |
else: | |
time.sleep(0.1) # Wait if deque is empty | |
# Start the task consumer | |
consumer_thread = threading.Thread(target=task_consumer, args=(t_pool, )) | |
consumer_thread.daemon = True # Set as daemon so it stops when main thread stops | |
consumer_thread.start() | |
def add_task_to_queue(task_name): | |
with deque_lock: | |
task_deque.append(task_name) | |
print(f"Added task to queue: {task_name}") | |
@app.post("/submit-task/") | |
async def submit_task(task_name: str, background_tasks: BackgroundTasks): | |
background_tasks.add_task(add_task_to_queue, task_name) | |
return JSONResponse(content={"message": f"Task {task_name} submitted successfully"}, status_code=202) | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=8000) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment