Created
September 3, 2025 10:00
-
-
Save ant1fact/0dc34164fcb8ce2d0e89952124cb20dd to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import asyncio | |
| import os | |
| from dotenv import load_dotenv | |
| from pyzeebe import Job, JobController, ZeebeWorker, create_camunda_cloud_channel | |
| load_dotenv() | |
| client_id = os.environ["CAMUNDA_CLIENT_ID"] | |
| client_secret = os.environ["CAMUNDA_CLIENT_SECRET"] | |
| cluster_id = os.environ["CAMUNDA_CLUSTER_ID"] | |
| channel = create_camunda_cloud_channel(client_id, client_secret, cluster_id) | |
| worker = ZeebeWorker(channel) | |
| async def on_error(exception: Exception, job: Job, job_controller: JobController): | |
| """on_error will be called when the task fails""" | |
| print(exception) | |
| await job_controller.set_error_status( | |
| f"Failed to handle job {job}. Error: {str(exception)}" | |
| ) | |
| @worker.task(task_type="credit-deduction", exception_handler=on_error) | |
| async def credit_deduction_task() -> dict: | |
| return {"output": {"task_type": "credit-deduction"}} | |
| @worker.task(task_type="credit-card-charging", exception_handler=on_error) | |
| async def credit_card_charging_task() -> dict: | |
| return {"output": {"task_type": "credit-card-charging"}} | |
| loop = asyncio.get_event_loop() | |
| loop.run_until_complete(worker.work()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment