Created
December 2, 2019 19:24
-
-
Save jjttjj/f2f3dfed3dd5651863b0b12e44e7dd03 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns mult | |
(:require[clojure.core.async :as a])) | |
(defprotocol MultFn | |
:extend-via-metadata true | |
(add-tap [this-fn f] [this-fn xf f] | |
"Adds a function `f` to 'tapset', a set of functions which are | |
each called on every value passed to `this-fn`. If a transducer `xf` | |
is supplied, subjects all values to `xf` before calling `f` on the | |
result. When a value is reduced, the function is removed from the | |
tapset. Returns a reference to the added function (which may be | |
different than `f`) for later removal.") | |
(remove-tap [this-fn f] | |
"Removes `f` from tapset")) | |
(defn mult-impl [handlers] | |
{`add-tap (fn ([this f] | |
(swap! handlers conj f) | |
f) | |
([this xf f] | |
(let [xf' (xf (fn [_acc x] x)) | |
f' (fn this-fn [x] | |
(let [result (xf' ::nothing x)] | |
(cond | |
(reduced? result) | |
(do (remove-tap this this-fn) | |
(f @result)) | |
(= result ::nothing) nil | |
:else | |
(f result))))] | |
(swap! handlers conj f') | |
f'))) | |
`remove-tap (fn [this f] | |
(swap! handlers disj f))}) | |
"Constructs a tap-fn, which can have functions added and removed from it's 'tapset'. Every time the function is called with a value, that value is passed to all values in the tapset." | |
(defn go-mult | |
"Creates a multfn which returns immediately when called and calls all | |
taps in a `core.async/go` processs." | |
[] | |
(let [handlers (atom #{}) | |
tapq (a/chan)] | |
(a/go-loop [] | |
(when-let [x (a/<! tapq)] | |
(try (doseq [f @handlers] | |
(f (if (identical? ::tap-nil x) nil x))) | |
(catch #?(:clj Throwable :cljs js/Object) t | |
;;todo: error handling | |
;;(log/error t x) | |
(println t x))) | |
(recur))) | |
(with-meta | |
(fn [x] | |
(a/put! tapq (if (nil? x) ::tap-nil x)) | |
nil) | |
(mult-impl handlers)))) | |
#?(:clj | |
(defn thread-mult | |
"Creates a multfn which returns immediately when called and calls all | |
taps in a seperate thread processs." | |
[] | |
(let [handlers (atom #{}) | |
tapq (a/chan)] | |
(a/thread | |
(loop [] | |
(when-let [x (a/<!! tapq)] | |
(try (doseq [f @handlers] | |
(f (if (identical? ::tap-nil x) nil x))) | |
(catch #?(:clj Throwable :cljs js/Object) t | |
;;todo: error handling | |
;;(log/error t x) | |
(println t x))) | |
(recur)))) | |
(with-meta | |
(fn [x] | |
(a/put! tapq (if (nil? x) ::tap-nil x)) | |
nil) | |
(mult-impl handlers))))) | |
(defn sync-mult | |
"Creates a multfn which blocks when called until all taps are complete" | |
[] | |
(let [handlers (atom #{})] | |
(with-meta | |
(fn [x] | |
(try (doseq [f @handlers] | |
(f x)) | |
(catch #?(:clj Throwable :cljs js/Object) t | |
;;todo: error handling | |
;;(log/error t x) | |
(println t x))) | |
nil) | |
(mult-impl handlers)))) | |
;;Usage | |
(def m1 (go-mult)) | |
(add-tap m1 | |
#(println "received value in m1:" %)) | |
(add-tap m1 (filter even?) | |
#(println "received even value in m1:" %)) | |
(add-tap m1 (comp (filter odd?) (partition-all 2)) | |
#(println "received two odd values:" %)) | |
(def m2 (sync-mult)) | |
(add-tap m1 (take 5) m2) | |
(add-tap m2 (map #(* 100 %)) #(println "m2 received" %)) | |
(doseq [x (range 10)] | |
(m1 x)) | |
;;prints: | |
;; received value in m1: 0 | |
;; received even value in m1: 0 | |
;; received value in m1: 1 | |
;; received value in m1: 2 | |
;; received even value in m1: 2 | |
;; received value in m1: 3 | |
;; received two odd values: [1 3] | |
;; received value in m1: 4 | |
;; received even value in m1: 4 | |
;; received value in m1: 5 | |
;; received value in m1: 6 | |
;; received even value in m1: 6 | |
;; received value in m1: 7 | |
;; received two odd values: [5 7] | |
;; received value in m1: 8 | |
;; received even value in m1: 8 | |
;; received value in m1: 9 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment