Skip to content

Instantly share code, notes, and snippets.

@helton
Created October 7, 2024 14:30
Show Gist options
  • Save helton/8b5d60540337e7e725727e2c1bce4043 to your computer and use it in GitHub Desktop.
Save helton/8b5d60540337e7e725727e2c1bce4043 to your computer and use it in GitHub Desktop.
Run and wait (blocking) a celery workflow to complete
def run_workflow(title: str, workflow, timeout: int = 30, poll_interval: int = 1):
start_time = datetime.now(timezone.utc)
elapsed_time = 0
result = workflow.delay()
print(title)
print(f"πŸ”„ Workflow '{result}' created")
print(f"⏳ Waiting until result is available (timeout {timeout}s)...")
while not result.ready():
current_time = datetime.now(timezone.utc)
elapsed_time = (current_time - start_time).total_seconds()
print(f"πŸ” Result not ready yet. Checking again in {poll_interval}s...")
if elapsed_time > timeout:
print("⏰ Timeout reached. Workflow did not complete in time. πŸ›‘")
return
time.sleep(poll_interval)
if result.successful():
workflow_result = result.get()
print(f"βœ… Workflow '{result}' completed successfully in {elapsed_time:.2f}s. Result:\n{workflow_result}")
else:
print("❌ Workflow '{result}' failed. 😞")
print(f"πŸ“ Traceback:\n{result.traceback}")
print('=' * 100)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment