Skip to content

Instantly share code, notes, and snippets.

@mlimotte
Created August 31, 2025 13:20
Show Gist options
  • Save mlimotte/bb060acaf5c76652cebcc8e9a189ec91 to your computer and use it in GitHub Desktop.
Save mlimotte/bb060acaf5c76652cebcc8e9a189ec91 to your computer and use it in GitHub Desktop.
Clojure interface for Camunda, wraps io.camunda/zeebe-client-java
;; Clojure interface for Camunda. It's a nice product, but way too expensive for a startup.
;; I'm going to look at n8n.io instead. But in case, if you want to use Camunda, this code
;; may give you a headstart. Basic support for client, external service task workers,
;; starting jobs.
;; This namespace provides:
;; - Camunda client
;; - retrieving variables froma job
;; - complete a job
;; - fail a job
;; - A job handler (i.e. create a worker from a fn)
;; - Start a job (aka process instance)
;; - publish messages (another way to start a process instance)
;; - cancel a process instance
;; - Push (set) variables into an existing process instance
;; deps.edn
; io.camunda/zeebe-client-java {:mvn/version "8.7.9"}
(ns skipp.util.camunda
(:require [com.rpl.specter :as sp]
[skipp.util.data :as sudata]
[com.stuartsierra.component :as component]
[jsonista.core :as jsonista]
[taoensso.timbre :as timbre])
(:import [java.net URI]
[java.time Duration]
io.camunda.zeebe.client.ZeebeClient
io.camunda.zeebe.client.api.response.ActivatedJob
io.camunda.zeebe.client.impl.oauth.OAuthCredentialsProviderBuilder
[io.camunda.zeebe.client.api.worker JobWorker JobHandler JobClient]))
;; Reference: https://javadoc.io/doc/io.camunda/zeebe-client-java/8.7.9/index.html
;;; Config and Client
(def CREDS_PATH "tmp/camunda-credentials.json")
(defn load-local-config
[]
(sudata/load-path CREDS_PATH))
(defn create-zeebe-client
[camunda-config]
(let [credentials-provider (-> (OAuthCredentialsProviderBuilder.)
(.authorizationServerUrl (get camunda-config :ZEEBE_AUTHORIZATION_SERVER_URL))
(.audience (get camunda-config :ZEEBE_TOKEN_AUDIENCE))
(.clientId (get camunda-config :ZEEBE_CLIENT_ID))
(.clientSecret (get camunda-config :ZEEBE_CLIENT_SECRET))
;; For additional target resources:
;; (.resource (get camunda-config :RESOURCE_INDICATOR))
(.build))
client (-> (ZeebeClient/newClientBuilder)
(.grpcAddress (URI/create (get camunda-config :ZEEBE_GRPC_ADDRESS)))
(.restAddress (URI/create (get camunda-config :ZEEBE_REST_ADDRESS)))
(.credentialsProvider credentials-provider)
(.build))]
client))
(defn test-client
[client]
(-> client
(.newTopologyRequest)
(.send)
(.join)))
;; For local development:
(def zeebe-client (delay (create-zeebe-client (load-local-config))))
(defrecord ZeebeClientComponent [creds-and-config]
;; Note: for creds-and-config, either get from a local file `(load-local-config)`, or
;; Store and retrieve from an AWS secret.
component/Lifecycle
(start [this]
(assoc this :conn (create-zeebe-client creds-and-config)))
(stop [this]
(some-> (:conn this) (.close))
(dissoc this :conn)))
;;; Helpers
(defn keyword->str
"Convert a keyword to its string representation.
For namespaced keywords, returns 'namespace/name'.
For simple keywords, returns just the name."
[kw]
(if-let [n (namespace kw)]
(str n "/" (name kw))
(name kw)))
(defn normalize-variable-map
[m]
(sp/transform [(sp/walker keyword?)] keyword->str m))
(defn keywordize
[x]
(cond
(map? x) (into {}
(map (fn [[k v]]
[(if (string? k) (keyword k) k) (keywordize v)]))
x)
(coll? x) (mapv keywordize x)
:else x))
(defn get-variables
[job]
(jsonista/read-value (.getVariables job) (jsonista/object-mapper {:decode-key-fn true})))
;;; Workers and Jobs (doc: https://docs.camunda.io/docs/components/concepts/job-workers/)
(defn complete-job
"Complete a job successfully."
[job-client activated-job variables]
(-> job-client
(.newCompleteCommand (.getKey activated-job))
(cond-> variables (.variables (normalize-variable-map variables)))
(.send)
(.join)))
(defn fail-job
"Fail a job with error message and optional retries."
([job-client activated-job error-msg]
(fail-job job-client activated-job error-msg 0))
([job-client activated-job error-msg retries]
(-> job-client
(.newFailCommand (.getKey activated-job))
(.retries retries)
(.errorMessage error-msg)
(.send)
(.join))))
(defn ^JobHandler clj-job-handler
"Creates a JobHandler from a Clojure function.
Handler fn should take [job-client activated-job] and return job result or nil.
job-result is a Map, and can include variables to put back into the flow.
E.g. `{:variables {:foo \"bar\"}}`
"
[handler-fn]
(reify JobHandler
(handle [this job-client activated-job]
(try
(let [{:keys [variables]} (handler-fn job-client activated-job)]
(complete-job job-client activated-job variables))
(catch Exception e
(timbre/error e "Camunda JobHandler error, failing job"
{:job-id (.getKey activated-job)})
(fail-job job-client activated-job (.getMessage e)))))))
(defn create-job-worker
"Creates a job worker.
Required args:
- client - ZeebeClient instance
- job-type - (String) job type name, should match a job-type name in one of our Camunda BPMN flows.
- handler - Function taking [job-client activated-job] (see skipp.util.camunda/clj-job-handler)
- Options:
- :timeout - Duration, defaults to 10 seconds
- :max-jobs-active - Int, defaults to 16
- :worker-name - String worker name"
[client job-type handler
{:keys [timeout max-jobs-active worker-name]
:as options
:or {timeout (Duration/ofSeconds 10)
max-jobs-active 16}}]
;; Based on code at https://docs.camunda.io/docs/apis-tools/java-client-examples/job-worker-open/
(-> client
(.newWorker)
(.jobType job-type)
(.handler (clj-job-handler handler))
(.timeout timeout)
(.maxJobsActive max-jobs-active)
(cond-> worker-name (.name worker-name))
(.open)))
(defrecord JobWorkerManager [zeebe-client worker-configs]
;; A System Component to start one or more Workers
;; worker-configs is a Map of job-type-name => Map
;; See skipp.util.camunda/create-job-worker
component/Lifecycle
(start [this]
(let [client (:conn zeebe-client)
workers (mapv (fn [[job-type {:keys [handler] :as options}]]
(create-job-worker client
(name job-type)
(partial handler this)
(dissoc options :handler)))
worker-configs)]
(assoc this :workers workers)))
(stop [this]
(doseq [worker (:workers this)]
(.close worker))
(dissoc this :workers)))
;(defn example-handler
; "Example job handler that just logs and completes."
; [component-map job-client activated-job]
; (println "Processing job:"
; (.getKey activated-job)
; (.getVariables activated-job)))
;;; Process Instance Management
;; https://docs.camunda.io/docs/components/concepts/processes/
(defn start-process-instance
"Start a BPMN process instance. Returns the process-instance-key (Long).
Args:
- client - ZeebeClient instance
- bpmn-process-id - BPMN process definition key (string)
- Options:
- :variables - Map of process variables (will be JSON serialized)
- :version - Specific process version (defaults to latest)
- :await-result - Boolean, wait for process completion (default false)"
[client bpmn-process-id
{:keys [variables version await-result]
:as options
:or {await-result false}}]
(let [command (-> client
(.newCreateInstanceCommand)
(.bpmnProcessId bpmn-process-id)
(cond-> version (.version version)
(not version) (.latestVersion)
variables (.variables (jsonista/write-value-as-string variables))))
response (if await-result
(-> command (.withResult) (.send) (.join))
(-> command (.send) (.join)))]
{:process-id bpmn-process-id
:process-definition-key (.getProcessDefinitionKey response)
:process-instance-key (.getProcessInstanceKey response)
:tenant-id (.getTenantId response)
:version (.getVersion response)}))
(defn publish-message
"Publish a message to trigger message start events or correlate with running processes.
Required args:
- client - ZeebeClient instance
- message-name - String name of the message
- Options:
- :correlation-key - String correlation key (required for intermediate message events)
- :variables - Map of message variables
- :time-to-live - Duration for message TTL (default 1 minute)"
[client message-name
{:keys [correlation-key variables time-to-live]
:as options
:or {time-to-live (Duration/ofMinutes 1)}}]
(-> client
(.newPublishMessageCommand)
(.messageName message-name)
(cond-> correlation-key (.correlationKey correlation-key)
variables (.variables (jsonista/write-value-as-string variables)))
(.timeToLive time-to-live)
(.send)
(.join)))
(defn cancel-process-instance
"Cancel a running process instance.
Args:
- client - ZeebeClient instance
- process-instance-key - Long key of the process instance to cancel"
[client process-instance-key]
(-> client
(.newCancelInstanceCommand process-instance-key)
(.send)
(.join)))
(defn set-variables-for-running-job
[zeebe-client job variables-map]
(let [element-instance-key (.getElementInstanceKey job)]
(timbre/info (format "Camunda: setting variables on %s: %s"
element-instance-key
(keys variables-map)))
(-> zeebe-client
(.newSetVariablesCommand element-instance-key)
(.variables (normalize-variable-map variables-map))
(.send))))
(comment
(require '[skipp.util.camunda :as camunda :refer [zeebe-client]])
;; Start and wait for completion
(start-process-instance @zeebe-client "quick-price-1"
{:await-result false
:variables {:foo 6}})
;; Start via message trigger
(publish-message @zeebe-client "CustomerOrder"
{:correlation-key "customer-123"
:variables {:order-id "ord-456"}})
;; Cancel a process instance
(cancel-process-instance @zeebe-client 12345)
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment