Created
August 20, 2014 22:17
-
-
Save aphyr/52ca0aec41abe8921313 to your computer and use it in GitHub Desktop.
Macros to make it easier to write serializers for Fressian.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns audience.analysis.hadoop.serialization | |
(:require [clojure.data.fressian :as fress] | |
[clojure.walk :as walk] | |
[parkour.wrapper :as wrapper]) | |
(:import (audience.analysis.hadoop FressianWritable) | |
(java.io DataInput | |
DataOutput | |
ByteArrayInputStream | |
ByteArrayOutputStream) | |
(java.nio ByteBuffer) | |
(org.fressian.handlers WriteHandler | |
ReadHandler) | |
(com.clearspring.analytics.stream.quantile QDigest))) | |
(defn ^"[B" byte-buffer->bytes | |
"Convert a byte buffer to a byte array." | |
[^ByteBuffer buffer] | |
(let [array (byte-array (.remaining buffer))] | |
(.get buffer array) | |
array)) | |
(defmacro handler | |
"Takes a classname as a symbol, a tag name as a string, and bodies for write | |
and read functions. Provides a special syntax for writing the component | |
count: (write-tag! some-number), which expands to (.writeTag writer tag | |
some-number). Returns a map with two keys: :readers, and :writers, each value | |
being a map suitable for use as a Fressian reader or writer, respectively. | |
(handler QDigest \"q-digest\" | |
(write [_ writer digest] | |
(write-tag! 1) | |
(.writeBytes writer (QDigest/serialize digest))) | |
(read [_ reader tag component-count] | |
(QDigest/deserialize ^bytes (.readObject reader))))" | |
[classname tag write-expr read-expr] | |
(let [writer-sym (-> write-expr second second) | |
write-expr (walk/prewalk | |
(fn [form] | |
(if (and (list? form) | |
(= 'write-tag! (first form))) | |
(let [count-expr (second form)] | |
(assert | |
(= 2 (count form)) | |
"write-tag! takes 1 argument: a component count.") | |
`(.writeTag ~writer-sym ~tag ~count-expr)) | |
form)) | |
write-expr)] | |
`{:writers {~classname {~tag (reify WriteHandler ~write-expr)}} | |
:readers {~tag (reify ReadHandler ~read-expr)}})) | |
(defmacro handlers | |
"Takes a flat series of handler quartets: class-name, tag, writer, reader, as | |
per `handler`. Returns a {:writers {...}, :readers {...}} map, where all | |
writers are merged into a unified map, merged with the clojure default | |
handlers, and wrapped with inheritance/associative lookups. Does the same for | |
the readers map, but without inheritance lookups. :readers and :writers may | |
be passed to Fressian. | |
(handlers | |
QDigest \"q-digest\" | |
(write [_ writer digest] | |
(write-tag! 1) | |
(.writeBytes writer (QDigest/serialize digest))) | |
(read [_ reader tag component-count] | |
(QDigest/deserialize ^bytes (.readObject reader))) | |
clojure.lang.PersistentVector \"vector\" | |
(write [_ writer v] | |
(write-tag! (count v)) | |
(doseq [e v] | |
(.writeObject writer e))) | |
(read [_ rdr tag component-count] | |
(let [v (transient [])] | |
(dotimes [_ component-count] | |
(conj! v (.readObject rdr))) | |
(persistent! v))))" | |
[& quartets] | |
(let [handlers (partition 4 quartets) | |
names (repeatedly (count handlers) (partial gensym "handler"))] | |
; Bind each handler to a symbol | |
`(let [~@(->> handlers | |
(map (partial cons `handler)) | |
(interleave names)) | |
; Wrap up handlers into a vector | |
handlers# [~@names] | |
; Extract writers and readers | |
writers# (map :writers handlers#) | |
readers# (map :readers handlers#)] | |
; Merge writers/readers together into unified maps | |
{:writers (->> writers# | |
(cons fress/clojure-write-handlers) | |
(reduce merge) | |
fress/associative-lookup | |
fress/inheritance-lookup) | |
:readers (->> readers# | |
(cons fress/clojure-read-handlers) | |
(reduce merge) | |
fress/associative-lookup)}))) | |
(def fress-handlers | |
(handlers | |
QDigest "q-digest" | |
(write [_ writer digest] | |
(write-tag! 1) | |
(.writeBytes writer (QDigest/serialize digest))) | |
(read [_ reader tag component-count] | |
(QDigest/deserialize ^bytes (.readObject reader))) | |
clojure.lang.PersistentVector "vector" | |
(write [_ writer v] | |
(write-tag! (count v)) | |
(doseq [e v] | |
(.writeObject writer e))) | |
(read [_ rdr tag component-count] | |
(let [v (transient [])] | |
(dotimes [_ component-count] | |
(conj! v (.readObject rdr))) | |
(persistent! v))) | |
clojure.lang.PersistentHashMap "hash-map" | |
(write [_ w m] | |
(write-tag! (* 2 (count m))) | |
(doseq [[k v] m] | |
(.writeObject w k) | |
(.writeObject w v))) | |
(read [_ rdr tag component-count] | |
(let [m (transient {})] | |
(dotimes [_ component-count] | |
(assoc! m (.readObject rdr) (.readObject rdr))) | |
(persistent! m))) | |
clojure.lang.PersistentTreeMap "sorted-map" | |
(write [_ w m] | |
(write-tag! (* 2 (count m))) | |
(doseq [[k v] m] | |
(.writeObject w k) | |
(.writeObject w v))) | |
(read [_ rdr tag component-count] | |
(loop [i component-count | |
m (sorted-map)] | |
(if (pos? i) | |
(recur | |
(- i 2) | |
(assoc m (.readObject rdr) (.readObject rdr))) | |
m))) | |
clojure.lang.PersistentHashSet "hash-set" | |
(write [_ w set] | |
(write-tag! (count set)) | |
(doseq [e set] | |
(.writeObject w e))) | |
(read [_ rdr tag component-count] | |
(let [s (transient #{})] | |
(dotimes [_ component-count] | |
(conj! s (.readObject rdr))) | |
(persistent! s))) | |
clojure.lang.PersistentTreeSet "sorted-set" | |
(write [_ w set] | |
(write-tag! (count set)) | |
(doseq [e set] | |
(.writeObject w e))) | |
(read [_ rdr tag component-count] | |
(loop [i component-count | |
s (sorted-set)] | |
(if (pos? i) | |
(recur (dec i) | |
(conj s (.readObject rdr))) | |
s))))) | |
; Serializes values to and from Fressian records, delimited by a 32-bit int | |
; length header. Lord have mercy on my soul. | |
(set! FressianWritable/readFieldsFn | |
(fn read-fields [^FressianWritable w ^DataInput in] | |
(let [buffer (-> in | |
.readInt | |
byte-array)] | |
; Copy input to buffer | |
(.readFully in buffer) | |
(set! (.state w) | |
(-> buffer | |
(ByteArrayInputStream.) | |
(fress/create-reader :handlers (:readers fress-handlers)) | |
(fress/read-object)))))) | |
(set! FressianWritable/writeFn | |
(fn write [^FressianWritable w ^DataOutput out] | |
(let [value (.state w) | |
buf (ByteArrayOutputStream.) | |
writer (fress/create-writer | |
buf :handlers (:writers fress-handlers)) | |
_ (fress/write-object writer value)] | |
(.writeInt out (.size buf)) | |
(.write out (.toByteArray buf))))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment