Skip to content

Instantly share code, notes, and snippets.

@shishkin
Created October 22, 2014 10:37
Show Gist options
  • Save shishkin/cfd0617b2e1fd8783b52 to your computer and use it in GitHub Desktop.
Save shishkin/cfd0617b2e1fd8783b52 to your computer and use it in GitHub Desktop.
Clojure core.async configurable polling for a change of a non-reactive function
;### Tested with clojure "1.6.0" and core.async "0.1.346.0-17112a-alpha" ###
(use 'clojure.core.async)
;### Lib functions ###
(defn take-until
"Creates a channel that replicates values from the in channel until the
control channel either emits a value or closes."
[control in]
(let [out (chan)]
(go-loop []
(let [[value ch] (alts! [in control])]
(if (and
(not (nil? value))
(not= ch control)
(>! out value))
(recur)
(close! out))))
out))
(defn every
"Creates a channel that emits a result of calling f every millis."
[millis f]
(let [out (chan)]
(go-loop [value (f)]
(if (or
(nil? value)
(not (>! out value)))
(close! out)
(do
(<! (timeout millis))
(recur (f)))))
out))
(defn poll-until-changed
"Creates a channel which eventually emits a single value of calling
referentially non-transparent f once it has been changed from the value of
its initial call."
[f poll-millis timeout-millis]
(let [out (chan)
poll (every poll-millis f)
in (->> (unique poll)
(take-until (timeout timeout-millis)))]
(go
(let [init (<! in)
changed (<! in)
result (if (nil? changed) init changed)]
(close! poll)
(>! out result)
(close! out)))
out))
;### Usage ###
(def x (atom :old))
(defn foo []
(let [v @x]
(println "Value is" v)
v))
(defn start-process []
(go
(<! (timeout 2000))
(reset! x :new)))
(<!! (poll-until-changed foo 500 2000))
#_( Prints:
Value is :old
Value is :old
Value is :old
Value is :old
=> :old
Value is :old
)
(do
(start-process)
(<!! (poll-until-changed foo 500 4000)))
#_( Prints:
Value is :old
Value is :old
Value is :old
Value is :old
Value is :new
=> :new
Value is :new
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment