Created
May 29, 2015 17:04
-
-
Save MichaelDrogalis/0c734b804d02b4e87b53 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| diff --git a/src/onyx/api.clj b/src/onyx/api.clj | |
| index 504fdb1..00c41fd 100644 | |
| --- a/src/onyx/api.clj | |
| +++ b/src/onyx/api.clj | |
| @@ -2,7 +2,7 @@ | |
| (:require [clojure.string :refer [split]] | |
| [clojure.core.async :refer [chan alts!! >!! <!! close!]] | |
| [com.stuartsierra.component :as component] | |
| - [taoensso.timbre :refer [warn fatal]] | |
| + [taoensso.timbre :refer [warn fatal error]] | |
| [onyx.log.entry :refer [create-log-entry]] | |
| [onyx.system :as system] | |
| [onyx.extensions :as extensions] | |
| @@ -29,6 +29,23 @@ | |
| Double/POSITIVE_INFINITY)}) | |
| tasks))) | |
| +(defn ^{:no-doc true} min-required-peers [catalog tasks] | |
| + (into | |
| + {} | |
| + (map | |
| + (fn [task] | |
| + {(:id task) | |
| + (or (:onyx/min-peers (planning/find-task catalog (:name task))) 1)}) | |
| + tasks))) | |
| + | |
| +(defn ^{:no-doc true} flux-policies [catalog tasks] | |
| + (->> tasks | |
| + (map (fn [task] | |
| + (vector (:id task) | |
| + (:onyx/flux-policy (planning/find-task catalog (:name task)))))) | |
| + (filter second) | |
| + (into {}))) | |
| + | |
| (defn ^{:added "0.6.0"} map-set-workflow->workflow | |
| "Converts a workflow in format: | |
| {:a #{:b :c} | |
| @@ -88,12 +105,20 @@ | |
| scheduler (:task-scheduler job) | |
| sat (saturation (:catalog job)) | |
| task-saturation (task-saturation (:catalog job) tasks) | |
| + min-reqs (min-required-peers (:catalog job) tasks) | |
| + task-flux-policies (flux-policies (:catalog job) tasks) | |
| input-task-ids (find-input-tasks (:catalog job) tasks) | |
| output-task-ids (find-output-tasks (:catalog job) tasks) | |
| exempt-task-ids (find-exempt-tasks tasks (:acker/exempt-tasks job)) | |
| - args {:id id :tasks task-ids :task-scheduler scheduler | |
| - :saturation sat :task-saturation task-saturation | |
| - :inputs input-task-ids :outputs output-task-ids | |
| + args {:id id | |
| + :tasks task-ids | |
| + :task-scheduler scheduler | |
| + :saturation sat | |
| + :task-saturation task-saturation | |
| + :min-required-peers min-reqs | |
| + :flux-policies task-flux-policies | |
| + :inputs input-task-ids | |
| + :outputs output-task-ids | |
| :exempt-tasks exempt-task-ids | |
| :acker-percentage (or (:acker/percentage job) 1) | |
| :acker-exclude-inputs (or (:acker/exempt-input-tasks? job) false) | |
| @@ -102,11 +127,15 @@ | |
| (create-log-entry :submit-job args))) | |
| (defn ^{:added "0.6.0"} submit-job [config job] | |
| - (validator/validate-peer-config config) | |
| + (try (validator/validate-peer-config config) | |
| + (validator/validate-job (assoc job :workflow (:workflow job))) | |
| + (validator/validate-flow-conditions (:flow-conditions job) (:workflow job)) | |
| + (validator/validate-lifecycles (:lifecycles job) (:catalog job)) | |
| + (catch Throwable t | |
| + (println t) | |
| + (error t) | |
| + (throw t))) | |
| (let [id (java.util.UUID/randomUUID) | |
| - _ (validator/validate-job (assoc job :workflow (:workflow job))) | |
| - _ (validator/validate-flow-conditions (:flow-conditions job) (:workflow job)) | |
| - _ (validator/validate-lifecycles (:lifecycles job) (:catalog job)) | |
| tasks (planning/discover-tasks (:catalog job) (:workflow job)) | |
| entry (create-submit-job-entry id config job tasks) | |
| client (component/start (system/onyx-client config))] | |
| @@ -165,8 +194,7 @@ | |
| (extensions/write-log-entry (:log client) entry) | |
| (loop [replica (extensions/subscribe-to-log (:log client) ch)] | |
| - (let [position (<!! ch) | |
| - entry (extensions/read-log-entry (:log client) position) | |
| + (let [entry (<!! ch) | |
| new-replica (extensions/apply-log-entry entry replica)] | |
| (if (and (= (:fn entry) :gc) (= (:id (:args entry)) id)) | |
| (let [diff (extensions/replica-diff entry replica new-replica)] | |
| @@ -181,8 +209,7 @@ | |
| (let [client (component/start (system/onyx-client config)) | |
| ch (chan 100)] | |
| (loop [replica (extensions/subscribe-to-log (:log client) ch)] | |
| - (let [position (<!! ch) | |
| - entry (extensions/read-log-entry (:log client) position) | |
| + (let [entry (<!! ch) | |
| new-replica (extensions/apply-log-entry entry replica)] | |
| (if-not (some #{job-id} (:completed-jobs new-replica)) | |
| (recur new-replica) | |
| diff --git a/src/onyx/extensions.clj b/src/onyx/extensions.clj | |
| index 2834c75..e3e88c3 100644 | |
| --- a/src/onyx/extensions.clj | |
| +++ b/src/onyx/extensions.clj | |
| @@ -41,6 +41,9 @@ | |
| (defmulti peer-site (fn [messenger] (type messenger))) | |
| +(defmulti get-peer-site (fn [replica peer] | |
| + (:onyx.messaging/impl (:messaging replica)))) | |
| + | |
| (defmulti open-peer-site (fn [messenger assigned] (type messenger))) | |
| (defmulti connect-to-peer (fn [messenger event peer-site] (type messenger))) | |
| @@ -57,3 +60,6 @@ | |
| (defmulti internal-complete-message (fn [messenger event id peer-link] (type messenger))) | |
| (defmulti internal-retry-message (fn [messenger event id peer-link] (type messenger))) | |
| + | |
| +(defmethod open-peer-site :default | |
| + [_ _] "localhost") | |
| \ No newline at end of file | |
| diff --git a/src/onyx/log/commands/abort_join_cluster.clj b/src/onyx/log/commands/abort_join_cluster.clj | |
| index dc8b8c6..fe34cfb 100644 | |
| --- a/src/onyx/log/commands/abort_join_cluster.clj | |
| +++ b/src/onyx/log/commands/abort_join_cluster.clj | |
| @@ -21,18 +21,18 @@ | |
| (when (or (seq prepared) (seq accepted)) | |
| {:aborted (or (first prepared) (first accepted))}))) | |
| -(defmethod extensions/fire-side-effects! :abort-join-cluster | |
| - [{:keys [args]} old new diff state] | |
| - ;; Abort back-off/retry | |
| - (when (= (:id args) (:id state)) | |
| - (Thread/sleep (or (:onyx.peer/join-failure-back-off (:opts state)) 250))) | |
| - state) | |
| - | |
| (defmethod extensions/reactions :abort-join-cluster | |
| [{:keys [args]} old new diff peer-args] | |
| - (when (= (:id args) (:id peer-args)) | |
| + (when (and (= (:id args) (:id peer-args)) | |
| + (not (:onyx.peer/try-join-once? (:peer-opts (:messenger peer-args))))) | |
| [{:fn :prepare-join-cluster | |
| :args {:joiner (:id peer-args) | |
| :peer-site (extensions/peer-site (:messenger peer-args))} | |
| :immediate? true}])) | |
| +(defmethod extensions/fire-side-effects! :abort-join-cluster | |
| + [{:keys [args]} old new diff state] | |
| + ;; Abort back-off/retry | |
| + (when (= (:id args) (:id state)) | |
| + (Thread/sleep (or (:onyx.peer/join-failure-back-off (:opts state)) 250))) | |
| + state) | |
| diff --git a/src/onyx/log/commands/accept_join_cluster.clj b/src/onyx/log/commands/accept_join_cluster.clj | |
| index ed7ed1a..95632bd 100644 | |
| --- a/src/onyx/log/commands/accept_join_cluster.clj | |
| +++ b/src/onyx/log/commands/accept_join_cluster.clj | |
| @@ -1,6 +1,7 @@ | |
| (ns onyx.log.commands.accept-join-cluster | |
| (:require [clojure.core.async :refer [chan go >! <! >!! close!]] | |
| [clojure.data :refer [diff]] | |
| + [clojure.set :refer [union difference map-invert]] | |
| [taoensso.timbre :refer [info] :as timbre] | |
| [onyx.extensions :as extensions] | |
| [onyx.log.commands.common :as common] | |
| @@ -10,23 +11,27 @@ | |
| [{:keys [args]} replica] | |
| (let [{:keys [accepted-joiner accepted-observer]} args | |
| target (or (get-in replica [:pairs accepted-observer]) | |
| - accepted-observer)] | |
| - (-> replica | |
| - (update-in [:pairs] merge {accepted-observer accepted-joiner}) | |
| - (update-in [:pairs] merge {accepted-joiner target}) | |
| - (update-in [:accepted] dissoc accepted-observer) | |
| - (update-in [:peers] vec) | |
| - (update-in [:peers] conj accepted-joiner) | |
| - (assoc-in [:peer-state accepted-joiner] :idle) | |
| - (reconfigure-cluster-workload)))) | |
| + accepted-observer) | |
| + still-joining? (get (map-invert (:accepted replica)) accepted-joiner)] | |
| + (if still-joining? | |
| + (-> replica | |
| + (update-in [:pairs] merge {accepted-observer accepted-joiner}) | |
| + (update-in [:pairs] merge {accepted-joiner target}) | |
| + (update-in [:accepted] dissoc accepted-observer) | |
| + (update-in [:peers] vec) | |
| + (update-in [:peers] conj accepted-joiner) | |
| + (assoc-in [:peer-state accepted-joiner] :idle) | |
| + (reconfigure-cluster-workload)) | |
| + replica))) | |
| (defmethod extensions/replica-diff :accept-join-cluster | |
| [entry old new] | |
| - (let [rets (first (diff (:accepted old) (:accepted new)))] | |
| - (assert (<= (count rets) 1)) | |
| - (when (seq rets) | |
| - {:observer (first (keys rets)) | |
| - :subject (first (vals rets))}))) | |
| + (if-not (= old new) | |
| + (let [rets (first (diff (:accepted old) (:accepted new)))] | |
| + (assert (<= (count rets) 1)) | |
| + (when (seq rets) | |
| + {:observer (first (keys rets)) | |
| + :subject (first (vals rets))})))) | |
| (defmethod extensions/reactions :accept-join-cluster | |
| [entry old new diff state] | |
| @@ -43,5 +48,6 @@ | |
| (defmethod extensions/fire-side-effects! :accept-join-cluster | |
| [entry old new diff state] | |
| - (let [next-state (unbuffer-messages state diff new)] | |
| - (common/start-new-lifecycle old new diff next-state))) | |
| + (if-not (= old new) | |
| + (let [next-state (unbuffer-messages state diff new)] | |
| + (common/start-new-lifecycle old new diff next-state)))) | |
| diff --git a/src/onyx/log/commands/gc.clj b/src/onyx/log/commands/gc.clj | |
| index 1525d3e..808be9d 100644 | |
| --- a/src/onyx/log/commands/gc.clj | |
| +++ b/src/onyx/log/commands/gc.clj | |
| @@ -2,24 +2,29 @@ | |
| (:require [clojure.set :refer [difference]] | |
| [clojure.data :refer [diff]] | |
| [onyx.log.commands.common :as common] | |
| - [onyx.extensions :as extensions])) | |
| + [onyx.extensions :as extensions] | |
| + [taoensso.timbre :refer [warn]])) | |
| (defmethod extensions/apply-log-entry :gc | |
| [{:keys [args message-id]} replica] | |
| - (let [completed (:completed-jobs replica) | |
| - killed (:killed-jobs replica) | |
| - jobs (concat completed killed)] | |
| - (as-> replica x | |
| - (assoc x :killed-jobs []) | |
| - (assoc x :completed-jobs []) | |
| - (reduce (fn [new job] (update-in new [:tasks] dissoc job)) x jobs) | |
| - (reduce (fn [new job] (update-in new [:allocations] dissoc job)) x jobs) | |
| - (reduce (fn [new job] (update-in new [:task-schedulers] dissoc job)) x jobs) | |
| - (reduce (fn [new job] (update-in new [:percentages] dissoc job)) x jobs) | |
| - (reduce (fn [new job] (update-in new [:task-percentages] dissoc job)) x jobs) | |
| - (reduce (fn [new job] (update-in new [:saturation] dissoc job)) x jobs) | |
| - (reduce (fn [new job] (update-in new [:input-tasks] dissoc job)) x jobs) | |
| - (reduce (fn [new job] (update-in new [:output-tasks] dissoc job)) x jobs)))) | |
| + (try | |
| + (let [completed (:completed-jobs replica) | |
| + killed (:killed-jobs replica) | |
| + jobs (concat completed killed)] | |
| + (as-> replica x | |
| + (assoc x :killed-jobs []) | |
| + (assoc x :completed-jobs []) | |
| + (reduce (fn [new job] (update-in new [:tasks] dissoc job)) x jobs) | |
| + (reduce (fn [new job] (update-in new [:allocations] dissoc job)) x jobs) | |
| + (reduce (fn [new job] (update-in new [:task-schedulers] dissoc job)) x jobs) | |
| + (reduce (fn [new job] (update-in new [:percentages] dissoc job)) x jobs) | |
| + (reduce (fn [new job] (update-in new [:task-percentages] dissoc job)) x jobs) | |
| + (reduce (fn [new job] (update-in new [:saturation] dissoc job)) x jobs) | |
| + (reduce (fn [new job] (update-in new [:input-tasks] dissoc job)) x jobs) | |
| + (reduce (fn [new job] (update-in new [:output-tasks] dissoc job)) x jobs))) | |
| + (catch Throwable e | |
| + (warn e) | |
| + replica))) | |
| (defmethod extensions/replica-diff :gc | |
| [entry old new] | |
| @@ -39,4 +44,3 @@ | |
| (doseq [k (range 0 message-id)] | |
| (extensions/gc-log-entry (:log state) k)))) | |
| state) | |
| - | |
| diff --git a/src/onyx/log/commands/kill_job.clj b/src/onyx/log/commands/kill_job.clj | |
| index e66cf7d..dfb7158 100644 | |
| --- a/src/onyx/log/commands/kill_job.clj | |
| +++ b/src/onyx/log/commands/kill_job.clj | |
| @@ -4,23 +4,32 @@ | |
| [onyx.log.commands.common :refer [peer->allocated-job] :as common] | |
| [onyx.scheduling.common-job-scheduler :as cjs] | |
| [onyx.extensions :as extensions] | |
| - [onyx.scheduling.common-job-scheduler :refer [reconfigure-cluster-workload]])) | |
| + [onyx.scheduling.common-job-scheduler :refer [reconfigure-cluster-workload]] | |
| + [taoensso.timbre :refer [warn]])) | |
| -(defmethod extensions/apply-log-entry :kill-job | |
| - [{:keys [args]} replica] | |
| - (if-not (some #{(:job args)} (:killed-jobs replica)) | |
| - (let [peers (mapcat identity (vals (get-in replica [:allocations (:job args)])))] | |
| +;; Pulled this out of the defmethod because it's reused across other log entries. | |
| +(defn apply-kill-job [replica job-id] | |
| + (if-not (some #{job-id} (:killed-jobs replica)) | |
| + (let [peers (mapcat identity (vals (get-in replica [:allocations job-id])))] | |
| (-> replica | |
| - (update-in [:jobs] (fn [coll] (remove (partial = (:job args)) coll))) | |
| + (update-in [:jobs] (fn [coll] (remove (partial = job-id) coll))) | |
| (update-in [:jobs] vec) | |
| - (update-in [:killed-jobs] conj (:job args)) | |
| + (update-in [:killed-jobs] conj job-id) | |
| (update-in [:killed-jobs] vec) | |
| - (update-in [:allocations] dissoc (:job args)) | |
| - (update-in [:ackers] dissoc (:job args)) | |
| + (update-in [:allocations] dissoc job-id) | |
| + (update-in [:ackers] dissoc job-id) | |
| (merge {:peer-state (into {} (map (fn [p] {p :idle}) peers))}) | |
| (reconfigure-cluster-workload))) | |
| replica)) | |
| +(defmethod extensions/apply-log-entry :kill-job | |
| + [{:keys [args]} replica] | |
| + (try | |
| + (apply-kill-job replica (:job args)) | |
| + (catch Throwable e | |
| + (warn e) | |
| + replica))) | |
| + | |
| (defmethod extensions/replica-diff :kill-job | |
| [entry old new] | |
| (second (diff (into #{} (:killed-jobs old)) (into #{} (:killed-jobs new))))) | |
| diff --git a/src/onyx/log/commands/leave_cluster.clj b/src/onyx/log/commands/leave_cluster.clj | |
| index 02b8cf3..fa551d8 100644 | |
| --- a/src/onyx/log/commands/leave_cluster.clj | |
| +++ b/src/onyx/log/commands/leave_cluster.clj | |
| @@ -5,8 +5,15 @@ | |
| [com.stuartsierra.component :as component] | |
| [onyx.extensions :as extensions] | |
| [onyx.log.commands.common :as common] | |
| + [onyx.log.commands.kill-job :refer [apply-kill-job]] | |
| [onyx.scheduling.common-job-scheduler :refer [reconfigure-cluster-workload]])) | |
| +(defn enforce-flux-policy [replica id] | |
| + (let [allocation (common/peer->allocated-job (:allocations replica) id)] | |
| + (if (= (get-in replica [:flux-policies (:job allocation) (:task allocation)]) :kill) | |
| + (apply-kill-job replica (:job allocation)) | |
| + replica))) | |
| + | |
| (defmethod extensions/apply-log-entry :leave-cluster | |
| [{:keys [args]} replica] | |
| (let [{:keys [id]} args | |
| @@ -16,6 +23,7 @@ | |
| prep-observer (get (map-invert (:prepared replica)) id) | |
| accep-observer (get (map-invert (:accepted replica)) id)] | |
| (-> replica | |
| + (enforce-flux-policy id) | |
| (update-in [:peers] (partial remove #(= % id))) | |
| (update-in [:peers] vec) | |
| (update-in [:prepared] dissoc id) | |
| diff --git a/src/onyx/log/commands/seal_output.clj b/src/onyx/log/commands/seal_output.clj | |
| index 5bdc55c..6df3aee 100644 | |
| --- a/src/onyx/log/commands/seal_output.clj | |
| +++ b/src/onyx/log/commands/seal_output.clj | |
| @@ -23,6 +23,7 @@ | |
| (update-in [:jobs] (fn [coll] (remove (partial = (:job args)) coll))) | |
| (update-in [:jobs] vec) | |
| (update-in [:completed-jobs] conj (:job args)) | |
| + (update-in [:completed-jobs] vec) | |
| (update-in [:allocations] dissoc (:job args)) | |
| (update-in [:peer-state] merge (into {} (map (fn [p] {p :idle}) peers))) | |
| (reconfigure-cluster-workload))) | |
| diff --git a/src/onyx/log/commands/set_replica.clj b/src/onyx/log/commands/set_replica.clj | |
| new file mode 100644 | |
| index 0000000..3ead7b9 | |
| --- /dev/null | |
| +++ b/src/onyx/log/commands/set_replica.clj | |
| @@ -0,0 +1,19 @@ | |
| +(ns onyx.log.commands.set-replica | |
| + (:require [clojure.data :refer [diff]] | |
| + [onyx.extensions :as extensions])) | |
| + | |
| +(defmethod extensions/apply-log-entry :set-replica! | |
| + [{:keys [args message-id]} replica] | |
| + (:replica args)) | |
| + | |
| +(defmethod extensions/replica-diff :set-replica! | |
| + [entry old new] | |
| + {:diff (diff old new)}) | |
| + | |
| +(defmethod extensions/reactions :set-replica! | |
| + [{:keys [args]} old new diff peer-args] | |
| + []) | |
| + | |
| +(defmethod extensions/fire-side-effects! :set-replica! | |
| + [{:keys [args]} old new diff state] | |
| + state) | |
| diff --git a/src/onyx/log/commands/submit_job.clj b/src/onyx/log/commands/submit_job.clj | |
| index ee301dc..01e6fad 100644 | |
| --- a/src/onyx/log/commands/submit_job.clj | |
| +++ b/src/onyx/log/commands/submit_job.clj | |
| @@ -6,7 +6,8 @@ | |
| [onyx.scheduling.common-job-scheduler :as cjs] | |
| [onyx.scheduling.common-task-scheduler :as cts] | |
| [onyx.extensions :as extensions] | |
| - [onyx.scheduling.common-job-scheduler :refer [reconfigure-cluster-workload]])) | |
| + [onyx.scheduling.common-job-scheduler :refer [reconfigure-cluster-workload]] | |
| + [taoensso.timbre :refer [warn]])) | |
| (defmulti job-scheduler-replica-update | |
| (fn [replica entry] | |
| @@ -34,23 +35,29 @@ | |
| (defmethod extensions/apply-log-entry :submit-job | |
| [{:keys [args] :as entry} replica] | |
| - (-> replica | |
| - (update-in [:jobs] conj (:id args)) | |
| - (update-in [:jobs] vec) | |
| - (assoc-in [:task-schedulers (:id args)] (:task-scheduler args)) | |
| - (assoc-in [:tasks (:id args)] (vec (:tasks args))) | |
| - (assoc-in [:allocations (:id args)] {}) | |
| - (assoc-in [:saturation (:id args)] (:saturation args)) | |
| - (assoc-in [:task-saturation (:id args)] (:task-saturation args)) | |
| - (assoc-in [:input-tasks (:id args)] (vec (:inputs args))) | |
| - (assoc-in [:output-tasks (:id args)] (vec (:outputs args))) | |
| - (assoc-in [:exempt-tasks (:id args)] (vec (:exempt-tasks args))) | |
| - (assoc-in [:acker-percentage (:id args)] (:acker-percentage args)) | |
| - (assoc-in [:acker-exclude-inputs (:id args)] (:acker-exclude-inputs args)) | |
| - (assoc-in [:acker-exclude-outputs (:id args)] (:acker-exclude-outputs args)) | |
| - (job-scheduler-replica-update entry) | |
| - (task-scheduler-replica-update entry) | |
| - (reconfigure-cluster-workload))) | |
| + (try | |
| + (-> replica | |
| + (update-in [:jobs] conj (:id args)) | |
| + (update-in [:jobs] vec) | |
| + (assoc-in [:task-schedulers (:id args)] (:task-scheduler args)) | |
| + (assoc-in [:tasks (:id args)] (vec (:tasks args))) | |
| + (assoc-in [:allocations (:id args)] {}) | |
| + (assoc-in [:saturation (:id args)] (:saturation args)) | |
| + (assoc-in [:task-saturation (:id args)] (:task-saturation args)) | |
| + (assoc-in [:flux-policies (:id args)] (:flux-policies args)) | |
| + (assoc-in [:min-required-peers (:id args)] (:min-required-peers args)) | |
| + (assoc-in [:input-tasks (:id args)] (vec (:inputs args))) | |
| + (assoc-in [:output-tasks (:id args)] (vec (:outputs args))) | |
| + (assoc-in [:exempt-tasks (:id args)] (vec (:exempt-tasks args))) | |
| + (assoc-in [:acker-percentage (:id args)] (:acker-percentage args)) | |
| + (assoc-in [:acker-exclude-inputs (:id args)] (:acker-exclude-inputs args)) | |
| + (assoc-in [:acker-exclude-outputs (:id args)] (:acker-exclude-outputs args)) | |
| + (job-scheduler-replica-update entry) | |
| + (task-scheduler-replica-update entry) | |
| + (reconfigure-cluster-workload)) | |
| + (catch Throwable e | |
| + (warn e) | |
| + replica))) | |
| (defmethod extensions/replica-diff :submit-job | |
| [{:keys [args]} old new] | |
| diff --git a/src/onyx/log/curator.clj b/src/onyx/log/curator.clj | |
| new file mode 100644 | |
| index 0000000..3a87701 | |
| --- /dev/null | |
| +++ b/src/onyx/log/curator.clj | |
| @@ -0,0 +1,138 @@ | |
| +(ns onyx.log.curator | |
| + (:require [clojure.core.async :refer [chan >!! <!! close! thread]] | |
| + [taoensso.timbre :refer [fatal warn trace]] | |
| + [onyx.static.default-vals :refer [defaults]]) | |
| + (:import [org.apache.zookeeper CreateMode] | |
| + [org.apache.zookeeper KeeperException$NoNodeException KeeperException$NodeExistsException Watcher] | |
| + [org.apache.curator.test TestingServer] | |
| + [org.apache.zookeeper.data Stat] | |
| + [org.apache.curator.framework CuratorFrameworkFactory CuratorFramework] | |
| + [org.apache.curator.framework.api CuratorWatcher PathAndBytesable Versionable GetDataBuilder | |
| + SetDataBuilder DeleteBuilder ExistsBuilder GetChildrenBuilder Pathable Watchable] | |
| + [org.apache.curator.framework.state ConnectionStateListener ConnectionState] | |
| + [org.apache.curator.framework.imps CuratorFrameworkState] | |
| + [org.apache.curator RetryPolicy] | |
| + [org.apache.curator.retry BoundedExponentialBackoffRetry])) | |
| + | |
| +;; Thanks to zookeeper-clj | |
| +(defn stat-to-map | |
| + ([^org.apache.zookeeper.data.Stat stat] | |
| + ;;(long czxid, long mzxid, long ctime, long mtime, int version, int cversion, int aversion, long ephemeralOwner, int dataLength, int numChildren, long pzxid) | |
| + (when stat | |
| + {:czxid (.getCzxid stat) | |
| + :mzxid (.getMzxid stat) | |
| + :ctime (.getCtime stat) | |
| + :mtime (.getMtime stat) | |
| + :version (.getVersion stat) | |
| + :cversion (.getCversion stat) | |
| + :aversion (.getAversion stat) | |
| + :ephemeralOwner (.getEphemeralOwner stat) | |
| + :dataLength (.getDataLength stat) | |
| + :numChildren (.getNumChildren stat) | |
| + :pzxid (.getPzxid stat)}))) | |
| + | |
| +(defn event-to-map | |
| + ([^org.apache.zookeeper.WatchedEvent event] | |
| + (when event | |
| + {:event-type (keyword (.name (.getType event))) | |
| + :keeper-state (keyword (.name (.getState event))) | |
| + :path (.getPath event)}))) | |
| + | |
| +;; Watcher | |
| + | |
| +(defn make-watcher | |
| + ([handler] | |
| + (reify Watcher | |
| + (process [this event] | |
| + (handler (event-to-map event)))))) | |
| + | |
| +(defn ^CuratorFramework connect | |
| + ([connection-string] | |
| + (connect connection-string "")) | |
| + ([connection-string ns] | |
| + (connect connection-string ns | |
| + (BoundedExponentialBackoffRetry. | |
| + (:onyx.zookeeper/backoff-base-sleep-time-ms defaults) | |
| + (:onyx.zookeeper/backoff-max-sleep-time-ms defaults) | |
| + (:onyx.zookeeper/backoff-max-retries defaults)))) | |
| + ([connection-string ns ^RetryPolicy retry-policy] | |
| + (doto | |
| + (.. (CuratorFrameworkFactory/builder) | |
| + (namespace ns) | |
| + (connectString connection-string) | |
| + (retryPolicy retry-policy) | |
| + (build)) | |
| + .start))) | |
| + | |
| +(defn close | |
| + "Closes the connection to the ZooKeeper server." | |
| + [^CuratorFramework client] | |
| + (.close client)) | |
| + | |
| +(defn create-mode [opts] | |
| + (cond (and (:persistent? opts) (:sequential? opts)) | |
| + CreateMode/PERSISTENT_SEQUENTIAL | |
| + (:persistent? opts) | |
| + CreateMode/PERSISTENT | |
| + (:sequential? opts) | |
| + CreateMode/EPHEMERAL_SEQUENTIAL | |
| + :else | |
| + CreateMode/EPHEMERAL)) | |
| + | |
| +(defn create | |
| + [^CuratorFramework client path & {:keys [data] :as opts}] | |
| + (try | |
| + (let [cr ^SetDataBuilder (.. client | |
| + create | |
| + (withMode (create-mode opts)))] | |
| + (if data | |
| + (.forPath ^SetDataBuilder cr path data) | |
| + (.forPath ^SetDataBuilder cr path))) | |
| + (catch org.apache.zookeeper.KeeperException$NodeExistsException e | |
| + false))) | |
| + | |
| +(defn create-all | |
| + [^CuratorFramework client path & {:keys [data] :as opts}] | |
| + (try | |
| + (let [cr (.. client | |
| + create | |
| + creatingParentsIfNeeded | |
| + (withMode (create-mode opts)))] | |
| + (if data | |
| + (.forPath ^SetDataBuilder cr path data) | |
| + (.forPath ^SetDataBuilder cr path))) | |
| + (catch org.apache.zookeeper.KeeperException$NodeExistsException e | |
| + false) )) | |
| + | |
| +(defn delete | |
| + "Deletes the given node if it exists. Otherwise returns false." | |
| + [^CuratorFramework client path] | |
| + (try | |
| + (.forPath ^DeleteBuilder (.delete client) path ) | |
| + (catch KeeperException$NoNodeException e false))) | |
| + | |
| +(defn children | |
| + ([^CuratorFramework client path & {:keys [watcher]}] | |
| + (let [children-builder ^GetChildrenBuilder (.getChildren client)] | |
| + (if watcher | |
| + (.forPath ^GetChildrenBuilder (.usingWatcher children-builder ^Watcher (make-watcher watcher)) path) | |
| + (.forPath ^GetChildrenBuilder children-builder path))))) | |
| + | |
| +(defn data [^CuratorFramework client path] | |
| + (let [stat ^Stat (Stat.) | |
| + data (.forPath ^GetDataBuilder (.storingStatIn ^GetDataBuilder (.getData client) stat) path)] | |
| + {:data data | |
| + :stat (stat-to-map stat)})) | |
| + | |
| +(defn set-data [^CuratorFramework client path data version] | |
| + (.forPath ^SetDataBuilder (.withVersion ^SetDataBuilder (.setData client) | |
| + version) | |
| + path | |
| + data)) | |
| + | |
| +(defn exists [^CuratorFramework client path & {:keys [watcher]}] | |
| + (stat-to-map | |
| + (let [builder ^ExistsBuilder (.. client checkExists)] | |
| + (if watcher | |
| + (.forPath ^ExistsBuilder (.usingWatcher builder ^Watcher (make-watcher watcher)) path) | |
| + (.forPath builder path))))) | |
| diff --git a/src/onyx/log/zookeeper.clj b/src/onyx/log/zookeeper.clj | |
| index d5dbf32..33e4040 100644 | |
| --- a/src/onyx/log/zookeeper.clj | |
| +++ b/src/onyx/log/zookeeper.clj | |
| @@ -1,11 +1,14 @@ | |
| (ns onyx.log.zookeeper | |
| (:require [clojure.core.async :refer [chan >!! <!! close! thread]] | |
| [com.stuartsierra.component :as component] | |
| - [taoensso.timbre :refer [fatal warn]] | |
| - [zookeeper :as zk] | |
| + [taoensso.timbre :refer [fatal warn trace]] | |
| + [onyx.log.curator :as zk] | |
| [onyx.extensions :as extensions] | |
| - [onyx.compression.nippy :refer [compress decompress]]) | |
| - (:import [org.apache.curator.test TestingServer])) | |
| + [onyx.static.default-vals :refer [defaults]] | |
| + [onyx.compression.nippy :refer [compress decompress]] | |
| + [onyx.log.entry :refer [create-log-entry]]) | |
| + (:import [org.apache.curator.test TestingServer] | |
| + [org.apache.zookeeper KeeperException$NoNodeException KeeperException$NodeExistsException])) | |
| (def root-path "/onyx") | |
| @@ -55,8 +58,10 @@ | |
| (try | |
| (f) | |
| (catch org.apache.zookeeper.KeeperException$ConnectionLossException e | |
| + (trace e) | |
| (throw-subscriber-closed)) | |
| (catch org.apache.zookeeper.KeeperException$SessionExpiredException e | |
| + (trace e) | |
| (throw-subscriber-closed)))) | |
| (defn initialize-origin! [conn config prefix] | |
| @@ -147,6 +152,7 @@ | |
| (when-not (zk/exists conn (str (pulse-path prefix) "/" id) :watcher f) | |
| (>!! ch true)) | |
| (catch Throwable e | |
| + (trace e) | |
| ;; Node doesn't exist. | |
| (>!! ch true))))) | |
| @@ -176,6 +182,22 @@ | |
| (do (Thread/sleep 500) | |
| (recur))))) | |
| +(defn seek-to-new-origin! [log ch] | |
| + (let [origin (extensions/read-chunk log :origin nil) | |
| + starting-position (inc (:message-id origin)) | |
| + entry (create-log-entry :set-replica! {:replica origin})] | |
| + (>!! ch entry) | |
| + starting-position)) | |
| + | |
| +(defn seek-and-put-entry! [log position ch] | |
| + (try | |
| + (let [entry (extensions/read-log-entry log position)] | |
| + (>!! ch entry)) | |
| + (catch KeeperException$NoNodeException e | |
| + (seek-to-new-origin!)) | |
| + (catch KeeperException$NodeExistsException e | |
| + (seek-to-new-origin!)))) | |
| + | |
| (defmethod extensions/subscribe-to-log ZooKeeper | |
| [{:keys [conn opts prefix] :as log} ch] | |
| (let [rets (chan)] | |
| @@ -192,7 +214,7 @@ | |
| (loop [position starting-position] | |
| (let [path (str (log-path prefix) "/entry-" (pad-sequential-id position))] | |
| (if (zk/exists conn path) | |
| - (>!! ch position) | |
| + (seek-and-put-entry! log position ch) | |
| (loop [] | |
| (let [read-ch (chan 2)] | |
| (zk/children conn (log-path prefix) :watcher (fn [_] (>!! read-ch true))) | |
| @@ -205,13 +227,19 @@ | |
| ;; Requires one more check. Watch may have been triggered by a delete | |
| ;; from a GC call. | |
| (if (zk/exists conn path) | |
| - (>!! ch position) | |
| + (seek-and-put-entry! log position ch) | |
| (recur))))) | |
| (recur (inc position))))) | |
| + (catch java.lang.IllegalStateException e | |
| + (trace e) | |
| + ;; Curator client has been shutdown, close the subscriber cleanly. | |
| + (close! ch)) | |
| (catch org.apache.zookeeper.KeeperException$ConnectionLossException e | |
| + (trace e) | |
| ;; ZooKeeper has been shutdown, close the subscriber cleanly. | |
| (close! ch)) | |
| (catch org.apache.zookeeper.KeeperException$SessionExpiredException e | |
| + (trace e) | |
| (close! ch)) | |
| (catch Throwable e | |
| (fatal e)))) | |
| @@ -269,19 +297,10 @@ | |
| [{:keys [conn opts prefix] :as log} kw chunk id] | |
| (clean-up-broken-connections | |
| (fn [] | |
| - (let [unique-id (java.util.UUID/randomUUID) | |
| - node (str (chunk-path prefix) "/" id "/chunk-" unique-id) | |
| + (let [node (str (chunk-path prefix) "/" id "/chunk") | |
| bytes (compress chunk)] | |
| (zk/create-all conn node :persistent? true :data bytes) | |
| - unique-id)))) | |
| - | |
| -(defmethod extensions/write-chunk [ZooKeeper :chunk-index] | |
| - [{:keys [conn opts prefix] :as log} kw chunk id] | |
| - (clean-up-broken-connections | |
| - (fn [] | |
| - (let [node (str (chunk-path prefix) "/" id "/index") | |
| - bytes (compress chunk)] | |
| - (zk/create-all conn node :persistent? true :data bytes))))) | |
| + id)))) | |
| (defmethod extensions/write-chunk [ZooKeeper :job-scheduler] | |
| [{:keys [conn opts prefix] :as log} kw chunk id] | |
| @@ -303,10 +322,12 @@ | |
| [{:keys [conn opts prefix] :as log} kw chunk id] | |
| (clean-up-broken-connections | |
| (fn [] | |
| - (let [node (str (chunk-path prefix) "/" id "/chunk-" (:id chunk)) | |
| + (let [node (str (chunk-path prefix) "/" id "/chunk") | |
| version (:version (zk/exists conn node)) | |
| bytes (compress chunk)] | |
| - (zk/set-data conn node bytes version))))) | |
| + (if (nil? version) | |
| + (zk/create-all conn node :persistent? true :data bytes) | |
| + (zk/set-data conn node bytes version)))))) | |
| (defmethod extensions/read-chunk [ZooKeeper :catalog] | |
| [{:keys [conn opts prefix] :as log} kw id & _] | |
| @@ -351,17 +372,10 @@ | |
| (decompress (:data (zk/data conn node))))))) | |
| (defmethod extensions/read-chunk [ZooKeeper :chunk] | |
| - [{:keys [conn opts prefix] :as log} kw id & {:keys [task-id]}] | |
| - (clean-up-broken-connections | |
| - (fn [] | |
| - (let [node (str (chunk-path prefix) "/" task-id "/chunk-" id)] | |
| - (decompress (:data (zk/data conn node))))))) | |
| - | |
| -(defmethod extensions/read-chunk [ZooKeeper :chunk-index] | |
| - [{:keys [conn opts prefix] :as log} kw id & {:keys [task-id]}] | |
| + [{:keys [conn opts prefix] :as log} kw id & _] | |
| (clean-up-broken-connections | |
| (fn [] | |
| - (let [node (str (chunk-path prefix) "/" task-id "/index")] | |
| + (let [node (str (chunk-path prefix) "/" id "/chunk")] | |
| (decompress (:data (zk/data conn node))))))) | |
| (defmethod extensions/read-chunk [ZooKeeper :origin] | |
| diff --git a/src/onyx/messaging/acking_daemon.clj b/src/onyx/messaging/acking_daemon.clj | |
| index d36342a..2784a6f 100644 | |
| --- a/src/onyx/messaging/acking_daemon.clj | |
| +++ b/src/onyx/messaging/acking_daemon.clj | |
| @@ -1,23 +1,48 @@ | |
| (ns ^:no-doc onyx.messaging.acking-daemon | |
| (:require [clojure.core.async :refer [chan >!! close! sliding-buffer]] | |
| [com.stuartsierra.component :as component] | |
| - [onyx.static.default-vals :refer [defaults]] | |
| + [onyx.static.default-vals :refer [arg-or-default]] | |
| [onyx.types :refer [->Ack]] | |
| [taoensso.timbre :as timbre])) | |
| +(defn now [] | |
| + (System/currentTimeMillis)) | |
| + | |
| +(defn clear-messages-loop [state opts] | |
| + (let [timeout (arg-or-default :onyx.messaging/ack-daemon-timeout opts) | |
| + interval (arg-or-default :onyx.messaging/ack-daemon-clear-interval opts)] | |
| + (loop [] | |
| + (try | |
| + (Thread/sleep interval) | |
| + (let [t (now) | |
| + snapshot @state | |
| + dead (map first (filter (fn [[k v]] (>= (- t (:timestamp v)) timeout)) snapshot))] | |
| + (doseq [k dead] | |
| + (swap! state dissoc k))) | |
| + (catch InterruptedException e | |
| + (throw e)) | |
| + (catch Throwable e | |
| + (timbre/fatal e))) | |
| + (recur)))) | |
| + | |
| (defrecord AckingDaemon [opts] | |
| component/Lifecycle | |
| (start [component] | |
| (taoensso.timbre/info "Starting Acking Daemon") | |
| - (let [buffer-size (or (:onyx.messaging/completion-buffer-size opts) | |
| - (:onyx.messaging/completion-buffer-size defaults))] | |
| - (assoc component :ack-state (atom {}) :completions-ch (chan (sliding-buffer buffer-size))))) | |
| + (let [buffer-size (arg-or-default :onyx.messaging/completion-buffer-size opts) | |
| + state (atom {}) | |
| + timeout-fut (future (clear-messages-loop state opts))] | |
| + (assoc component | |
| + :ack-state state | |
| + :completions-ch (chan (sliding-buffer buffer-size)) | |
| + :timeout-fut timeout-fut))) | |
| (stop [component] | |
| (taoensso.timbre/info "Stopping Acking Daemon") | |
| (close! (:completions-ch component)) | |
| - (assoc component :ack-state nil :completions-ch nil))) | |
| + (future-cancel (:timeout-fut component)) | |
| + (assoc component :ack-state nil :completions-ch nil :timeout-fut nil))) | |
| (defn acking-daemon [config] | |
| (map->AckingDaemon {:opts config})) | |
| @@ -34,10 +59,10 @@ | |
| (assoc state message-id (assoc ack :ack-val updated-ack-val)))) | |
| (if (zero? ack-val) | |
| state | |
| - (assoc state message-id (->Ack nil completion-id ack-val))))))] | |
| + (assoc state message-id (->Ack nil completion-id ack-val (now)))))))] | |
| (when-not (get rets message-id) | |
| - (>!! (:completions-ch daemon) {:id message-id | |
| - :peer-id completion-id})))) | |
| + (>!! (:completions-ch daemon) | |
| + {:id message-id :peer-id completion-id})))) | |
| (defn gen-message-id | |
| "Generates a unique ID for a message - acts as the root id." | |
| @@ -58,5 +83,3 @@ | |
| (persistent! coll) | |
| (recur (inc n) | |
| (conj! coll (.nextLong rng))))))) | |
| - | |
| - | |
| diff --git a/src/onyx/messaging/aeron.clj b/src/onyx/messaging/aeron.clj | |
| index 4d6535b..7497923 100644 | |
| --- a/src/onyx/messaging/aeron.clj | |
| +++ b/src/onyx/messaging/aeron.clj | |
| @@ -1,5 +1,5 @@ | |
| (ns ^:no-doc onyx.messaging.aeron | |
| - (:require [clojure.core.async :refer [chan >!! <!! alts!! timeout close! dropping-buffer]] | |
| + (:require [clojure.core.async :refer [chan >!! <!! alts!! timeout close! sliding-buffer]] | |
| [com.stuartsierra.component :as component] | |
| [taoensso.timbre :refer [fatal] :as timbre] | |
| [onyx.messaging.protocol-aeron :as protocol] | |
| @@ -8,7 +8,7 @@ | |
| [onyx.extensions :as extensions] | |
| [onyx.compression.nippy :refer [compress decompress]] | |
| [onyx.static.default-vals :refer [defaults]]) | |
| - (:import [uk.co.real_logic.aeron Aeron FragmentAssemblyAdapter] | |
| + #_(:import [uk.co.real_logic.aeron Aeron FragmentAssemblyAdapter] | |
| [uk.co.real_logic.aeron Aeron$Context] | |
| [uk.co.real_logic.aeron.driver MediaDriver MediaDriver$Context ThreadingMode] | |
| [uk.co.real_logic.aeron.common.concurrent.logbuffer DataHandler] | |
| @@ -17,7 +17,7 @@ | |
| [uk.co.real_logic.agrona.concurrent IdleStrategy BackoffIdleStrategy] | |
| [java.util.function Consumer] | |
| [java.util.concurrent TimeUnit])) | |
| - | |
| +(comment | |
| (defrecord AeronPeerGroup [opts] | |
| component/Lifecycle | |
| (start [component] | |
| @@ -54,6 +54,10 @@ | |
| (assert port "Couldn't assign port - ran out of available ports.") | |
| {:aeron/port port})) | |
| +(defmethod extensions/get-peer-site :aeron | |
| + [replica peer] | |
| + (get-in replica [:peer-sites peer :aeron/external-addr])) | |
| + | |
| (defn handle-sent-message [inbound-ch decompress-f ^UnsafeBuffer buffer offset length header] | |
| (let [messages (protocol/read-messages-buf decompress-f buffer offset length)] | |
| (doseq [message messages] | |
| @@ -110,8 +114,8 @@ | |
| (start [component] | |
| (taoensso.timbre/info "Starting Aeron") | |
| (let [config (:config peer-group) | |
| - release-ch (chan (dropping-buffer (:onyx.messaging/release-ch-buffer-size defaults))) | |
| - retry-ch (chan (dropping-buffer (:onyx.messaging/retry-ch-buffer-size defaults))) | |
| + release-ch (chan (sliding-buffer (:onyx.messaging/release-ch-buffer-size defaults))) | |
| + retry-ch (chan (sliding-buffer (:onyx.messaging/retry-ch-buffer-size defaults))) | |
| bind-addr (bind-addr config) | |
| external-addr (external-addr config) | |
| ports (allowable-ports config) | |
| @@ -266,3 +270,4 @@ | |
| (reset! pub nil)) | |
| (.close ^uk.co.real_logic.aeron.Aeron (:conn peer-link)) | |
| {}) | |
| +) | |
| diff --git a/src/onyx/messaging/aeron_media_driver.clj b/src/onyx/messaging/aeron_media_driver.clj | |
| index 5a6b496..7bdd5d9 100644 | |
| --- a/src/onyx/messaging/aeron_media_driver.clj | |
| +++ b/src/onyx/messaging/aeron_media_driver.clj | |
| @@ -1,10 +1,10 @@ | |
| (ns onyx.messaging.aeron-media-driver | |
| (:require [clojure.core.async :refer [chan <!!]]) | |
| - (:import [uk.co.real_logic.aeron Aeron FragmentAssemblyAdapter] | |
| + #_(:import [uk.co.real_logic.aeron Aeron FragmentAssemblyAdapter] | |
| [uk.co.real_logic.aeron Aeron$Context] | |
| [uk.co.real_logic.aeron.driver MediaDriver MediaDriver$Context ThreadingMode])) | |
| -(defn -main [& args] | |
| +#_(defn -main [& args] | |
| (let [ctx (doto (MediaDriver$Context.) | |
| (.threadingMode ThreadingMode/SHARED) | |
| ;(.threadingMode ThreadingMode/DEDICATED) | |
| diff --git a/src/onyx/messaging/core_async.clj b/src/onyx/messaging/core_async.clj | |
| index b4ec374..8d6c0d0 100644 | |
| --- a/src/onyx/messaging/core_async.clj | |
| +++ b/src/onyx/messaging/core_async.clj | |
| @@ -1,5 +1,5 @@ | |
| (ns ^:no-doc onyx.messaging.core-async | |
| - (:require [clojure.core.async :refer [chan >!! <!! alts!! dropping-buffer timeout close!]] | |
| + (:require [clojure.core.async :refer [chan >!! <!! alts!! sliding-buffer timeout close!]] | |
| [com.stuartsierra.component :as component] | |
| [taoensso.timbre :refer [fatal] :as timbre] | |
| [onyx.messaging.acking-daemon :as acker] | |
| @@ -25,13 +25,17 @@ | |
| [config peer-site peer-sites] | |
| peer-site) | |
| +(defmethod extensions/get-peer-site :core.async | |
| + [replica peer] | |
| + "localhost") | |
| + | |
| (defrecord CoreAsync [peer-group] | |
| component/Lifecycle | |
| (start [component] | |
| (taoensso.timbre/info "Starting core.async Messaging Channel") | |
| - (let [release-ch (chan (dropping-buffer (:onyx.messaging/release-ch-buffer-size defaults))) | |
| - retry-ch (chan (dropping-buffer (:onyx.messaging/retry-ch-buffer-size defaults)))] | |
| + (let [release-ch (chan (sliding-buffer (:onyx.messaging/release-ch-buffer-size defaults))) | |
| + retry-ch (chan (sliding-buffer (:onyx.messaging/retry-ch-buffer-size defaults)))] | |
| (assoc component :release-ch release-ch :retry-ch retry-ch))) | |
| (stop [component] | |
| @@ -48,7 +52,7 @@ | |
| (let [chs (:channels (:messaging-group (:peer-group messenger))) | |
| id (java.util.UUID/randomUUID) | |
| inbound-ch (:inbound-ch (:messenger-buffer messenger)) | |
| - ch (chan (dropping-buffer 10000))] | |
| + ch (chan (sliding-buffer 100000))] | |
| (future | |
| (try | |
| (loop [] | |
| diff --git a/src/onyx/messaging/messenger_buffer.clj b/src/onyx/messaging/messenger_buffer.clj | |
| index 7785f38..d4db4c7 100644 | |
| --- a/src/onyx/messaging/messenger_buffer.clj | |
| +++ b/src/onyx/messaging/messenger_buffer.clj | |
| @@ -1,6 +1,7 @@ | |
| (ns ^:no-doc onyx.messaging.messenger-buffer | |
| - (:require [clojure.core.async :refer [chan >!! <!! thread alts!! close!]] | |
| + (:require [clojure.core.async :refer [chan >!! <!! thread alts!! close! sliding-buffer]] | |
| [com.stuartsierra.component :as component] | |
| + [onyx.static.default-vals :refer [defaults]] | |
| [taoensso.timbre :as timbre])) | |
| (defrecord MessengerBuffer [opts] | |
| @@ -9,7 +10,8 @@ | |
| (start [component] | |
| (taoensso.timbre/info "Starting Messenger Buffer") | |
| - (let [inbound-ch (chan (or (:onyx.messenger/inbound-capacity opts) 20000))] | |
| + (let [inbound-ch (chan (sliding-buffer (or (:onyx.messaging/inbound-buffer-size opts) | |
| + (:onyx.messaging/inbound-buffer-size defaults))))] | |
| (assoc component :inbound-ch inbound-ch))) | |
| (stop [component] | |
| diff --git a/src/onyx/messaging/netty_tcp.clj b/src/onyx/messaging/netty_tcp.clj | |
| index 6c7d50d..fa4e208 100644 | |
| --- a/src/onyx/messaging/netty_tcp.clj | |
| +++ b/src/onyx/messaging/netty_tcp.clj | |
| @@ -1,6 +1,6 @@ | |
| (ns ^:no-doc onyx.messaging.netty-tcp | |
| (:require [clojure.core.async :refer [chan >!! >! <!! alts!! timeout close! | |
| - thread go-loop sliding-buffer dropping-buffer]] | |
| + thread go-loop sliding-buffer]] | |
| [com.stuartsierra.component :as component] | |
| [taoensso.timbre :as timbre] | |
| [onyx.messaging.protocol-netty :as protocol] | |
| @@ -101,6 +101,10 @@ | |
| (assert port "Couldn't assign port - ran out of available ports.") | |
| {:netty/port port})) | |
| +(defmethod extensions/get-peer-site :netty | |
| + [replica peer] | |
| + (get-in replica [:peer-sites peer :netty/external-addr])) | |
| + | |
| (defn int32-frame-decoder | |
| [] | |
| ; Offset 0, 4 byte header, skip those 4 bytes. | |
| @@ -226,17 +230,14 @@ | |
| (.group client-group) | |
| (.channel channel) | |
| (.handler (client-channel-initializer (new-client-handler)))) | |
| - ch-fut ^ChannelFuture (.awaitUninterruptibly ^ChannelFuture (.connect ^Bootstrap b ^String host ^Integer port) | |
| - ;; TODO, add connection timeout | |
| - ;(:onyx.messaging.netty/connect-timeout-millis defaults) | |
| - ;TimeUnit/MILLISECONDS | |
| - ) | |
| - ch (.channel ch-fut)] | |
| - (if (and (.isSuccess ^ChannelFuture ch-fut) | |
| - (established? ch)) | |
| - ch | |
| - (do (.close ch) | |
| - nil)))) | |
| + ch-fut ^ChannelFuture (.connect ^Bootstrap b ^String host ^Integer port)] | |
| + (if (.awaitUninterruptibly ch-fut (:onyx.messaging.netty/connect-timeout-millis defaults)) | |
| + (let [ch (.channel ch-fut)] | |
| + (if (and (.isSuccess ch-fut) | |
| + (established? ch)) | |
| + ch | |
| + (do (.close ch) | |
| + nil)))))) | |
| (defrecord NettyTcpSockets [peer-group] | |
| component/Lifecycle | |
| @@ -245,8 +246,8 @@ | |
| (taoensso.timbre/info "Starting Netty TCP Sockets") | |
| (let [{:keys [client-group worker-group boss-group]} (:messaging-group peer-group) | |
| config (:config peer-group) | |
| - release-ch (chan (dropping-buffer (:onyx.messaging/release-ch-buffer-size defaults))) | |
| - retry-ch (chan (dropping-buffer (:onyx.messaging/retry-ch-buffer-size defaults))) | |
| + release-ch (chan (sliding-buffer (:onyx.messaging/release-ch-buffer-size defaults))) | |
| + retry-ch (chan (sliding-buffer (:onyx.messaging/retry-ch-buffer-size defaults))) | |
| bind-addr (bind-addr config) | |
| external-addr (external-addr config) | |
| ports (allowable-ports config) | |
| @@ -332,7 +333,7 @@ | |
| (.addListener f (reify GenericFutureListener | |
| (operationComplete [_ _] | |
| (when-not (.isSuccess f) | |
| - (timbre/error "Message failed to send: " (.cause f)) | |
| + (timbre/trace (ex-info "Message failed to send" {:cause (.cause f)})) | |
| (reset-connection connection)))))) | |
| (defn make-pending-chan [messenger] | |
| diff --git a/src/onyx/messaging/protocol_aeron.clj b/src/onyx/messaging/protocol_aeron.clj | |
| index bed72d9..b7df1a9 100644 | |
| --- a/src/onyx/messaging/protocol_aeron.clj | |
| +++ b/src/onyx/messaging/protocol_aeron.clj | |
| @@ -1,10 +1,11 @@ | |
| (ns ^:no-doc onyx.messaging.protocol-aeron | |
| (:require [taoensso.timbre :as timbre] | |
| [onyx.types :refer [->Leaf]]) | |
| - (:import [java.util UUID] | |
| + #_(:import [java.util UUID] | |
| [uk.co.real_logic.agrona.concurrent UnsafeBuffer] | |
| [uk.co.real_logic.agrona DirectBuffer MutableDirectBuffer])) | |
| +(comment | |
| ;;;;;; | |
| ;; Constants | |
| @@ -137,3 +138,4 @@ | |
| (next payloads) | |
| (+ offset message-base-length)) | |
| (persistent! messages))))) | |
| +) | |
| diff --git a/src/onyx/peer/function.clj b/src/onyx/peer/function.clj | |
| index 8f4eec1..560bdba 100644 | |
| --- a/src/onyx/peer/function.clj | |
| +++ b/src/onyx/peer/function.clj | |
| @@ -59,17 +59,17 @@ | |
| ack-vals)))) | |
| fast-concat)) | |
| -(defn pick-peer [active-peers hash-group] | |
| +(defn pick-peer [id active-peers hash-group max-downstream-links] | |
| (when-not (empty? active-peers) | |
| (if hash-group | |
| (nth active-peers | |
| (mod (hash hash-group) | |
| (count active-peers))) | |
| - (rand-nth active-peers)))) | |
| + (rand-nth (operation/select-n-peers id active-peers max-downstream-links))))) | |
| ;; Needs a performance boost | |
| (defmethod p-ext/write-batch :default | |
| - [{:keys [onyx.core/results onyx.core/messenger onyx.core/job-id] :as event}] | |
| + [{:keys [onyx.core/id onyx.core/results onyx.core/messenger onyx.core/job-id onyx.core/max-downstream-links] :as event}] | |
| (let [leaves (fast-concat (map :leaves results)) | |
| egress-tasks (:egress-ids (:onyx.core/serialized-task event))] | |
| (when-not (empty? leaves) | |
| @@ -80,7 +80,7 @@ | |
| (doseq [[[route hash-group] segs] groups] | |
| (let [peers (get allocations (get egress-tasks route)) | |
| active-peers (filter #(= (get-in replica [:peer-state %]) :active) peers) | |
| - target (pick-peer active-peers hash-group)] | |
| + target (pick-peer id active-peers hash-group max-downstream-links)] | |
| (when target | |
| (let [link (operation/peer-link event target)] | |
| (onyx.extensions/send-messages messenger event link segs))))) | |
| diff --git a/src/onyx/peer/operation.clj b/src/onyx/peer/operation.clj | |
| index 086890c..1c49a56 100644 | |
| --- a/src/onyx/peer/operation.clj | |
| +++ b/src/onyx/peer/operation.clj | |
| @@ -23,9 +23,22 @@ | |
| [{:keys [onyx.core/queue onyx.core/ingress-queues onyx.core/task-map]}] | |
| true) | |
| +;; TODO: may want to consider memoizing this | |
| +;; must be careful about ensuring we don't bloat memory wise | |
| +;; use clojure.core.memoize with LRU | |
| +(defn select-n-peers | |
| + "Stably select n peers using our id and the downstream task ids. | |
| + If a peer is added or removed, the set can only change by one value at max" | |
| + [id all-peers n] | |
| + (if (<= (count all-peers) n) | |
| + all-peers | |
| + (take n | |
| + (sort-by (fn [peer-id] (hash [id peer-id])) | |
| + all-peers)))) | |
| + | |
| (defn peer-link | |
| [{:keys [onyx.core/state] :as event} peer-id] | |
| - (if-let [link (get (:links @state) peer-id)] | |
| + (if-let [link (:link (get (:links @state) peer-id))] | |
| link | |
| (let [site (-> @(:onyx.core/replica event) | |
| :peer-sites | |
| @@ -35,6 +48,8 @@ | |
| [:links peer-id] | |
| (fn [link] | |
| (or link | |
| - (extensions/connect-to-peer (:onyx.core/messenger event) event site)))) | |
| + {:link (extensions/connect-to-peer (:onyx.core/messenger event) event site) | |
| + :timestamp (System/currentTimeMillis)}))) | |
| :links | |
| - (get peer-id))))) | |
| + (get peer-id) | |
| + :link)))) | |
| diff --git a/src/onyx/peer/task_lifecycle.clj b/src/onyx/peer/task_lifecycle.clj | |
| index d089b53..98cf696 100644 | |
| --- a/src/onyx/peer/task_lifecycle.clj | |
| +++ b/src/onyx/peer/task_lifecycle.clj | |
| @@ -1,5 +1,5 @@ | |
| (ns ^:no-doc onyx.peer.task-lifecycle | |
| - (:require [clojure.core.async :refer [alts!! <!! >!! <! >! timeout chan close! thread go dropping-buffer]] | |
| + (:require [clojure.core.async :refer [alts!! <!! >!! <! >! timeout chan close! thread go]] | |
| [com.stuartsierra.component :as component] | |
| [dire.core :as dire] | |
| [taoensso.timbre :refer [info warn trace fatal level-compile-time] :as timbre] | |
| @@ -14,7 +14,7 @@ | |
| [onyx.extensions :as extensions] | |
| [onyx.compression.nippy] | |
| [onyx.types :refer [->Leaf leaf ->Route ->Ack ->Result]] | |
| - [onyx.static.default-vals :refer [defaults]]) | |
| + [onyx.static.default-vals :refer [defaults arg-or-default]]) | |
| (:import [java.security MessageDigest])) | |
| ;; TODO: Are there any exceptions that a peer should autoreboot itself? | |
| @@ -38,13 +38,14 @@ | |
| (defn munge-start-lifecycle [event] | |
| ((:onyx.core/compiled-start-task-fn event) event)) | |
| -(defn add-acker-id [event m] | |
| +(defn add-acker-id [{:keys [onyx.core/id onyx.core/max-acker-links] :as event} m] | |
| (let [peers (get-in @(:onyx.core/replica event) [:ackers (:onyx.core/job-id event)])] | |
| (if-not (seq peers) | |
| (do (warn (format "[%s] This job no longer has peers capable of acking. This job will now pause execution." (:onyx.core/id event))) | |
| (throw (ex-info "Not enough acker peers" {:peers peers}))) | |
| - (let [n (mod (hash (:message m)) (count peers))] | |
| - (assoc m :acker-id (nth peers n)))))) | |
| + (let [candidates (operation/select-n-peers id peers max-acker-links) | |
| + n (mod (hash (:message m)) (count candidates))] | |
| + (assoc m :acker-id (nth candidates n)))))) | |
| (defn add-completion-id [event m] | |
| (assoc m :completion-id (:onyx.core/id event))) | |
| @@ -92,16 +93,12 @@ | |
| (choose-output-paths event compiled-ex-fcs result leaf serialized-task downstream) | |
| (choose-output-paths event compiled-norm-fcs result leaf serialized-task downstream))) | |
| -(defn hash-value [x] | |
| - (let [md5 (MessageDigest/getInstance "MD5")] | |
| - (apply str (.digest md5 (.getBytes (pr-str x) "UTF-8"))))) | |
| - | |
| (defn group-message [segment catalog task] | |
| (let [t (find-task-fast catalog task)] | |
| (if-let [k (:onyx/group-by-key t)] | |
| - (hash-value (get segment k)) | |
| + (hash (get segment k)) | |
| (when-let [f (:onyx/group-by-fn t)] | |
| - (hash-value ((operation/resolve-fn {:onyx/fn f}) segment)))))) | |
| + (hash ((operation/resolve-fn {:onyx/fn f}) segment)))))) | |
| (defn group-segments [leaf next-tasks catalog event] | |
| (let [post-transformation (:post-transformation (:routes leaf)) | |
| @@ -128,20 +125,21 @@ | |
| (defn build-new-segments | |
| [{:keys [onyx.core/results onyx.core/serialized-task onyx.core/catalog] :as event}] | |
| (let [downstream (keys (:egress-ids serialized-task)) | |
| - results (map (fn [{:keys [root leaves] :as result}] | |
| - (let [{:keys [id acker-id completion-id]} root] | |
| - (assoc result | |
| - :leaves | |
| - (map (fn [leaf] | |
| - (-> leaf | |
| - (assoc :routes (add-route-data event result leaf downstream)) | |
| - (assoc :id id) | |
| - (assoc :acker-id acker-id) | |
| - (assoc :completion-id completion-id) | |
| - (group-segments downstream catalog event) | |
| - (add-ack-vals))) | |
| - leaves)))) | |
| - results)] | |
| + results (doall | |
| + (map (fn [{:keys [root leaves] :as result}] | |
| + (let [{:keys [id acker-id completion-id]} root] | |
| + (assoc result | |
| + :leaves | |
| + (map (fn [leaf] | |
| + (-> leaf | |
| + (assoc :routes (add-route-data event result leaf downstream)) | |
| + (assoc :id id) | |
| + (assoc :acker-id acker-id) | |
| + (assoc :completion-id completion-id) | |
| + (group-segments downstream catalog event) | |
| + (add-ack-vals))) | |
| + leaves)))) | |
| + results))] | |
| (assoc event :onyx.core/results results))) | |
| (defn ack-routes? [routes] | |
| @@ -164,16 +162,18 @@ | |
| (defn ack-messages [{:keys [onyx.core/results onyx.core/task-map] :as event}] | |
| (doseq [[acker-id results-by-acker] (group-by (comp :acker-id :root) results)] | |
| (let [link (operation/peer-link event acker-id) | |
| - acks (map (fn [result] (let [fused-leaf-vals (gen-ack-fusion-vals task-map (:leaves result)) | |
| - fused-vals (if-let [ack-val (:ack-val (:root result))] | |
| - (bit-xor fused-leaf-vals ack-val) | |
| - fused-leaf-vals)] | |
| - (->Ack (:id (:root result)) | |
| - (:completion-id (:root result)) | |
| - ;; or'ing by zero covers the case of flow conditions where an | |
| - ;; input task produces a segment that goes nowhere. | |
| - (or fused-vals 0)))) | |
| - results-by-acker)] | |
| + acks (doall | |
| + (map (fn [result] (let [fused-leaf-vals (gen-ack-fusion-vals task-map (:leaves result)) | |
| + fused-vals (if-let [ack-val (:ack-val (:root result))] | |
| + (bit-xor fused-leaf-vals ack-val) | |
| + fused-leaf-vals)] | |
| + (->Ack (:id (:root result)) | |
| + (:completion-id (:root result)) | |
| + ;; or'ing by zero covers the case of flow conditions where an | |
| + ;; input task produces a segment that goes nowhere. | |
| + (or fused-vals 0) | |
| + (System/currentTimeMillis)))) | |
| + results-by-acker))] | |
| (extensions/internal-ack-messages (:onyx.core/messenger event) event link acks))) | |
| event) | |
| @@ -241,12 +241,13 @@ | |
| (assoc | |
| event | |
| :onyx.core/results | |
| - (map | |
| - (fn [segment] | |
| - (let [segments (collect-next-segments event (:message segment)) | |
| - leaves (map leaf segments)] | |
| - (->Result segment leaves))) | |
| - batch))) | |
| + (doall | |
| + (map | |
| + (fn [segment] | |
| + (let [segments (collect-next-segments event (:message segment)) | |
| + leaves (map leaf segments)] | |
| + (->Result segment leaves))) | |
| + batch)))) | |
| (defn apply-fn-bulk [{:keys [onyx.core/batch] :as event}] | |
| ;; Bulk functions intentionally ignore their outputs. | |
| @@ -255,11 +256,11 @@ | |
| (assoc | |
| event | |
| :onyx.core/results | |
| - (map | |
| - (fn [segment] | |
| - (let [leaves (map leaf segments)] | |
| - (->Result segment leaves))) | |
| - batch)))) | |
| + (doall | |
| + (map | |
| + (fn [segment] | |
| + (->Result segment (list (leaf (:message segment))))) | |
| + batch))))) | |
| (defn apply-fn [event] | |
| (if (:onyx/bulk? (:onyx.core/task-map event)) | |
| @@ -310,7 +311,7 @@ | |
| (let [tail (last (get-in @(:onyx.core/state event) [:timeout-pool]))] | |
| (doseq [m tail] | |
| (when (p-ext/pending? event m) | |
| - (taoensso.timbre/info (format "Input retry message %s" m)) | |
| + (taoensso.timbre/trace (format "Input retry message %s" m)) | |
| (p-ext/retry-message event m))) | |
| (swap! (:onyx.core/state event) update-in [:timeout-pool] rsc/expire-bucket) | |
| (recur)))))))) | |
| @@ -353,8 +354,8 @@ | |
| (apply-fn) | |
| (build-new-segments) | |
| (write-batch) | |
| - (ack-messages) | |
| (flow-retry-messages) | |
| + (ack-messages) | |
| (close-batch-resources))) | |
| (catch Throwable e | |
| (ex-f e)))) | |
| @@ -370,6 +371,28 @@ | |
| (operation/resolve-fn f) | |
| onyx.compression.nippy/compress))) | |
| +(defn gc-peer-links [event state opts] | |
| + (let [interval (or (:onyx.messaging/peer-link-gc-interval opts) | |
| + (:onyx.messaging/peer-link-gc-interval defaults)) | |
| + idle (or (:onyx.messaging/peer-link-idle-timeout opts) | |
| + (:onyx.messaging/peer-link-idle-timeout defaults))] | |
| + (loop [] | |
| + (try | |
| + (Thread/sleep interval) | |
| + (let [t (System/currentTimeMillis)] | |
| + (swap! state | |
| + (fn [x] | |
| + (let [to-remove (map first (filter (fn [[k v]] (>= (- t (:timestamp v)) idle)) (:links x))) | |
| + result (into {} (remove (fn [[k v]] (some #{k} to-remove)) (:links x)))] | |
| + (doseq [p to-remove] | |
| + (extensions/close-peer-connection (:onyx.core/messenger event) event (:link (get (:links x) p)))) | |
| + (assoc x :links result))))) | |
| + (catch InterruptedException e | |
| + (throw e)) | |
| + (catch Throwable e | |
| + (fatal e))) | |
| + (recur)))) | |
| + | |
| (defn any-ackers? [replica job-id] | |
| (> (count (get-in replica [:ackers job-id])) 0)) | |
| @@ -400,8 +423,8 @@ | |
| identity | |
| matched))) | |
| -(defn compile-before-task-functions [lifecycles task-name] | |
| - (compile-lifecycle-functions lifecycles task-name :lifecycle/before-task)) | |
| +(defn compile-before-task-start-functions [lifecycles task-name] | |
| + (compile-lifecycle-functions lifecycles task-name :lifecycle/before-task-start)) | |
| (defn compile-before-batch-task-functions [lifecycles task-name] | |
| (compile-lifecycle-functions lifecycles task-name :lifecycle/before-batch)) | |
| @@ -410,12 +433,17 @@ | |
| (compile-lifecycle-functions lifecycles task-name :lifecycle/after-batch)) | |
| (defn compile-after-task-functions [lifecycles task-name] | |
| - (compile-lifecycle-functions lifecycles task-name :lifecycle/after-task)) | |
| + (compile-lifecycle-functions lifecycles task-name :lifecycle/after-task-stop)) | |
| (defn resolve-task-fn [entry] | |
| (when (= (:onyx/type entry) :function) | |
| (operation/kw->fn (:onyx/fn entry)))) | |
| +(defn validate-pending-timeout [pending-timeout opts] | |
| + (when (> pending-timeout (arg-or-default :onyx.messaging/ack-daemon-timeout opts)) | |
| + (throw (ex-info "Pending timeout cannot be greater than acking daemon timeout" | |
| + {:opts opts :pending-timeout pending-timeout})))) | |
| + | |
| (defrecord TaskLifeCycle | |
| [id log messenger-buffer messenger job-id task-id replica restart-ch | |
| kill-ch outbox-ch seal-resp-ch completion-ch opts task-kill-ch] | |
| @@ -435,8 +463,10 @@ | |
| pending-timeout (or (:onyx/pending-timeout catalog-entry) | |
| (:onyx/pending-timeout defaults)) | |
| r-seq (rsc/create-r-seq pending-timeout input-retry-timeout) | |
| + state (atom {:timeout-pool r-seq}) | |
| _ (taoensso.timbre/info (format "[%s] Warming up Task LifeCycle for job %s, task %s" id job-id (:name task))) | |
| + _ (validate-pending-timeout pending-timeout opts) | |
| pipeline-data {:onyx.core/id id | |
| :onyx.core/job-id job-id | |
| @@ -446,7 +476,7 @@ | |
| :onyx.core/workflow (extensions/read-chunk log :workflow job-id) | |
| :onyx.core/flow-conditions flow-conditions | |
| :onyx.core/compiled-start-task-fn (compile-start-task-functions lifecycles (:name task)) | |
| - :onyx.core/compiled-before-task-fn (compile-before-task-functions lifecycles (:name task)) | |
| + :onyx.core/compiled-before-task-start-fn (compile-before-task-start-functions lifecycles (:name task)) | |
| :onyx.core/compiled-before-batch-fn (compile-before-batch-task-functions lifecycles (:name task)) | |
| :onyx.core/compiled-after-batch-fn (compile-after-batch-task-functions lifecycles (:name task)) | |
| :onyx.core/compiled-after-task-fn (compile-after-task-functions lifecycles (:name task)) | |
| @@ -462,16 +492,19 @@ | |
| :onyx.core/outbox-ch outbox-ch | |
| :onyx.core/seal-ch seal-resp-ch | |
| :onyx.core/peer-opts (resolve-compression-fn-impls opts) | |
| + :onyx.core/max-downstream-links (or (:onyx.messaging/max-downstream-links opts) | |
| + (:onyx.messaging/max-downstream-links defaults)) | |
| + :onyx.core/max-acker-links (or (:onyx.messaging/max-acker-links opts) | |
| + (:onyx.messaging/max-acker-links defaults)) | |
| :onyx.core/fn (resolve-task-fn catalog-entry) | |
| :onyx.core/replica replica | |
| - :onyx.core/state (atom {:timeout-pool r-seq})} | |
| + :onyx.core/state state} | |
| ex-f (fn [e] (handle-exception e restart-ch outbox-ch job-id)) | |
| - pipeline-data (merge pipeline-data ((:onyx.core/compiled-before-task-fn pipeline-data) pipeline-data))] | |
| - | |
| - (while (and (first (alts!! [kill-ch task-kill-ch] :default true)) | |
| - (not (munge-start-lifecycle pipeline-data))) | |
| - (Thread/sleep (or (:onyx.peer/sequential-back-off opts) 2000))) | |
| + _ (while (and (first (alts!! [kill-ch task-kill-ch] :default true)) | |
| + (not (munge-start-lifecycle pipeline-data))) | |
| + (Thread/sleep (or (:onyx.peer/peer-not-ready-back-off opts) 2000))) | |
| + pipeline-data (merge pipeline-data ((:onyx.core/compiled-before-task-start-fn pipeline-data) pipeline-data))] | |
| (>!! outbox-ch (entry/create-log-entry :signal-ready {:id id})) | |
| @@ -487,14 +520,16 @@ | |
| (let [input-retry-messages-ch (input-retry-messages! messenger pipeline-data input-retry-timeout task-kill-ch) | |
| aux-ch (launch-aux-threads! messenger pipeline-data outbox-ch seal-resp-ch completion-ch task-kill-ch) | |
| - task-lifecycle-ch (thread (run-task-lifecycle pipeline-data seal-resp-ch kill-ch ex-f))] | |
| + task-lifecycle-ch (thread (run-task-lifecycle pipeline-data seal-resp-ch kill-ch ex-f)) | |
| + peer-link-gc-thread (future (gc-peer-links pipeline-data state opts))] | |
| (assoc component | |
| :pipeline-data pipeline-data | |
| :seal-ch seal-resp-ch | |
| :task-kill-ch task-kill-ch | |
| :task-lifecycle-ch task-lifecycle-ch | |
| :input-retry-messages-ch input-retry-messages-ch | |
| - :aux-ch aux-ch))) | |
| + :aux-ch aux-ch | |
| + :peer-link-gc-thread peer-link-gc-thread))) | |
| (catch Throwable e | |
| (handle-exception e restart-ch outbox-ch job-id) | |
| component))) | |
| @@ -502,19 +537,22 @@ | |
| (stop [component] | |
| (taoensso.timbre/info (format "[%s] Stopping Task LifeCycle for %s" id (:onyx.core/task (:pipeline-data component)))) | |
| (when-let [event (:pipeline-data component)] | |
| - ((:onyx.core/compiled-after-task-fn event) event) | |
| - | |
| ;; Ensure task operations are finished before closing peer connections | |
| (close! (:seal-ch component)) | |
| (<!! (:task-lifecycle-ch component)) | |
| - | |
| (close! (:task-kill-ch component)) | |
| + | |
| (<!! (:input-retry-messages-ch component)) | |
| (<!! (:aux-ch component)) | |
| + ((:onyx.core/compiled-after-task-fn event) event) | |
| + | |
| (let [state @(:onyx.core/state event)] | |
| - (doseq [[_ link] (:links state)] | |
| - (extensions/close-peer-connection (:onyx.core/messenger event) event link)))) | |
| + (doseq [[_ link-map] (:links state)] | |
| + (extensions/close-peer-connection (:onyx.core/messenger event) event (:link link-map))))) | |
| + | |
| + (when-let [t (:peer-link-gc-thread component)] | |
| + (future-cancel t)) | |
| (assoc component | |
| :pipeline-data nil | |
| @@ -522,7 +560,8 @@ | |
| :aux-ch nil | |
| :input-retry-messages-ch nil | |
| :task-lifecycle-ch nil | |
| - :task-lifecycle-ch nil))) | |
| + :task-lifecycle-ch nil | |
| + :peer-link-gc-thread nil))) | |
| (defn task-lifecycle [args {:keys [id log messenger-buffer messenger job task replica | |
| restart-ch kill-ch outbox-ch seal-ch completion-ch opts task-kill-ch]}] | |
| diff --git a/src/onyx/peer/virtual_peer.clj b/src/onyx/peer/virtual_peer.clj | |
| index 1447e52..5338016 100644 | |
| --- a/src/onyx/peer/virtual_peer.clj | |
| +++ b/src/onyx/peer/virtual_peer.clj | |
| @@ -37,10 +37,9 @@ | |
| :task-lifecycle-fn task-lifecycle} | |
| (:onyx.peer/state opts))] | |
| (let [replica @replica-atom | |
| - position (first (alts!! [kill-ch inbox-ch] :priority true))] | |
| - (if position | |
| - (let [entry (extensions/read-log-entry log position) | |
| - new-replica (extensions/apply-log-entry entry replica) | |
| + entry (first (alts!! [kill-ch inbox-ch] :priority true))] | |
| + (if entry | |
| + (let [new-replica (extensions/apply-log-entry entry replica) | |
| diff (extensions/replica-diff entry replica new-replica) | |
| reactions (extensions/reactions entry replica new-replica diff state) | |
| new-state (extensions/fire-side-effects! entry replica new-replica diff state)] | |
| @@ -85,9 +84,7 @@ | |
| restart-ch (chan 1) | |
| completion-ch (:completions-ch acking-daemon) | |
| peer-site (extensions/peer-site messenger) | |
| - entry (create-log-entry :prepare-join-cluster | |
| - {:joiner id | |
| - :peer-site peer-site}) | |
| + entry (create-log-entry :prepare-join-cluster {:joiner id :peer-site peer-site}) | |
| origin (extensions/subscribe-to-log log inbox-ch)] | |
| (extensions/register-pulse log id) | |
| (>!! outbox-ch entry) | |
| @@ -101,7 +98,7 @@ | |
| :outbox-ch outbox-ch :kill-ch kill-ch | |
| :restart-ch restart-ch))) | |
| (catch Throwable e | |
| - (taoensso.timbre/fatal (format "Error starting Virtual Peer %s" id) e) | |
| + (taoensso.timbre/fatal e (format "Error starting Virtual Peer %s" id)) | |
| (throw e))))) | |
| (stop [component] | |
| diff --git a/src/onyx/plugin/core_async.clj b/src/onyx/plugin/core_async.clj | |
| index 4b9bc63..b76634d 100644 | |
| --- a/src/onyx/plugin/core_async.clj | |
| +++ b/src/onyx/plugin/core_async.clj | |
| @@ -2,25 +2,32 @@ | |
| (:require [clojure.core.async :refer [chan >!! <!! alts!! timeout go <!]] | |
| [onyx.peer.pipeline-extensions :as p-ext] | |
| [onyx.static.default-vals :refer [defaults]] | |
| - [taoensso.timbre :refer [debug] :as timbre])) | |
| + [taoensso.timbre :refer [debug info] :as timbre])) | |
| (defn inject-reader | |
| [event lifecycle] | |
| - (assert (:core.async/chan event) ":core.async/chan not found - add it via inject-lifecycle-resources.") | |
| + (assert (:core.async/chan event) ":core.async/chan not found - add it using a :before-task-start lifecycle") | |
| {:core.async/pending-messages (atom {}) | |
| :core.async/drained? (atom false) | |
| - :core.async/retry-ch (chan 1000)}) | |
| + :core.async/retry-ch (chan 10000) | |
| + :core.async/retry-count (atom 0)}) | |
| + | |
| +(defn log-retry-count | |
| + [event lifecycle] | |
| + (info "core.async input plugin stopping. Retry count:" @(:core.async/retry-count event)) | |
| + {}) | |
| (defn inject-writer | |
| [event lifecycle] | |
| - (assert (:core.async/chan event) ":core.async/chan not found - add it via inject-lifecycle-resources.") | |
| + (assert (:core.async/chan event) ":core.async/chan not found - add it using a :before-task-start lifecycle") | |
| {}) | |
| (def reader-calls | |
| - {:lifecycle/before-task inject-reader}) | |
| + {:lifecycle/before-task-start inject-reader | |
| + :lifecycle/after-task-stop log-retry-count}) | |
| (def writer-calls | |
| - {:lifecycle/before-task inject-writer}) | |
| + {:lifecycle/before-task-start inject-writer}) | |
| (defmethod p-ext/read-batch :core.async/read-from-chan | |
| [{:keys [onyx.core/task-map core.async/chan core.async/retry-ch | |
| @@ -32,10 +39,9 @@ | |
| ms (or (:onyx/batch-timeout task-map) (:onyx/batch-timeout defaults)) | |
| step-ms (/ ms (:onyx/batch-size task-map)) | |
| timeout-ch (timeout ms) | |
| - batch (if (zero? max-segments) | |
| - (<!! timeout-ch) | |
| + batch (if (pos? max-segments) | |
| (loop [segments [] cnt 0] | |
| - (if (= cnt batch-size) | |
| + (if (= cnt max-segments) | |
| segments | |
| (if-let [message (first (alts!! [retry-ch chan timeout-ch] :priority true))] | |
| (recur (conj segments | |
| @@ -43,7 +49,8 @@ | |
| :input :core.async | |
| :message message}) | |
| (inc cnt)) | |
| - segments))))] | |
| + segments))) | |
| + (<!! timeout-ch))] | |
| (doseq [m batch] | |
| (swap! pending-messages assoc (:id m) (:message m))) | |
| (when (and (= 1 (count @pending-messages)) | |
| @@ -57,10 +64,12 @@ | |
| (swap! pending-messages dissoc message-id)) | |
| (defmethod p-ext/retry-message :core.async/read-from-chan | |
| - [{:keys [core.async/pending-messages core.async/retry-ch]} message-id] | |
| + [{:keys [core.async/pending-messages core.async/retry-count core.async/retry-ch]} message-id] | |
| (when-let [msg (get @pending-messages message-id)] | |
| - (>!! retry-ch msg) | |
| - (swap! pending-messages dissoc message-id))) | |
| + (swap! pending-messages dissoc message-id) | |
| + (when-not (= msg :done) | |
| + (swap! retry-count inc)) | |
| + (>!! retry-ch msg))) | |
| (defmethod p-ext/pending? :core.async/read-from-chan | |
| [{:keys [core.async/pending-messages]} message-id] | |
| diff --git a/src/onyx/scheduling/balanced_job_scheduler.clj b/src/onyx/scheduling/balanced_job_scheduler.clj | |
| index 3ecccec..df058e9 100644 | |
| --- a/src/onyx/scheduling/balanced_job_scheduler.clj | |
| +++ b/src/onyx/scheduling/balanced_job_scheduler.clj | |
| @@ -1,26 +1,9 @@ | |
| (ns onyx.scheduling.balanced-job-scheduler | |
| (:require [onyx.scheduling.common-job-scheduler :as cjs] | |
| [onyx.scheduling.common-task-scheduler :as cts] | |
| + [taoensso.timbre :refer [info]] | |
| [onyx.log.commands.common :as common])) | |
| -(defn job-coverable? [replica job n] | |
| - (let [tasks (get-in replica [:tasks job])] | |
| - (>= n (count tasks)))) | |
| - | |
| -(defn allocate-peers [{:keys [jobs peers] :as replica}] | |
| - (loop [results {}] | |
| - (let [j (count jobs) | |
| - p (count peers) | |
| - min-peers (int (/ p j)) | |
| - n (rem p j) | |
| - max-peers (inc min-peers) | |
| - allocations | |
| - (reduce | |
| - (fn [all [job k]] | |
| - (assoc all job (if (< k n) max-peers min-peers))) | |
| - {} | |
| - (map vector jobs (range)))]))) | |
| - | |
| (defmethod cjs/job-offer-n-peers :onyx.job-scheduler/balanced | |
| [{:keys [jobs peers] :as replica}] | |
| (if (seq jobs) | |
| @@ -36,21 +19,45 @@ | |
| (map vector jobs (range)))) | |
| {})) | |
| +(defmethod cjs/sort-job-priority :onyx.job-scheduler/balanced | |
| + [replica jobs] | |
| + (sort-by (juxt (fn [job] (apply + (map count (vals (get-in replica [:allocations job]))))) | |
| + #(.indexOf ^clojure.lang.PersistentVector (vec (:jobs replica)) %)) | |
| + (:jobs replica))) | |
| + | |
| +;; filter out saturated, then sort by is-covered? (no before yes), | |
| +;; then by number of allocated peers | |
| + | |
| +(defn select-job-requiring-peer | |
| + "Selects the next job deserving a peer. | |
| + Tries to cover job requiring the least peers to cover first, | |
| + then tries to balance by peer count" | |
| + [replica jobs] | |
| + (->> jobs | |
| + (sort-by (fn [job] | |
| + (let [peer-count (val job) | |
| + covered (max 0 (- (cjs/job-lower-bound replica (key job)) peer-count))] | |
| + (vector covered | |
| + peer-count | |
| + (.indexOf ^clojure.lang.PersistentVector (vec (:jobs replica)) job))))) | |
| + (remove (fn [job] | |
| + (let [peer-count (val job)] | |
| + (>= peer-count (cjs/job-upper-bound replica (key job)))))) | |
| + (ffirst))) | |
| + | |
| +(defmethod cjs/equivalent-allocation? :onyx.job-scheduler/balanced | |
| + [replica replica-new] | |
| + (= (sort (map (fn [[job-id _]] | |
| + (apply + (map count (vals (get-in replica [:allocations job-id]))))) | |
| + (:allocations replica))) | |
| + (sort (map (fn [[job-id _]] | |
| + (apply + (map count (vals (get-in replica-new [:allocations job-id]))))) | |
| + (:allocations replica-new))))) | |
| + | |
| (defmethod cjs/claim-spare-peers :onyx.job-scheduler/balanced | |
| [replica jobs n] | |
| - (let [ordered-jobs (sort-by (juxt #(.indexOf ^clojure.lang.PersistentVector (vec (:jobs replica)) %) | |
| - #(count (get-in replica [:tasks %]))) | |
| - (:jobs replica))] | |
| - (loop [[head & tail :as job-seq] ordered-jobs | |
| - results jobs | |
| - capacity n] | |
| - (let [tail (vec tail) | |
| - to-cover (- (count (get-in replica [:tasks head])) (get results head 0))] | |
| - (cond (or (<= capacity 0) (not (seq job-seq))) | |
| - results | |
| - (and (>= capacity to-cover) (pos? to-cover)) | |
| - (recur (conj tail head) (update-in results [head] + to-cover) (- capacity to-cover)) | |
| - (and (< (get results head) (get-in replica [:saturation head])) (pos? (- capacity to-cover))) | |
| - (recur (conj tail head) (update-in results [head] inc) (dec capacity)) | |
| - :else | |
| - (recur tail results capacity)))))) | |
| + (loop [jobs* jobs n* n] | |
| + (if (zero? n*) | |
| + jobs* | |
| + (recur (update-in jobs* [(select-job-requiring-peer replica jobs*)] inc) | |
| + (dec n*))))) | |
| diff --git a/src/onyx/scheduling/balanced_task_scheduler.clj b/src/onyx/scheduling/balanced_task_scheduler.clj | |
| index a305728..52997c6 100644 | |
| --- a/src/onyx/scheduling/balanced_task_scheduler.clj | |
| +++ b/src/onyx/scheduling/balanced_task_scheduler.clj | |
| @@ -1,5 +1,6 @@ | |
| (ns onyx.scheduling.balanced-task-scheduler | |
| (:require [onyx.scheduling.common-task-scheduler :as cts] | |
| + [onyx.scheduling.common-job-scheduler :as cjs] | |
| [onyx.log.commands.common :as common])) | |
| (defmethod cts/drop-peers :onyx.task-scheduler/balanced | |
| @@ -16,45 +17,53 @@ | |
| task-most-peers (ffirst (filter (fn [x] (= max-peers (count (second x)))) allocations))] | |
| [(conj peers-to-drop (last (allocations task-most-peers))) | |
| (update-in allocations [task-most-peers] butlast)])) | |
| - [[] (get-in replica [:allocations job])] | |
| + [[] (cts/filter-grouped-tasks replica job (get-in replica [:allocations job]))] | |
| (range n)))) | |
| -(defmethod cts/task-claim-n-peers :onyx.task-scheduler/balanced | |
| - [replica job n] | |
| - (let [tasks (get-in replica [:tasks job])] | |
| - ;; If the number of peers is less than the number of tasks, | |
| - ;; we're not covered - so we claim zero peers. Otherwise we | |
| - ;; take as much as we're allowed, depending on the saturation | |
| - ;; of the job. | |
| - (if (< n (count tasks)) | |
| - 0 | |
| - (min (get-in replica [:saturation job] Double/POSITIVE_INFINITY) n)))) | |
| - | |
| (defn reuse-spare-peers [replica job tasks spare-peers] | |
| - (loop [[head & tail :as task-seq] (get-in replica [:tasks job]) | |
| + (loop [task-seq (into #{} (get-in replica [:tasks job])) | |
| results tasks | |
| capacity spare-peers] | |
| - (let [tail (vec tail)] | |
| - (cond (or (<= capacity 0) (not (seq task-seq))) | |
| - results | |
| - (< (get results head) (or (get-in replica [:task-saturation job head] Double/POSITIVE_INFINITY))) | |
| - (recur (conj tail head) (update-in results [head] inc) (dec capacity)) | |
| - :else | |
| - (recur tail results capacity))))) | |
| + (let [least-allocated-task (first (sort-by | |
| + (juxt | |
| + #(get results %) | |
| + #(.indexOf ^clojure.lang.PersistentVector (vec (get-in replica [:tasks job])) %)) | |
| + task-seq))] | |
| + (cond | |
| + ;; If there are no more peers to give out, or no more tasks | |
| + ;; want peers, we're done. | |
| + (or (<= capacity 0) (nil? least-allocated-task)) | |
| + results | |
| + | |
| + ;; If we're underneath the saturation level for this task, and this | |
| + ;; task is allowed to be allocated to, we give it one peer and rotate it | |
| + ;; to the back to possibly get more peers later. | |
| + (and (< (get results least-allocated-task) | |
| + (or (get-in replica [:task-saturation job least-allocated-task] | |
| + Double/POSITIVE_INFINITY))) | |
| + (not (cts/preallocated-grouped-task? replica job least-allocated-task))) | |
| + (recur task-seq (update-in results [least-allocated-task] inc) (dec capacity)) | |
| + | |
| + ;; This task doesn't want more peers, throw it away from the rotating sequence. | |
| + :else | |
| + (recur (disj task-seq least-allocated-task) results capacity))))) | |
| (defmethod cts/task-distribute-peer-count :onyx.task-scheduler/balanced | |
| [replica job n] | |
| (let [tasks (get-in replica [:tasks job]) | |
| - t (count tasks) | |
| - min-peers (int (/ n t)) | |
| - r (rem n t) | |
| - max-peers (inc min-peers) | |
| - init | |
| - (reduce | |
| - (fn [all [task k]] | |
| - (assoc all task (min (get-in replica [:task-saturation job task] Double/POSITIVE_INFINITY) | |
| - (if (< k r) max-peers min-peers)))) | |
| - {} | |
| - (map vector tasks (range))) | |
| - spare-peers (- n (apply + (vals init)))] | |
| - (reuse-spare-peers replica job init spare-peers))) | |
| + t (cjs/job-lower-bound replica job)] | |
| + (if (< n t) | |
| + (zipmap tasks (repeat 0)) | |
| + (let [init | |
| + (reduce | |
| + (fn [all [task k]] | |
| + ;; If it's a grouped task that has already been allocated, | |
| + ;; we can't add more peers since that would break the hashing algorithm. | |
| + (if (cts/preallocated-grouped-task? replica job task) | |
| + (assoc all task (count (get-in replica [:allocations job task]))) | |
| + (assoc all task (min (get-in replica [:task-saturation job task] Double/POSITIVE_INFINITY) | |
| + (get-in replica [:min-required-peers job task] Double/POSITIVE_INFINITY))))) | |
| + {} | |
| + (map vector tasks (range))) | |
| + spare-peers (- n (apply + (vals init)))] | |
| + (reuse-spare-peers replica job init spare-peers))))) | |
| \ No newline at end of file | |
| diff --git a/src/onyx/scheduling/common_job_scheduler.clj b/src/onyx/scheduling/common_job_scheduler.clj | |
| index c3755ae..554ed04 100644 | |
| --- a/src/onyx/scheduling/common_job_scheduler.clj | |
| +++ b/src/onyx/scheduling/common_job_scheduler.clj | |
| @@ -8,6 +8,42 @@ | |
| [onyx.scheduling.common-task-scheduler :as cts] | |
| [taoensso.timbre :refer [info]])) | |
| +(defn job-upper-bound [replica job] | |
| + ;; We need to handle a special case here when figuring out the upper saturation limit. | |
| + ;; If this is a job with a grouped task that has already been allocated, | |
| + ;; we can't allocate to the grouped task anymore, even if it's saturation | |
| + ;; level is Infinity. | |
| + (let [tasks (get-in replica [:tasks job]) | |
| + grouped-tasks (filter (fn [task] (get-in replica [:flux-policies job task])) tasks)] | |
| + (if (seq (filter (fn [task] (seq (get-in replica [:allocations job task]))) grouped-tasks)) | |
| + (apply + (map (fn [task] | |
| + (if (some #{task} grouped-tasks) | |
| + ;; Cannot allocate anymore, you have what you have. | |
| + (count (get-in replica [:allocations job task])) | |
| + ;; Allocate as much the original task saturation allows since it hasn't | |
| + ;; been allocated yet. | |
| + (get-in replica [:task-saturation job task]))) | |
| + tasks)) | |
| + (get-in replica [:saturation job] Double/POSITIVE_INFINITY)))) | |
| + | |
| +(defn job-lower-bound [replica job] | |
| + ;; Again, we handle the special case of a grouped task that has already | |
| + ;; begun. | |
| + (let [tasks (get-in replica [:tasks job]) | |
| + grouped-tasks (filter (fn [task] (get-in replica [:flux-policies job task])) tasks)] | |
| + (if (seq (filter (fn [task] (seq (get-in replica [:allocations job task]))) grouped-tasks)) | |
| + (apply + (map (fn [task] | |
| + (if (some #{task} grouped-tasks) | |
| + ;; Cannot allocate anymore, you have what you have. | |
| + (count (get-in replica [:allocations job task])) | |
| + ;; Grab the absolute minimum for this task, no constraints. | |
| + (get-in replica [:min-required-peers job task]))) | |
| + tasks)) | |
| + (apply + (vals (get-in replica [:min-required-peers job])))))) | |
| + | |
| +(defn job-coverable? [replica job n] | |
| + (>= n (job-lower-bound replica job))) | |
| + | |
| (defmulti job-offer-n-peers | |
| (fn [replica] | |
| (:job-scheduler replica))) | |
| @@ -16,6 +52,10 @@ | |
| (fn [replica jobs n] | |
| (:job-scheduler replica))) | |
| +(defmulti sort-job-priority | |
| + (fn [replica jobs] | |
| + (:job-scheduler replica))) | |
| + | |
| (defmethod job-offer-n-peers :default | |
| [replica] | |
| (throw (ex-info (format "Job scheduler %s not recognized" (:job-scheduler replica)) | |
| @@ -26,6 +66,11 @@ | |
| (throw (ex-info (format "Job scheduler %s not recognized" (:job-scheduler replica)) | |
| {:job-scheduler (:job-scheduler replica)}))) | |
| +(defmethod sort-job-priority :default | |
| + [replica] | |
| + (throw (ex-info (format "Job scheduler %s not recognized" (:job-scheduler replica)) | |
| + {:job-scheduler (:job-scheduler replica)}))) | |
| + | |
| (defn current-job-allocations [replica] | |
| (into {} | |
| (map (fn [j] | |
| @@ -42,27 +87,31 @@ | |
| (get-in replica [:tasks j])))}) | |
| (:jobs replica)))) | |
| -(defn job->task-claims [replica job-offers] | |
| +(defn job-claim-peers [replica job-offers] | |
| (reduce-kv | |
| - (fn [all j claim] | |
| - (assoc all j (cts/task-claim-n-peers replica j claim))) | |
| + (fn [all j n] | |
| + (if (job-coverable? replica j n) | |
| + (let [sat (job-upper-bound replica j)] | |
| + (assoc all j (min sat n))) | |
| + (assoc all j 0))) | |
| {} | |
| job-offers)) | |
| (defn reallocate-peers [origin-replica displaced-peers max-utilization] | |
| (loop [peer-pool displaced-peers | |
| replica origin-replica] | |
| - (let [candidate-jobs (filter identity | |
| - (mapcat | |
| - (fn [job] | |
| - (let [current (get (current-task-allocations replica) job) | |
| - desired (cts/task-distribute-peer-count replica job (get max-utilization job)) | |
| - tasks (get-in replica [:tasks job])] | |
| - (map (fn [t] | |
| - (when (< (or (get current t) 0) (get desired t)) | |
| - [job t])) | |
| - tasks))) | |
| - (:jobs replica)))] | |
| + (let [candidate-jobs (remove | |
| + nil? | |
| + (mapcat | |
| + (fn [job] | |
| + (let [current (get (current-task-allocations replica) job) | |
| + desired (cts/task-distribute-peer-count origin-replica job (get max-utilization job)) | |
| + tasks (get-in replica [:tasks job])] | |
| + (map (fn [t] | |
| + (when (< (or (get current t) 0) (get desired t)) | |
| + [job t])) | |
| + tasks))) | |
| + (sort-job-priority replica (:jobs replica))))] | |
| (if (and (seq peer-pool) (seq candidate-jobs)) | |
| (recur (rest peer-pool) | |
| (let [removed (common/remove-peers replica (first peer-pool)) | |
| @@ -101,15 +150,43 @@ | |
| (and (get-in replica [:acker-exclude-outputs job]) | |
| (some #{task} (get-in replica [:output-tasks job]))))) | |
| -(defn choose-acker-candidates [replica peers] | |
| - (remove | |
| - (fn [p] | |
| - (let [{:keys [job task]} (common/peer->allocated-job (:allocations replica) p)] | |
| - (exempt-from-acker? replica job task))) | |
| +(defn find-physically-colocated-peers | |
| + "Takes replica and a peer. Returns a set of peers, exluding this peer, | |
| + that reside on the same physical machine." | |
| + [replica peer] | |
| + (let [peers (remove (fn [p] (= p peer)) (:peers replica)) | |
| + peer-site (extensions/get-peer-site replica peer)] | |
| + (filter | |
| + (fn [p] | |
| + (= (extensions/get-peer-site replica p) peer-site)) | |
| + peers))) | |
| + | |
| +(defn sort-acker-candidates | |
| + "We try to be smart about which ackers we pick. If we can avoid | |
| + colocating an acker and any peers executing an exempt task, | |
| + we try to. It's a best effort, though, so if it's not possible | |
| + we proceed anyway." | |
| + [replica peers] | |
| + (sort-by | |
| + (fn [peer] | |
| + (let [colocated-peers (find-physically-colocated-peers replica peer) | |
| + statuses (map | |
| + #(let [{:keys [job task]} (common/peer->allocated-job (:allocations replica) %)] | |
| + (exempt-from-acker? replica job task)) | |
| + colocated-peers)] | |
| + (some #{true} statuses))) | |
| peers)) | |
| +(defn choose-acker-candidates [replica peers] | |
| + (sort-acker-candidates | |
| + replica | |
| + (remove | |
| + (fn [p] | |
| + (let [{:keys [job task]} (common/peer->allocated-job (:allocations replica) p)] | |
| + (exempt-from-acker? replica job task))) | |
| + peers))) | |
| + | |
| (defn choose-ackers [replica] | |
| - ;; TODO: ensure this behaves consistently with respect to ordering | |
| (reduce | |
| (fn [result job] | |
| (let [peers (apply concat (vals (get-in result [:allocations job]))) | |
| @@ -120,11 +197,36 @@ | |
| replica | |
| (:jobs replica))) | |
| +(defn deallocate-starved-jobs | |
| + "Strips out allocations from jobs that no longer meet the minimum number | |
| + of peers. This can happen if a peer leaves from a running job." | |
| + [replica] | |
| + (reduce | |
| + (fn [result job] | |
| + (if (< (apply + (map count (vals (get-in result [:allocations job])))) | |
| + (apply + (vals (get-in result [:min-required-peers job])))) | |
| + (update-in result [:allocations] dissoc job) | |
| + result)) | |
| + replica | |
| + (:jobs replica))) | |
| + | |
| +(defmulti equivalent-allocation? | |
| + (fn [replica replica-new] | |
| + (:job-scheduler replica))) | |
| + | |
| +(defmethod equivalent-allocation? :default | |
| + [_ _] | |
| + false) | |
| + | |
| (defn reconfigure-cluster-workload [replica] | |
| (let [job-offers (job-offer-n-peers replica) | |
| - job-claims (job->task-claims replica job-offers) | |
| + job-claims (job-claim-peers replica job-offers) | |
| spare-peers (apply + (vals (merge-with - job-offers job-claims))) | |
| max-utilization (claim-spare-peers replica job-claims spare-peers) | |
| current-allocations (current-job-allocations replica) | |
| - peers-to-displace (find-displaced-peers replica current-allocations max-utilization)] | |
| - (choose-ackers (reallocate-peers replica peers-to-displace max-utilization)))) | |
| + peers-to-displace (find-displaced-peers replica current-allocations max-utilization) | |
| + deallocated (deallocate-starved-jobs replica) | |
| + updated-replica (choose-ackers (reallocate-peers deallocated peers-to-displace max-utilization))] | |
| + (if (equivalent-allocation? replica updated-replica) | |
| + replica | |
| + updated-replica))) | |
| diff --git a/src/onyx/scheduling/common_task_scheduler.clj b/src/onyx/scheduling/common_task_scheduler.clj | |
| index 0458655..1e17dfc 100644 | |
| --- a/src/onyx/scheduling/common_task_scheduler.clj | |
| +++ b/src/onyx/scheduling/common_task_scheduler.clj | |
| @@ -17,6 +17,18 @@ | |
| completed (get-in replica [:completions job])] | |
| (filter identity (second (diff completed tasks))))) | |
| +(defn preallocated-grouped-task? [replica job task] | |
| + (and (not (nil? (get-in replica [:flux-policies job task]))) | |
| + (> (count (get-in replica [:allocations job task])) 0))) | |
| + | |
| +(defn filter-grouped-tasks [replica job allocations] | |
| + (into | |
| + {} | |
| + (remove | |
| + (fn [[k v]] | |
| + (not (nil? (get-in replica [:flux-policies job k])))) | |
| + allocations))) | |
| + | |
| (defmulti drop-peers | |
| (fn [replica job n] | |
| (get-in replica [:task-schedulers job]))) | |
| @@ -29,20 +41,10 @@ | |
| scheduler) | |
| {:replica replica})))) | |
| -(defmulti task-claim-n-peers | |
| - (fn [replica job n] | |
| - (get-in replica [:task-schedulers job]))) | |
| - | |
| (defmulti task-distribute-peer-count | |
| (fn [replica job n] | |
| (get-in replica [:task-schedulers job]))) | |
| -(defmethod task-claim-n-peers :default | |
| - [replica job n] | |
| - (throw (ex-info (format "Task scheduler %s not recognized" (get-in replica [:task-schedulers job])) | |
| - {:task-scheduler (get-in replica [:task-schedulers job]) | |
| - :job job}))) | |
| - | |
| (defmethod task-distribute-peer-count :default | |
| [replica job n] | |
| (throw (ex-info (format "Task scheduler %s not recognized" (get-in replica [:task-schedulers job])) | |
| diff --git a/src/onyx/scheduling/greedy_job_scheduler.clj b/src/onyx/scheduling/greedy_job_scheduler.clj | |
| index 1ab52b1..7835287 100644 | |
| --- a/src/onyx/scheduling/greedy_job_scheduler.clj | |
| +++ b/src/onyx/scheduling/greedy_job_scheduler.clj | |
| @@ -4,16 +4,14 @@ | |
| [onyx.log.commands.common :as common])) | |
| (defn job-coverable? [replica job] | |
| - (let [tasks (get-in replica [:tasks job])] | |
| - (>= (count (get-in replica [:peers])) (count tasks)))) | |
| + (let [min-req (apply + (vals (get-in replica [:min-required-peers job])))] | |
| + (>= (count (get-in replica [:peers])) min-req))) | |
| (defmethod cjs/job-offer-n-peers :onyx.job-scheduler/greedy | |
| [replica] | |
| (if (seq (:jobs replica)) | |
| - (let [[active & passive] (:jobs replica) | |
| - coverable? (job-coverable? replica active) | |
| - n (if coverable? (count (:peers replica)) 0)] | |
| - (merge {active n} (zipmap passive (repeat 0)))) | |
| + (let [[active & passive] (:jobs replica)] | |
| + (merge {active (count (:peers replica))} (zipmap passive (repeat 0)))) | |
| {})) | |
| (defmethod cjs/claim-spare-peers :onyx.job-scheduler/greedy | |
| @@ -24,3 +22,10 @@ | |
| ;; them. Return the same job claims since nothing will change. | |
| ;; | |
| jobs) | |
| + | |
| +(defmethod cjs/sort-job-priority :onyx.job-scheduler/greedy | |
| + [replica jobs] | |
| + ;; We only care about the first job in a Greedy scheduler. | |
| + (if-let [x (first jobs)] | |
| + (vector x) | |
| + [])) | |
| diff --git a/src/onyx/scheduling/percentage_job_scheduler.clj b/src/onyx/scheduling/percentage_job_scheduler.clj | |
| index f0254df..e49570c 100644 | |
| --- a/src/onyx/scheduling/percentage_job_scheduler.clj | |
| +++ b/src/onyx/scheduling/percentage_job_scheduler.clj | |
| @@ -36,6 +36,12 @@ | |
| init-allocations (min-allocations jobs-to-use n-peers)] | |
| (into {} (map (fn [j] {(:job j) (:capacity j)}) init-allocations)))) | |
| +(defmethod cjs/sort-job-priority :onyx.job-scheduler/percentage | |
| + [replica jobs] | |
| + (sort-by (juxt #(.indexOf ^clojure.lang.PersistentVector (vec (:jobs replica)) %) | |
| + (fn [job] (apply + (map count (vals (get-in replica [:allocations job])))))) | |
| + (:jobs replica))) | |
| + | |
| (defmethod cjs/claim-spare-peers :onyx.job-scheduler/percentage | |
| [replica jobs n] | |
| ;; We can get away with using the exact same algorithm as the | |
| diff --git a/src/onyx/scheduling/percentage_task_scheduler.clj b/src/onyx/scheduling/percentage_task_scheduler.clj | |
| index 058650c..040d12a 100644 | |
| --- a/src/onyx/scheduling/percentage_task_scheduler.clj | |
| +++ b/src/onyx/scheduling/percentage_task_scheduler.clj | |
| @@ -1,67 +1,93 @@ | |
| (ns onyx.scheduling.percentage-task-scheduler | |
| - (:require [onyx.scheduling.common-task-scheduler :as cts] | |
| + (:require [onyx.scheduling.common-job-scheduler :as cjs] | |
| + [onyx.scheduling.common-task-scheduler :as cts] | |
| [onyx.log.commands.common :as common])) | |
| -(defn sort-tasks-by-pct [replica job tasks] | |
| - (let [indexed | |
| - (map-indexed | |
| - (fn [k t] | |
| - {:position k :task t :pct (get-in replica [:task-percentages job t])}) | |
| - (reverse tasks))] | |
| - (reverse (sort-by (juxt :pct :position) indexed)))) | |
| +(defn tasks-by-pct [replica job tasks] | |
| + (map | |
| + (fn [t] | |
| + {:task t :pct (get-in replica [:task-percentages job t])}) | |
| + tasks)) | |
| -(defn min-task-allocations [replica job tasks n-peers] | |
| - (mapv | |
| - (fn [task] | |
| - (let [n (int (Math/floor (* (* 0.01 (:pct task)) n-peers)))] | |
| - (assoc task :allocation n))) | |
| - tasks)) | |
| +(defn rescale-task-percentages | |
| + "Rescale task percentages after saturated tasks were removed" | |
| + [tasks] | |
| + (let [total (/ (apply + (map :pct tasks)) 100)] | |
| + (map (fn [task] | |
| + (update-in task [:pct] / total)) | |
| + tasks))) | |
| -(defn percentage-balanced-taskload [replica job candidate-tasks n-peers] | |
| - (let [sorted-tasks (sort-tasks-by-pct replica job candidate-tasks) | |
| - init-allocations (min-task-allocations replica job sorted-tasks n-peers) | |
| - init-usage (apply + (map :allocation init-allocations)) | |
| - left-over-peers (- n-peers init-usage) | |
| - with-leftovers (update-in init-allocations [0 :allocation] + left-over-peers)] | |
| - (into {} (map (fn [t] {(:task t) t}) with-leftovers)))) | |
| +(defn largest-remainder-allocations | |
| + "Allocates remaining peers to the tasks with the largest remainder. | |
| + e.g. 3 tasks pct allocated 3.5, 1.75, 1.75 -> 3, 2, 2" | |
| + [replica tasks n-peers job] | |
| + (let [tasks* (rescale-task-percentages tasks) | |
| + unrounded (map (fn [task] | |
| + (cond (cts/preallocated-grouped-task? replica job (:task task)) | |
| + (count (get-in replica [:allocations job (:task task)])) | |
| + (not (nil? (get-in replica [:flux-policies job (:task task)]))) | |
| + (max (get-in replica [:min-required-peers job (:task task)] Double/POSITIVE_INFINITY) | |
| + (* 0.01 (:pct task) n-peers)) | |
| + :else (* 0.01 (:pct task) n-peers))) | |
| + tasks*) | |
| + full (map int unrounded) | |
| + taken (apply + full) | |
| + remaining (- n-peers taken) | |
| + full-allocated (zipmap tasks* full) | |
| + remainders (->> (map (fn [task v] | |
| + (vector task (- v (int v)))) | |
| + tasks* | |
| + unrounded) | |
| + (sort-by second) | |
| + (reverse) | |
| + (take remaining) | |
| + (map (juxt first (constantly 1))) | |
| + (into {})) | |
| + final-allocations (merge-with + full-allocated remainders)] | |
| + (mapv (fn [[task allocation]] | |
| + (assoc task :allocation (int allocation))) | |
| + final-allocations))) | |
| + | |
| +(defn percentage-balanced-taskload | |
| + [replica job candidate-tasks n-peers] | |
| + {:post [(>= n-peers 0)]} | |
| + (let [sorted-tasks (tasks-by-pct replica job candidate-tasks) | |
| + allocations (largest-remainder-allocations replica sorted-tasks n-peers job) | |
| + oversaturated (filter (fn [{:keys [task allocation]}] | |
| + (> allocation (get-in replica [:task-saturation job task]))) | |
| + allocations) | |
| + cutoff-oversaturated (->> oversaturated | |
| + (map (fn [{:keys [task] :as t}] | |
| + [task (assoc t :allocation (get-in replica [:task-saturation job task]))])) | |
| + (into {}))] | |
| + (if (empty? cutoff-oversaturated) | |
| + (into {} (map (fn [t] {(:task t) t}) allocations)) | |
| + (let [n-peers-fully-saturated (apply + (map :allocation (vals cutoff-oversaturated))) | |
| + n-remaining-peers (- n-peers n-peers-fully-saturated) | |
| + unallocated-tasks (remove cutoff-oversaturated candidate-tasks)] | |
| + (merge (percentage-balanced-taskload replica job unallocated-tasks n-remaining-peers) | |
| + cutoff-oversaturated))))) | |
| (defmethod cts/drop-peers :onyx.task-scheduler/percentage | |
| [replica job n] | |
| (let [tasks (keys (get-in replica [:allocations job])) | |
| - balanced (percentage-balanced-taskload replica job tasks n)] | |
| + balanced (percentage-balanced-taskload replica job tasks n) | |
| + tasks (cts/filter-grouped-tasks replica job balanced)] | |
| (mapcat | |
| (fn [[task {:keys [allocation]}]] | |
| (drop-last allocation (get-in replica [:allocations job task]))) | |
| - balanced))) | |
| - | |
| -(defmethod cts/task-claim-n-peers :onyx.task-scheduler/percentage | |
| - [replica job n] | |
| - ;; We can reuse the Balanced task scheduler algorithm as is. | |
| - (cts/task-claim-n-peers | |
| - (assoc-in replica [:task-schedulers job] :onyx.task-scheduler/balanced) | |
| - job n)) | |
| - | |
| -(defn reuse-spare-peers [replica job tasks spare-peers] | |
| - (loop [[head & tail :as task-seq] (get-in replica [:tasks job]) | |
| - results tasks | |
| - capacity spare-peers] | |
| - (let [tail (vec tail)] | |
| - (cond (or (<= capacity 0) (not (seq task-seq))) | |
| - results | |
| - (< (get results head) (or (get-in replica [:task-saturation job head] Double/POSITIVE_INFINITY))) | |
| - (recur (conj tail head) (update-in results [head] inc) (dec capacity)) | |
| - :else | |
| - (recur tail results capacity))))) | |
| + (take n (cycle tasks))))) | |
| (defmethod cts/task-distribute-peer-count :onyx.task-scheduler/percentage | |
| [replica job n] | |
| (let [tasks (get-in replica [:tasks job]) | |
| - init | |
| - (reduce | |
| - (fn [all [task k]] | |
| - (assoc all task (min (get-in replica [:task-saturation job task] Double/POSITIVE_INFINITY) | |
| - (int (* n (get-in replica [:task-percentages job task]) 0.01))))) | |
| - {} | |
| - (map vector tasks (range))) | |
| - spare-peers (- n (apply + (vals init)))] | |
| - (reuse-spare-peers replica job init spare-peers))) | |
| + t (cjs/job-lower-bound replica job)] | |
| + (if (< n t) | |
| + (zipmap tasks (repeat 0)) | |
| + (let [grouped (filter (partial cts/preallocated-grouped-task? replica job) tasks) | |
| + not-grouped (remove (partial cts/preallocated-grouped-task? replica job) tasks) | |
| + init (into {} (map (fn [t] {t (count (get-in replica [:allocations job t]))}) grouped)) | |
| + spare-peers (- n (apply + (vals init))) | |
| + balanced (percentage-balanced-taskload replica job not-grouped spare-peers) | |
| + rets (into {} (map (fn [[k v]] {k (:allocation v)}) balanced))] | |
| + (merge init rets))))) | |
| \ No newline at end of file | |
| diff --git a/src/onyx/static/default_vals.clj b/src/onyx/static/default_vals.clj | |
| index 3e08a88..838c3ec 100644 | |
| --- a/src/onyx/static/default_vals.clj | |
| +++ b/src/onyx/static/default_vals.clj | |
| @@ -1,19 +1,25 @@ | |
| (ns onyx.static.default-vals) | |
| +(def default-pending-timeout 60000) | |
| + | |
| (def defaults | |
| {; input task defaults | |
| :onyx/input-retry-timeout 1000 | |
| - :onyx/pending-timeout 60000 | |
| + :onyx/pending-timeout default-pending-timeout | |
| :onyx/max-pending 10000 | |
| ; task defaults | |
| :onyx/batch-timeout 1000 | |
| + ; zookeeper defaults | |
| + :onyx.zookeeper/backoff-base-sleep-time-ms 1000 | |
| + :onyx.zookeeper/backoff-max-sleep-time-ms 30000 | |
| + :onyx.zookeeper/backoff-max-retries 5 | |
| + | |
| ; peer defaults | |
| :onyx.peer/inbox-capacity 1000 | |
| :onyx.peer/outbox-capacity 1000 | |
| :onyx.peer/drained-back-off 400 | |
| - :onyx.peer/sequential-back-off 2000 | |
| :onyx.peer/job-not-ready-back-off 500 | |
| ; messaging defaults | |
| @@ -21,6 +27,16 @@ | |
| :onyx.messaging.netty/thread-pool-sizes 1 | |
| :onyx.messaging.netty/connect-timeout-millis 1000 | |
| :onyx.messaging.netty/pending-buffer-size 10000 | |
| + :onyx.messaging/inbound-buffer-size 200000 | |
| :onyx.messaging/completion-buffer-size 50000 | |
| :onyx.messaging/release-ch-buffer-size 10000 | |
| - :onyx.messaging/retry-ch-buffer-size 10000}) | |
| + :onyx.messaging/retry-ch-buffer-size 10000 | |
| + :onyx.messaging/max-downstream-links 10 | |
| + :onyx.messaging/max-acker-links 5 | |
| + :onyx.messaging/peer-link-gc-interval 90000 | |
| + :onyx.messaging/peer-link-idle-timeout 60000 | |
| + :onyx.messaging/ack-daemon-timeout default-pending-timeout | |
| + :onyx.messaging/ack-daemon-clear-interval 15000}) | |
| + | |
| +(defn arg-or-default [k opts] | |
| + (or (get opts k) (get defaults k))) | |
| diff --git a/src/onyx/static/validation.clj b/src/onyx/static/validation.clj | |
| index 8a251ce..2104277 100644 | |
| --- a/src/onyx/static/validation.clj | |
| +++ b/src/onyx/static/validation.clj | |
| @@ -28,6 +28,12 @@ | |
| :else | |
| (merge base-catalog-entry-validator {:onyx/fn schema/Keyword}))) | |
| +(def group-entry-validator | |
| + {(schema/optional-key :onyx/group-by-key) schema/Keyword | |
| + (schema/optional-key :onyx/group-by-fn) schema/Keyword | |
| + :onyx/min-peers schema/Int | |
| + :onyx/flux-policy (schema/enum :continue :kill)}) | |
| + | |
| (defn task-dispatch-validator [task] | |
| (when (= (:onyx/name task) | |
| (:onyx/type task)) | |
| @@ -42,6 +48,12 @@ | |
| [catalog] | |
| (doseq [entry catalog] | |
| (schema/validate catalog-entry-validator entry) | |
| + (when (and (= (:onyx/type entry) :function) | |
| + (or (not (nil? (:onyx/group-by-key entry))) | |
| + (not (nil? (:onyx/group-by-fn entry))))) | |
| + (let [kws (select-keys entry [:onyx/group-by-fn :onyx/group-by-key | |
| + :onyx/min-peers :onyx/flux-policy])] | |
| + (schema/validate group-entry-validator kws))) | |
| (name-and-type-not-equal entry))) | |
| (defn validate-workflow-names [{:keys [workflow catalog]}] | |
| @@ -53,6 +65,12 @@ | |
| "for the following workflow keywords: " | |
| (apply str (interpose ", " missing-names))))))) | |
| +(defn validate-workflow-no-dupes [{:keys [workflow]}] | |
| + (when-not (= (count workflow) | |
| + (count (set workflow))) | |
| + (throw (ex-info "Workflows entries cannot contain duplicates" | |
| + {:workflow workflow})))) | |
| + | |
| (defn catalog->type-task-names [catalog type-pred] | |
| (set (map :onyx/name | |
| (filter (fn [task] | |
| @@ -86,7 +104,8 @@ | |
| (defn validate-workflow [job] | |
| (validate-workflow-graph job) | |
| - (validate-workflow-names job)) | |
| + (validate-workflow-names job) | |
| + (validate-workflow-no-dupes job)) | |
| (def job-validator | |
| {:catalog [(schema/pred map? 'map?)] | |
| @@ -102,9 +121,10 @@ | |
| (defn validate-lifecycles [lifecycles catalog] | |
| (doseq [lifecycle lifecycles] | |
| - (assert (or (= (:lifecycle/task lifecycle) :all) | |
| - (some #{(:lifecycle/task lifecycle)} (map :onyx/name catalog))) | |
| - (str ":lifecycle/task must either name a task in the catalog or be :all, it was: " (:lifecycle/task lifecycle))) | |
| + (when-not (or (= (:lifecycle/task lifecycle) :all) | |
| + (some #{(:lifecycle/task lifecycle)} (map :onyx/name catalog))) | |
| + (throw (ex-info (str ":lifecycle/task must name a task in the catalog. It was: " (:lifecycle/task lifecycle)) | |
| + {:lifecycle lifecycle :catalog catalog}))) | |
| (schema/validate | |
| {:lifecycle/task schema/Keyword | |
| (schema/optional-key :lifecycle/pre) schema/Keyword | |
| @@ -120,10 +140,13 @@ | |
| :lifecycle/post | |
| :lifecycle/doc])))) | |
| +(def deployment-id-schema | |
| + (schema/either schema/Uuid schema/Str)) | |
| + | |
| (defn validate-env-config [env-config] | |
| (schema/validate | |
| {:zookeeper/address schema/Str | |
| - :onyx/id schema/Uuid | |
| + :onyx/id deployment-id-schema | |
| (schema/optional-key :zookeeper/server?) schema/Bool | |
| (schema/optional-key :zookeeper.server/port) schema/Int} | |
| (select-keys env-config | |
| @@ -132,7 +155,7 @@ | |
| (defn validate-peer-config [peer-config] | |
| (schema/validate | |
| {:zookeeper/address schema/Str | |
| - :onyx/id schema/Uuid | |
| + :onyx/id deployment-id-schema | |
| :onyx.peer/job-scheduler schema/Keyword | |
| :onyx.messaging/impl (schema/enum :aeron :netty :core.async :dummy-messenger) | |
| :onyx.messaging/bind-addr schema/Str | |
| diff --git a/src/onyx/system.clj b/src/onyx/system.clj | |
| index c7cfd7b..ff2873a 100644 | |
| --- a/src/onyx/system.clj | |
| +++ b/src/onyx/system.clj | |
| @@ -14,6 +14,7 @@ | |
| [onyx.log.commands.exhaust-input] | |
| [onyx.log.commands.seal-output] | |
| [onyx.log.commands.signal-ready] | |
| + [onyx.log.commands.set-replica] | |
| [onyx.log.commands.leave-cluster] | |
| [onyx.log.commands.submit-job] | |
| [onyx.log.commands.kill-job] | |
| diff --git a/src/onyx/types.clj b/src/onyx/types.clj | |
| index aee8554..836f8f9 100644 | |
| --- a/src/onyx/types.clj | |
| +++ b/src/onyx/types.clj | |
| @@ -1,11 +1,13 @@ | |
| (ns onyx.types) | |
| (defrecord Leaf [message id acker-id completion-id ack-val ack-vals route routes hash-group]) | |
| + | |
| (defn leaf | |
| ([message] | |
| - (->Leaf message nil nil nil nil nil nil nil nil))) | |
| + (->Leaf message nil nil nil nil nil nil nil nil))) | |
| + | |
| (defrecord Route [flow exclusions post-transformation action]) | |
| -(defrecord Ack [id completion-id ack-val]) | |
| -(defrecord Result [root leaves]) | |
| +(defrecord Ack [id completion-id ack-val timestamp]) | |
| +(defrecord Result [root leaves]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment