-
-
Save laurentpetit/403bb61bd69765482e6a to your computer and use it in GitHub Desktop.
Reimplementation of transducers, in terms of processing functions instead of reducing functions. WIP.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns mutabots | |
"Reimplementation of transducers, in terms of processing functions instead | |
of reducing functions. | |
tl;dr: reducing-fn based transducers are a special case, influenced by reducers, | |
of processing-fn based transducers. | |
In Clojure 1.7.0-alpha2, transducers are expressed in terms of the existing | |
concept of reducing functions. | |
To sum it up, a transducer has currently the signature : | |
reducing-fn -> reducing-fn | |
The following implementation proposes that transducers get the more general | |
signature of: | |
processing-fn -> processing-fn | |
With a processing fn being a generalization of a process consuming inputs, | |
and cleaning/flushing things at the end (when it receives the signal that there | |
is no more input). | |
While the signature of a reducing-fn is .... | |
(fn | |
([]) ; <- used to pass init value down the transducers chain. | |
; Arity used by the transduce/into functions only | |
([acc]) ; <- used when the upstream has no more value. | |
; Arity used by all functions | |
; the acc(umulator) value must be passed unchanged down | |
; the transducers chain for consumption by the final | |
; reducing-fn (the real and only reducing-fn collecting | |
; the results). | |
; the acc argument is only used when a collector reducing-fn | |
; is required (e.g. only with transduce and into) | |
([acc input])) ; <- used to process a new input value from upstream. | |
; as for the 1-arity, the acc(umulator) value must be passed | |
; unchanged down the transducers chain ... | |
; Must return a reduced value if it won't accept any additional | |
; input. | |
.... the signature of a processing-fn is : | |
(fn | |
([]) ; <- used when the upstream has no more value. | |
([input]) ; <- used to process a new input value from upstream. | |
; Must return true if it won't accept any additional input, | |
; false otherwise. | |
Comparing transducer implementations. | |
Identity transducer: | |
(defn rf-identity [] | |
(fn [rf] | |
(fn [] (rf)) | |
(fn [acc] (rf acc)) | |
(fn [acc input] (rf acc input)))) | |
(defn pf-identity [] | |
(fn [p] | |
(fn [] (p)) | |
(fn [input] (p input)))) | |
note: simpler arity. It is yet to be proven that the 0-arity form will | |
ever return something different than (rf), and that 1-arity and 2-arity | |
can modify acc for a valuable reason. | |
Partition-all transducer: | |
;; with current transducers | |
(defn partition-all [^long n] | |
(fn [rf] | |
(let [a (java.util.ArrayList. n)] | |
(fn | |
([] (rf)) | |
([result] | |
(let [result (if (.isEmpty a) | |
result | |
(let [v (vec (.toArray a))] | |
;;clear first! | |
(.clear a) | |
(unreduced (rf result v))))] | |
(rf result))) | |
([result input] | |
(.add a input) | |
(if (= n (.size a)) | |
(let [v (vec (.toArray a))] | |
(.clear a) | |
(rf result v)) | |
result)))))) | |
;; with processing-based transducers | |
(defn partition-all [^long n] | |
(fn [p] | |
(let [a (java.util.ArrayList. n) | |
flush! (fn [] (let [v (vec (.toArray a))] | |
(.clear a) | |
(p v)))] | |
(fn | |
([] | |
(when-not (.isEmpty a) (flush!)) | |
(p)) | |
([x] | |
(.add a x) | |
(when (= n (.size a)) (flush!))))))) | |
note: notice the call to #'unreduced in the rf-based version? Subtle bugs | |
waiting for you. | |
alsos notice that the pf-based does not need to wrap / unwrap values | |
with #'reduced => just use truethy/falsy values instead." | |
(:refer-clojure | |
:exclude [map filter remove | |
take take-while take-nth | |
drop drop-while replace | |
partition-by partition-all | |
transduce sequence | |
keep keep-indexed cycle | |
dedupe cat mapcat])) | |
(defn map [f] | |
(fn [p] | |
(fn | |
([] (p)) | |
([x] (p (f x)))))) | |
(defn filter [pred] | |
(fn [p1] | |
(fn | |
([] (p1)) | |
([x] (and (pred x) (p1 x)))))) | |
(defn remove [pred] (filter (complement pred))) | |
(defn take [n] | |
(fn [p1] | |
(let [vn (volatile! (dec n))] | |
(fn | |
([] (p1)) | |
([x] (or (neg? @vn) (p1 x) (neg? (vswap! vn dec)))))))) | |
(defn take-while [pred] | |
(fn [p1] | |
(fn | |
([] (p1)) | |
([x] (if (pred x) (p1 x) true))))) | |
(defn take-nth [n] | |
(fn [p1] | |
(let [vn (volatile! n)] | |
(fn | |
([] (p1)) | |
([x] (if (== @vn n) | |
(do (vreset! vn 1) (p1 x)) | |
(do (vswap! vn inc) false))))))) | |
(defn drop [n] | |
(fn [p1] | |
(let [vn (volatile! n)] | |
(fn | |
([] (p1)) | |
([x] (if (pos? @vn) | |
(do (vswap! vn dec) false) | |
(p1 x))))))) | |
(defn drop-while [pred] | |
(fn [p1] | |
(let [vtake? (volatile! false) | |
start-take? (complement pred)] | |
(fn | |
([] (p1)) | |
([x] | |
(cond | |
@vtake? (p1 x) | |
(start-take? x) | |
(do | |
(vreset! vtake? true) | |
(p1 x)))))))) | |
(defn replace [smap] | |
(map #(if-let [e (find smap %)] (val e) %))) | |
(defn keep [f] | |
(fn [p1] | |
(fn | |
([] (p1)) | |
([x] (let [v (f x)] (when-not (nil? v) (p1 x))))))) | |
(defn keep-indexed [f] | |
(fn [p1] | |
(let [vi (volatile! -1)] | |
(fn | |
([] (p1)) | |
([x] | |
(let [i (vswap! vi inc) | |
v (f i x)] | |
(when-not (nil? v) | |
(p1 v)))))))) | |
(defn cycle [] | |
(fn [p1] | |
(let [xs (java.util.ArrayList.)] | |
(fn | |
([] | |
(let [max (dec (.size xs))] | |
(loop [i 0] | |
(when-not (p1 (.get xs i)) | |
(recur (if (< i max) (inc i) 0))))) | |
(p1)) | |
([x] (.add xs x) (p1 x)))))) | |
(defn partition-by [f] | |
(fn [p] | |
(let [a (java.util.ArrayList.) | |
pv (volatile! ::none)] | |
(fn | |
([] | |
(when-not (.isEmpty a) | |
(let [v (vec (.toArray a))] | |
;;clear first! | |
(.clear a) | |
(p v))) | |
(p)) | |
([input] | |
(let [pval @pv | |
val (f input)] | |
(vreset! pv val) | |
(if (or (identical? pval ::none) | |
(= val pval)) | |
(do (.add a input) false) ; .add returns true | |
(let [v (vec (.toArray a))] | |
(.clear a) | |
(or (p v) | |
(do (.add a input) false)))))))))) | |
(defn partition-all [^long n] | |
(fn [p1] | |
(let [a (java.util.ArrayList. n) | |
flush! (fn [] (let [v (vec (.toArray a))] | |
(.clear a) | |
(p1 v)))] | |
(fn | |
([] | |
(when-not (.isEmpty a) (flush!)) | |
(p1)) | |
([x] | |
(.add a x) | |
(when (= n (.size a)) (flush!))))))) | |
(defn dedupe [] | |
(fn [p1] | |
(let [vprev (volatile! (Object.))] | |
(fn | |
([] (p1)) | |
([x] (when (not= @vprev x) | |
(vreset! vprev x) | |
(p1 x))))))) | |
(defn cat [] | |
(fn [p1] | |
(let [rf (fn [_ x] (when (p1 x) (reduced true)))] | |
(fn | |
([] (p1)) | |
([c] (reduce rf false c)))))) | |
(defn mapcat [f] (comp (map f) (cat))) | |
(defn transduce [xform f init coll] | |
(let [vacc (volatile! init) | |
p (fn | |
([] (vswap! vacc f)) | |
([x] (reduced? (vreset! vacc (f @vacc x))))) | |
p (xform p)] | |
(reduce (fn [_ x] (when (p x) (reduced nil))) ::unused coll) | |
(p) | |
@vacc)) | |
;; bleh :-( | |
(defn- promised-seq-proc! [pstep! p] | |
(let [vp (volatile! p)] | |
(fn | |
([] | |
(deliver @vp nil)) | |
([x] | |
(deliver @vp (cons x (let [p (vreset! vp (promise))] | |
(lazy-seq (@pstep! p) @p)))) | |
false)))) | |
(defn sequence [xform coll] | |
(let [vcoll (volatile! coll) | |
p (promise) | |
promised-seq (lazy-seq p) | |
pstep! (promise) | |
proc! (xform (promised-seq-proc! pstep! p)) | |
step! (fn [p] | |
(loop [coll @vcoll] | |
(if (realized? p) | |
(vreset! vcoll coll) | |
(if-let [[x :as s] (seq coll)] | |
(recur (if (proc! x) nil (rest s))) | |
(proc!)))))] | |
(deliver pstep! step!) ; tying the knot | |
(lazy-seq (step! p) @p))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
detritus on line 85?