Skip to content

Instantly share code, notes, and snippets.

@ant1fact
Created September 3, 2025 10:00
Show Gist options
  • Select an option

  • Save ant1fact/0dc34164fcb8ce2d0e89952124cb20dd to your computer and use it in GitHub Desktop.

Select an option

Save ant1fact/0dc34164fcb8ce2d0e89952124cb20dd to your computer and use it in GitHub Desktop.
import asyncio
import os
from dotenv import load_dotenv
from pyzeebe import Job, JobController, ZeebeWorker, create_camunda_cloud_channel
load_dotenv()
client_id = os.environ["CAMUNDA_CLIENT_ID"]
client_secret = os.environ["CAMUNDA_CLIENT_SECRET"]
cluster_id = os.environ["CAMUNDA_CLUSTER_ID"]
channel = create_camunda_cloud_channel(client_id, client_secret, cluster_id)
worker = ZeebeWorker(channel)
async def on_error(exception: Exception, job: Job, job_controller: JobController):
"""on_error will be called when the task fails"""
print(exception)
await job_controller.set_error_status(
f"Failed to handle job {job}. Error: {str(exception)}"
)
@worker.task(task_type="credit-deduction", exception_handler=on_error)
async def credit_deduction_task() -> dict:
return {"output": {"task_type": "credit-deduction"}}
@worker.task(task_type="credit-card-charging", exception_handler=on_error)
async def credit_card_charging_task() -> dict:
return {"output": {"task_type": "credit-card-charging"}}
loop = asyncio.get_event_loop()
loop.run_until_complete(worker.work())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment