Skip to content

Instantly share code, notes, and snippets.

@richhickey
Last active October 23, 2025 13:32
Show Gist options
  • Save richhickey/fbc1439372b1f5319bb37aa1934d19b8 to your computer and use it in GitHub Desktop.
Save richhickey/fbc1439372b1f5319bb37aa1934d19b8 to your computer and use it in GitHub Desktop.
;;=============== play with flow ==============
(require '[clojure.core.async :as async]
'[clojure.core.async.flow :as flow]
'[clojure.pprint :as pp]
'[clojure.datafy :as d])
(set! *warn-on-reflection* true)
(defn monitoring [{:keys [report-chan error-chan]}]
(prn "========= monitoring start")
(async/thread
(loop []
(let [[val port] (async/alts!! [report-chan error-chan])]
(if (nil? val)
(prn "========= monitoring shutdown")
(do
(prn (str "======== message from " (if (= port error-chan) :error-chan :report-chan)))
(pp/pprint val)
(recur))))))
nil)
(defn ddupe
([] {:ins {:in "stuff"}
:outs {:out "stuff w/o consecutive dupes"}
:signal-select #{:craps/report}
:workload :compute})
([_] {:last nil})
([state _] state)
([{:keys [last] :as state} in v]
(case in
:craps/report [state {::flow/report [{:ddupe-last last}]}]
[{:last v} (when (not= last v) {:out [v]})])))
(defn time-thread [^long period-ms]
(let [chan (async/chan)]
(async/thread
(if (async/>!! chan (System/currentTimeMillis))
(do (Thread/sleep period-ms)
(recur))
(prn "time thread stopped")))
chan))
(defn time-proc
"casts the currentTimeMillis over :craps/time"
([] {:params {:period-ms "time interval"}})
([{:keys [period-ms] :as args}]
{::flow/in-ports {:ticks (time-thread period-ms)}})
([state status]
(when (= status ::flow/stop)
(async/close! (-> state ::flow/in-ports :ticks)))
state)
([state _ ms] [state {[::flow/cast :craps/time] [ms]}]))
(defn roller-proc
"no inputs, responds to :craps/time signal"
([] {:params {:pace-ms "range of roll intervals"}
:outs {:out "roll the dice!"}
:signal-select #{:craps/time}})
([{:keys [pace-ms] :as args}] {:last 0 :pace-ms pace-ms})
([state _] state)
([{:keys [last pace-ms] :as state} in ms]
(case in
:craps/time
(if (> (- ms last) pace-ms)
[(assoc state last ms) {:out [[(inc (rand-int 6)) (inc (rand-int 6))]]}]
[state nil]))))
(def gdef
{:procs
{
:time-ticker
{:proc (flow/process #'time-proc)
:args {:period-ms 100}}
:dice-source
{:proc (flow/process #'roller-proc)
:args {:pace-ms 250}}
:craps-finder
{:proc (-> #(when (#{2 3 12} (apply + %)) %) flow/lift1->step flow/process)}
:dedupe
{:proc (flow/process #'ddupe)}
:prn-sink
{:proc (flow/process
(flow/map->step
{:describe (fn [] {:ins {:in "gimme stuff to print!"}})
:transform (fn [_ _ v] (prn v))}))}}
:conns
[
[[:dice-source :out] [:dedupe :in]]
[[:dedupe :out] [:craps-finder :in]]
[[:craps-finder :out] [:prn-sink :in]]]
})
(def g (flow/create-flow gdef))
(pprint (-> g d/datafy))
(monitoring (flow/start g))
(flow/resume g) ;;wait a bit for craps to print
(def temp (flow/inject g [::flow/cast :craps/report] [:unused]))
(flow/pause g)
(flow/pause-proc g :prn-sink)
(flow/resume-proc g :prn-sink)
(def dummy (flow/inject g [:craps-finder :in] [[1 2] [2 1] [2 1] [6 6] [6 6] [4 3] [1 1]]))
(pprint (flow/ping g))
(pprint (flow/ping-proc g :prn-sink))
(flow/stop g)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment