Skip to content

Instantly share code, notes, and snippets.

@teaforthecat
Created December 18, 2015 03:30
Show Gist options
  • Select an option

  • Save teaforthecat/ec1efa73e9514e229768 to your computer and use it in GitHub Desktop.

Select an option

Save teaforthecat/ec1efa73e9514e229768 to your computer and use it in GitHub Desktop.
An onyx job builder, basically a shortcut to this: kafka-input->function->kafka-output
(ns bones.jobs
" given a symbol and config, build all components of an onyx job
with three tasks kafka-input->function->kafka-output
(def x.y/fn [s] (str s \"-yo\"))
(api/submit-jobs (bones.jobs/build-jobs {} [:x.y/fn]))
(kafka/produce \"x.y..fn-input\" \"hello\")
(kafka/consume \"x.y..fn-output\") => \"hello-yo\"
")
(defn topic-reader [^String topic]
"builds a catalog entry that reads from a kafka topic"
{:onyx/name (keyword topic)
:onyx/plugin :onyx.plugin.kafka/read-messages
:onyx/batch-size 1
:onyx/type :input
:onyx/medium :kafka
:kafka/topic topic
:kafka/zookeeper "127.0.0.1:2181" ;; can be updated in conf
:kafka/deserializer-fn :bones.serializer/deserializer}) ;; can be updated in conf
(defn topic-writer [^String topic]
"builds a catalog entry that writes to a kafka topic"
{:onyx/name (keyword topic)
:onyx/plugin :onyx.plugin.kafka/write-messages
:onyx/batch-size 1
:onyx/type :output
:onyx/medium :kafka
:kafka/topic topic
:kafka/zookeeper "127.0.0.1:2181" ;; can be updated in conf
:kafka/serializer-fn :bones.serializer/serializer}) ;; can be updated in conf
(defn topic-function [^clojure.lang.Keyword ns-fn]
"builds a catalog entry that performs some user function"
{:onyx/name ns-fn
:onyx/fn ns-fn
:onyx/batch-size 1
:onyx/type :function})
(defn kafka-lifecycle [input-task output-task]
[{:lifecycle/task input-task
:lifecycle/calls :onyx.plugin.kafka/read-messages-calls}
{:lifecycle/task output-task
:lifecycle/calls :onyx.plugin.kafka/write-messages-calls}])
(defn sym-to-topic [^clojure.lang.Keyword job-sym]
(-> (str job-sym)
(clojure.string/replace "/" "..") ;; / is illegal in kafka topic name
(subs 1))) ;; remove leading colon
(defn topic-to-sym [^String topic-name]
(-> topic-name
(clojure.string/replace ".." "/") ;; puts the / back
(keyword))) ;; put the colon back
(defn topic-name-input [^clojure.lang.Keyword job-sym]
(str (sym-to-topic job-sym) "-input"))
(defn topic-name-output [^clojure.lang.Keyword job-sym]
(str (sym-to-topic job-sym) "-output"))
(defn inject-zookeeper-address [catalog address]
(mapv (fn [c]
(if (= (:onyx/medium c) :kafka)
(assoc c :kafka/zookeeper address)
c))
catalog))
(defn build-catalog-for-job [job-sym]
[(topic-reader (topic-name-input job-sym))
(topic-function job-sym)
(topic-writer (topic-name-output job-sym))])
(defn build-workflow-entry [job-sym]
"routes segments through a function in a kafka-function-kafka sandwich
given a symbol x, create two vectors as: [[x-input x] [x x-output]]"
(let [input (topic-name-input job-sym)
output (topic-name-output job-sym)
first-flow (mapv keyword [input job-sym])
second-flow (mapv keyword [job-sym output])]
[first-flow
second-flow]))
(defn build-lifecycle-entries [job-sym]
(kafka-lifecycle (keyword (topic-name-input job-sym))
(keyword (topic-name-output job-sym))))
(defn build-default-job [job-sym]
{:workflow (build-workflow-entry job-sym)
:catalog (build-catalog-for-job job-sym)
:lifecycles (build-lifecycle-entries job-sym)
:task-scheduler :onyx.task-scheduler/greedy})
(defn build-configured-job [conf job-sym]
"here we combine the configurable bits with the built bits"
(let [job (build-default-job job-sym)]
(cond-> job
(:zookeeper/address conf) (update :catalog #(inject-zookeeper-address % (:zookeeper/address conf)))
(:onyx.task-scheduler conf) (assoc :task-scheduler (:onyx.task-scheduler conf)))))
(defn build-jobs [conf jobs]
(mapv (partial build-configured-job conf) jobs))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment