Skip to content

Instantly share code, notes, and snippets.

@helton
Created October 17, 2024 15:06
Show Gist options
  • Save helton/9dce0b19fa73bb02567e940596bb2df4 to your computer and use it in GitHub Desktop.
Save helton/9dce0b19fa73bb02567e940596bb2df4 to your computer and use it in GitHub Desktop.
Celery Dynamic Workflow
import json
import random
from shared.app import app
from datetime import datetime, timezone
import time
from ulid import ulid
def run_workflow(title: str, workflow, timeout: int = 600, poll_interval: int = 1):
start_time = datetime.now(timezone.utc)
elapsed_time = 0
result = workflow.delay()
print(result)
workflow_id = result.id
print(title)
print(f"πŸ”„ Workflow '{workflow_id}' created:")
print(f"⏳ Waiting until result is available (timeout {timeout}s)...")
while not result.ready():
current_time = datetime.now(timezone.utc)
elapsed_time = (current_time - start_time).total_seconds()
print(f"πŸ” Result not ready yet. Checking again in {poll_interval}s...")
if elapsed_time > timeout:
print(f"⏰ Timeout reached. Workflow id='{workflow_id}' did not complete in time. πŸ›‘")
print('=' * 100)
return
time.sleep(poll_interval)
if result.successful():
workflow_result = result.get(disable_sync_subtasks=False)
print(f"βœ… Workflow id='{workflow_id}' completed successfully in {elapsed_time:.2f}s. Result:\n{workflow_result}")
else:
print(f"❌ Workflow id='{workflow_id}' failed. 😞")
print(f"πŸ“ Traceback:\n{result.traceback}")
print('=' * 100)
if __name__ == "__main__":
run_workflow(
title="Dynamic Workflow",
workflow=
app.signature("generate_list", kwargs={"amount": random.randint(1, 10)})
| app.signature("process_numbers_individually", args=(app.signature("double_number") | app.signature("square_number"),))
| app.signature("sum_numbers")
)
import celery
from celery import chain, group, subtask
from celery.utils.log import get_task_logger
from typing import List
from shared.app import app
logger = get_task_logger(__name__)
@app.task(name="generate_list")
def generate_list(amount: int) -> List[int]:
logger.info(f"Generating list of integers up to {amount}.")
return list(range(1, amount + 1))
@app.task(name="double_number")
def double_number(x: int) -> int:
logger.info(f"Doubling number: {x}")
return x * 2
@app.task(name="square_number")
def square_number(x: int) -> int:
logger.info(f"Squaring number: {x}")
return x * x
@app.task(name="sum_numbers")
def sum_numbers(numbers: List[int]) -> int:
logger.info(f"Summing numbers: {numbers}")
return sum(numbers)
# reference: https://blog.det.life/replacing-celery-tasks-inside-a-chain-b1328923fb02
@app.task(name="process_numbers_individually", bind=True)
def process_numbers_individually(self, numbers: List[int], task_chain):
logger.info(f"Processing all numbers: {numbers}")
branches = []
s = subtask(task_chain)
for number in numbers:
sig = s.clone(args=(number,))
if isinstance(sig, celery.canvas._chain):
first_task = sig.tasks[0]
first_task.args = (number,)
branches.append(sig)
return self.replace(group(branches))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment