Last active
October 23, 2025 13:32
-
-
Save richhickey/fbc1439372b1f5319bb37aa1934d19b8 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| ;;=============== play with flow ============== | |
| (require '[clojure.core.async :as async] | |
| '[clojure.core.async.flow :as flow] | |
| '[clojure.pprint :as pp] | |
| '[clojure.datafy :as d]) | |
| (set! *warn-on-reflection* true) | |
| (defn monitoring [{:keys [report-chan error-chan]}] | |
| (prn "========= monitoring start") | |
| (async/thread | |
| (loop [] | |
| (let [[val port] (async/alts!! [report-chan error-chan])] | |
| (if (nil? val) | |
| (prn "========= monitoring shutdown") | |
| (do | |
| (prn (str "======== message from " (if (= port error-chan) :error-chan :report-chan))) | |
| (pp/pprint val) | |
| (recur)))))) | |
| nil) | |
| (defn ddupe | |
| ([] {:ins {:in "stuff"} | |
| :outs {:out "stuff w/o consecutive dupes"} | |
| :signal-select #{:craps/report} | |
| :workload :compute}) | |
| ([_] {:last nil}) | |
| ([state _] state) | |
| ([{:keys [last] :as state} in v] | |
| (case in | |
| :craps/report [state {::flow/report [{:ddupe-last last}]}] | |
| [{:last v} (when (not= last v) {:out [v]})]))) | |
| (defn time-thread [^long period-ms] | |
| (let [chan (async/chan)] | |
| (async/thread | |
| (if (async/>!! chan (System/currentTimeMillis)) | |
| (do (Thread/sleep period-ms) | |
| (recur)) | |
| (prn "time thread stopped"))) | |
| chan)) | |
| (defn time-proc | |
| "casts the currentTimeMillis over :craps/time" | |
| ([] {:params {:period-ms "time interval"}}) | |
| ([{:keys [period-ms] :as args}] | |
| {::flow/in-ports {:ticks (time-thread period-ms)}}) | |
| ([state status] | |
| (when (= status ::flow/stop) | |
| (async/close! (-> state ::flow/in-ports :ticks))) | |
| state) | |
| ([state _ ms] [state {[::flow/cast :craps/time] [ms]}])) | |
| (defn roller-proc | |
| "no inputs, responds to :craps/time signal" | |
| ([] {:params {:pace-ms "range of roll intervals"} | |
| :outs {:out "roll the dice!"} | |
| :signal-select #{:craps/time}}) | |
| ([{:keys [pace-ms] :as args}] {:last 0 :pace-ms pace-ms}) | |
| ([state _] state) | |
| ([{:keys [last pace-ms] :as state} in ms] | |
| (case in | |
| :craps/time | |
| (if (> (- ms last) pace-ms) | |
| [(assoc state last ms) {:out [[(inc (rand-int 6)) (inc (rand-int 6))]]}] | |
| [state nil])))) | |
| (def gdef | |
| {:procs | |
| { | |
| :time-ticker | |
| {:proc (flow/process #'time-proc) | |
| :args {:period-ms 100}} | |
| :dice-source | |
| {:proc (flow/process #'roller-proc) | |
| :args {:pace-ms 250}} | |
| :craps-finder | |
| {:proc (-> #(when (#{2 3 12} (apply + %)) %) flow/lift1->step flow/process)} | |
| :dedupe | |
| {:proc (flow/process #'ddupe)} | |
| :prn-sink | |
| {:proc (flow/process | |
| (flow/map->step | |
| {:describe (fn [] {:ins {:in "gimme stuff to print!"}}) | |
| :transform (fn [_ _ v] (prn v))}))}} | |
| :conns | |
| [ | |
| [[:dice-source :out] [:dedupe :in]] | |
| [[:dedupe :out] [:craps-finder :in]] | |
| [[:craps-finder :out] [:prn-sink :in]]] | |
| }) | |
| (def g (flow/create-flow gdef)) | |
| (pprint (-> g d/datafy)) | |
| (monitoring (flow/start g)) | |
| (flow/resume g) ;;wait a bit for craps to print | |
| (def temp (flow/inject g [::flow/cast :craps/report] [:unused])) | |
| (flow/pause g) | |
| (flow/pause-proc g :prn-sink) | |
| (flow/resume-proc g :prn-sink) | |
| (def dummy (flow/inject g [:craps-finder :in] [[1 2] [2 1] [2 1] [6 6] [6 6] [4 3] [1 1]])) | |
| (pprint (flow/ping g)) | |
| (pprint (flow/ping-proc g :prn-sink)) | |
| (flow/stop g) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment