-
-
Save dotemacs/d22d5b32dce7bf719e8dba9c9ad8e6e1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns er-model.connectors.kafka.deserializers | |
(:require | |
[clojure.edn :as edn]) | |
(:import | |
[org.apache.kafka.common.serialization | |
Deserializer | |
StringDeserializer | |
LongDeserializer | |
IntegerDeserializer | |
ByteArrayDeserializer] | |
[java.io PushbackReader ByteArrayInputStream])) | |
;; from https://github.com/ymilky/franzy/blob/master/src/franzy/serialization/deserializers.clj | |
(defn byte-array-deserializer | |
"Kafka raw byte array deserializer. | |
Useful for value deserialization." | |
^Deserializer [] | |
(ByteArrayDeserializer.)) | |
(defn integer-deserializer | |
"Kafka integer deserializer. | |
Useful for key deserialization." | |
^Deserializer [] | |
(IntegerDeserializer.)) | |
(defn long-deserializer | |
"Kafka long deserializer. | |
Useful for key deserialization." | |
^Deserializer [] | |
(LongDeserializer.)) | |
(defn string-deserializer | |
"Kafka string deserializer. | |
Useful for key deserialization." | |
^Deserializer [] | |
(StringDeserializer.)) | |
(deftype EdnDeserializer [opts] | |
Deserializer | |
(configure [_ _ _]) | |
(deserialize [_ _ data] | |
(when data | |
(with-open [r (PushbackReader. (clojure.java.io/reader (ByteArrayInputStream. data)))] | |
;;Can't remember if this binding is needed anymore with safer edn/read, but we like safe(r/ish) via edn/read | |
;;Hey you're sending raw EDN over the network, you like to live on the wild side, friend! | |
(binding [*read-eval* false] | |
(edn/read (or opts {}) r))))) | |
(close [_])) | |
(defn edn-deserializer | |
"An EDN deserializer for Kafka. | |
Contents of each item serialized must fit in memory. | |
> Note: Any users of EDN deserializers should note the usual serialization/deserialization attack vectors. | |
You should always validate any data before it is serialized so that an attack may not be executed on deserialization. | |
Although EDN facilities try to protect you against this, nothing in this life is ever for sure. Be vigilant." | |
(^EdnDeserializer [] (edn-deserializer nil)) | |
(^EdnDeserializer [opts] | |
(EdnDeserializer. opts))) | |
(deftype SimpleEdnDeserializer [opts] | |
Deserializer | |
(configure [_ _ _]) | |
(deserialize [_ _ data] | |
(edn/read-string (or opts {}) (String. ^bytes data "UTF-8"))) | |
(close [_])) | |
(defn simple-edn-deserializer | |
"A Simple EDN deserializer for Kafka. | |
Useful for value deserialization." | |
^SimpleEdnDeserializer | |
([] (simple-edn-deserializer nil)) | |
^SimpleEdnDeserializer | |
([opts] | |
(SimpleEdnDeserializer. opts))) | |
(deftype KeywordDeserializer [] | |
Deserializer | |
(configure [_ _ _]) | |
(deserialize [_ _ data] | |
(when data | |
(keyword (String. ^bytes data "UTF-8")))) | |
(close [_])) | |
(defn keyword-deserializer | |
"A deserializer that deserializes string values as keywords. | |
Useful for key deserializers." | |
^Deserializer [] | |
(KeywordDeserializer.)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment