Last active
January 19, 2019 05:27
-
-
Save lnostdal/26f7a83a037e3e4eecfe4c81bf58d4c9 to your computer and use it in GitHub Desktop.
Clojure for fast processing of streams of data via LAZY-SEQ and SEQUE
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
;; www.Quanto.ga | |
;; | |
;; * You have a stream of data where generating each new element for the stream takes some time. | |
;; * You consume and do some calculations on each element from the stream; this too takes some time. | |
;; * When the consumer is done with its calculations for a element (or chunk of elements!) you do | |
;; not want it to wait around for the code that fetches new elements to finish fetching a new element | |
;; -- because this could be done in the background while you where doing the calculations. | |
;; | |
;; A way to deal with this is to use SEQUE which will keep production N steps in advance of consumption | |
;; via a background thread that works on Clojue seqs. | |
;; The starting scenario, without use of SEQUE: | |
(defn blah1 | |
([] (blah1 0)) | |
([^long i] | |
(lazy-seq | |
(Thread/sleep 250) | |
(println "i:" i) | |
(cons i (blah1 (inc i)))))) | |
quantataraxia.core> (time (run! #(do (Thread/sleep 1000) (println "output:" %)) | |
(take 5 (blah1)))) | |
i: 0 | |
output: 0 | |
i: 1 | |
output: 1 | |
i: 2 | |
output: 2 | |
i: 3 | |
output: 3 | |
i: 4 | |
output: 4 | |
"Elapsed time: 6259.611617 msecs" | |
;; We cannot simply wrap the BLAH1 body in a SEQUE form here, because then we'd spawn a new | |
;; background worker for each step since BLAH1 is calling itself. Instead we need to setup | |
;; an inner helper Fn which we call instead -- the seq this Fn returns when called is then | |
;; wrapped in a call to SEQUE, like this: | |
(defn blah2 | |
([] (blah2 0)) | |
([^long i] | |
(seque 2 | |
((fn inner [^long i] | |
(lazy-seq | |
(Thread/sleep 250) | |
(println "i:" i) | |
(cons i (inner (inc i))))) | |
i)))) | |
quantataraxia.core> (time (run! #(do (Thread/sleep 1000) (println "output:" %)) | |
(take 5 (blah2)))) | |
i: 0 | |
i: 1 | |
i: 2 | |
i: 3 | |
i: 4 | |
output: 0 | |
i: 5 | |
output: 1 | |
i: 6 | |
output: 2 | |
i: 7 | |
output: 3 | |
i: 8 | |
output: 4 | |
"Elapsed time: 5509.658327 msecs" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment