Skip to content

Instantly share code, notes, and snippets.

@richhickey
Last active April 24, 2025 21:12
Show Gist options
  • Save richhickey/fbc1439372b1f5319bb37aa1934d19b8 to your computer and use it in GitHub Desktop.
Save richhickey/fbc1439372b1f5319bb37aa1934d19b8 to your computer and use it in GitHub Desktop.
;;=============== play with flow ==============
(require '[clojure.core.async :as async]
'[clojure.core.async.flow :as flow]
'[clojure.pprint :as pp]
'[clojure.datafy :as d]
'[clojure.walk :as w])
(set! *warn-on-reflection* true)
(defn monitoring [{:keys [report-chan error-chan]}]
(prn "========= monitoring start")
(async/thread
(loop []
(let [[val port] (async/alts!! [report-chan error-chan])]
(if (nil? val)
(prn "========= monitoring shutdown")
(do
(prn (str "======== message from " (if (= port error-chan) :error-chan :report-chan)))
(pp/pprint val)
(recur))))))
nil)
(defn ddupe
([] {:ins {:in "stuff"}
:outs {:out "stuff w/o consecutive dupes"}})
([_] {:last nil})
([state _] state)
([{:keys [last]} _ v]
[{:last v} (when (not= last v) {:out [v]})]))
(defn roller-thread [pace-ms]
(let [chan (async/chan)]
(async/thread
(if (async/>!! chan [(inc (rand-int 6)) (inc (rand-int 6))])
(do (Thread/sleep (long (rand-int pace-ms)))
(recur))
(prn "roller thread stopped")))
chan))
(defn roller-proc
([] {:params {:pace-ms "range of roll intervals"}
:outs {:out "roll the dice!"}})
([{:keys [pace-ms]}] {::flow/in-ports {:in (roller-thread pace-ms)}})
([state status]
(when (= status ::flow/stop)
(async/close! (-> state ::flow/in-ports :in)))
state)
([state _ v] [state {:out [v]}]))
(def gdef
{:procs
{
:dice-source
{:proc (flow/process #'roller-proc)
:args {:pace-ms 1000}}
:craps-finder
{:proc (-> #(when (#{2 3 12} (apply + %)) %) flow/lift1->step flow/process)}
:dedupe
{:proc (flow/process #'ddupe)}
:prn-sink
{:proc (flow/process
{:describe (fn [] {:ins {:in "gimme stuff to print!"}})
:transform (fn [_ _ v] (prn v))})}}
:conns
[
[[:dice-source :out] [:dedupe :in]]
[[:dedupe :out] [:craps-finder :in]]
[[:craps-finder :out] [:prn-sink :in]]]
})
(def g (flow/create-flow gdef))
(pprint (d/datafy g))
(monitoring (flow/start g))
(flow/resume g) ;;wait a bit for craps to print
(flow/pause g)
(flow/pause-proc g :prn-sink)
(flow/resume-proc g :prn-sink)
(flow/inject g [:craps-finder :in] [[1 2] [2 1] [2 1] [6 6] [6 6] [4 3] [1 1]])
(pprint (flow/ping g))
(pprint (flow/ping-proc g :prn-sink))
(flow/stop g)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment