Created
July 9, 2025 18:23
-
-
Save tomconnors/245fb69ed757b34502c8d57637db8de2 to your computer and use it in GitHub Desktop.
core.async.flow examples
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
;; There are some patterns with c.a.flow that weren't immediately obvious to me so I made this gist to | |
;; figure them out. | |
;; Those patterns are: | |
;; - backpressure across processes | |
;; - long-running tasks | |
;; - getting inputs for the first process in the flow | |
(require '[clojure.core.async :as async] | |
'[clojure.core.async.flow :as flow] | |
'[clojure.pprint :as pp] | |
'[clojure.datafy :as d] | |
'[clojure.walk :as w]) | |
;; Backpressure across processes: | |
;; The first process produces values very quickly. | |
;; The second process handles them more slowly. | |
;; When running this code, you'll see the number-maker produce 10 values immediately, | |
;; then slows its pace to match the pace of consumption. | |
;; That's due to backpressure from the consumer. | |
;; The channels used for ins and outs by default used a fixed buffer of size 10. | |
;; (per flow/create-flow's docs) | |
;; In this example, the producer sends 10 values to its "out" channel, | |
;; which c.a.flow handles putting to the consumer's "in" channel. | |
;; Once the consumer's "in" channel is saturated, puts by c.a.flow to that channel | |
;; are no longer immediate, so takes from the producer's "out" channel are no longer immediate, | |
;; so the producer's `number-maker-thread` doesn't/can't write to its channel as frequently. | |
(defn number-maker-thread [pace-ms] | |
(let [chan (async/chan)] | |
(async/thread | |
(loop [i 0] | |
(if (async/>!! chan i) | |
(do | |
(prn "Number maker produced a number:" i) | |
(Thread/sleep pace-ms) | |
(recur (inc i))) | |
(prn "Number maker stopped.")))) | |
chan)) | |
(def number-maker | |
(flow/map->step | |
{:describe (fn [] {:params {:pace-ms "How frequently to produce a value"} | |
:ins {} | |
:outs {:out "Output channel"} | |
:workload :io}) | |
:init (fn [{:keys [pace-ms]}] {::flow/in-ports {:in (number-maker-thread pace-ms)}}) | |
:transition (fn [state status] | |
(when (= status ::flow/stop) | |
(async/close! (-> state ::flow/in-ports :in)))) | |
:transform (fn [state _ n] [state {:out [n]}])})) | |
(def number-calculator | |
(flow/map->step | |
{:describe (fn [] {:params {:pace-ms "How frequently to consume a number"} | |
:ins {:in "Source of numbers"} | |
:workload :compute}) | |
:init (fn [state] (assoc state :total 0)) | |
:transition (fn [state status] state) | |
:transform (fn [state _ n] | |
(Thread/sleep (:pace-ms state 1000)) | |
(let [state (update state :total + n)] | |
(prn "Number calculator consumed a number:" n) | |
(prn "New total is:" (:total state)) | |
[state]))})) | |
(def backpressure-demo | |
{:procs {:producer {:proc (flow/process #'number-maker) | |
:args {:pace-ms 100} | |
;; :chan-opts {:out {:buf-or-n 1}} | |
} | |
:consumer {:proc (flow/process #'number-calculator) | |
:args {:pace-ms 1000} | |
;; :chan-opts {:in {:buf-or-n 1}} | |
}} | |
:conns [[[:producer :out] [:consumer :in]]]}) | |
(do | |
(def bp (flow/create-flow backpressure-demo)) | |
(flow/start bp) | |
(flow/resume bp)) | |
(flow/stop bp) | |
;; Long-running tasks (such as loading a DB) | |
;; Here we'll have 3 processes: | |
;; the first makes single 'items' which we are to write to the database | |
;; the second batches items into groups of 10 | |
;; the third 'writes to the database' - it'll actually just sleep for a long time. | |
;; `prn`s are getting garbled, so using telemere instead. | |
(require '[taoensso.telemere :as t]) | |
(defn db-item-maker-thread "Just writes unimportant values to `chan`. Returns `chan`." [] | |
(let [chan (async/chan)] | |
(async/thread | |
(loop [i 0] | |
(let [event {:event-id (random-uuid) | |
:event-count i | |
:timestamp (java.time.Instant/now)}] | |
(if (async/>!! chan event) | |
(do | |
#_(t/log! "DB Item Maker produced an item.") | |
(recur (inc i))) | |
(t/log! "DB Item Maker stopped."))))) | |
chan)) | |
(def db-item-maker | |
(flow/map->step | |
{:describe (fn [] {:outs {:out "Single items"}}) | |
:init (fn [state] {::flow/in-ports {:in (db-item-maker-thread)}}) | |
:transition (fn [state status] | |
(when (= status ::flow/stop) | |
(async/close! (-> state ::flow/in-ports :in)))) | |
:transform (fn [state _ n] [state {:out [n]}])})) | |
(def db-item-batcher | |
(flow/map->step | |
{:describe (fn [] {:ins {:in "Single items"} | |
:outs {:out "Batched items"}}) | |
:init (fn [state] (assoc state :batch [])) | |
:transition (fn [state status] state) | |
:transform (fn [{:keys [batch] :as state} _ item] | |
(let [batch (conj batch item)] | |
(if (>= (count batch) 10) | |
[(assoc state :batch []) {:out [batch]}] | |
[(assoc state :batch batch)])))})) | |
(def db-loader | |
(flow/map->step | |
{:describe (fn [] {:workload :io | |
:ins {:in "Batched items"}}) | |
:init (fn [state] state) | |
:transition (fn [state status] state) | |
:transform (fn [state _ batch] | |
(t/log! "DB Loader is loading a batch.") | |
(Thread/sleep 10000) | |
(t/log! "DB Loader loaded a batch.") | |
[state])})) | |
(def long-task-demo | |
{:procs {:item-maker {:proc (flow/process #'db-item-maker)} | |
:item-batcher {:proc (flow/process #'db-item-batcher) | |
:chan-opts {:in {:buf-or-n 1} | |
:out {:buf-or-n 1}}} | |
:batch-loader {:proc (flow/process #'db-loader) | |
:chan-opts {:in {:buf-or-n 1}}}} | |
:conns [[[:item-maker :out] [:item-batcher :in]] | |
[[:item-batcher :out] [:batch-loader :in]]]}) | |
(require '[clojure.core.async.flow-monitor :as mon]) | |
(do | |
(def longtask (flow/create-flow long-task-demo)) | |
(flow/start longtask) | |
(flow/resume longtask)) | |
(def flow-server (mon/start-server {:flow longtask})) | |
(mon/stop-server flow-server) | |
(flow/stop longtask) | |
;; With the above approach, you'll see that flow monitor can't ping the db loader | |
;; as often as it would like. Probably because db-loader's thread is occupied | |
;; with the db load. | |
;; I don't know what the solution is to that or whether it's actually a problem. | |
;; Inputs from I/O | |
;; We already looked at this in the first example. If you want to get inputs from s3, | |
;; a database, Kafka, the user typing things, or some other I/O place, you'd use | |
;; a process that sends those messages to itself on a ::flow/in-port. | |
;; Here's another example where we get input w/ read-line. | |
(defn input-request-thread [] | |
(let [chan (async/chan)] | |
(async/thread | |
(loop [] | |
(let [_ (println "Give me an input.") | |
in (read-line)] | |
(if in | |
(if (async/>!! chan in) | |
;; sleep to give printer a chance to do its work. | |
;; How might I restructure this code to avoid this? | |
(do (Thread/sleep 50) | |
(recur)) | |
(prn "Input request process stopped.")) | |
(prn "Didn't get input."))))) | |
chan)) | |
(def input-requester | |
(flow/map->step | |
{:describe (fn [] {:outs {:out "Output channel"} | |
:workload :io}) | |
:init (fn [state] {::flow/in-ports {:in (input-request-thread)}}) | |
:transition (fn [state status] | |
(when (= status ::flow/stop) | |
(async/close! (-> state ::flow/in-ports :in)))) | |
:transform (fn [state _ n] [state {:out [n]}])})) | |
(require '[clojure.string :as string]) | |
(def input-processor | |
(flow/map->step | |
{:describe (fn [] {:ins {:in "Source of inputs"} | |
:outs {:out "Processed inputs"} | |
:workload :compute}) | |
:transform (fn [state _ msg] | |
(let [msg (apply str (map-indexed (fn [idx s] | |
(if (even? idx) | |
(string/lower-case s) | |
(string/upper-case s))) | |
msg))] | |
[state {:out [msg]}]))})) | |
(def input-printer | |
(flow/map->step | |
{:describe (fn [] {:ins {:in "Source of inputs"} | |
:workload :io}) | |
:transform (fn [state _ msg] | |
(t/log! (str "\"" msg "\". That's what you sound like.")) | |
[state])})) | |
(def io-input-demo | |
{:procs {:requester {:proc (flow/process #'input-requester) | |
:chan-opts {:out {:buf-or-n 1}}} | |
:processor {:proc (flow/process #'input-processor)} | |
:printer {:proc (flow/process #'input-printer)}} | |
:conns [[[:requester :out] [:processor :in]] | |
[[:processor :out] [:printer :in]]]}) | |
(do | |
(def io-demo (flow/create-flow io-input-demo)) | |
(flow/start io-demo) | |
(flow/resume io-demo)) | |
(flow/stop io-demo) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment