Skip to content

Instantly share code, notes, and snippets.

@tomconnors
Created July 9, 2025 18:23
Show Gist options
  • Save tomconnors/245fb69ed757b34502c8d57637db8de2 to your computer and use it in GitHub Desktop.
Save tomconnors/245fb69ed757b34502c8d57637db8de2 to your computer and use it in GitHub Desktop.
core.async.flow examples
;; There are some patterns with c.a.flow that weren't immediately obvious to me so I made this gist to
;; figure them out.
;; Those patterns are:
;; - backpressure across processes
;; - long-running tasks
;; - getting inputs for the first process in the flow
(require '[clojure.core.async :as async]
'[clojure.core.async.flow :as flow]
'[clojure.pprint :as pp]
'[clojure.datafy :as d]
'[clojure.walk :as w])
;; Backpressure across processes:
;; The first process produces values very quickly.
;; The second process handles them more slowly.
;; When running this code, you'll see the number-maker produce 10 values immediately,
;; then slows its pace to match the pace of consumption.
;; That's due to backpressure from the consumer.
;; The channels used for ins and outs by default used a fixed buffer of size 10.
;; (per flow/create-flow's docs)
;; In this example, the producer sends 10 values to its "out" channel,
;; which c.a.flow handles putting to the consumer's "in" channel.
;; Once the consumer's "in" channel is saturated, puts by c.a.flow to that channel
;; are no longer immediate, so takes from the producer's "out" channel are no longer immediate,
;; so the producer's `number-maker-thread` doesn't/can't write to its channel as frequently.
(defn number-maker-thread [pace-ms]
(let [chan (async/chan)]
(async/thread
(loop [i 0]
(if (async/>!! chan i)
(do
(prn "Number maker produced a number:" i)
(Thread/sleep pace-ms)
(recur (inc i)))
(prn "Number maker stopped."))))
chan))
(def number-maker
(flow/map->step
{:describe (fn [] {:params {:pace-ms "How frequently to produce a value"}
:ins {}
:outs {:out "Output channel"}
:workload :io})
:init (fn [{:keys [pace-ms]}] {::flow/in-ports {:in (number-maker-thread pace-ms)}})
:transition (fn [state status]
(when (= status ::flow/stop)
(async/close! (-> state ::flow/in-ports :in))))
:transform (fn [state _ n] [state {:out [n]}])}))
(def number-calculator
(flow/map->step
{:describe (fn [] {:params {:pace-ms "How frequently to consume a number"}
:ins {:in "Source of numbers"}
:workload :compute})
:init (fn [state] (assoc state :total 0))
:transition (fn [state status] state)
:transform (fn [state _ n]
(Thread/sleep (:pace-ms state 1000))
(let [state (update state :total + n)]
(prn "Number calculator consumed a number:" n)
(prn "New total is:" (:total state))
[state]))}))
(def backpressure-demo
{:procs {:producer {:proc (flow/process #'number-maker)
:args {:pace-ms 100}
;; :chan-opts {:out {:buf-or-n 1}}
}
:consumer {:proc (flow/process #'number-calculator)
:args {:pace-ms 1000}
;; :chan-opts {:in {:buf-or-n 1}}
}}
:conns [[[:producer :out] [:consumer :in]]]})
(do
(def bp (flow/create-flow backpressure-demo))
(flow/start bp)
(flow/resume bp))
(flow/stop bp)
;; Long-running tasks (such as loading a DB)
;; Here we'll have 3 processes:
;; the first makes single 'items' which we are to write to the database
;; the second batches items into groups of 10
;; the third 'writes to the database' - it'll actually just sleep for a long time.
;; `prn`s are getting garbled, so using telemere instead.
(require '[taoensso.telemere :as t])
(defn db-item-maker-thread "Just writes unimportant values to `chan`. Returns `chan`." []
(let [chan (async/chan)]
(async/thread
(loop [i 0]
(let [event {:event-id (random-uuid)
:event-count i
:timestamp (java.time.Instant/now)}]
(if (async/>!! chan event)
(do
#_(t/log! "DB Item Maker produced an item.")
(recur (inc i)))
(t/log! "DB Item Maker stopped.")))))
chan))
(def db-item-maker
(flow/map->step
{:describe (fn [] {:outs {:out "Single items"}})
:init (fn [state] {::flow/in-ports {:in (db-item-maker-thread)}})
:transition (fn [state status]
(when (= status ::flow/stop)
(async/close! (-> state ::flow/in-ports :in))))
:transform (fn [state _ n] [state {:out [n]}])}))
(def db-item-batcher
(flow/map->step
{:describe (fn [] {:ins {:in "Single items"}
:outs {:out "Batched items"}})
:init (fn [state] (assoc state :batch []))
:transition (fn [state status] state)
:transform (fn [{:keys [batch] :as state} _ item]
(let [batch (conj batch item)]
(if (>= (count batch) 10)
[(assoc state :batch []) {:out [batch]}]
[(assoc state :batch batch)])))}))
(def db-loader
(flow/map->step
{:describe (fn [] {:workload :io
:ins {:in "Batched items"}})
:init (fn [state] state)
:transition (fn [state status] state)
:transform (fn [state _ batch]
(t/log! "DB Loader is loading a batch.")
(Thread/sleep 10000)
(t/log! "DB Loader loaded a batch.")
[state])}))
(def long-task-demo
{:procs {:item-maker {:proc (flow/process #'db-item-maker)}
:item-batcher {:proc (flow/process #'db-item-batcher)
:chan-opts {:in {:buf-or-n 1}
:out {:buf-or-n 1}}}
:batch-loader {:proc (flow/process #'db-loader)
:chan-opts {:in {:buf-or-n 1}}}}
:conns [[[:item-maker :out] [:item-batcher :in]]
[[:item-batcher :out] [:batch-loader :in]]]})
(require '[clojure.core.async.flow-monitor :as mon])
(do
(def longtask (flow/create-flow long-task-demo))
(flow/start longtask)
(flow/resume longtask))
(def flow-server (mon/start-server {:flow longtask}))
(mon/stop-server flow-server)
(flow/stop longtask)
;; With the above approach, you'll see that flow monitor can't ping the db loader
;; as often as it would like. Probably because db-loader's thread is occupied
;; with the db load.
;; I don't know what the solution is to that or whether it's actually a problem.
;; Inputs from I/O
;; We already looked at this in the first example. If you want to get inputs from s3,
;; a database, Kafka, the user typing things, or some other I/O place, you'd use
;; a process that sends those messages to itself on a ::flow/in-port.
;; Here's another example where we get input w/ read-line.
(defn input-request-thread []
(let [chan (async/chan)]
(async/thread
(loop []
(let [_ (println "Give me an input.")
in (read-line)]
(if in
(if (async/>!! chan in)
;; sleep to give printer a chance to do its work.
;; How might I restructure this code to avoid this?
(do (Thread/sleep 50)
(recur))
(prn "Input request process stopped."))
(prn "Didn't get input.")))))
chan))
(def input-requester
(flow/map->step
{:describe (fn [] {:outs {:out "Output channel"}
:workload :io})
:init (fn [state] {::flow/in-ports {:in (input-request-thread)}})
:transition (fn [state status]
(when (= status ::flow/stop)
(async/close! (-> state ::flow/in-ports :in))))
:transform (fn [state _ n] [state {:out [n]}])}))
(require '[clojure.string :as string])
(def input-processor
(flow/map->step
{:describe (fn [] {:ins {:in "Source of inputs"}
:outs {:out "Processed inputs"}
:workload :compute})
:transform (fn [state _ msg]
(let [msg (apply str (map-indexed (fn [idx s]
(if (even? idx)
(string/lower-case s)
(string/upper-case s)))
msg))]
[state {:out [msg]}]))}))
(def input-printer
(flow/map->step
{:describe (fn [] {:ins {:in "Source of inputs"}
:workload :io})
:transform (fn [state _ msg]
(t/log! (str "\"" msg "\". That's what you sound like."))
[state])}))
(def io-input-demo
{:procs {:requester {:proc (flow/process #'input-requester)
:chan-opts {:out {:buf-or-n 1}}}
:processor {:proc (flow/process #'input-processor)}
:printer {:proc (flow/process #'input-printer)}}
:conns [[[:requester :out] [:processor :in]]
[[:processor :out] [:printer :in]]]})
(do
(def io-demo (flow/create-flow io-input-demo))
(flow/start io-demo)
(flow/resume io-demo))
(flow/stop io-demo)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment