-
-
Save dotemacs/35f709c2536248d2ea2b05d67a4543e4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns er-model.connectors.kafka.serializers | |
(:import (org.apache.kafka.common.serialization | |
LongSerializer | |
Serializer | |
IntegerSerializer | |
StringSerializer | |
ByteArraySerializer) | |
(java.io ByteArrayOutputStream))) | |
;; from https://github.com/ymilky/franzy/blob/master/src/franzy/serialization/serializers.clj | |
(defn byte-array-serializer | |
"Kafka raw byte array serializer. | |
Useful for value serialization." | |
^Serializer [] | |
(ByteArraySerializer.)) | |
(defn string-serializer | |
"Kafka string serializer. | |
This serializer allows serializing values without a key." | |
^Serializer [] | |
(StringSerializer.)) | |
(defn integer-serializer | |
"Kafka integer serializer. | |
Useful for key serialization." | |
^Serializer [] | |
(IntegerSerializer.)) | |
(defn long-serializer | |
"Kafka long serializer. | |
Useful for key serialization." | |
^Serializer [] | |
(LongSerializer.)) | |
(deftype EdnSerializer [opts] | |
Serializer | |
(configure [_ _ _]) | |
(serialize [_ _ data] | |
;;TODO: process + inject more options? better defaults via configure or opts? | |
;;no reason to close bos, but we do so to keep clean | |
(with-open [bos (ByteArrayOutputStream. 1024)] | |
(with-open [w (if opts (clojure.java.io/writer bos opts) (clojure.java.io/writer bos))] | |
(binding [*print-length* false | |
*out* w] | |
(pr data))) | |
;;death to efficiency, but easiest way without writing something low-level to encode a stream directly into Kafka | |
(.toByteArray bos))) | |
(close [_])) | |
(defn edn-serializer | |
(^EdnSerializer [] (edn-serializer nil)) | |
(^EdnSerializer [opts] | |
(EdnSerializer. opts))) | |
(deftype SimpleEdnSerializer [] | |
Serializer | |
(configure [_ _ _]) | |
(serialize [_ _ data] | |
(some-> data pr-str .getBytes)) | |
(close [_])) | |
(defn simple-edn-serializer | |
"A simple EDN deserializer for small amounts of data for Kafka. | |
Useful for value serialization." | |
^SimpleEdnSerializer [] | |
(SimpleEdnSerializer.)) | |
(deftype KeywordSerializer [] | |
Serializer | |
(configure [_ _ _]) | |
(serialize [_ _ data] | |
(some-> data name .getBytes)) | |
(close [_])) | |
(defn keyword-serializer | |
"A serializer that serializers string values as keywords. | |
Useful for key serializers." | |
^KeywordSerializer [] | |
(KeywordSerializer.)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment