Last active
December 17, 2015 14:59
-
-
Save mjwillson/5628308 to your computer and use it in GitHub Desktop.
scoped streams in clojure
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns streams.core | |
(:require [clojure.java.io :as io])) | |
(def END (Object.)) | |
(defprotocol Stream | |
(with-generator [_ callback] | |
"Should call callback with a generator function, finally closing any | |
resources associated with the stream after the callback returns. | |
A 'generator' here is a function with no arguments which returns | |
a new value each time it's called, or the special END object if | |
it's exhausted. | |
The idea is that the generators (which are stateful) are an | |
implementation detail which the end-user needn't directly concern | |
themselves with, instead can use combinators here to compose and | |
consume streams in a functional style.")) | |
(extend-protocol Stream | |
Iterable | |
(with-generator [iterable callback] | |
(let [iterator (.iterator iterable)] | |
(callback (fn [] (if (.hasNext iterator) | |
(.next iterator) | |
END)))))) | |
(defn lines-of-file-stream | |
[file] | |
(reify Stream | |
(with-generator [_ callback] | |
(with-open [reader (io/reader file)] | |
(callback #(or (.readLine reader) END)))))) | |
(defn map | |
[f stream] | |
(reify Stream | |
(with-generator [_ callback] | |
(with-generator stream | |
(fn [generator] | |
(callback #(let [next (generator)] | |
(if (identical? next END) | |
END | |
(f next))))))))) | |
(defn filter | |
[pred stream] | |
(reify Stream | |
(with-generator [_ callback] | |
(with-generator stream | |
(fn [generator] | |
(callback #(let [next (generator)] | |
(cond | |
(identical? next END) END | |
(pred next) next | |
true (recur))))))))) | |
;; Maybe could supply a CollReduce implementation here? | |
;; Also note: parallel versions of reduce or fold could be implemented in various ways | |
;; ontop of the generator interface. | |
(defn reduce | |
"Applies a reduction over the stream. Unlike with a lazy sequence | |
there is no possibility here of holding on to the head." | |
[identity reduce-fn stream] | |
(with-generator stream | |
(fn [generator] | |
(loop [running-total identity] | |
(let [next (generator)] | |
(if (identical? next END) | |
running-total | |
(recur (reduce-fn running-total next)))))))) | |
(defn vec | |
"Realises the whole stream in memory as a vector" | |
[stream] | |
(persistent! (reduce (transient []) conj! stream))) | |
(defn- generator-seq | |
[generator] | |
(lazy-seq (let [next (generator)] | |
(when-not (identical? next END) | |
(cons next (generator-seq generator)))))) | |
(defn with-lazy-seq | |
[stream callback] | |
(with-generator stream | |
(fn [generator] | |
(callback (generator-seq generator))))) | |
(comment | |
;; e.g.: | |
(->> (lines-of-file-stream "/tmp/foob") | |
(map #(.toLowerCase %)) | |
(filter #(< (.length %) 5)) | |
(reduce {} #(merge-with + %1 {%2 1})))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment