Last active
October 8, 2019 23:52
-
-
Save cddr/d2564689a9cd10367bd0d2a86e860016 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns mock-avro-example | |
(:require | |
[clojure.data.json :as json] | |
[jackdaw.streams :as k] | |
[jackdaw.serdes.avro :as avro] | |
[jackddaw.serdes.avro.schema-registry :as reg] | |
[jackdaw.test :as jdt])) | |
(def foo-schema | |
{:type :record | |
:name "foo" | |
:namespace "big.data.co" | |
:fields [{:name "id" | |
:type :string} | |
{:name "amount" | |
:type :long} | |
{:name "updated_at" | |
:type {:type :long | |
:logicalType "tmestamp-millis"}}]}) | |
(defn avro-serde | |
[schema key? client-options] | |
(let [schema-options {:avro/schema (json/write-str schema) | |
:key? key?}] | |
(avro/serde avro/+base-schema-type-registry+ | |
client-opts | |
schema-opts))) | |
(defn topic-config | |
[{:keys [registry-url registry-client]}] | |
(let [client-options {:avro.schema-registry/client registry-client | |
:avro.schema-registry/url registry-url}] | |
{:foo {:topic-name "foo" | |
:replication-factor 1 | |
:partition-count 1 | |
:key-serde (avro-serde {:type "string"} true client-options) | |
:value-serde (avro-serde foo-schema false client-options)} | |
:bar {:topic-name "foo" | |
:replication-factor 1 | |
:partition-count 1 | |
:key-serde (avro-serde {:type "string"} true client-options) | |
:value-serde (avro-serde bar-schema false client-options)}})) | |
(defn topology-builder | |
[topics builder] | |
(-> (k/stream builder (:foo topics)) | |
(k/map transform-foos) | |
(k/to (:bar topics))) | |
builder) | |
(deftest test-topology-builder | |
(let [topics (topic-config {:registry-url "http://schema-registry.test" | |
:registry-client (reg/mock-client)})] | |
(jdt/with-test-machine (jdt/mock-transport {:driver (jdt/mock-test-driver #(topology-builder topics %))}) | |
(fn [machine] | |
(let [{:keys [journal results]} (jdt/run-test machine test-commands)] | |
;; then in here perform the assertions | |
))))) | |
(defn -main [& args] | |
;; This represents where you'd start the topology in a live env | |
(let [registry-url "https://schema-registry.ccloud:8081" | |
builder (k/streams-builder) | |
topics (topic-registry {:registry-url registry-url | |
:registry-client (reg/client registry-url 100)})] | |
(-> (topology-builder topics builder) | |
(k/kafka-streams {"bootstrap.servers" "confluent.cloud:9092" | |
"group.id" "my-app-group" | |
"application.id" "my-app-id"}) | |
(k/start)))) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment