Created
October 17, 2024 15:06
-
-
Save helton/9dce0b19fa73bb02567e940596bb2df4 to your computer and use it in GitHub Desktop.
Celery Dynamic Workflow
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import random | |
from shared.app import app | |
from datetime import datetime, timezone | |
import time | |
from ulid import ulid | |
def run_workflow(title: str, workflow, timeout: int = 600, poll_interval: int = 1): | |
start_time = datetime.now(timezone.utc) | |
elapsed_time = 0 | |
result = workflow.delay() | |
print(result) | |
workflow_id = result.id | |
print(title) | |
print(f"π Workflow '{workflow_id}' created:") | |
print(f"β³ Waiting until result is available (timeout {timeout}s)...") | |
while not result.ready(): | |
current_time = datetime.now(timezone.utc) | |
elapsed_time = (current_time - start_time).total_seconds() | |
print(f"π Result not ready yet. Checking again in {poll_interval}s...") | |
if elapsed_time > timeout: | |
print(f"β° Timeout reached. Workflow id='{workflow_id}' did not complete in time. π") | |
print('=' * 100) | |
return | |
time.sleep(poll_interval) | |
if result.successful(): | |
workflow_result = result.get(disable_sync_subtasks=False) | |
print(f"β Workflow id='{workflow_id}' completed successfully in {elapsed_time:.2f}s. Result:\n{workflow_result}") | |
else: | |
print(f"β Workflow id='{workflow_id}' failed. π") | |
print(f"π Traceback:\n{result.traceback}") | |
print('=' * 100) | |
if __name__ == "__main__": | |
run_workflow( | |
title="Dynamic Workflow", | |
workflow= | |
app.signature("generate_list", kwargs={"amount": random.randint(1, 10)}) | |
| app.signature("process_numbers_individually", args=(app.signature("double_number") | app.signature("square_number"),)) | |
| app.signature("sum_numbers") | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import celery | |
from celery import chain, group, subtask | |
from celery.utils.log import get_task_logger | |
from typing import List | |
from shared.app import app | |
logger = get_task_logger(__name__) | |
@app.task(name="generate_list") | |
def generate_list(amount: int) -> List[int]: | |
logger.info(f"Generating list of integers up to {amount}.") | |
return list(range(1, amount + 1)) | |
@app.task(name="double_number") | |
def double_number(x: int) -> int: | |
logger.info(f"Doubling number: {x}") | |
return x * 2 | |
@app.task(name="square_number") | |
def square_number(x: int) -> int: | |
logger.info(f"Squaring number: {x}") | |
return x * x | |
@app.task(name="sum_numbers") | |
def sum_numbers(numbers: List[int]) -> int: | |
logger.info(f"Summing numbers: {numbers}") | |
return sum(numbers) | |
# reference: https://blog.det.life/replacing-celery-tasks-inside-a-chain-b1328923fb02 | |
@app.task(name="process_numbers_individually", bind=True) | |
def process_numbers_individually(self, numbers: List[int], task_chain): | |
logger.info(f"Processing all numbers: {numbers}") | |
branches = [] | |
s = subtask(task_chain) | |
for number in numbers: | |
sig = s.clone(args=(number,)) | |
if isinstance(sig, celery.canvas._chain): | |
first_task = sig.tasks[0] | |
first_task.args = (number,) | |
branches.append(sig) | |
return self.replace(group(branches)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment