Created
December 31, 2014 08:44
-
-
Save optevo/aa9e64b5bd732544d05d to your computer and use it in GitHub Desktop.
Select an execution mode for a task with one parameter (immediate, cached, delayed, agent send, agent send-off) and add priority queues.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns easy.scheduler | |
(:require [clojure.data.priority-map :refer :all])) | |
(def ^:dynamic *task* identity) | |
(def result (atom (transient {}))) | |
(def todo (atom (priority-map))) | |
(deftype Now [o] | |
clojure.lang.IDeref | |
(deref [this] o) | |
clojure.lang.IPending | |
(isRealized [this] true)) | |
(defn now [o] (Now. o)) | |
(defn next-todo! [] | |
(loop [t0 @todo] | |
(if-not (empty? t0) | |
(let [k (first (first t0)) t1 (dissoc t0 k) pass? (compare-and-set! todo t0 t1)] | |
(if pass? k (recur @todo)))))) | |
(defn process! [p] | |
(let [k (next-todo!)] | |
(if k (deliver p (*task* k))) k)) | |
(defn agent-task! [new-key send-fn priority] | |
(let [p (promise) a (agent p)] | |
(swap! result assoc! new-key p) | |
(swap! todo assoc new-key (if priority priority 5)) | |
(send-fn a process!))) | |
(defn add-task! [new-key mode & [option]] | |
(cond | |
(= :now mode) (*task* new-key) | |
(@result new-key) (throw (IllegalArgumentException. (str "Key already added: " new-key))) | |
(= :send mode) (agent-task! new-key send option) | |
(= :send-off mode) (agent-task! new-key send-off option) | |
(= :cache mode) (swap! result assoc! new-key (now (*task* new-key))) | |
(= :delay mode) (swap! result assoc! new-key (delay (*task* new-key))) | |
:else (throw (IllegalArgumentException. (str mode))))) | |
(defn add-data [k v] | |
(swap! result assoc! k (now v))) | |
(defn prioritize! [k f] | |
(loop [t0 @todo] | |
(if (contains? t0 k) | |
(let [p0 (t0 k) p1 (f p0) t1 (assoc t0 k p1) pass? (compare-and-set! todo t0 t1)] | |
(if-not pass? (recur @todo)))))) | |
(defn get-result [priority-fn k] | |
(let [r @result p (r k)] | |
(if (nil? p) (*task* k) | |
(if (or (realized? p) (delay? p)) @p | |
(do (prioritize! k priority-fn) @p))))) | |
(defn get-eventually [k] | |
(let [r @result p (r k)] | |
(if (nil? p) (*task* k) @p))) | |
(def get-now (partial get-result dec)) | |
;------------- Tests | |
(def workers (atom [])) | |
(defn restart! [] | |
(reset! todo (priority-map)) | |
(apply await @workers) | |
(reset! result (transient {})) | |
(reset! workers [])) | |
(defn calculation [x] | |
(loop [n x f 1] | |
(if (= n 1) f (recur (dec n) (* f n))))) | |
(defn simulate-io [_] | |
(Thread/sleep 3)) | |
(defn agent? [o] (instance? clojure.lang.Agent o)) | |
(defn exec-test [func mode] | |
(binding [*task* func] | |
(let [t0 (System/nanoTime) | |
r (range 1N 1000N) | |
k1 500N | |
k2 600N | |
_ (doseq [n r] (let [w (add-task! n mode (if (= n k2) 0))] | |
(if (agent? w) (swap! workers conj w)))) | |
t1 (System/nanoTime) | |
_ (get-now k1) | |
t2 (System/nanoTime) | |
_ (get-now k2) | |
t3 (System/nanoTime) | |
_ (apply await @workers) | |
t4 (System/nanoTime) | |
_ (get-now k1) | |
t5 (System/nanoTime) | |
_ (get-now k2) | |
t6 (System/nanoTime) | |
_ (doseq [n r] (get-now n)) | |
t7 (System/nanoTime) | |
_ (Thread/sleep 10) | |
_ (restart!) | |
t (fn [s f] (format "%10.2f" (/ (- f s) 100000.0)))] | |
(str (t t0 t1) " " (t t1 t2) " " (t t2 t3) " " (t t4 t5) " " (t t0 t4) " " (t t5 t6) " " (t t6 t7))))) | |
(exec-test calculation :now ) | |
(exec-test calculation :cache ) | |
(exec-test calculation :delay ) | |
(exec-test calculation :send ) | |
(exec-test simulate-io :now ) | |
(exec-test simulate-io :cache ) | |
(exec-test simulate-io :delay ) | |
(exec-test simulate-io :send ) | |
(exec-test simulate-io :send-off ) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment