Created
March 13, 2021 22:12
-
-
Save echeran/98514f10ad087da95cb8759c94178187 to your computer and use it in GitHub Desktop.
Mulit-threaded Spark jobs via Clojure with Claypoole (custom pmap) + mapPartitions glue code
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(require '[claypoole.core :as tpool]) | |
;; | |
;; helper fns | |
;; | |
(defn your-multi-threaded-spark-fn | |
"Create a parallelized version of your-fn using a fixed-size threadpool. | |
This fn is created to fit the signature of .mapPartitions in Spark." | |
[extra-args-to-set-up-your-fn] | |
(fn | |
[elem-iter] | |
;; The "hard part" of this fn is creating an Iterator + Iterable out of a | |
;; Clojure seq. The thread pool management and custom parallel map (pmap) call | |
;; are made easy by the custom thread pool library. | |
(let [elem-seq (or (seq (iterator-seq elem-iter)) [])] | |
(if (empty? elem-seq) | |
;; If we are given an empty Spark partition, then return an empty Iterator/-able | |
(let [iter (java.util.Collections/emptyIterator)] | |
(proxy [ java.lang.Iterable ] | |
[] | |
(hasNext [] (.hasNext iter)) | |
(next [] (.next iter)) | |
(remove [] (.remove iter)) | |
(iterator [] iter))) | |
;; ...else, we now know we have a non-empty Spark partition. Implementing | |
;; the Iterator interface means creating code like in a 'loop' form when | |
;; iterating over a seq. | |
(let [out-iter-rest-cache (atom (tpool/pmap num-threads (partial your-fn any-args-go-here) elem-seq)) | |
out-iter (proxy [ java.util.Iterator | |
java.lang.Iterable ] | |
[] | |
(remove [] (throw Exception "You don't want to call 'remove' on an iterator from a Spark mapPartitions call!")) | |
(hasNext [] (boolean (seq @out-iter-rest-cache))) | |
(next [] | |
(let [out-iter-next (first @out-iter-rest-cache)] | |
(do | |
(swap! out-iter-rest-cache rest)) | |
out-iter-next)) | |
(iterator [] this))] | |
out-iter))))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment