Created
August 31, 2025 13:20
-
-
Save mlimotte/bb060acaf5c76652cebcc8e9a189ec91 to your computer and use it in GitHub Desktop.
Clojure interface for Camunda, wraps io.camunda/zeebe-client-java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| ;; Clojure interface for Camunda. It's a nice product, but way too expensive for a startup. | |
| ;; I'm going to look at n8n.io instead. But in case, if you want to use Camunda, this code | |
| ;; may give you a headstart. Basic support for client, external service task workers, | |
| ;; starting jobs. | |
| ;; This namespace provides: | |
| ;; - Camunda client | |
| ;; - retrieving variables froma job | |
| ;; - complete a job | |
| ;; - fail a job | |
| ;; - A job handler (i.e. create a worker from a fn) | |
| ;; - Start a job (aka process instance) | |
| ;; - publish messages (another way to start a process instance) | |
| ;; - cancel a process instance | |
| ;; - Push (set) variables into an existing process instance | |
| ;; deps.edn | |
| ; io.camunda/zeebe-client-java {:mvn/version "8.7.9"} | |
| (ns skipp.util.camunda | |
| (:require [com.rpl.specter :as sp] | |
| [skipp.util.data :as sudata] | |
| [com.stuartsierra.component :as component] | |
| [jsonista.core :as jsonista] | |
| [taoensso.timbre :as timbre]) | |
| (:import [java.net URI] | |
| [java.time Duration] | |
| io.camunda.zeebe.client.ZeebeClient | |
| io.camunda.zeebe.client.api.response.ActivatedJob | |
| io.camunda.zeebe.client.impl.oauth.OAuthCredentialsProviderBuilder | |
| [io.camunda.zeebe.client.api.worker JobWorker JobHandler JobClient])) | |
| ;; Reference: https://javadoc.io/doc/io.camunda/zeebe-client-java/8.7.9/index.html | |
| ;;; Config and Client | |
| (def CREDS_PATH "tmp/camunda-credentials.json") | |
| (defn load-local-config | |
| [] | |
| (sudata/load-path CREDS_PATH)) | |
| (defn create-zeebe-client | |
| [camunda-config] | |
| (let [credentials-provider (-> (OAuthCredentialsProviderBuilder.) | |
| (.authorizationServerUrl (get camunda-config :ZEEBE_AUTHORIZATION_SERVER_URL)) | |
| (.audience (get camunda-config :ZEEBE_TOKEN_AUDIENCE)) | |
| (.clientId (get camunda-config :ZEEBE_CLIENT_ID)) | |
| (.clientSecret (get camunda-config :ZEEBE_CLIENT_SECRET)) | |
| ;; For additional target resources: | |
| ;; (.resource (get camunda-config :RESOURCE_INDICATOR)) | |
| (.build)) | |
| client (-> (ZeebeClient/newClientBuilder) | |
| (.grpcAddress (URI/create (get camunda-config :ZEEBE_GRPC_ADDRESS))) | |
| (.restAddress (URI/create (get camunda-config :ZEEBE_REST_ADDRESS))) | |
| (.credentialsProvider credentials-provider) | |
| (.build))] | |
| client)) | |
| (defn test-client | |
| [client] | |
| (-> client | |
| (.newTopologyRequest) | |
| (.send) | |
| (.join))) | |
| ;; For local development: | |
| (def zeebe-client (delay (create-zeebe-client (load-local-config)))) | |
| (defrecord ZeebeClientComponent [creds-and-config] | |
| ;; Note: for creds-and-config, either get from a local file `(load-local-config)`, or | |
| ;; Store and retrieve from an AWS secret. | |
| component/Lifecycle | |
| (start [this] | |
| (assoc this :conn (create-zeebe-client creds-and-config))) | |
| (stop [this] | |
| (some-> (:conn this) (.close)) | |
| (dissoc this :conn))) | |
| ;;; Helpers | |
| (defn keyword->str | |
| "Convert a keyword to its string representation. | |
| For namespaced keywords, returns 'namespace/name'. | |
| For simple keywords, returns just the name." | |
| [kw] | |
| (if-let [n (namespace kw)] | |
| (str n "/" (name kw)) | |
| (name kw))) | |
| (defn normalize-variable-map | |
| [m] | |
| (sp/transform [(sp/walker keyword?)] keyword->str m)) | |
| (defn keywordize | |
| [x] | |
| (cond | |
| (map? x) (into {} | |
| (map (fn [[k v]] | |
| [(if (string? k) (keyword k) k) (keywordize v)])) | |
| x) | |
| (coll? x) (mapv keywordize x) | |
| :else x)) | |
| (defn get-variables | |
| [job] | |
| (jsonista/read-value (.getVariables job) (jsonista/object-mapper {:decode-key-fn true}))) | |
| ;;; Workers and Jobs (doc: https://docs.camunda.io/docs/components/concepts/job-workers/) | |
| (defn complete-job | |
| "Complete a job successfully." | |
| [job-client activated-job variables] | |
| (-> job-client | |
| (.newCompleteCommand (.getKey activated-job)) | |
| (cond-> variables (.variables (normalize-variable-map variables))) | |
| (.send) | |
| (.join))) | |
| (defn fail-job | |
| "Fail a job with error message and optional retries." | |
| ([job-client activated-job error-msg] | |
| (fail-job job-client activated-job error-msg 0)) | |
| ([job-client activated-job error-msg retries] | |
| (-> job-client | |
| (.newFailCommand (.getKey activated-job)) | |
| (.retries retries) | |
| (.errorMessage error-msg) | |
| (.send) | |
| (.join)))) | |
| (defn ^JobHandler clj-job-handler | |
| "Creates a JobHandler from a Clojure function. | |
| Handler fn should take [job-client activated-job] and return job result or nil. | |
| job-result is a Map, and can include variables to put back into the flow. | |
| E.g. `{:variables {:foo \"bar\"}}` | |
| " | |
| [handler-fn] | |
| (reify JobHandler | |
| (handle [this job-client activated-job] | |
| (try | |
| (let [{:keys [variables]} (handler-fn job-client activated-job)] | |
| (complete-job job-client activated-job variables)) | |
| (catch Exception e | |
| (timbre/error e "Camunda JobHandler error, failing job" | |
| {:job-id (.getKey activated-job)}) | |
| (fail-job job-client activated-job (.getMessage e))))))) | |
| (defn create-job-worker | |
| "Creates a job worker. | |
| Required args: | |
| - client - ZeebeClient instance | |
| - job-type - (String) job type name, should match a job-type name in one of our Camunda BPMN flows. | |
| - handler - Function taking [job-client activated-job] (see skipp.util.camunda/clj-job-handler) | |
| - Options: | |
| - :timeout - Duration, defaults to 10 seconds | |
| - :max-jobs-active - Int, defaults to 16 | |
| - :worker-name - String worker name" | |
| [client job-type handler | |
| {:keys [timeout max-jobs-active worker-name] | |
| :as options | |
| :or {timeout (Duration/ofSeconds 10) | |
| max-jobs-active 16}}] | |
| ;; Based on code at https://docs.camunda.io/docs/apis-tools/java-client-examples/job-worker-open/ | |
| (-> client | |
| (.newWorker) | |
| (.jobType job-type) | |
| (.handler (clj-job-handler handler)) | |
| (.timeout timeout) | |
| (.maxJobsActive max-jobs-active) | |
| (cond-> worker-name (.name worker-name)) | |
| (.open))) | |
| (defrecord JobWorkerManager [zeebe-client worker-configs] | |
| ;; A System Component to start one or more Workers | |
| ;; worker-configs is a Map of job-type-name => Map | |
| ;; See skipp.util.camunda/create-job-worker | |
| component/Lifecycle | |
| (start [this] | |
| (let [client (:conn zeebe-client) | |
| workers (mapv (fn [[job-type {:keys [handler] :as options}]] | |
| (create-job-worker client | |
| (name job-type) | |
| (partial handler this) | |
| (dissoc options :handler))) | |
| worker-configs)] | |
| (assoc this :workers workers))) | |
| (stop [this] | |
| (doseq [worker (:workers this)] | |
| (.close worker)) | |
| (dissoc this :workers))) | |
| ;(defn example-handler | |
| ; "Example job handler that just logs and completes." | |
| ; [component-map job-client activated-job] | |
| ; (println "Processing job:" | |
| ; (.getKey activated-job) | |
| ; (.getVariables activated-job))) | |
| ;;; Process Instance Management | |
| ;; https://docs.camunda.io/docs/components/concepts/processes/ | |
| (defn start-process-instance | |
| "Start a BPMN process instance. Returns the process-instance-key (Long). | |
| Args: | |
| - client - ZeebeClient instance | |
| - bpmn-process-id - BPMN process definition key (string) | |
| - Options: | |
| - :variables - Map of process variables (will be JSON serialized) | |
| - :version - Specific process version (defaults to latest) | |
| - :await-result - Boolean, wait for process completion (default false)" | |
| [client bpmn-process-id | |
| {:keys [variables version await-result] | |
| :as options | |
| :or {await-result false}}] | |
| (let [command (-> client | |
| (.newCreateInstanceCommand) | |
| (.bpmnProcessId bpmn-process-id) | |
| (cond-> version (.version version) | |
| (not version) (.latestVersion) | |
| variables (.variables (jsonista/write-value-as-string variables)))) | |
| response (if await-result | |
| (-> command (.withResult) (.send) (.join)) | |
| (-> command (.send) (.join)))] | |
| {:process-id bpmn-process-id | |
| :process-definition-key (.getProcessDefinitionKey response) | |
| :process-instance-key (.getProcessInstanceKey response) | |
| :tenant-id (.getTenantId response) | |
| :version (.getVersion response)})) | |
| (defn publish-message | |
| "Publish a message to trigger message start events or correlate with running processes. | |
| Required args: | |
| - client - ZeebeClient instance | |
| - message-name - String name of the message | |
| - Options: | |
| - :correlation-key - String correlation key (required for intermediate message events) | |
| - :variables - Map of message variables | |
| - :time-to-live - Duration for message TTL (default 1 minute)" | |
| [client message-name | |
| {:keys [correlation-key variables time-to-live] | |
| :as options | |
| :or {time-to-live (Duration/ofMinutes 1)}}] | |
| (-> client | |
| (.newPublishMessageCommand) | |
| (.messageName message-name) | |
| (cond-> correlation-key (.correlationKey correlation-key) | |
| variables (.variables (jsonista/write-value-as-string variables))) | |
| (.timeToLive time-to-live) | |
| (.send) | |
| (.join))) | |
| (defn cancel-process-instance | |
| "Cancel a running process instance. | |
| Args: | |
| - client - ZeebeClient instance | |
| - process-instance-key - Long key of the process instance to cancel" | |
| [client process-instance-key] | |
| (-> client | |
| (.newCancelInstanceCommand process-instance-key) | |
| (.send) | |
| (.join))) | |
| (defn set-variables-for-running-job | |
| [zeebe-client job variables-map] | |
| (let [element-instance-key (.getElementInstanceKey job)] | |
| (timbre/info (format "Camunda: setting variables on %s: %s" | |
| element-instance-key | |
| (keys variables-map))) | |
| (-> zeebe-client | |
| (.newSetVariablesCommand element-instance-key) | |
| (.variables (normalize-variable-map variables-map)) | |
| (.send)))) | |
| (comment | |
| (require '[skipp.util.camunda :as camunda :refer [zeebe-client]]) | |
| ;; Start and wait for completion | |
| (start-process-instance @zeebe-client "quick-price-1" | |
| {:await-result false | |
| :variables {:foo 6}}) | |
| ;; Start via message trigger | |
| (publish-message @zeebe-client "CustomerOrder" | |
| {:correlation-key "customer-123" | |
| :variables {:order-id "ord-456"}}) | |
| ;; Cancel a process instance | |
| (cancel-process-instance @zeebe-client 12345) | |
| ) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment