Last active
November 28, 2017 03:03
-
-
Save reborg/3056a50af4f977b280b9b6f526670add to your computer and use it in GitHub Desktop.
Clojure parallel-lazy merge-sort
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
;; PARALLEL LAZY MERGE-SORT | |
;; Allows large datasets (that would otherwise not fit into memory) | |
;; to be sorted in parallel on a single machine. | |
;; Data to fetch is identified by a range of IDs. IDs are split into | |
;; chunks that are sent in parallel to a fork-join thread pool | |
;; (using reducers). A protocol allows to define a policy to fetch | |
;; the data for the current ID range. The chunk is sorted and saved | |
;; to disk. A file handle is returned from each thread pointing at | |
;; a temp file containing the sorted chunk. The list of file handles | |
;; is finally used to merge pre-sorted chunks lazily while | |
;; maintaining order. The provided fetch-ids is just for test and | |
;; generates shuffled integer ranges. | |
(require '[clojure.java.io :as io]) | |
(require '[clojure.core.reducers :as r]) | |
(defn- save-chunk! [data] | |
(let [file (java.io.File/createTempFile "mergesort-" ".tmp")] | |
(with-open [fw (io/writer file)] | |
(binding [*out* fw] | |
(pr data) | |
file)))) | |
(defprotocol DataProvider | |
(fetch-ids [id-range])) | |
(defn- process-leaf [id-range sortf] | |
(-> (fetch-ids id-range) | |
sortf | |
save-chunk! | |
vector)) | |
(defrecord IdRange [from to] | |
r/CollFold | |
(coll-fold [{:keys [from to] :as id-range} n mergef sortf] | |
(if (<= (- to from) n) | |
(process-leaf id-range sortf) | |
(let [half (+ from (quot (- to from) 2)) | |
r1 (IdRange. from half) | |
r2 (IdRange. half to) | |
fc (fn [id-range] #(r/fold n mergef sortf id-range))] | |
(#'r/fjinvoke | |
#(let [f1 (fc r1) | |
t2 (#'r/fjtask (fc r2))] | |
(#'r/fjfork t2) | |
(mergef (f1) (#'r/fjjoin t2)))))))) | |
(extend-type IdRange | |
DataProvider | |
(fetch-ids [id-range] | |
(shuffle (range (:from id-range) (:to id-range))))) | |
(defn sort-all | |
"Lazily merge already sorted collections. Maintains order | |
through given comparator (or compare by default)." | |
([colls] | |
(sort-all compare colls)) | |
([cmp colls] | |
(lazy-seq | |
(if (some identity (map first colls)) | |
(let [[[winner & losers] & others] (sort-by first cmp colls)] | |
(cons winner (sort-all cmp (if losers (conj others losers) others)))))))) | |
(defn- load-chunk [fname] | |
(read-string (slurp fname))) | |
(defn psort | |
([id-range] | |
(psort compare id-range)) | |
([cmp id-range] | |
(->> (r/fold 512 concat (partial sort cmp) id-range) | |
(map load-chunk) | |
(sort-all cmp)))) | |
;; Example usage: | |
;; (take 10 (psort (IdRange. 0 10000000))) | |
;; 13 secs later on my machine: | |
;; (0 1 2 3 4 5 6 7 8 9) | |
;; (take 10 (psort > (IdRange. 0 1000))) | |
;; (999 998 997 996 995 994 993 992 991 990) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment