Created
February 7, 2021 22:39
-
-
Save souenzzo/003c15bd5a6c140debb580c555d7354c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns digital-wallet.main-test-old | |
(:require [clojure.test :refer [deftest]] | |
[midje.sweet :refer [fact =>]]) | |
(:import (clojure.lang ILookup) | |
(java.lang AutoCloseable) | |
(java.util Properties) | |
(org.apache.kafka.common.serialization Serdes Serdes$StringSerde Serdes$LongSerde) | |
(org.apache.kafka.streams TopologyTestDriver StreamsConfig Topology KeyValue) | |
(org.apache.kafka.streams.processor.api ProcessorSupplier Processor ProcessorContext Record) | |
(org.apache.kafka.streams.state Stores KeyValueStore) | |
(java.time Duration) | |
(org.apache.kafka.streams.processor PunctuationType Punctuator))) | |
(set! *warn-on-reflection* true) | |
(comment | |
;; This is the model of a transaction entry | |
;; (use this model for the providers and BinoBank statements) | |
[{:transaction-id "{{UUID}}" | |
:description "CASH{{IN|OUT}} VIA {{TYPE}}" | |
:transaction-type "PIX|CARD" | |
:entry-date "yyyy-MM-dd'T'HH:mm:ss" | |
:amount 1000 | |
;; obs: CARD transactions are only DEBIT | |
:type "CREDIT|DEBIT"}] | |
;; This is the model of a webhook notification | |
{:message "new transaction for you and only you"} | |
;; useless message, just an alert | |
;; This is the model of the user (shared resource) | |
{:balance 0}) | |
(defn ^AutoCloseable ->app | |
[] | |
(let [*store (promise) | |
*ctx (promise) | |
flush-storage-punctuator (reify Punctuator | |
(punctuate [this time] | |
(let [^KeyValueStore store @*store | |
^ProcessorContext ctx @*ctx] | |
(doseq [^KeyValue kv (iterator-seq (.all store))] | |
(.forward ctx (Record. (.key kv) | |
(.value kv) | |
time)))))) | |
processor-supplier (reify ProcessorSupplier | |
(get [this] | |
(reify Processor | |
(init [this ctx] | |
(deliver *ctx ctx) | |
(.schedule ctx | |
(Duration/ofSeconds 60) | |
PunctuationType/WALL_CLOCK_TIME | |
flush-storage-punctuator) | |
(.schedule ctx | |
(Duration/ofSeconds 10) | |
PunctuationType/STREAM_TIME | |
flush-storage-punctuator)) | |
(process [this record] | |
(let [key (.key record) | |
value (.value record) | |
^KeyValueStore store @*store | |
old-value (.get store key)] | |
(when (or (nil? old-value) | |
(> value old-value)) | |
(.put store key value))))))) | |
store-builder (-> (Stores/keyValueStoreBuilder | |
(Stores/inMemoryKeyValueStore "aggStore") | |
(Serdes/String) | |
(Serdes/Long)) | |
(.withLoggingDisabled)) | |
topology (doto (Topology.) | |
(.addSource "sourceProcessor" | |
^"[Ljava.lang.String;" (into-array ["input-topic"])) | |
(.addProcessor "aggregator" | |
processor-supplier | |
^"[Ljava.lang.String;" (into-array ["sourceProcessor"])) | |
(.addStateStore store-builder ^"[Ljava.lang.String;" (into-array ["aggregator"])) | |
(.addSink "sinkProcessor" "result-topic" ^"[Ljava.lang.String;" (into-array ["aggregator"]))) | |
props (doto (Properties.) | |
(.setProperty StreamsConfig/APPLICATION_ID_CONFIG "maxAggregation") | |
(.setProperty StreamsConfig/BOOTSTRAP_SERVERS_CONFIG "dummy:1234") | |
(.setProperty StreamsConfig/DEFAULT_KEY_SERDE_CLASS_CONFIG (-> (Serdes/String) class .getName)) | |
(.setProperty StreamsConfig/DEFAULT_VALUE_SERDE_CLASS_CONFIG (-> (Serdes/Long) class .getName))) | |
test-driver (TopologyTestDriver. topology props) | |
string-serde (Serdes$StringSerde.) | |
long-serde (Serdes$LongSerde.) | |
input-topic (.createInputTopic test-driver "input-topic" | |
(.serializer string-serde) (.serializer long-serde)) | |
output-topic (.createOutputTopic test-driver "result-topic" | |
(.deserializer string-serde) (.deserializer long-serde)) | |
store (.getKeyValueStore test-driver "aggStore") | |
kv {::output-topic output-topic | |
::input-topic input-topic | |
::store store | |
::test-driver test-driver}] | |
(deliver *store store) | |
(.put store "a" 21) | |
(reify | |
ILookup | |
(valAt [this key] | |
(kv key)) | |
AutoCloseable | |
(close [this] | |
(.close test-driver))))) | |
(deftest should-flush-store-for-first-input | |
(with-open [app (->app)] | |
(let [{::keys [input-topic | |
output-topic]} app] | |
(.pipeInput input-topic "a" 1) | |
(fact | |
(.readKeyValue output-topic) | |
=> (KeyValue. "a" 21)) | |
(fact | |
(.isEmpty output-topic) | |
=> true)))) | |
(deftest should-not-update-store-for-smaller-value | |
(with-open [app (->app)] | |
(let [{::keys [input-topic | |
output-topic | |
store]} app] | |
(.pipeInput input-topic "a" 1) | |
(fact | |
(.get store "a") | |
=> 21) | |
(fact | |
(.readKeyValue output-topic) | |
=> (KeyValue. "a" 21)) | |
(fact | |
(.isEmpty output-topic) | |
=> true)))) | |
(deftest should-update-store-for-larger-value | |
(with-open [app (->app)] | |
(let [{::keys [input-topic | |
output-topic | |
store]} app] | |
(.pipeInput input-topic "a" 42) | |
(fact | |
(.get store "a") | |
=> 42) | |
(fact | |
(.readKeyValue output-topic) | |
=> (KeyValue. "a" 42)) | |
(fact | |
(.isEmpty output-topic) | |
=> true)))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment