Last active
March 8, 2024 18:42
-
-
Save cddr/46cbb9cfb631cb896f12309cb6e68efe to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns kafka.test-helpers | |
"Here, we define a simpler version of jackdaw's test-machine | |
Tries to capture the two essential features of jackdaw via | |
a simple functional interface rather than having to learn a | |
whole new data format to write tests. | |
See `journal`/`with-journal` and `wait-for` for the main | |
entrypoints to this ns. | |
Also, `with-bg-process` for running tests that require | |
programs running in the background. | |
" | |
(:require | |
[clojure.stacktrace :as strace] | |
[clojure.java.io :as io] | |
[com.brunobonacci.mulog :as u]]) | |
(:import | |
(java.io File) | |
(java.util UUID Properties) | |
(java.time Duration) | |
(org.apache.kafka.common TopicPartition) | |
(io.confluent.kafka.serializers KafkaAvroSerializer) | |
(org.apache.kafka.common.serialization Serdes) | |
(org.apache.kafka.clients.consumer KafkaConsumer ConsumerRebalanceListener) | |
(org.apache.kafka.clients.producer KafkaProducer ProducerRecord))) | |
(defn properties | |
[in] | |
(cond | |
(instance? File in) | |
(doto (Properties.) | |
(.load (io/input-stream in))) | |
(map? in) | |
(let [p (Properties.)] | |
(doseq [[k v] in] | |
(.setProperty k v)) | |
p))) | |
(defn lazy-log | |
"A lazy sequence of the records received by polling the supplied consumer" | |
[consumer {:keys [poll-duration fuse-fn]}] | |
(let [r (.poll consumer poll-duration)] | |
(if (fuse-fn r) | |
(lazy-cat r (lazy-log consumer {:poll-duration poll-duration | |
:fuse-fn fuse-fn})) | |
r))) | |
(defn clojure-record | |
"A map representing the key features of a kafka `ConsumerRecord`" | |
[rec] | |
{:topic (.topic rec) | |
:partition (.partition rec) | |
:key (.key rec) | |
:value (.value rec)}) | |
(defn kafka-record | |
"Creates a kafka `ProducerRecord` from a clojure map" | |
[clj] | |
(ProducerRecord. (:topic clj) | |
(:key clj) | |
(:value clj))) | |
(defn wait-for-assignment | |
"Return a `ConsumerBalanceListener` that delivers `:partitions-assigned` | |
to the provided `start-promise` when partitions have been assigned to | |
this consumer | |
This helps ensure the test consumer is ready to receive messages | |
before we start sending them in a test." | |
[start-promise] | |
(reify ConsumerRebalanceListener | |
(onPartitionsRevoked [_this _partitions-xs]) | |
(onPartitionsLost [_this _partitions-xs]) | |
(onPartitionsAssigned [_this partitions-xs] | |
(deliver start-promise :partitions-assigned)))) | |
(defn journal | |
"Takes an atom `j` and in another thread, copy the output observed on the | |
provided `topics` to the atom. | |
Immediately returns a function that must be called to stop the journal. | |
See `with-journal` for a way to call your wrap your test-function in a | |
a way that makes the kafka output available as an atom | |
Data Structures | |
After reading a few messages are written to topic 'foo' for example, `@j` | |
might look something like this | |
[ | |
{:topic \"credits\" :partition 0 key 1 :value \"ben\"} | |
{:topic \"credits\" :partition 0 key 1 :value \"ellie\"} | |
{:topic \"credits\" :partition 0 key 1 :value \"simon\"} | |
{:topic \"credits\" :partition 0 key 1 :value \"andy\"} | |
{:topic \"credits\" :partition 0 key 1 :value \"tom\"} | |
]" | |
[j {:keys [consumer-fn topics]}] | |
(let [strt (promise) | |
stopped? (promise) | |
done? (promise) | |
bg-fn (fn [] | |
(try | |
(with-open [consumer (consumer-fn)] | |
(.subscribe consumer (or topics #".*") (wait-for-assignment strt)) | |
(doseq [rec (->> (lazy-log consumer | |
{:poll-duration (Duration/ofMillis 500) | |
:fuse-fn #(do % (not (realized? stopped?)))}) | |
(map clojure-record))] | |
(swap! j conj rec)) | |
;; commit all offsets we've read in this consumer | |
(.commitSync consumer) | |
(deliver done? :success)) | |
(catch Exception e | |
(strace/print-cause-trace e) | |
(deliver done? :fail) | |
(throw e)))) | |
bg-fut (future (bg-fn)) | |
stop (fn [] | |
(deliver stopped? true) | |
@bg-fut)] | |
@strt | |
stop)) | |
(defn with-journal | |
"Convenience wrapper for running some test function `f` and providing it with | |
access to an atom that mirrors one or more kafka topics. | |
`journal-args` is a map containing :consumer-fn and :topics keys | |
the consumer-fn will be invoked with zero parameters | |
to produce a KafkaConsumer which will then immediately | |
subscribe to the topics listed in :topics | |
`f` is a function that will be invoked with the journal atom as the single | |
parameter | |
Typically some test will be performed within the `f`." | |
[journal-args f] | |
(let [j (atom []) | |
stop-fn (journal j journal-args)] | |
(try | |
(f j) | |
(finally | |
(stop-fn))))) | |
(defn wait-for | |
"Monitor changes to some atom `jrnl` until the new-value satisfies predicate `pred`. | |
Primary use for this is to wait until all expected output has arrived on some | |
topic before making test-assertions" | |
[jrnl pred] | |
(let [id (str (UUID/randomUUID)) | |
done? (promise) | |
done-when-f (fn [k r _old-val new-val] | |
(let [rslt (pred new-val)] | |
(when rslt | |
(remove-watch jrnl k) | |
(deliver done? :done))))] | |
(add-watch jrnl id done-when-f) | |
(u/log ::wait-for :id id :done? @done?) | |
@jrnl)) | |
(defn call-with-bg-proc | |
"Invoke `f` after starting the background process represented by `cmd` | |
and waiting until we've seen a line in the output stream that satisfies | |
the predicate `started?`" | |
[cmd f started?] | |
(let [pb (ProcessBuilder. (into-array String cmd)) | |
proc (.start pb) | |
br (io/reader (.getInputStream proc))] | |
(try | |
;; read process output until we find a line that tells us the app | |
;; has successfully started | |
(loop [line (.readLine br)] | |
(if (started? line) | |
nil | |
(recur (.readLine br)))) | |
;; invoke the test-function | |
(f) | |
;; clean-up | |
(finally | |
(.destroy proc))))) | |
(defmacro with-bg-process [cmd started? & body] | |
`(call-with-bg-proc ~cmd (fn [] ~@body) ~started?)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment