Last active
September 1, 2017 03:32
-
-
Save karlmikko/6cf998ac10a533c480acd460505f3f06 to your computer and use it in GitHub Desktop.
yet another pmap
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
;; You can use this as a drop in replacement for map | |
;; like pmap but with a bounded number of threads that is shared with child fmap invocations | |
;; making this safe to use in recusive algorithms like tree walking. Dead locks are avaoided | |
;; by continuing processing on the current thread if the pool is full. | |
;; | |
;; You can also specify the window size amouth that fmap looks ahead | |
;; this make fmap semi-lazy like pmap. | |
;; | |
;; You don't need to set options - it will default to the same thread count as pmap (2 + N-CPUs). | |
;; | |
;; Implemented using futures and an atom to count the number of futures created. | |
;; Due to the nature of atoms in clojure, this means the actual number of futures that can be created | |
;; may be larger than the pool-size provided. I don't expect this to be too large though. | |
;; | |
(ns future-map) | |
(def ^{:dynamic true} *future-counter* nil) | |
(def ^{:dynamic true} *pool-size* nil) | |
(def ^{:dynamic true} *window-size* nil) | |
(defn processors | |
[] | |
(.. Runtime getRuntime availableProcessors)) | |
(defmacro start-options | |
[{:keys [pool-size window-size size new-counter]} & body] | |
`(let [psize# (or ~pool-size ~size *pool-size* (+ 2 (processors))) | |
wsize# (or ~window-size ~size *window-size* psize#) | |
future-counter# (or (when ~new-counter (atom 0)) *future-counter* (atom 0))] | |
(with-bindings {#'*pool-size* psize# | |
#'*window-size* (min psize# wsize#) | |
#'*future-counter* future-counter#} | |
~@body))) | |
(defn- future-deref | |
[x] | |
(if (future? x) | |
(deref x) | |
x)) | |
(defn- window-ahead | |
([n coll] | |
(let [step (fn step [[x & xs :as vs] fs] | |
(lazy-seq | |
(if-let [s (seq fs)] | |
(cons (future-deref x) (step xs (rest s))) | |
(map future-deref vs))))] | |
(step coll (drop n coll))))) | |
(defn fmap | |
[f coll] | |
(start-options | |
{} | |
(let [future-counter *future-counter* | |
pool-size *pool-size* | |
window-size *window-size*] | |
(->> coll | |
(map (fn internal-mapper | |
[x] | |
(if (< @future-counter pool-size) | |
(do | |
(swap! future-counter inc) | |
(future (let [r (f x)] | |
(swap! future-counter dec) | |
r))) | |
(f x)))) | |
(window-ahead window-size))))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Example usage:
The last example may seem odd as one might expect the elapsed time to be 3 seconds. When in fact, 1 future is created and work continues on in the current thread. So you have a concurrency level of 2.