Skip to content

Instantly share code, notes, and snippets.

@dotemacs
Forked from mccraigmccraig/deserializers.clj
Created October 25, 2018 22:15
Show Gist options
  • Save dotemacs/d22d5b32dce7bf719e8dba9c9ad8e6e1 to your computer and use it in GitHub Desktop.
Save dotemacs/d22d5b32dce7bf719e8dba9c9ad8e6e1 to your computer and use it in GitHub Desktop.
(ns er-model.connectors.kafka.deserializers
(:require
[clojure.edn :as edn])
(:import
[org.apache.kafka.common.serialization
Deserializer
StringDeserializer
LongDeserializer
IntegerDeserializer
ByteArrayDeserializer]
[java.io PushbackReader ByteArrayInputStream]))
;; from https://github.com/ymilky/franzy/blob/master/src/franzy/serialization/deserializers.clj
(defn byte-array-deserializer
"Kafka raw byte array deserializer.
Useful for value deserialization."
^Deserializer []
(ByteArrayDeserializer.))
(defn integer-deserializer
"Kafka integer deserializer.
Useful for key deserialization."
^Deserializer []
(IntegerDeserializer.))
(defn long-deserializer
"Kafka long deserializer.
Useful for key deserialization."
^Deserializer []
(LongDeserializer.))
(defn string-deserializer
"Kafka string deserializer.
Useful for key deserialization."
^Deserializer []
(StringDeserializer.))
(deftype EdnDeserializer [opts]
Deserializer
(configure [_ _ _])
(deserialize [_ _ data]
(when data
(with-open [r (PushbackReader. (clojure.java.io/reader (ByteArrayInputStream. data)))]
;;Can't remember if this binding is needed anymore with safer edn/read, but we like safe(r/ish) via edn/read
;;Hey you're sending raw EDN over the network, you like to live on the wild side, friend!
(binding [*read-eval* false]
(edn/read (or opts {}) r)))))
(close [_]))
(defn edn-deserializer
"An EDN deserializer for Kafka.
Contents of each item serialized must fit in memory.
> Note: Any users of EDN deserializers should note the usual serialization/deserialization attack vectors.
You should always validate any data before it is serialized so that an attack may not be executed on deserialization.
Although EDN facilities try to protect you against this, nothing in this life is ever for sure. Be vigilant."
(^EdnDeserializer [] (edn-deserializer nil))
(^EdnDeserializer [opts]
(EdnDeserializer. opts)))
(deftype SimpleEdnDeserializer [opts]
Deserializer
(configure [_ _ _])
(deserialize [_ _ data]
(edn/read-string (or opts {}) (String. ^bytes data "UTF-8")))
(close [_]))
(defn simple-edn-deserializer
"A Simple EDN deserializer for Kafka.
Useful for value deserialization."
^SimpleEdnDeserializer
([] (simple-edn-deserializer nil))
^SimpleEdnDeserializer
([opts]
(SimpleEdnDeserializer. opts)))
(deftype KeywordDeserializer []
Deserializer
(configure [_ _ _])
(deserialize [_ _ data]
(when data
(keyword (String. ^bytes data "UTF-8"))))
(close [_]))
(defn keyword-deserializer
"A deserializer that deserializes string values as keywords.
Useful for key deserializers."
^Deserializer []
(KeywordDeserializer.))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment