Last active
August 29, 2015 14:10
-
-
Save tcoupland/83c47d8ce2f78570a54c to your computer and use it in GitHub Desktop.
A channel modifier that creates batches of events either when a given number of events have been put into it or when a given time period has expired.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(defn partition-or-time | |
"Returns a channel that will either contain vectors of n items taken from ch or | |
if beat-rate millis elapses then a vector with the available items. The | |
final vector in the return channel may be smaller than n if ch closed before | |
the vector could be completely filled." | |
[n ch beat-rate buf-or-n] | |
(let [out (chan buf-or-n)] | |
(go (loop [arr (make-array Object n) | |
idx 0 | |
beat (timeout beat-rate)] | |
(let [[v c] (alts! [ch beat])] | |
(if (= c beat) | |
(do | |
(if (> idx 0) | |
(do (>! out (vec (take idx arr))) | |
(recur (make-array Object n) | |
0 | |
(timeout beat-rate))) | |
(recur arr idx (timeout beat-rate)))) | |
(if-not (nil? v) | |
(do (aset ^objects arr idx v) | |
(let [new-idx (inc idx)] | |
(if (< new-idx n) | |
(recur arr new-idx beat) | |
(do (>! out (vec arr)) | |
(recur (make-array Object n) 0 (timeout beat-rate)))))) | |
(do (when (> idx 0) | |
(let [narray (make-array Object idx)] | |
(System/arraycopy arr 0 narray 0 idx) | |
(>! out (vec narray)))) | |
(close! out))))))) | |
out)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment