Last active
January 1, 2016 15:29
-
-
Save gerritjvv/8164850 to your computer and use it in GitHub Desktop.
There are many times where you need to buffer up a series off results and then perform an operation on them, and if the count is not reached
on a predefined timeout do the operation with the results collected. For my use case I'm writing a kafka producer and want messages received on the a send function to be buffered before sending to kafka. Th…
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(require '[clojure.core.async :refer [go alts! >! <! >!! <!! chan timeout]]) | |
(defn buffered-chan | |
"Reads from ch-source and if either timeout or the buffer-count has been | |
read the result it sent to the channel thats returned from this function" | |
([ch-source buffer-count timeout-ms] | |
(buffered-chan ch-source buffer-count timeout-ms 1)) | |
([ch-source buffer-count timeout-ms buffer-or-n] | |
(let [ch-target (chan buffer-or-n)] | |
(go | |
(loop [buff [] t (timeout timeout-ms)] | |
(let [[v _] (alts! [ch-source t]) | |
b (if v (conj buff v) buff)] | |
(if (or (>= (count b) buffer-count) (not v)) | |
(do | |
(if (>= (count b) 0) | |
(>! ch-target b)) ;send the buffer to the channel | |
(recur [] (timeout timeout-ms))) ;create a new buffer and new timeout | |
(recur b t))))) ;pass the new buffer and the current timeout | |
ch-target))) | |
;; test | |
(let [ch-source (chan) | |
buff-ch (buffered-chan ch-source 10 50000 11)] | |
(go | |
(dotimes [i 100] | |
(>! ch-source i))) | |
(dotimes [i 10] | |
(let [v (<!! buff-ch)] | |
(prn "got " v))) | |
) | |
;; "got " [0 1 2 3 4 5 6 7 8 9] | |
;; "got " [10 11 12 13 14 15 16 17 18 19] | |
;; "got " [20 21 22 23 24 25 26 27 28 29] | |
;; "got " [30 31 32 33 34 35 36 37 38 39] | |
;; "got " [40 41 42 43 44 45 46 47 48 49] | |
;; "got " [50 51 52 53 54 55 56 57 58 59] | |
;; "got " [60 61 62 63 64 65 66 67 68 69] | |
;; "got " [70 71 72 73 74 75 76 77 78 79] | |
;; "got " [80 81 82 83 84 85 86 87 88 89] | |
;; "got " [90 91 92 93 94 95 96 97 98 99] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment