(ns kafka.test-helpers
  "Here, we define a simpler version of jackdaw's test-machine

   Tries to capture the two essential features of jackdaw via
   a simple functional interface rather than having to learn a
   whole new data format to write tests.

   See `journal`/`with-journal` and `wait-for` for the main
   entrypoints to this ns.

   Also, `with-bg-process` for running tests that require
   programs running in the background.
  "
  (:require
   [clojure.stacktrace :as strace]
   [clojure.java.io :as io]
   [com.brunobonacci.mulog :as u]])
  (:import
   (java.io File)
   (java.util UUID Properties)
   (java.time Duration)
   (org.apache.kafka.common TopicPartition)
   (io.confluent.kafka.serializers KafkaAvroSerializer)
   (org.apache.kafka.common.serialization Serdes)
   (org.apache.kafka.clients.consumer KafkaConsumer ConsumerRebalanceListener)
   (org.apache.kafka.clients.producer KafkaProducer ProducerRecord)))

(defn properties
  [in]
  (cond
    (instance? File in)
    (doto (Properties.)
      (.load (io/input-stream in)))

    (map? in)
    (let [p (Properties.)]
      (doseq [[k v] in]
        (.setProperty k v))
      p)))

(defn lazy-log
  "A lazy sequence of the records received by polling the supplied consumer"
  [consumer {:keys [poll-duration fuse-fn]}]
  (let [r (.poll consumer poll-duration)]
    (if (fuse-fn r)
      (lazy-cat r (lazy-log consumer {:poll-duration poll-duration
                                      :fuse-fn fuse-fn}))
      r)))

(defn clojure-record
  "A map representing the key features of a kafka `ConsumerRecord`"
  [rec]
  {:topic (.topic rec)
   :partition (.partition rec)
   :key (.key rec)
   :value (.value rec)})

(defn kafka-record
  "Creates a kafka `ProducerRecord` from a clojure map"
  [clj]
  (ProducerRecord. (:topic clj)
                   (:key clj)
                   (:value clj)))

(defn wait-for-assignment
  "Return a `ConsumerBalanceListener` that delivers `:partitions-assigned`
   to the provided `start-promise` when partitions have been assigned to
   this consumer

   This helps ensure the test consumer is ready to receive messages
   before we start sending them in a test."
  [start-promise]
  (reify ConsumerRebalanceListener
    (onPartitionsRevoked [_this _partitions-xs])
    (onPartitionsLost [_this _partitions-xs])
    (onPartitionsAssigned [_this partitions-xs]
      (deliver start-promise :partitions-assigned))))

(defn journal
  "Takes an atom `j` and in another thread, copy the output observed on the
   provided `topics` to the atom.

   Immediately returns a function that must be called to stop the journal.

   See `with-journal` for a way to call your wrap your test-function in a
   a way that makes the kafka output available as an atom

   Data Structures

   After reading a few messages are written to topic 'foo' for example, `@j`
   might look something like this

   [
     {:topic \"credits\" :partition 0 key 1 :value \"ben\"}
     {:topic \"credits\" :partition 0 key 1 :value \"ellie\"}
     {:topic \"credits\" :partition 0 key 1 :value \"simon\"}
     {:topic \"credits\" :partition 0 key 1 :value \"andy\"}
     {:topic \"credits\" :partition 0 key 1 :value \"tom\"}
   ]"
  [j {:keys [consumer-fn topics]}]
  (let [strt (promise)
        stopped? (promise)
        done? (promise)

        bg-fn (fn []
                (try
                  (with-open [consumer (consumer-fn)]
                    (.subscribe consumer (or topics #".*") (wait-for-assignment strt))
                    (doseq [rec (->> (lazy-log consumer
                                               {:poll-duration (Duration/ofMillis 500)
                                                :fuse-fn #(do % (not (realized? stopped?)))})
                                     (map clojure-record))]
                      (swap! j conj rec))
                    ;; commit all offsets we've read in this consumer
                    (.commitSync consumer)
                    (deliver done? :success))
                  (catch Exception e
                    (strace/print-cause-trace e)
                    (deliver done? :fail)
                    (throw e))))
        bg-fut (future (bg-fn))


        stop (fn []
               (deliver stopped? true)
               @bg-fut)]

    @strt

    stop))


(defn with-journal
  "Convenience wrapper for running some test function `f` and providing it with
   access to an atom that mirrors one or more kafka topics.

   `journal-args` is a map containing :consumer-fn and :topics keys
                  the consumer-fn will be invoked with zero parameters
                  to produce a KafkaConsumer which will then immediately
                  subscribe to the topics listed in :topics

   `f` is a function that will be invoked with the journal atom as the single
       parameter

   Typically some test will be performed within the `f`."
  [journal-args f]
  (let [j (atom [])
        stop-fn (journal j journal-args)]
    (try
      (f j)
      (finally
        (stop-fn)))))

(defn wait-for
  "Monitor changes to some atom `jrnl` until the new-value satisfies predicate `pred`.

   Primary use for this is to wait until all expected output has arrived on some
   topic before making test-assertions"
  [jrnl pred]
  (let [id (str (UUID/randomUUID))
        done? (promise)
        done-when-f (fn [k r _old-val new-val]
                      (let [rslt (pred new-val)]
                        (when rslt
                          (remove-watch jrnl k)
                          (deliver done? :done))))]

    (add-watch jrnl id done-when-f)
    (u/log ::wait-for :id id :done? @done?)
    @jrnl))

(defn call-with-bg-proc
  "Invoke `f` after starting the background process represented by `cmd`
   and waiting until we've seen a line in the output stream that satisfies
   the predicate `started?`"
  [cmd f started?]
  (let [pb (ProcessBuilder. (into-array String cmd))
        proc (.start pb)
        br (io/reader (.getInputStream proc))]
    (try
      ;; read process output until we find a line that tells us the app
      ;; has successfully started
      (loop [line (.readLine br)]
        (if (started? line)
          nil
          (recur (.readLine br))))

      ;; invoke the test-function
      (f)

      ;; clean-up
      (finally
        (.destroy proc)))))

(defmacro with-bg-process [cmd started? & body]
  `(call-with-bg-proc ~cmd (fn [] ~@body) ~started?))