Last active
October 14, 2018 13:51
-
-
Save dazld/424fe19380f1a1227e0024af319b758d to your computer and use it in GitHub Desktop.
accumulate values for processing in another thread
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns batches.core | |
(:require [clojure.core.async :as a])) | |
(defprotocol Accumulating | |
(add [this v]) | |
(stop [this])) | |
(defn accumulate | |
"Periodically invoke action with values, which can be conj'd onto the store with add. Allows the invocation to be cancelled´´ | |
by calling (stop (accumulate identity))" | |
[action time-ms] | |
(let [stop (a/chan 1) | |
push (a/chan) | |
timer (a/chan) | |
_ (a/go-loop [] | |
(a/<! (a/timeout time-ms)) | |
(a/>! timer :go) | |
(recur))] | |
(a/go-loop [vals []] | |
(let [[v ch] (a/alts! [push timer stop] :priority true)] | |
(condp = ch | |
push (recur (conj vals v)) | |
timer (do | |
(action vals) | |
(recur [])) | |
stop vals))) | |
(reify | |
Accumulating | |
(add [_ v] (a/put! push v)) | |
(stop [_] (a/put! stop :stop))))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment