(ns kafka.test-helpers "Here, we define a simpler version of jackdaw's test-machine Tries to capture the two essential features of jackdaw via a simple functional interface rather than having to learn a whole new data format to write tests. See `journal`/`with-journal` and `wait-for` for the main entrypoints to this ns. Also, `with-bg-process` for running tests that require programs running in the background. " (:require [clojure.stacktrace :as strace] [clojure.java.io :as io] [com.brunobonacci.mulog :as u]]) (:import (java.io File) (java.util UUID Properties) (java.time Duration) (org.apache.kafka.common TopicPartition) (io.confluent.kafka.serializers KafkaAvroSerializer) (org.apache.kafka.common.serialization Serdes) (org.apache.kafka.clients.consumer KafkaConsumer ConsumerRebalanceListener) (org.apache.kafka.clients.producer KafkaProducer ProducerRecord))) (defn properties [in] (cond (instance? File in) (doto (Properties.) (.load (io/input-stream in))) (map? in) (let [p (Properties.)] (doseq [[k v] in] (.setProperty k v)) p))) (defn lazy-log "A lazy sequence of the records received by polling the supplied consumer" [consumer {:keys [poll-duration fuse-fn]}] (let [r (.poll consumer poll-duration)] (if (fuse-fn r) (lazy-cat r (lazy-log consumer {:poll-duration poll-duration :fuse-fn fuse-fn})) r))) (defn clojure-record "A map representing the key features of a kafka `ConsumerRecord`" [rec] {:topic (.topic rec) :partition (.partition rec) :key (.key rec) :value (.value rec)}) (defn kafka-record "Creates a kafka `ProducerRecord` from a clojure map" [clj] (ProducerRecord. (:topic clj) (:key clj) (:value clj))) (defn wait-for-assignment "Return a `ConsumerBalanceListener` that delivers `:partitions-assigned` to the provided `start-promise` when partitions have been assigned to this consumer This helps ensure the test consumer is ready to receive messages before we start sending them in a test." [start-promise] (reify ConsumerRebalanceListener (onPartitionsRevoked [_this _partitions-xs]) (onPartitionsLost [_this _partitions-xs]) (onPartitionsAssigned [_this partitions-xs] (deliver start-promise :partitions-assigned)))) (defn journal "Takes an atom `j` and in another thread, copy the output observed on the provided `topics` to the atom. Immediately returns a function that must be called to stop the journal. See `with-journal` for a way to call your wrap your test-function in a a way that makes the kafka output available as an atom Data Structures After reading a few messages are written to topic 'foo' for example, `@j` might look something like this [ {:topic \"credits\" :partition 0 key 1 :value \"ben\"} {:topic \"credits\" :partition 0 key 1 :value \"ellie\"} {:topic \"credits\" :partition 0 key 1 :value \"simon\"} {:topic \"credits\" :partition 0 key 1 :value \"andy\"} {:topic \"credits\" :partition 0 key 1 :value \"tom\"} ]" [j {:keys [consumer-fn topics]}] (let [strt (promise) stopped? (promise) done? (promise) bg-fn (fn [] (try (with-open [consumer (consumer-fn)] (.subscribe consumer (or topics #".*") (wait-for-assignment strt)) (doseq [rec (->> (lazy-log consumer {:poll-duration (Duration/ofMillis 500) :fuse-fn #(do % (not (realized? stopped?)))}) (map clojure-record))] (swap! j conj rec)) ;; commit all offsets we've read in this consumer (.commitSync consumer) (deliver done? :success)) (catch Exception e (strace/print-cause-trace e) (deliver done? :fail) (throw e)))) bg-fut (future (bg-fn)) stop (fn [] (deliver stopped? true) @bg-fut)] @strt stop)) (defn with-journal "Convenience wrapper for running some test function `f` and providing it with access to an atom that mirrors one or more kafka topics. `journal-args` is a map containing :consumer-fn and :topics keys the consumer-fn will be invoked with zero parameters to produce a KafkaConsumer which will then immediately subscribe to the topics listed in :topics `f` is a function that will be invoked with the journal atom as the single parameter Typically some test will be performed within the `f`." [journal-args f] (let [j (atom []) stop-fn (journal j journal-args)] (try (f j) (finally (stop-fn))))) (defn wait-for "Monitor changes to some atom `jrnl` until the new-value satisfies predicate `pred`. Primary use for this is to wait until all expected output has arrived on some topic before making test-assertions" [jrnl pred] (let [id (str (UUID/randomUUID)) done? (promise) done-when-f (fn [k r _old-val new-val] (let [rslt (pred new-val)] (when rslt (remove-watch jrnl k) (deliver done? :done))))] (add-watch jrnl id done-when-f) (u/log ::wait-for :id id :done? @done?) @jrnl)) (defn call-with-bg-proc "Invoke `f` after starting the background process represented by `cmd` and waiting until we've seen a line in the output stream that satisfies the predicate `started?`" [cmd f started?] (let [pb (ProcessBuilder. (into-array String cmd)) proc (.start pb) br (io/reader (.getInputStream proc))] (try ;; read process output until we find a line that tells us the app ;; has successfully started (loop [line (.readLine br)] (if (started? line) nil (recur (.readLine br)))) ;; invoke the test-function (f) ;; clean-up (finally (.destroy proc))))) (defmacro with-bg-process [cmd started? & body] `(call-with-bg-proc ~cmd (fn [] ~@body) ~started?))