Skip to content

Instantly share code, notes, and snippets.

@MichaelDrogalis
Created May 29, 2015 17:04
Show Gist options
  • Select an option

  • Save MichaelDrogalis/0c734b804d02b4e87b53 to your computer and use it in GitHub Desktop.

Select an option

Save MichaelDrogalis/0c734b804d02b4e87b53 to your computer and use it in GitHub Desktop.
diff --git a/src/onyx/api.clj b/src/onyx/api.clj
index 504fdb1..00c41fd 100644
--- a/src/onyx/api.clj
+++ b/src/onyx/api.clj
@@ -2,7 +2,7 @@
(:require [clojure.string :refer [split]]
[clojure.core.async :refer [chan alts!! >!! <!! close!]]
[com.stuartsierra.component :as component]
- [taoensso.timbre :refer [warn fatal]]
+ [taoensso.timbre :refer [warn fatal error]]
[onyx.log.entry :refer [create-log-entry]]
[onyx.system :as system]
[onyx.extensions :as extensions]
@@ -29,6 +29,23 @@
Double/POSITIVE_INFINITY)})
tasks)))
+(defn ^{:no-doc true} min-required-peers [catalog tasks]
+ (into
+ {}
+ (map
+ (fn [task]
+ {(:id task)
+ (or (:onyx/min-peers (planning/find-task catalog (:name task))) 1)})
+ tasks)))
+
+(defn ^{:no-doc true} flux-policies [catalog tasks]
+ (->> tasks
+ (map (fn [task]
+ (vector (:id task)
+ (:onyx/flux-policy (planning/find-task catalog (:name task))))))
+ (filter second)
+ (into {})))
+
(defn ^{:added "0.6.0"} map-set-workflow->workflow
"Converts a workflow in format:
{:a #{:b :c}
@@ -88,12 +105,20 @@
scheduler (:task-scheduler job)
sat (saturation (:catalog job))
task-saturation (task-saturation (:catalog job) tasks)
+ min-reqs (min-required-peers (:catalog job) tasks)
+ task-flux-policies (flux-policies (:catalog job) tasks)
input-task-ids (find-input-tasks (:catalog job) tasks)
output-task-ids (find-output-tasks (:catalog job) tasks)
exempt-task-ids (find-exempt-tasks tasks (:acker/exempt-tasks job))
- args {:id id :tasks task-ids :task-scheduler scheduler
- :saturation sat :task-saturation task-saturation
- :inputs input-task-ids :outputs output-task-ids
+ args {:id id
+ :tasks task-ids
+ :task-scheduler scheduler
+ :saturation sat
+ :task-saturation task-saturation
+ :min-required-peers min-reqs
+ :flux-policies task-flux-policies
+ :inputs input-task-ids
+ :outputs output-task-ids
:exempt-tasks exempt-task-ids
:acker-percentage (or (:acker/percentage job) 1)
:acker-exclude-inputs (or (:acker/exempt-input-tasks? job) false)
@@ -102,11 +127,15 @@
(create-log-entry :submit-job args)))
(defn ^{:added "0.6.0"} submit-job [config job]
- (validator/validate-peer-config config)
+ (try (validator/validate-peer-config config)
+ (validator/validate-job (assoc job :workflow (:workflow job)))
+ (validator/validate-flow-conditions (:flow-conditions job) (:workflow job))
+ (validator/validate-lifecycles (:lifecycles job) (:catalog job))
+ (catch Throwable t
+ (println t)
+ (error t)
+ (throw t)))
(let [id (java.util.UUID/randomUUID)
- _ (validator/validate-job (assoc job :workflow (:workflow job)))
- _ (validator/validate-flow-conditions (:flow-conditions job) (:workflow job))
- _ (validator/validate-lifecycles (:lifecycles job) (:catalog job))
tasks (planning/discover-tasks (:catalog job) (:workflow job))
entry (create-submit-job-entry id config job tasks)
client (component/start (system/onyx-client config))]
@@ -165,8 +194,7 @@
(extensions/write-log-entry (:log client) entry)
(loop [replica (extensions/subscribe-to-log (:log client) ch)]
- (let [position (<!! ch)
- entry (extensions/read-log-entry (:log client) position)
+ (let [entry (<!! ch)
new-replica (extensions/apply-log-entry entry replica)]
(if (and (= (:fn entry) :gc) (= (:id (:args entry)) id))
(let [diff (extensions/replica-diff entry replica new-replica)]
@@ -181,8 +209,7 @@
(let [client (component/start (system/onyx-client config))
ch (chan 100)]
(loop [replica (extensions/subscribe-to-log (:log client) ch)]
- (let [position (<!! ch)
- entry (extensions/read-log-entry (:log client) position)
+ (let [entry (<!! ch)
new-replica (extensions/apply-log-entry entry replica)]
(if-not (some #{job-id} (:completed-jobs new-replica))
(recur new-replica)
diff --git a/src/onyx/extensions.clj b/src/onyx/extensions.clj
index 2834c75..e3e88c3 100644
--- a/src/onyx/extensions.clj
+++ b/src/onyx/extensions.clj
@@ -41,6 +41,9 @@
(defmulti peer-site (fn [messenger] (type messenger)))
+(defmulti get-peer-site (fn [replica peer]
+ (:onyx.messaging/impl (:messaging replica))))
+
(defmulti open-peer-site (fn [messenger assigned] (type messenger)))
(defmulti connect-to-peer (fn [messenger event peer-site] (type messenger)))
@@ -57,3 +60,6 @@
(defmulti internal-complete-message (fn [messenger event id peer-link] (type messenger)))
(defmulti internal-retry-message (fn [messenger event id peer-link] (type messenger)))
+
+(defmethod open-peer-site :default
+ [_ _] "localhost")
\ No newline at end of file
diff --git a/src/onyx/log/commands/abort_join_cluster.clj b/src/onyx/log/commands/abort_join_cluster.clj
index dc8b8c6..fe34cfb 100644
--- a/src/onyx/log/commands/abort_join_cluster.clj
+++ b/src/onyx/log/commands/abort_join_cluster.clj
@@ -21,18 +21,18 @@
(when (or (seq prepared) (seq accepted))
{:aborted (or (first prepared) (first accepted))})))
-(defmethod extensions/fire-side-effects! :abort-join-cluster
- [{:keys [args]} old new diff state]
- ;; Abort back-off/retry
- (when (= (:id args) (:id state))
- (Thread/sleep (or (:onyx.peer/join-failure-back-off (:opts state)) 250)))
- state)
-
(defmethod extensions/reactions :abort-join-cluster
[{:keys [args]} old new diff peer-args]
- (when (= (:id args) (:id peer-args))
+ (when (and (= (:id args) (:id peer-args))
+ (not (:onyx.peer/try-join-once? (:peer-opts (:messenger peer-args)))))
[{:fn :prepare-join-cluster
:args {:joiner (:id peer-args)
:peer-site (extensions/peer-site (:messenger peer-args))}
:immediate? true}]))
+(defmethod extensions/fire-side-effects! :abort-join-cluster
+ [{:keys [args]} old new diff state]
+ ;; Abort back-off/retry
+ (when (= (:id args) (:id state))
+ (Thread/sleep (or (:onyx.peer/join-failure-back-off (:opts state)) 250)))
+ state)
diff --git a/src/onyx/log/commands/accept_join_cluster.clj b/src/onyx/log/commands/accept_join_cluster.clj
index ed7ed1a..95632bd 100644
--- a/src/onyx/log/commands/accept_join_cluster.clj
+++ b/src/onyx/log/commands/accept_join_cluster.clj
@@ -1,6 +1,7 @@
(ns onyx.log.commands.accept-join-cluster
(:require [clojure.core.async :refer [chan go >! <! >!! close!]]
[clojure.data :refer [diff]]
+ [clojure.set :refer [union difference map-invert]]
[taoensso.timbre :refer [info] :as timbre]
[onyx.extensions :as extensions]
[onyx.log.commands.common :as common]
@@ -10,23 +11,27 @@
[{:keys [args]} replica]
(let [{:keys [accepted-joiner accepted-observer]} args
target (or (get-in replica [:pairs accepted-observer])
- accepted-observer)]
- (-> replica
- (update-in [:pairs] merge {accepted-observer accepted-joiner})
- (update-in [:pairs] merge {accepted-joiner target})
- (update-in [:accepted] dissoc accepted-observer)
- (update-in [:peers] vec)
- (update-in [:peers] conj accepted-joiner)
- (assoc-in [:peer-state accepted-joiner] :idle)
- (reconfigure-cluster-workload))))
+ accepted-observer)
+ still-joining? (get (map-invert (:accepted replica)) accepted-joiner)]
+ (if still-joining?
+ (-> replica
+ (update-in [:pairs] merge {accepted-observer accepted-joiner})
+ (update-in [:pairs] merge {accepted-joiner target})
+ (update-in [:accepted] dissoc accepted-observer)
+ (update-in [:peers] vec)
+ (update-in [:peers] conj accepted-joiner)
+ (assoc-in [:peer-state accepted-joiner] :idle)
+ (reconfigure-cluster-workload))
+ replica)))
(defmethod extensions/replica-diff :accept-join-cluster
[entry old new]
- (let [rets (first (diff (:accepted old) (:accepted new)))]
- (assert (<= (count rets) 1))
- (when (seq rets)
- {:observer (first (keys rets))
- :subject (first (vals rets))})))
+ (if-not (= old new)
+ (let [rets (first (diff (:accepted old) (:accepted new)))]
+ (assert (<= (count rets) 1))
+ (when (seq rets)
+ {:observer (first (keys rets))
+ :subject (first (vals rets))}))))
(defmethod extensions/reactions :accept-join-cluster
[entry old new diff state]
@@ -43,5 +48,6 @@
(defmethod extensions/fire-side-effects! :accept-join-cluster
[entry old new diff state]
- (let [next-state (unbuffer-messages state diff new)]
- (common/start-new-lifecycle old new diff next-state)))
+ (if-not (= old new)
+ (let [next-state (unbuffer-messages state diff new)]
+ (common/start-new-lifecycle old new diff next-state))))
diff --git a/src/onyx/log/commands/gc.clj b/src/onyx/log/commands/gc.clj
index 1525d3e..808be9d 100644
--- a/src/onyx/log/commands/gc.clj
+++ b/src/onyx/log/commands/gc.clj
@@ -2,24 +2,29 @@
(:require [clojure.set :refer [difference]]
[clojure.data :refer [diff]]
[onyx.log.commands.common :as common]
- [onyx.extensions :as extensions]))
+ [onyx.extensions :as extensions]
+ [taoensso.timbre :refer [warn]]))
(defmethod extensions/apply-log-entry :gc
[{:keys [args message-id]} replica]
- (let [completed (:completed-jobs replica)
- killed (:killed-jobs replica)
- jobs (concat completed killed)]
- (as-> replica x
- (assoc x :killed-jobs [])
- (assoc x :completed-jobs [])
- (reduce (fn [new job] (update-in new [:tasks] dissoc job)) x jobs)
- (reduce (fn [new job] (update-in new [:allocations] dissoc job)) x jobs)
- (reduce (fn [new job] (update-in new [:task-schedulers] dissoc job)) x jobs)
- (reduce (fn [new job] (update-in new [:percentages] dissoc job)) x jobs)
- (reduce (fn [new job] (update-in new [:task-percentages] dissoc job)) x jobs)
- (reduce (fn [new job] (update-in new [:saturation] dissoc job)) x jobs)
- (reduce (fn [new job] (update-in new [:input-tasks] dissoc job)) x jobs)
- (reduce (fn [new job] (update-in new [:output-tasks] dissoc job)) x jobs))))
+ (try
+ (let [completed (:completed-jobs replica)
+ killed (:killed-jobs replica)
+ jobs (concat completed killed)]
+ (as-> replica x
+ (assoc x :killed-jobs [])
+ (assoc x :completed-jobs [])
+ (reduce (fn [new job] (update-in new [:tasks] dissoc job)) x jobs)
+ (reduce (fn [new job] (update-in new [:allocations] dissoc job)) x jobs)
+ (reduce (fn [new job] (update-in new [:task-schedulers] dissoc job)) x jobs)
+ (reduce (fn [new job] (update-in new [:percentages] dissoc job)) x jobs)
+ (reduce (fn [new job] (update-in new [:task-percentages] dissoc job)) x jobs)
+ (reduce (fn [new job] (update-in new [:saturation] dissoc job)) x jobs)
+ (reduce (fn [new job] (update-in new [:input-tasks] dissoc job)) x jobs)
+ (reduce (fn [new job] (update-in new [:output-tasks] dissoc job)) x jobs)))
+ (catch Throwable e
+ (warn e)
+ replica)))
(defmethod extensions/replica-diff :gc
[entry old new]
@@ -39,4 +44,3 @@
(doseq [k (range 0 message-id)]
(extensions/gc-log-entry (:log state) k))))
state)
-
diff --git a/src/onyx/log/commands/kill_job.clj b/src/onyx/log/commands/kill_job.clj
index e66cf7d..dfb7158 100644
--- a/src/onyx/log/commands/kill_job.clj
+++ b/src/onyx/log/commands/kill_job.clj
@@ -4,23 +4,32 @@
[onyx.log.commands.common :refer [peer->allocated-job] :as common]
[onyx.scheduling.common-job-scheduler :as cjs]
[onyx.extensions :as extensions]
- [onyx.scheduling.common-job-scheduler :refer [reconfigure-cluster-workload]]))
+ [onyx.scheduling.common-job-scheduler :refer [reconfigure-cluster-workload]]
+ [taoensso.timbre :refer [warn]]))
-(defmethod extensions/apply-log-entry :kill-job
- [{:keys [args]} replica]
- (if-not (some #{(:job args)} (:killed-jobs replica))
- (let [peers (mapcat identity (vals (get-in replica [:allocations (:job args)])))]
+;; Pulled this out of the defmethod because it's reused across other log entries.
+(defn apply-kill-job [replica job-id]
+ (if-not (some #{job-id} (:killed-jobs replica))
+ (let [peers (mapcat identity (vals (get-in replica [:allocations job-id])))]
(-> replica
- (update-in [:jobs] (fn [coll] (remove (partial = (:job args)) coll)))
+ (update-in [:jobs] (fn [coll] (remove (partial = job-id) coll)))
(update-in [:jobs] vec)
- (update-in [:killed-jobs] conj (:job args))
+ (update-in [:killed-jobs] conj job-id)
(update-in [:killed-jobs] vec)
- (update-in [:allocations] dissoc (:job args))
- (update-in [:ackers] dissoc (:job args))
+ (update-in [:allocations] dissoc job-id)
+ (update-in [:ackers] dissoc job-id)
(merge {:peer-state (into {} (map (fn [p] {p :idle}) peers))})
(reconfigure-cluster-workload)))
replica))
+(defmethod extensions/apply-log-entry :kill-job
+ [{:keys [args]} replica]
+ (try
+ (apply-kill-job replica (:job args))
+ (catch Throwable e
+ (warn e)
+ replica)))
+
(defmethod extensions/replica-diff :kill-job
[entry old new]
(second (diff (into #{} (:killed-jobs old)) (into #{} (:killed-jobs new)))))
diff --git a/src/onyx/log/commands/leave_cluster.clj b/src/onyx/log/commands/leave_cluster.clj
index 02b8cf3..fa551d8 100644
--- a/src/onyx/log/commands/leave_cluster.clj
+++ b/src/onyx/log/commands/leave_cluster.clj
@@ -5,8 +5,15 @@
[com.stuartsierra.component :as component]
[onyx.extensions :as extensions]
[onyx.log.commands.common :as common]
+ [onyx.log.commands.kill-job :refer [apply-kill-job]]
[onyx.scheduling.common-job-scheduler :refer [reconfigure-cluster-workload]]))
+(defn enforce-flux-policy [replica id]
+ (let [allocation (common/peer->allocated-job (:allocations replica) id)]
+ (if (= (get-in replica [:flux-policies (:job allocation) (:task allocation)]) :kill)
+ (apply-kill-job replica (:job allocation))
+ replica)))
+
(defmethod extensions/apply-log-entry :leave-cluster
[{:keys [args]} replica]
(let [{:keys [id]} args
@@ -16,6 +23,7 @@
prep-observer (get (map-invert (:prepared replica)) id)
accep-observer (get (map-invert (:accepted replica)) id)]
(-> replica
+ (enforce-flux-policy id)
(update-in [:peers] (partial remove #(= % id)))
(update-in [:peers] vec)
(update-in [:prepared] dissoc id)
diff --git a/src/onyx/log/commands/seal_output.clj b/src/onyx/log/commands/seal_output.clj
index 5bdc55c..6df3aee 100644
--- a/src/onyx/log/commands/seal_output.clj
+++ b/src/onyx/log/commands/seal_output.clj
@@ -23,6 +23,7 @@
(update-in [:jobs] (fn [coll] (remove (partial = (:job args)) coll)))
(update-in [:jobs] vec)
(update-in [:completed-jobs] conj (:job args))
+ (update-in [:completed-jobs] vec)
(update-in [:allocations] dissoc (:job args))
(update-in [:peer-state] merge (into {} (map (fn [p] {p :idle}) peers)))
(reconfigure-cluster-workload)))
diff --git a/src/onyx/log/commands/set_replica.clj b/src/onyx/log/commands/set_replica.clj
new file mode 100644
index 0000000..3ead7b9
--- /dev/null
+++ b/src/onyx/log/commands/set_replica.clj
@@ -0,0 +1,19 @@
+(ns onyx.log.commands.set-replica
+ (:require [clojure.data :refer [diff]]
+ [onyx.extensions :as extensions]))
+
+(defmethod extensions/apply-log-entry :set-replica!
+ [{:keys [args message-id]} replica]
+ (:replica args))
+
+(defmethod extensions/replica-diff :set-replica!
+ [entry old new]
+ {:diff (diff old new)})
+
+(defmethod extensions/reactions :set-replica!
+ [{:keys [args]} old new diff peer-args]
+ [])
+
+(defmethod extensions/fire-side-effects! :set-replica!
+ [{:keys [args]} old new diff state]
+ state)
diff --git a/src/onyx/log/commands/submit_job.clj b/src/onyx/log/commands/submit_job.clj
index ee301dc..01e6fad 100644
--- a/src/onyx/log/commands/submit_job.clj
+++ b/src/onyx/log/commands/submit_job.clj
@@ -6,7 +6,8 @@
[onyx.scheduling.common-job-scheduler :as cjs]
[onyx.scheduling.common-task-scheduler :as cts]
[onyx.extensions :as extensions]
- [onyx.scheduling.common-job-scheduler :refer [reconfigure-cluster-workload]]))
+ [onyx.scheduling.common-job-scheduler :refer [reconfigure-cluster-workload]]
+ [taoensso.timbre :refer [warn]]))
(defmulti job-scheduler-replica-update
(fn [replica entry]
@@ -34,23 +35,29 @@
(defmethod extensions/apply-log-entry :submit-job
[{:keys [args] :as entry} replica]
- (-> replica
- (update-in [:jobs] conj (:id args))
- (update-in [:jobs] vec)
- (assoc-in [:task-schedulers (:id args)] (:task-scheduler args))
- (assoc-in [:tasks (:id args)] (vec (:tasks args)))
- (assoc-in [:allocations (:id args)] {})
- (assoc-in [:saturation (:id args)] (:saturation args))
- (assoc-in [:task-saturation (:id args)] (:task-saturation args))
- (assoc-in [:input-tasks (:id args)] (vec (:inputs args)))
- (assoc-in [:output-tasks (:id args)] (vec (:outputs args)))
- (assoc-in [:exempt-tasks (:id args)] (vec (:exempt-tasks args)))
- (assoc-in [:acker-percentage (:id args)] (:acker-percentage args))
- (assoc-in [:acker-exclude-inputs (:id args)] (:acker-exclude-inputs args))
- (assoc-in [:acker-exclude-outputs (:id args)] (:acker-exclude-outputs args))
- (job-scheduler-replica-update entry)
- (task-scheduler-replica-update entry)
- (reconfigure-cluster-workload)))
+ (try
+ (-> replica
+ (update-in [:jobs] conj (:id args))
+ (update-in [:jobs] vec)
+ (assoc-in [:task-schedulers (:id args)] (:task-scheduler args))
+ (assoc-in [:tasks (:id args)] (vec (:tasks args)))
+ (assoc-in [:allocations (:id args)] {})
+ (assoc-in [:saturation (:id args)] (:saturation args))
+ (assoc-in [:task-saturation (:id args)] (:task-saturation args))
+ (assoc-in [:flux-policies (:id args)] (:flux-policies args))
+ (assoc-in [:min-required-peers (:id args)] (:min-required-peers args))
+ (assoc-in [:input-tasks (:id args)] (vec (:inputs args)))
+ (assoc-in [:output-tasks (:id args)] (vec (:outputs args)))
+ (assoc-in [:exempt-tasks (:id args)] (vec (:exempt-tasks args)))
+ (assoc-in [:acker-percentage (:id args)] (:acker-percentage args))
+ (assoc-in [:acker-exclude-inputs (:id args)] (:acker-exclude-inputs args))
+ (assoc-in [:acker-exclude-outputs (:id args)] (:acker-exclude-outputs args))
+ (job-scheduler-replica-update entry)
+ (task-scheduler-replica-update entry)
+ (reconfigure-cluster-workload))
+ (catch Throwable e
+ (warn e)
+ replica)))
(defmethod extensions/replica-diff :submit-job
[{:keys [args]} old new]
diff --git a/src/onyx/log/curator.clj b/src/onyx/log/curator.clj
new file mode 100644
index 0000000..3a87701
--- /dev/null
+++ b/src/onyx/log/curator.clj
@@ -0,0 +1,138 @@
+(ns onyx.log.curator
+ (:require [clojure.core.async :refer [chan >!! <!! close! thread]]
+ [taoensso.timbre :refer [fatal warn trace]]
+ [onyx.static.default-vals :refer [defaults]])
+ (:import [org.apache.zookeeper CreateMode]
+ [org.apache.zookeeper KeeperException$NoNodeException KeeperException$NodeExistsException Watcher]
+ [org.apache.curator.test TestingServer]
+ [org.apache.zookeeper.data Stat]
+ [org.apache.curator.framework CuratorFrameworkFactory CuratorFramework]
+ [org.apache.curator.framework.api CuratorWatcher PathAndBytesable Versionable GetDataBuilder
+ SetDataBuilder DeleteBuilder ExistsBuilder GetChildrenBuilder Pathable Watchable]
+ [org.apache.curator.framework.state ConnectionStateListener ConnectionState]
+ [org.apache.curator.framework.imps CuratorFrameworkState]
+ [org.apache.curator RetryPolicy]
+ [org.apache.curator.retry BoundedExponentialBackoffRetry]))
+
+;; Thanks to zookeeper-clj
+(defn stat-to-map
+ ([^org.apache.zookeeper.data.Stat stat]
+ ;;(long czxid, long mzxid, long ctime, long mtime, int version, int cversion, int aversion, long ephemeralOwner, int dataLength, int numChildren, long pzxid)
+ (when stat
+ {:czxid (.getCzxid stat)
+ :mzxid (.getMzxid stat)
+ :ctime (.getCtime stat)
+ :mtime (.getMtime stat)
+ :version (.getVersion stat)
+ :cversion (.getCversion stat)
+ :aversion (.getAversion stat)
+ :ephemeralOwner (.getEphemeralOwner stat)
+ :dataLength (.getDataLength stat)
+ :numChildren (.getNumChildren stat)
+ :pzxid (.getPzxid stat)})))
+
+(defn event-to-map
+ ([^org.apache.zookeeper.WatchedEvent event]
+ (when event
+ {:event-type (keyword (.name (.getType event)))
+ :keeper-state (keyword (.name (.getState event)))
+ :path (.getPath event)})))
+
+;; Watcher
+
+(defn make-watcher
+ ([handler]
+ (reify Watcher
+ (process [this event]
+ (handler (event-to-map event))))))
+
+(defn ^CuratorFramework connect
+ ([connection-string]
+ (connect connection-string ""))
+ ([connection-string ns]
+ (connect connection-string ns
+ (BoundedExponentialBackoffRetry.
+ (:onyx.zookeeper/backoff-base-sleep-time-ms defaults)
+ (:onyx.zookeeper/backoff-max-sleep-time-ms defaults)
+ (:onyx.zookeeper/backoff-max-retries defaults))))
+ ([connection-string ns ^RetryPolicy retry-policy]
+ (doto
+ (.. (CuratorFrameworkFactory/builder)
+ (namespace ns)
+ (connectString connection-string)
+ (retryPolicy retry-policy)
+ (build))
+ .start)))
+
+(defn close
+ "Closes the connection to the ZooKeeper server."
+ [^CuratorFramework client]
+ (.close client))
+
+(defn create-mode [opts]
+ (cond (and (:persistent? opts) (:sequential? opts))
+ CreateMode/PERSISTENT_SEQUENTIAL
+ (:persistent? opts)
+ CreateMode/PERSISTENT
+ (:sequential? opts)
+ CreateMode/EPHEMERAL_SEQUENTIAL
+ :else
+ CreateMode/EPHEMERAL))
+
+(defn create
+ [^CuratorFramework client path & {:keys [data] :as opts}]
+ (try
+ (let [cr ^SetDataBuilder (.. client
+ create
+ (withMode (create-mode opts)))]
+ (if data
+ (.forPath ^SetDataBuilder cr path data)
+ (.forPath ^SetDataBuilder cr path)))
+ (catch org.apache.zookeeper.KeeperException$NodeExistsException e
+ false)))
+
+(defn create-all
+ [^CuratorFramework client path & {:keys [data] :as opts}]
+ (try
+ (let [cr (.. client
+ create
+ creatingParentsIfNeeded
+ (withMode (create-mode opts)))]
+ (if data
+ (.forPath ^SetDataBuilder cr path data)
+ (.forPath ^SetDataBuilder cr path)))
+ (catch org.apache.zookeeper.KeeperException$NodeExistsException e
+ false) ))
+
+(defn delete
+ "Deletes the given node if it exists. Otherwise returns false."
+ [^CuratorFramework client path]
+ (try
+ (.forPath ^DeleteBuilder (.delete client) path )
+ (catch KeeperException$NoNodeException e false)))
+
+(defn children
+ ([^CuratorFramework client path & {:keys [watcher]}]
+ (let [children-builder ^GetChildrenBuilder (.getChildren client)]
+ (if watcher
+ (.forPath ^GetChildrenBuilder (.usingWatcher children-builder ^Watcher (make-watcher watcher)) path)
+ (.forPath ^GetChildrenBuilder children-builder path)))))
+
+(defn data [^CuratorFramework client path]
+ (let [stat ^Stat (Stat.)
+ data (.forPath ^GetDataBuilder (.storingStatIn ^GetDataBuilder (.getData client) stat) path)]
+ {:data data
+ :stat (stat-to-map stat)}))
+
+(defn set-data [^CuratorFramework client path data version]
+ (.forPath ^SetDataBuilder (.withVersion ^SetDataBuilder (.setData client)
+ version)
+ path
+ data))
+
+(defn exists [^CuratorFramework client path & {:keys [watcher]}]
+ (stat-to-map
+ (let [builder ^ExistsBuilder (.. client checkExists)]
+ (if watcher
+ (.forPath ^ExistsBuilder (.usingWatcher builder ^Watcher (make-watcher watcher)) path)
+ (.forPath builder path)))))
diff --git a/src/onyx/log/zookeeper.clj b/src/onyx/log/zookeeper.clj
index d5dbf32..33e4040 100644
--- a/src/onyx/log/zookeeper.clj
+++ b/src/onyx/log/zookeeper.clj
@@ -1,11 +1,14 @@
(ns onyx.log.zookeeper
(:require [clojure.core.async :refer [chan >!! <!! close! thread]]
[com.stuartsierra.component :as component]
- [taoensso.timbre :refer [fatal warn]]
- [zookeeper :as zk]
+ [taoensso.timbre :refer [fatal warn trace]]
+ [onyx.log.curator :as zk]
[onyx.extensions :as extensions]
- [onyx.compression.nippy :refer [compress decompress]])
- (:import [org.apache.curator.test TestingServer]))
+ [onyx.static.default-vals :refer [defaults]]
+ [onyx.compression.nippy :refer [compress decompress]]
+ [onyx.log.entry :refer [create-log-entry]])
+ (:import [org.apache.curator.test TestingServer]
+ [org.apache.zookeeper KeeperException$NoNodeException KeeperException$NodeExistsException]))
(def root-path "/onyx")
@@ -55,8 +58,10 @@
(try
(f)
(catch org.apache.zookeeper.KeeperException$ConnectionLossException e
+ (trace e)
(throw-subscriber-closed))
(catch org.apache.zookeeper.KeeperException$SessionExpiredException e
+ (trace e)
(throw-subscriber-closed))))
(defn initialize-origin! [conn config prefix]
@@ -147,6 +152,7 @@
(when-not (zk/exists conn (str (pulse-path prefix) "/" id) :watcher f)
(>!! ch true))
(catch Throwable e
+ (trace e)
;; Node doesn't exist.
(>!! ch true)))))
@@ -176,6 +182,22 @@
(do (Thread/sleep 500)
(recur)))))
+(defn seek-to-new-origin! [log ch]
+ (let [origin (extensions/read-chunk log :origin nil)
+ starting-position (inc (:message-id origin))
+ entry (create-log-entry :set-replica! {:replica origin})]
+ (>!! ch entry)
+ starting-position))
+
+(defn seek-and-put-entry! [log position ch]
+ (try
+ (let [entry (extensions/read-log-entry log position)]
+ (>!! ch entry))
+ (catch KeeperException$NoNodeException e
+ (seek-to-new-origin!))
+ (catch KeeperException$NodeExistsException e
+ (seek-to-new-origin!))))
+
(defmethod extensions/subscribe-to-log ZooKeeper
[{:keys [conn opts prefix] :as log} ch]
(let [rets (chan)]
@@ -192,7 +214,7 @@
(loop [position starting-position]
(let [path (str (log-path prefix) "/entry-" (pad-sequential-id position))]
(if (zk/exists conn path)
- (>!! ch position)
+ (seek-and-put-entry! log position ch)
(loop []
(let [read-ch (chan 2)]
(zk/children conn (log-path prefix) :watcher (fn [_] (>!! read-ch true)))
@@ -205,13 +227,19 @@
;; Requires one more check. Watch may have been triggered by a delete
;; from a GC call.
(if (zk/exists conn path)
- (>!! ch position)
+ (seek-and-put-entry! log position ch)
(recur)))))
(recur (inc position)))))
+ (catch java.lang.IllegalStateException e
+ (trace e)
+ ;; Curator client has been shutdown, close the subscriber cleanly.
+ (close! ch))
(catch org.apache.zookeeper.KeeperException$ConnectionLossException e
+ (trace e)
;; ZooKeeper has been shutdown, close the subscriber cleanly.
(close! ch))
(catch org.apache.zookeeper.KeeperException$SessionExpiredException e
+ (trace e)
(close! ch))
(catch Throwable e
(fatal e))))
@@ -269,19 +297,10 @@
[{:keys [conn opts prefix] :as log} kw chunk id]
(clean-up-broken-connections
(fn []
- (let [unique-id (java.util.UUID/randomUUID)
- node (str (chunk-path prefix) "/" id "/chunk-" unique-id)
+ (let [node (str (chunk-path prefix) "/" id "/chunk")
bytes (compress chunk)]
(zk/create-all conn node :persistent? true :data bytes)
- unique-id))))
-
-(defmethod extensions/write-chunk [ZooKeeper :chunk-index]
- [{:keys [conn opts prefix] :as log} kw chunk id]
- (clean-up-broken-connections
- (fn []
- (let [node (str (chunk-path prefix) "/" id "/index")
- bytes (compress chunk)]
- (zk/create-all conn node :persistent? true :data bytes)))))
+ id))))
(defmethod extensions/write-chunk [ZooKeeper :job-scheduler]
[{:keys [conn opts prefix] :as log} kw chunk id]
@@ -303,10 +322,12 @@
[{:keys [conn opts prefix] :as log} kw chunk id]
(clean-up-broken-connections
(fn []
- (let [node (str (chunk-path prefix) "/" id "/chunk-" (:id chunk))
+ (let [node (str (chunk-path prefix) "/" id "/chunk")
version (:version (zk/exists conn node))
bytes (compress chunk)]
- (zk/set-data conn node bytes version)))))
+ (if (nil? version)
+ (zk/create-all conn node :persistent? true :data bytes)
+ (zk/set-data conn node bytes version))))))
(defmethod extensions/read-chunk [ZooKeeper :catalog]
[{:keys [conn opts prefix] :as log} kw id & _]
@@ -351,17 +372,10 @@
(decompress (:data (zk/data conn node)))))))
(defmethod extensions/read-chunk [ZooKeeper :chunk]
- [{:keys [conn opts prefix] :as log} kw id & {:keys [task-id]}]
- (clean-up-broken-connections
- (fn []
- (let [node (str (chunk-path prefix) "/" task-id "/chunk-" id)]
- (decompress (:data (zk/data conn node)))))))
-
-(defmethod extensions/read-chunk [ZooKeeper :chunk-index]
- [{:keys [conn opts prefix] :as log} kw id & {:keys [task-id]}]
+ [{:keys [conn opts prefix] :as log} kw id & _]
(clean-up-broken-connections
(fn []
- (let [node (str (chunk-path prefix) "/" task-id "/index")]
+ (let [node (str (chunk-path prefix) "/" id "/chunk")]
(decompress (:data (zk/data conn node)))))))
(defmethod extensions/read-chunk [ZooKeeper :origin]
diff --git a/src/onyx/messaging/acking_daemon.clj b/src/onyx/messaging/acking_daemon.clj
index d36342a..2784a6f 100644
--- a/src/onyx/messaging/acking_daemon.clj
+++ b/src/onyx/messaging/acking_daemon.clj
@@ -1,23 +1,48 @@
(ns ^:no-doc onyx.messaging.acking-daemon
(:require [clojure.core.async :refer [chan >!! close! sliding-buffer]]
[com.stuartsierra.component :as component]
- [onyx.static.default-vals :refer [defaults]]
+ [onyx.static.default-vals :refer [arg-or-default]]
[onyx.types :refer [->Ack]]
[taoensso.timbre :as timbre]))
+(defn now []
+ (System/currentTimeMillis))
+
+(defn clear-messages-loop [state opts]
+ (let [timeout (arg-or-default :onyx.messaging/ack-daemon-timeout opts)
+ interval (arg-or-default :onyx.messaging/ack-daemon-clear-interval opts)]
+ (loop []
+ (try
+ (Thread/sleep interval)
+ (let [t (now)
+ snapshot @state
+ dead (map first (filter (fn [[k v]] (>= (- t (:timestamp v)) timeout)) snapshot))]
+ (doseq [k dead]
+ (swap! state dissoc k)))
+ (catch InterruptedException e
+ (throw e))
+ (catch Throwable e
+ (timbre/fatal e)))
+ (recur))))
+
(defrecord AckingDaemon [opts]
component/Lifecycle
(start [component]
(taoensso.timbre/info "Starting Acking Daemon")
- (let [buffer-size (or (:onyx.messaging/completion-buffer-size opts)
- (:onyx.messaging/completion-buffer-size defaults))]
- (assoc component :ack-state (atom {}) :completions-ch (chan (sliding-buffer buffer-size)))))
+ (let [buffer-size (arg-or-default :onyx.messaging/completion-buffer-size opts)
+ state (atom {})
+ timeout-fut (future (clear-messages-loop state opts))]
+ (assoc component
+ :ack-state state
+ :completions-ch (chan (sliding-buffer buffer-size))
+ :timeout-fut timeout-fut)))
(stop [component]
(taoensso.timbre/info "Stopping Acking Daemon")
(close! (:completions-ch component))
- (assoc component :ack-state nil :completions-ch nil)))
+ (future-cancel (:timeout-fut component))
+ (assoc component :ack-state nil :completions-ch nil :timeout-fut nil)))
(defn acking-daemon [config]
(map->AckingDaemon {:opts config}))
@@ -34,10 +59,10 @@
(assoc state message-id (assoc ack :ack-val updated-ack-val))))
(if (zero? ack-val)
state
- (assoc state message-id (->Ack nil completion-id ack-val))))))]
+ (assoc state message-id (->Ack nil completion-id ack-val (now)))))))]
(when-not (get rets message-id)
- (>!! (:completions-ch daemon) {:id message-id
- :peer-id completion-id}))))
+ (>!! (:completions-ch daemon)
+ {:id message-id :peer-id completion-id}))))
(defn gen-message-id
"Generates a unique ID for a message - acts as the root id."
@@ -58,5 +83,3 @@
(persistent! coll)
(recur (inc n)
(conj! coll (.nextLong rng)))))))
-
-
diff --git a/src/onyx/messaging/aeron.clj b/src/onyx/messaging/aeron.clj
index 4d6535b..7497923 100644
--- a/src/onyx/messaging/aeron.clj
+++ b/src/onyx/messaging/aeron.clj
@@ -1,5 +1,5 @@
(ns ^:no-doc onyx.messaging.aeron
- (:require [clojure.core.async :refer [chan >!! <!! alts!! timeout close! dropping-buffer]]
+ (:require [clojure.core.async :refer [chan >!! <!! alts!! timeout close! sliding-buffer]]
[com.stuartsierra.component :as component]
[taoensso.timbre :refer [fatal] :as timbre]
[onyx.messaging.protocol-aeron :as protocol]
@@ -8,7 +8,7 @@
[onyx.extensions :as extensions]
[onyx.compression.nippy :refer [compress decompress]]
[onyx.static.default-vals :refer [defaults]])
- (:import [uk.co.real_logic.aeron Aeron FragmentAssemblyAdapter]
+ #_(:import [uk.co.real_logic.aeron Aeron FragmentAssemblyAdapter]
[uk.co.real_logic.aeron Aeron$Context]
[uk.co.real_logic.aeron.driver MediaDriver MediaDriver$Context ThreadingMode]
[uk.co.real_logic.aeron.common.concurrent.logbuffer DataHandler]
@@ -17,7 +17,7 @@
[uk.co.real_logic.agrona.concurrent IdleStrategy BackoffIdleStrategy]
[java.util.function Consumer]
[java.util.concurrent TimeUnit]))
-
+(comment
(defrecord AeronPeerGroup [opts]
component/Lifecycle
(start [component]
@@ -54,6 +54,10 @@
(assert port "Couldn't assign port - ran out of available ports.")
{:aeron/port port}))
+(defmethod extensions/get-peer-site :aeron
+ [replica peer]
+ (get-in replica [:peer-sites peer :aeron/external-addr]))
+
(defn handle-sent-message [inbound-ch decompress-f ^UnsafeBuffer buffer offset length header]
(let [messages (protocol/read-messages-buf decompress-f buffer offset length)]
(doseq [message messages]
@@ -110,8 +114,8 @@
(start [component]
(taoensso.timbre/info "Starting Aeron")
(let [config (:config peer-group)
- release-ch (chan (dropping-buffer (:onyx.messaging/release-ch-buffer-size defaults)))
- retry-ch (chan (dropping-buffer (:onyx.messaging/retry-ch-buffer-size defaults)))
+ release-ch (chan (sliding-buffer (:onyx.messaging/release-ch-buffer-size defaults)))
+ retry-ch (chan (sliding-buffer (:onyx.messaging/retry-ch-buffer-size defaults)))
bind-addr (bind-addr config)
external-addr (external-addr config)
ports (allowable-ports config)
@@ -266,3 +270,4 @@
(reset! pub nil))
(.close ^uk.co.real_logic.aeron.Aeron (:conn peer-link))
{})
+)
diff --git a/src/onyx/messaging/aeron_media_driver.clj b/src/onyx/messaging/aeron_media_driver.clj
index 5a6b496..7bdd5d9 100644
--- a/src/onyx/messaging/aeron_media_driver.clj
+++ b/src/onyx/messaging/aeron_media_driver.clj
@@ -1,10 +1,10 @@
(ns onyx.messaging.aeron-media-driver
(:require [clojure.core.async :refer [chan <!!]])
- (:import [uk.co.real_logic.aeron Aeron FragmentAssemblyAdapter]
+ #_(:import [uk.co.real_logic.aeron Aeron FragmentAssemblyAdapter]
[uk.co.real_logic.aeron Aeron$Context]
[uk.co.real_logic.aeron.driver MediaDriver MediaDriver$Context ThreadingMode]))
-(defn -main [& args]
+#_(defn -main [& args]
(let [ctx (doto (MediaDriver$Context.)
(.threadingMode ThreadingMode/SHARED)
;(.threadingMode ThreadingMode/DEDICATED)
diff --git a/src/onyx/messaging/core_async.clj b/src/onyx/messaging/core_async.clj
index b4ec374..8d6c0d0 100644
--- a/src/onyx/messaging/core_async.clj
+++ b/src/onyx/messaging/core_async.clj
@@ -1,5 +1,5 @@
(ns ^:no-doc onyx.messaging.core-async
- (:require [clojure.core.async :refer [chan >!! <!! alts!! dropping-buffer timeout close!]]
+ (:require [clojure.core.async :refer [chan >!! <!! alts!! sliding-buffer timeout close!]]
[com.stuartsierra.component :as component]
[taoensso.timbre :refer [fatal] :as timbre]
[onyx.messaging.acking-daemon :as acker]
@@ -25,13 +25,17 @@
[config peer-site peer-sites]
peer-site)
+(defmethod extensions/get-peer-site :core.async
+ [replica peer]
+ "localhost")
+
(defrecord CoreAsync [peer-group]
component/Lifecycle
(start [component]
(taoensso.timbre/info "Starting core.async Messaging Channel")
- (let [release-ch (chan (dropping-buffer (:onyx.messaging/release-ch-buffer-size defaults)))
- retry-ch (chan (dropping-buffer (:onyx.messaging/retry-ch-buffer-size defaults)))]
+ (let [release-ch (chan (sliding-buffer (:onyx.messaging/release-ch-buffer-size defaults)))
+ retry-ch (chan (sliding-buffer (:onyx.messaging/retry-ch-buffer-size defaults)))]
(assoc component :release-ch release-ch :retry-ch retry-ch)))
(stop [component]
@@ -48,7 +52,7 @@
(let [chs (:channels (:messaging-group (:peer-group messenger)))
id (java.util.UUID/randomUUID)
inbound-ch (:inbound-ch (:messenger-buffer messenger))
- ch (chan (dropping-buffer 10000))]
+ ch (chan (sliding-buffer 100000))]
(future
(try
(loop []
diff --git a/src/onyx/messaging/messenger_buffer.clj b/src/onyx/messaging/messenger_buffer.clj
index 7785f38..d4db4c7 100644
--- a/src/onyx/messaging/messenger_buffer.clj
+++ b/src/onyx/messaging/messenger_buffer.clj
@@ -1,6 +1,7 @@
(ns ^:no-doc onyx.messaging.messenger-buffer
- (:require [clojure.core.async :refer [chan >!! <!! thread alts!! close!]]
+ (:require [clojure.core.async :refer [chan >!! <!! thread alts!! close! sliding-buffer]]
[com.stuartsierra.component :as component]
+ [onyx.static.default-vals :refer [defaults]]
[taoensso.timbre :as timbre]))
(defrecord MessengerBuffer [opts]
@@ -9,7 +10,8 @@
(start [component]
(taoensso.timbre/info "Starting Messenger Buffer")
- (let [inbound-ch (chan (or (:onyx.messenger/inbound-capacity opts) 20000))]
+ (let [inbound-ch (chan (sliding-buffer (or (:onyx.messaging/inbound-buffer-size opts)
+ (:onyx.messaging/inbound-buffer-size defaults))))]
(assoc component :inbound-ch inbound-ch)))
(stop [component]
diff --git a/src/onyx/messaging/netty_tcp.clj b/src/onyx/messaging/netty_tcp.clj
index 6c7d50d..fa4e208 100644
--- a/src/onyx/messaging/netty_tcp.clj
+++ b/src/onyx/messaging/netty_tcp.clj
@@ -1,6 +1,6 @@
(ns ^:no-doc onyx.messaging.netty-tcp
(:require [clojure.core.async :refer [chan >!! >! <!! alts!! timeout close!
- thread go-loop sliding-buffer dropping-buffer]]
+ thread go-loop sliding-buffer]]
[com.stuartsierra.component :as component]
[taoensso.timbre :as timbre]
[onyx.messaging.protocol-netty :as protocol]
@@ -101,6 +101,10 @@
(assert port "Couldn't assign port - ran out of available ports.")
{:netty/port port}))
+(defmethod extensions/get-peer-site :netty
+ [replica peer]
+ (get-in replica [:peer-sites peer :netty/external-addr]))
+
(defn int32-frame-decoder
[]
; Offset 0, 4 byte header, skip those 4 bytes.
@@ -226,17 +230,14 @@
(.group client-group)
(.channel channel)
(.handler (client-channel-initializer (new-client-handler))))
- ch-fut ^ChannelFuture (.awaitUninterruptibly ^ChannelFuture (.connect ^Bootstrap b ^String host ^Integer port)
- ;; TODO, add connection timeout
- ;(:onyx.messaging.netty/connect-timeout-millis defaults)
- ;TimeUnit/MILLISECONDS
- )
- ch (.channel ch-fut)]
- (if (and (.isSuccess ^ChannelFuture ch-fut)
- (established? ch))
- ch
- (do (.close ch)
- nil))))
+ ch-fut ^ChannelFuture (.connect ^Bootstrap b ^String host ^Integer port)]
+ (if (.awaitUninterruptibly ch-fut (:onyx.messaging.netty/connect-timeout-millis defaults))
+ (let [ch (.channel ch-fut)]
+ (if (and (.isSuccess ch-fut)
+ (established? ch))
+ ch
+ (do (.close ch)
+ nil))))))
(defrecord NettyTcpSockets [peer-group]
component/Lifecycle
@@ -245,8 +246,8 @@
(taoensso.timbre/info "Starting Netty TCP Sockets")
(let [{:keys [client-group worker-group boss-group]} (:messaging-group peer-group)
config (:config peer-group)
- release-ch (chan (dropping-buffer (:onyx.messaging/release-ch-buffer-size defaults)))
- retry-ch (chan (dropping-buffer (:onyx.messaging/retry-ch-buffer-size defaults)))
+ release-ch (chan (sliding-buffer (:onyx.messaging/release-ch-buffer-size defaults)))
+ retry-ch (chan (sliding-buffer (:onyx.messaging/retry-ch-buffer-size defaults)))
bind-addr (bind-addr config)
external-addr (external-addr config)
ports (allowable-ports config)
@@ -332,7 +333,7 @@
(.addListener f (reify GenericFutureListener
(operationComplete [_ _]
(when-not (.isSuccess f)
- (timbre/error "Message failed to send: " (.cause f))
+ (timbre/trace (ex-info "Message failed to send" {:cause (.cause f)}))
(reset-connection connection))))))
(defn make-pending-chan [messenger]
diff --git a/src/onyx/messaging/protocol_aeron.clj b/src/onyx/messaging/protocol_aeron.clj
index bed72d9..b7df1a9 100644
--- a/src/onyx/messaging/protocol_aeron.clj
+++ b/src/onyx/messaging/protocol_aeron.clj
@@ -1,10 +1,11 @@
(ns ^:no-doc onyx.messaging.protocol-aeron
(:require [taoensso.timbre :as timbre]
[onyx.types :refer [->Leaf]])
- (:import [java.util UUID]
+ #_(:import [java.util UUID]
[uk.co.real_logic.agrona.concurrent UnsafeBuffer]
[uk.co.real_logic.agrona DirectBuffer MutableDirectBuffer]))
+(comment
;;;;;;
;; Constants
@@ -137,3 +138,4 @@
(next payloads)
(+ offset message-base-length))
(persistent! messages)))))
+)
diff --git a/src/onyx/peer/function.clj b/src/onyx/peer/function.clj
index 8f4eec1..560bdba 100644
--- a/src/onyx/peer/function.clj
+++ b/src/onyx/peer/function.clj
@@ -59,17 +59,17 @@
ack-vals))))
fast-concat))
-(defn pick-peer [active-peers hash-group]
+(defn pick-peer [id active-peers hash-group max-downstream-links]
(when-not (empty? active-peers)
(if hash-group
(nth active-peers
(mod (hash hash-group)
(count active-peers)))
- (rand-nth active-peers))))
+ (rand-nth (operation/select-n-peers id active-peers max-downstream-links)))))
;; Needs a performance boost
(defmethod p-ext/write-batch :default
- [{:keys [onyx.core/results onyx.core/messenger onyx.core/job-id] :as event}]
+ [{:keys [onyx.core/id onyx.core/results onyx.core/messenger onyx.core/job-id onyx.core/max-downstream-links] :as event}]
(let [leaves (fast-concat (map :leaves results))
egress-tasks (:egress-ids (:onyx.core/serialized-task event))]
(when-not (empty? leaves)
@@ -80,7 +80,7 @@
(doseq [[[route hash-group] segs] groups]
(let [peers (get allocations (get egress-tasks route))
active-peers (filter #(= (get-in replica [:peer-state %]) :active) peers)
- target (pick-peer active-peers hash-group)]
+ target (pick-peer id active-peers hash-group max-downstream-links)]
(when target
(let [link (operation/peer-link event target)]
(onyx.extensions/send-messages messenger event link segs)))))
diff --git a/src/onyx/peer/operation.clj b/src/onyx/peer/operation.clj
index 086890c..1c49a56 100644
--- a/src/onyx/peer/operation.clj
+++ b/src/onyx/peer/operation.clj
@@ -23,9 +23,22 @@
[{:keys [onyx.core/queue onyx.core/ingress-queues onyx.core/task-map]}]
true)
+;; TODO: may want to consider memoizing this
+;; must be careful about ensuring we don't bloat memory wise
+;; use clojure.core.memoize with LRU
+(defn select-n-peers
+ "Stably select n peers using our id and the downstream task ids.
+ If a peer is added or removed, the set can only change by one value at max"
+ [id all-peers n]
+ (if (<= (count all-peers) n)
+ all-peers
+ (take n
+ (sort-by (fn [peer-id] (hash [id peer-id]))
+ all-peers))))
+
(defn peer-link
[{:keys [onyx.core/state] :as event} peer-id]
- (if-let [link (get (:links @state) peer-id)]
+ (if-let [link (:link (get (:links @state) peer-id))]
link
(let [site (-> @(:onyx.core/replica event)
:peer-sites
@@ -35,6 +48,8 @@
[:links peer-id]
(fn [link]
(or link
- (extensions/connect-to-peer (:onyx.core/messenger event) event site))))
+ {:link (extensions/connect-to-peer (:onyx.core/messenger event) event site)
+ :timestamp (System/currentTimeMillis)})))
:links
- (get peer-id)))))
+ (get peer-id)
+ :link))))
diff --git a/src/onyx/peer/task_lifecycle.clj b/src/onyx/peer/task_lifecycle.clj
index d089b53..98cf696 100644
--- a/src/onyx/peer/task_lifecycle.clj
+++ b/src/onyx/peer/task_lifecycle.clj
@@ -1,5 +1,5 @@
(ns ^:no-doc onyx.peer.task-lifecycle
- (:require [clojure.core.async :refer [alts!! <!! >!! <! >! timeout chan close! thread go dropping-buffer]]
+ (:require [clojure.core.async :refer [alts!! <!! >!! <! >! timeout chan close! thread go]]
[com.stuartsierra.component :as component]
[dire.core :as dire]
[taoensso.timbre :refer [info warn trace fatal level-compile-time] :as timbre]
@@ -14,7 +14,7 @@
[onyx.extensions :as extensions]
[onyx.compression.nippy]
[onyx.types :refer [->Leaf leaf ->Route ->Ack ->Result]]
- [onyx.static.default-vals :refer [defaults]])
+ [onyx.static.default-vals :refer [defaults arg-or-default]])
(:import [java.security MessageDigest]))
;; TODO: Are there any exceptions that a peer should autoreboot itself?
@@ -38,13 +38,14 @@
(defn munge-start-lifecycle [event]
((:onyx.core/compiled-start-task-fn event) event))
-(defn add-acker-id [event m]
+(defn add-acker-id [{:keys [onyx.core/id onyx.core/max-acker-links] :as event} m]
(let [peers (get-in @(:onyx.core/replica event) [:ackers (:onyx.core/job-id event)])]
(if-not (seq peers)
(do (warn (format "[%s] This job no longer has peers capable of acking. This job will now pause execution." (:onyx.core/id event)))
(throw (ex-info "Not enough acker peers" {:peers peers})))
- (let [n (mod (hash (:message m)) (count peers))]
- (assoc m :acker-id (nth peers n))))))
+ (let [candidates (operation/select-n-peers id peers max-acker-links)
+ n (mod (hash (:message m)) (count candidates))]
+ (assoc m :acker-id (nth candidates n))))))
(defn add-completion-id [event m]
(assoc m :completion-id (:onyx.core/id event)))
@@ -92,16 +93,12 @@
(choose-output-paths event compiled-ex-fcs result leaf serialized-task downstream)
(choose-output-paths event compiled-norm-fcs result leaf serialized-task downstream)))
-(defn hash-value [x]
- (let [md5 (MessageDigest/getInstance "MD5")]
- (apply str (.digest md5 (.getBytes (pr-str x) "UTF-8")))))
-
(defn group-message [segment catalog task]
(let [t (find-task-fast catalog task)]
(if-let [k (:onyx/group-by-key t)]
- (hash-value (get segment k))
+ (hash (get segment k))
(when-let [f (:onyx/group-by-fn t)]
- (hash-value ((operation/resolve-fn {:onyx/fn f}) segment))))))
+ (hash ((operation/resolve-fn {:onyx/fn f}) segment))))))
(defn group-segments [leaf next-tasks catalog event]
(let [post-transformation (:post-transformation (:routes leaf))
@@ -128,20 +125,21 @@
(defn build-new-segments
[{:keys [onyx.core/results onyx.core/serialized-task onyx.core/catalog] :as event}]
(let [downstream (keys (:egress-ids serialized-task))
- results (map (fn [{:keys [root leaves] :as result}]
- (let [{:keys [id acker-id completion-id]} root]
- (assoc result
- :leaves
- (map (fn [leaf]
- (-> leaf
- (assoc :routes (add-route-data event result leaf downstream))
- (assoc :id id)
- (assoc :acker-id acker-id)
- (assoc :completion-id completion-id)
- (group-segments downstream catalog event)
- (add-ack-vals)))
- leaves))))
- results)]
+ results (doall
+ (map (fn [{:keys [root leaves] :as result}]
+ (let [{:keys [id acker-id completion-id]} root]
+ (assoc result
+ :leaves
+ (map (fn [leaf]
+ (-> leaf
+ (assoc :routes (add-route-data event result leaf downstream))
+ (assoc :id id)
+ (assoc :acker-id acker-id)
+ (assoc :completion-id completion-id)
+ (group-segments downstream catalog event)
+ (add-ack-vals)))
+ leaves))))
+ results))]
(assoc event :onyx.core/results results)))
(defn ack-routes? [routes]
@@ -164,16 +162,18 @@
(defn ack-messages [{:keys [onyx.core/results onyx.core/task-map] :as event}]
(doseq [[acker-id results-by-acker] (group-by (comp :acker-id :root) results)]
(let [link (operation/peer-link event acker-id)
- acks (map (fn [result] (let [fused-leaf-vals (gen-ack-fusion-vals task-map (:leaves result))
- fused-vals (if-let [ack-val (:ack-val (:root result))]
- (bit-xor fused-leaf-vals ack-val)
- fused-leaf-vals)]
- (->Ack (:id (:root result))
- (:completion-id (:root result))
- ;; or'ing by zero covers the case of flow conditions where an
- ;; input task produces a segment that goes nowhere.
- (or fused-vals 0))))
- results-by-acker)]
+ acks (doall
+ (map (fn [result] (let [fused-leaf-vals (gen-ack-fusion-vals task-map (:leaves result))
+ fused-vals (if-let [ack-val (:ack-val (:root result))]
+ (bit-xor fused-leaf-vals ack-val)
+ fused-leaf-vals)]
+ (->Ack (:id (:root result))
+ (:completion-id (:root result))
+ ;; or'ing by zero covers the case of flow conditions where an
+ ;; input task produces a segment that goes nowhere.
+ (or fused-vals 0)
+ (System/currentTimeMillis))))
+ results-by-acker))]
(extensions/internal-ack-messages (:onyx.core/messenger event) event link acks)))
event)
@@ -241,12 +241,13 @@
(assoc
event
:onyx.core/results
- (map
- (fn [segment]
- (let [segments (collect-next-segments event (:message segment))
- leaves (map leaf segments)]
- (->Result segment leaves)))
- batch)))
+ (doall
+ (map
+ (fn [segment]
+ (let [segments (collect-next-segments event (:message segment))
+ leaves (map leaf segments)]
+ (->Result segment leaves)))
+ batch))))
(defn apply-fn-bulk [{:keys [onyx.core/batch] :as event}]
;; Bulk functions intentionally ignore their outputs.
@@ -255,11 +256,11 @@
(assoc
event
:onyx.core/results
- (map
- (fn [segment]
- (let [leaves (map leaf segments)]
- (->Result segment leaves)))
- batch))))
+ (doall
+ (map
+ (fn [segment]
+ (->Result segment (list (leaf (:message segment)))))
+ batch)))))
(defn apply-fn [event]
(if (:onyx/bulk? (:onyx.core/task-map event))
@@ -310,7 +311,7 @@
(let [tail (last (get-in @(:onyx.core/state event) [:timeout-pool]))]
(doseq [m tail]
(when (p-ext/pending? event m)
- (taoensso.timbre/info (format "Input retry message %s" m))
+ (taoensso.timbre/trace (format "Input retry message %s" m))
(p-ext/retry-message event m)))
(swap! (:onyx.core/state event) update-in [:timeout-pool] rsc/expire-bucket)
(recur))))))))
@@ -353,8 +354,8 @@
(apply-fn)
(build-new-segments)
(write-batch)
- (ack-messages)
(flow-retry-messages)
+ (ack-messages)
(close-batch-resources)))
(catch Throwable e
(ex-f e))))
@@ -370,6 +371,28 @@
(operation/resolve-fn f)
onyx.compression.nippy/compress)))
+(defn gc-peer-links [event state opts]
+ (let [interval (or (:onyx.messaging/peer-link-gc-interval opts)
+ (:onyx.messaging/peer-link-gc-interval defaults))
+ idle (or (:onyx.messaging/peer-link-idle-timeout opts)
+ (:onyx.messaging/peer-link-idle-timeout defaults))]
+ (loop []
+ (try
+ (Thread/sleep interval)
+ (let [t (System/currentTimeMillis)]
+ (swap! state
+ (fn [x]
+ (let [to-remove (map first (filter (fn [[k v]] (>= (- t (:timestamp v)) idle)) (:links x)))
+ result (into {} (remove (fn [[k v]] (some #{k} to-remove)) (:links x)))]
+ (doseq [p to-remove]
+ (extensions/close-peer-connection (:onyx.core/messenger event) event (:link (get (:links x) p))))
+ (assoc x :links result)))))
+ (catch InterruptedException e
+ (throw e))
+ (catch Throwable e
+ (fatal e)))
+ (recur))))
+
(defn any-ackers? [replica job-id]
(> (count (get-in replica [:ackers job-id])) 0))
@@ -400,8 +423,8 @@
identity
matched)))
-(defn compile-before-task-functions [lifecycles task-name]
- (compile-lifecycle-functions lifecycles task-name :lifecycle/before-task))
+(defn compile-before-task-start-functions [lifecycles task-name]
+ (compile-lifecycle-functions lifecycles task-name :lifecycle/before-task-start))
(defn compile-before-batch-task-functions [lifecycles task-name]
(compile-lifecycle-functions lifecycles task-name :lifecycle/before-batch))
@@ -410,12 +433,17 @@
(compile-lifecycle-functions lifecycles task-name :lifecycle/after-batch))
(defn compile-after-task-functions [lifecycles task-name]
- (compile-lifecycle-functions lifecycles task-name :lifecycle/after-task))
+ (compile-lifecycle-functions lifecycles task-name :lifecycle/after-task-stop))
(defn resolve-task-fn [entry]
(when (= (:onyx/type entry) :function)
(operation/kw->fn (:onyx/fn entry))))
+(defn validate-pending-timeout [pending-timeout opts]
+ (when (> pending-timeout (arg-or-default :onyx.messaging/ack-daemon-timeout opts))
+ (throw (ex-info "Pending timeout cannot be greater than acking daemon timeout"
+ {:opts opts :pending-timeout pending-timeout}))))
+
(defrecord TaskLifeCycle
[id log messenger-buffer messenger job-id task-id replica restart-ch
kill-ch outbox-ch seal-resp-ch completion-ch opts task-kill-ch]
@@ -435,8 +463,10 @@
pending-timeout (or (:onyx/pending-timeout catalog-entry)
(:onyx/pending-timeout defaults))
r-seq (rsc/create-r-seq pending-timeout input-retry-timeout)
+ state (atom {:timeout-pool r-seq})
_ (taoensso.timbre/info (format "[%s] Warming up Task LifeCycle for job %s, task %s" id job-id (:name task)))
+ _ (validate-pending-timeout pending-timeout opts)
pipeline-data {:onyx.core/id id
:onyx.core/job-id job-id
@@ -446,7 +476,7 @@
:onyx.core/workflow (extensions/read-chunk log :workflow job-id)
:onyx.core/flow-conditions flow-conditions
:onyx.core/compiled-start-task-fn (compile-start-task-functions lifecycles (:name task))
- :onyx.core/compiled-before-task-fn (compile-before-task-functions lifecycles (:name task))
+ :onyx.core/compiled-before-task-start-fn (compile-before-task-start-functions lifecycles (:name task))
:onyx.core/compiled-before-batch-fn (compile-before-batch-task-functions lifecycles (:name task))
:onyx.core/compiled-after-batch-fn (compile-after-batch-task-functions lifecycles (:name task))
:onyx.core/compiled-after-task-fn (compile-after-task-functions lifecycles (:name task))
@@ -462,16 +492,19 @@
:onyx.core/outbox-ch outbox-ch
:onyx.core/seal-ch seal-resp-ch
:onyx.core/peer-opts (resolve-compression-fn-impls opts)
+ :onyx.core/max-downstream-links (or (:onyx.messaging/max-downstream-links opts)
+ (:onyx.messaging/max-downstream-links defaults))
+ :onyx.core/max-acker-links (or (:onyx.messaging/max-acker-links opts)
+ (:onyx.messaging/max-acker-links defaults))
:onyx.core/fn (resolve-task-fn catalog-entry)
:onyx.core/replica replica
- :onyx.core/state (atom {:timeout-pool r-seq})}
+ :onyx.core/state state}
ex-f (fn [e] (handle-exception e restart-ch outbox-ch job-id))
- pipeline-data (merge pipeline-data ((:onyx.core/compiled-before-task-fn pipeline-data) pipeline-data))]
-
- (while (and (first (alts!! [kill-ch task-kill-ch] :default true))
- (not (munge-start-lifecycle pipeline-data)))
- (Thread/sleep (or (:onyx.peer/sequential-back-off opts) 2000)))
+ _ (while (and (first (alts!! [kill-ch task-kill-ch] :default true))
+ (not (munge-start-lifecycle pipeline-data)))
+ (Thread/sleep (or (:onyx.peer/peer-not-ready-back-off opts) 2000)))
+ pipeline-data (merge pipeline-data ((:onyx.core/compiled-before-task-start-fn pipeline-data) pipeline-data))]
(>!! outbox-ch (entry/create-log-entry :signal-ready {:id id}))
@@ -487,14 +520,16 @@
(let [input-retry-messages-ch (input-retry-messages! messenger pipeline-data input-retry-timeout task-kill-ch)
aux-ch (launch-aux-threads! messenger pipeline-data outbox-ch seal-resp-ch completion-ch task-kill-ch)
- task-lifecycle-ch (thread (run-task-lifecycle pipeline-data seal-resp-ch kill-ch ex-f))]
+ task-lifecycle-ch (thread (run-task-lifecycle pipeline-data seal-resp-ch kill-ch ex-f))
+ peer-link-gc-thread (future (gc-peer-links pipeline-data state opts))]
(assoc component
:pipeline-data pipeline-data
:seal-ch seal-resp-ch
:task-kill-ch task-kill-ch
:task-lifecycle-ch task-lifecycle-ch
:input-retry-messages-ch input-retry-messages-ch
- :aux-ch aux-ch)))
+ :aux-ch aux-ch
+ :peer-link-gc-thread peer-link-gc-thread)))
(catch Throwable e
(handle-exception e restart-ch outbox-ch job-id)
component)))
@@ -502,19 +537,22 @@
(stop [component]
(taoensso.timbre/info (format "[%s] Stopping Task LifeCycle for %s" id (:onyx.core/task (:pipeline-data component))))
(when-let [event (:pipeline-data component)]
- ((:onyx.core/compiled-after-task-fn event) event)
-
;; Ensure task operations are finished before closing peer connections
(close! (:seal-ch component))
(<!! (:task-lifecycle-ch component))
-
(close! (:task-kill-ch component))
+
(<!! (:input-retry-messages-ch component))
(<!! (:aux-ch component))
+ ((:onyx.core/compiled-after-task-fn event) event)
+
(let [state @(:onyx.core/state event)]
- (doseq [[_ link] (:links state)]
- (extensions/close-peer-connection (:onyx.core/messenger event) event link))))
+ (doseq [[_ link-map] (:links state)]
+ (extensions/close-peer-connection (:onyx.core/messenger event) event (:link link-map)))))
+
+ (when-let [t (:peer-link-gc-thread component)]
+ (future-cancel t))
(assoc component
:pipeline-data nil
@@ -522,7 +560,8 @@
:aux-ch nil
:input-retry-messages-ch nil
:task-lifecycle-ch nil
- :task-lifecycle-ch nil)))
+ :task-lifecycle-ch nil
+ :peer-link-gc-thread nil)))
(defn task-lifecycle [args {:keys [id log messenger-buffer messenger job task replica
restart-ch kill-ch outbox-ch seal-ch completion-ch opts task-kill-ch]}]
diff --git a/src/onyx/peer/virtual_peer.clj b/src/onyx/peer/virtual_peer.clj
index 1447e52..5338016 100644
--- a/src/onyx/peer/virtual_peer.clj
+++ b/src/onyx/peer/virtual_peer.clj
@@ -37,10 +37,9 @@
:task-lifecycle-fn task-lifecycle}
(:onyx.peer/state opts))]
(let [replica @replica-atom
- position (first (alts!! [kill-ch inbox-ch] :priority true))]
- (if position
- (let [entry (extensions/read-log-entry log position)
- new-replica (extensions/apply-log-entry entry replica)
+ entry (first (alts!! [kill-ch inbox-ch] :priority true))]
+ (if entry
+ (let [new-replica (extensions/apply-log-entry entry replica)
diff (extensions/replica-diff entry replica new-replica)
reactions (extensions/reactions entry replica new-replica diff state)
new-state (extensions/fire-side-effects! entry replica new-replica diff state)]
@@ -85,9 +84,7 @@
restart-ch (chan 1)
completion-ch (:completions-ch acking-daemon)
peer-site (extensions/peer-site messenger)
- entry (create-log-entry :prepare-join-cluster
- {:joiner id
- :peer-site peer-site})
+ entry (create-log-entry :prepare-join-cluster {:joiner id :peer-site peer-site})
origin (extensions/subscribe-to-log log inbox-ch)]
(extensions/register-pulse log id)
(>!! outbox-ch entry)
@@ -101,7 +98,7 @@
:outbox-ch outbox-ch :kill-ch kill-ch
:restart-ch restart-ch)))
(catch Throwable e
- (taoensso.timbre/fatal (format "Error starting Virtual Peer %s" id) e)
+ (taoensso.timbre/fatal e (format "Error starting Virtual Peer %s" id))
(throw e)))))
(stop [component]
diff --git a/src/onyx/plugin/core_async.clj b/src/onyx/plugin/core_async.clj
index 4b9bc63..b76634d 100644
--- a/src/onyx/plugin/core_async.clj
+++ b/src/onyx/plugin/core_async.clj
@@ -2,25 +2,32 @@
(:require [clojure.core.async :refer [chan >!! <!! alts!! timeout go <!]]
[onyx.peer.pipeline-extensions :as p-ext]
[onyx.static.default-vals :refer [defaults]]
- [taoensso.timbre :refer [debug] :as timbre]))
+ [taoensso.timbre :refer [debug info] :as timbre]))
(defn inject-reader
[event lifecycle]
- (assert (:core.async/chan event) ":core.async/chan not found - add it via inject-lifecycle-resources.")
+ (assert (:core.async/chan event) ":core.async/chan not found - add it using a :before-task-start lifecycle")
{:core.async/pending-messages (atom {})
:core.async/drained? (atom false)
- :core.async/retry-ch (chan 1000)})
+ :core.async/retry-ch (chan 10000)
+ :core.async/retry-count (atom 0)})
+
+(defn log-retry-count
+ [event lifecycle]
+ (info "core.async input plugin stopping. Retry count:" @(:core.async/retry-count event))
+ {})
(defn inject-writer
[event lifecycle]
- (assert (:core.async/chan event) ":core.async/chan not found - add it via inject-lifecycle-resources.")
+ (assert (:core.async/chan event) ":core.async/chan not found - add it using a :before-task-start lifecycle")
{})
(def reader-calls
- {:lifecycle/before-task inject-reader})
+ {:lifecycle/before-task-start inject-reader
+ :lifecycle/after-task-stop log-retry-count})
(def writer-calls
- {:lifecycle/before-task inject-writer})
+ {:lifecycle/before-task-start inject-writer})
(defmethod p-ext/read-batch :core.async/read-from-chan
[{:keys [onyx.core/task-map core.async/chan core.async/retry-ch
@@ -32,10 +39,9 @@
ms (or (:onyx/batch-timeout task-map) (:onyx/batch-timeout defaults))
step-ms (/ ms (:onyx/batch-size task-map))
timeout-ch (timeout ms)
- batch (if (zero? max-segments)
- (<!! timeout-ch)
+ batch (if (pos? max-segments)
(loop [segments [] cnt 0]
- (if (= cnt batch-size)
+ (if (= cnt max-segments)
segments
(if-let [message (first (alts!! [retry-ch chan timeout-ch] :priority true))]
(recur (conj segments
@@ -43,7 +49,8 @@
:input :core.async
:message message})
(inc cnt))
- segments))))]
+ segments)))
+ (<!! timeout-ch))]
(doseq [m batch]
(swap! pending-messages assoc (:id m) (:message m)))
(when (and (= 1 (count @pending-messages))
@@ -57,10 +64,12 @@
(swap! pending-messages dissoc message-id))
(defmethod p-ext/retry-message :core.async/read-from-chan
- [{:keys [core.async/pending-messages core.async/retry-ch]} message-id]
+ [{:keys [core.async/pending-messages core.async/retry-count core.async/retry-ch]} message-id]
(when-let [msg (get @pending-messages message-id)]
- (>!! retry-ch msg)
- (swap! pending-messages dissoc message-id)))
+ (swap! pending-messages dissoc message-id)
+ (when-not (= msg :done)
+ (swap! retry-count inc))
+ (>!! retry-ch msg)))
(defmethod p-ext/pending? :core.async/read-from-chan
[{:keys [core.async/pending-messages]} message-id]
diff --git a/src/onyx/scheduling/balanced_job_scheduler.clj b/src/onyx/scheduling/balanced_job_scheduler.clj
index 3ecccec..df058e9 100644
--- a/src/onyx/scheduling/balanced_job_scheduler.clj
+++ b/src/onyx/scheduling/balanced_job_scheduler.clj
@@ -1,26 +1,9 @@
(ns onyx.scheduling.balanced-job-scheduler
(:require [onyx.scheduling.common-job-scheduler :as cjs]
[onyx.scheduling.common-task-scheduler :as cts]
+ [taoensso.timbre :refer [info]]
[onyx.log.commands.common :as common]))
-(defn job-coverable? [replica job n]
- (let [tasks (get-in replica [:tasks job])]
- (>= n (count tasks))))
-
-(defn allocate-peers [{:keys [jobs peers] :as replica}]
- (loop [results {}]
- (let [j (count jobs)
- p (count peers)
- min-peers (int (/ p j))
- n (rem p j)
- max-peers (inc min-peers)
- allocations
- (reduce
- (fn [all [job k]]
- (assoc all job (if (< k n) max-peers min-peers)))
- {}
- (map vector jobs (range)))])))
-
(defmethod cjs/job-offer-n-peers :onyx.job-scheduler/balanced
[{:keys [jobs peers] :as replica}]
(if (seq jobs)
@@ -36,21 +19,45 @@
(map vector jobs (range))))
{}))
+(defmethod cjs/sort-job-priority :onyx.job-scheduler/balanced
+ [replica jobs]
+ (sort-by (juxt (fn [job] (apply + (map count (vals (get-in replica [:allocations job])))))
+ #(.indexOf ^clojure.lang.PersistentVector (vec (:jobs replica)) %))
+ (:jobs replica)))
+
+;; filter out saturated, then sort by is-covered? (no before yes),
+;; then by number of allocated peers
+
+(defn select-job-requiring-peer
+ "Selects the next job deserving a peer.
+ Tries to cover job requiring the least peers to cover first,
+ then tries to balance by peer count"
+ [replica jobs]
+ (->> jobs
+ (sort-by (fn [job]
+ (let [peer-count (val job)
+ covered (max 0 (- (cjs/job-lower-bound replica (key job)) peer-count))]
+ (vector covered
+ peer-count
+ (.indexOf ^clojure.lang.PersistentVector (vec (:jobs replica)) job)))))
+ (remove (fn [job]
+ (let [peer-count (val job)]
+ (>= peer-count (cjs/job-upper-bound replica (key job))))))
+ (ffirst)))
+
+(defmethod cjs/equivalent-allocation? :onyx.job-scheduler/balanced
+ [replica replica-new]
+ (= (sort (map (fn [[job-id _]]
+ (apply + (map count (vals (get-in replica [:allocations job-id])))))
+ (:allocations replica)))
+ (sort (map (fn [[job-id _]]
+ (apply + (map count (vals (get-in replica-new [:allocations job-id])))))
+ (:allocations replica-new)))))
+
(defmethod cjs/claim-spare-peers :onyx.job-scheduler/balanced
[replica jobs n]
- (let [ordered-jobs (sort-by (juxt #(.indexOf ^clojure.lang.PersistentVector (vec (:jobs replica)) %)
- #(count (get-in replica [:tasks %])))
- (:jobs replica))]
- (loop [[head & tail :as job-seq] ordered-jobs
- results jobs
- capacity n]
- (let [tail (vec tail)
- to-cover (- (count (get-in replica [:tasks head])) (get results head 0))]
- (cond (or (<= capacity 0) (not (seq job-seq)))
- results
- (and (>= capacity to-cover) (pos? to-cover))
- (recur (conj tail head) (update-in results [head] + to-cover) (- capacity to-cover))
- (and (< (get results head) (get-in replica [:saturation head])) (pos? (- capacity to-cover)))
- (recur (conj tail head) (update-in results [head] inc) (dec capacity))
- :else
- (recur tail results capacity))))))
+ (loop [jobs* jobs n* n]
+ (if (zero? n*)
+ jobs*
+ (recur (update-in jobs* [(select-job-requiring-peer replica jobs*)] inc)
+ (dec n*)))))
diff --git a/src/onyx/scheduling/balanced_task_scheduler.clj b/src/onyx/scheduling/balanced_task_scheduler.clj
index a305728..52997c6 100644
--- a/src/onyx/scheduling/balanced_task_scheduler.clj
+++ b/src/onyx/scheduling/balanced_task_scheduler.clj
@@ -1,5 +1,6 @@
(ns onyx.scheduling.balanced-task-scheduler
(:require [onyx.scheduling.common-task-scheduler :as cts]
+ [onyx.scheduling.common-job-scheduler :as cjs]
[onyx.log.commands.common :as common]))
(defmethod cts/drop-peers :onyx.task-scheduler/balanced
@@ -16,45 +17,53 @@
task-most-peers (ffirst (filter (fn [x] (= max-peers (count (second x)))) allocations))]
[(conj peers-to-drop (last (allocations task-most-peers)))
(update-in allocations [task-most-peers] butlast)]))
- [[] (get-in replica [:allocations job])]
+ [[] (cts/filter-grouped-tasks replica job (get-in replica [:allocations job]))]
(range n))))
-(defmethod cts/task-claim-n-peers :onyx.task-scheduler/balanced
- [replica job n]
- (let [tasks (get-in replica [:tasks job])]
- ;; If the number of peers is less than the number of tasks,
- ;; we're not covered - so we claim zero peers. Otherwise we
- ;; take as much as we're allowed, depending on the saturation
- ;; of the job.
- (if (< n (count tasks))
- 0
- (min (get-in replica [:saturation job] Double/POSITIVE_INFINITY) n))))
-
(defn reuse-spare-peers [replica job tasks spare-peers]
- (loop [[head & tail :as task-seq] (get-in replica [:tasks job])
+ (loop [task-seq (into #{} (get-in replica [:tasks job]))
results tasks
capacity spare-peers]
- (let [tail (vec tail)]
- (cond (or (<= capacity 0) (not (seq task-seq)))
- results
- (< (get results head) (or (get-in replica [:task-saturation job head] Double/POSITIVE_INFINITY)))
- (recur (conj tail head) (update-in results [head] inc) (dec capacity))
- :else
- (recur tail results capacity)))))
+ (let [least-allocated-task (first (sort-by
+ (juxt
+ #(get results %)
+ #(.indexOf ^clojure.lang.PersistentVector (vec (get-in replica [:tasks job])) %))
+ task-seq))]
+ (cond
+ ;; If there are no more peers to give out, or no more tasks
+ ;; want peers, we're done.
+ (or (<= capacity 0) (nil? least-allocated-task))
+ results
+
+ ;; If we're underneath the saturation level for this task, and this
+ ;; task is allowed to be allocated to, we give it one peer and rotate it
+ ;; to the back to possibly get more peers later.
+ (and (< (get results least-allocated-task)
+ (or (get-in replica [:task-saturation job least-allocated-task]
+ Double/POSITIVE_INFINITY)))
+ (not (cts/preallocated-grouped-task? replica job least-allocated-task)))
+ (recur task-seq (update-in results [least-allocated-task] inc) (dec capacity))
+
+ ;; This task doesn't want more peers, throw it away from the rotating sequence.
+ :else
+ (recur (disj task-seq least-allocated-task) results capacity)))))
(defmethod cts/task-distribute-peer-count :onyx.task-scheduler/balanced
[replica job n]
(let [tasks (get-in replica [:tasks job])
- t (count tasks)
- min-peers (int (/ n t))
- r (rem n t)
- max-peers (inc min-peers)
- init
- (reduce
- (fn [all [task k]]
- (assoc all task (min (get-in replica [:task-saturation job task] Double/POSITIVE_INFINITY)
- (if (< k r) max-peers min-peers))))
- {}
- (map vector tasks (range)))
- spare-peers (- n (apply + (vals init)))]
- (reuse-spare-peers replica job init spare-peers)))
+ t (cjs/job-lower-bound replica job)]
+ (if (< n t)
+ (zipmap tasks (repeat 0))
+ (let [init
+ (reduce
+ (fn [all [task k]]
+ ;; If it's a grouped task that has already been allocated,
+ ;; we can't add more peers since that would break the hashing algorithm.
+ (if (cts/preallocated-grouped-task? replica job task)
+ (assoc all task (count (get-in replica [:allocations job task])))
+ (assoc all task (min (get-in replica [:task-saturation job task] Double/POSITIVE_INFINITY)
+ (get-in replica [:min-required-peers job task] Double/POSITIVE_INFINITY)))))
+ {}
+ (map vector tasks (range)))
+ spare-peers (- n (apply + (vals init)))]
+ (reuse-spare-peers replica job init spare-peers)))))
\ No newline at end of file
diff --git a/src/onyx/scheduling/common_job_scheduler.clj b/src/onyx/scheduling/common_job_scheduler.clj
index c3755ae..554ed04 100644
--- a/src/onyx/scheduling/common_job_scheduler.clj
+++ b/src/onyx/scheduling/common_job_scheduler.clj
@@ -8,6 +8,42 @@
[onyx.scheduling.common-task-scheduler :as cts]
[taoensso.timbre :refer [info]]))
+(defn job-upper-bound [replica job]
+ ;; We need to handle a special case here when figuring out the upper saturation limit.
+ ;; If this is a job with a grouped task that has already been allocated,
+ ;; we can't allocate to the grouped task anymore, even if it's saturation
+ ;; level is Infinity.
+ (let [tasks (get-in replica [:tasks job])
+ grouped-tasks (filter (fn [task] (get-in replica [:flux-policies job task])) tasks)]
+ (if (seq (filter (fn [task] (seq (get-in replica [:allocations job task]))) grouped-tasks))
+ (apply + (map (fn [task]
+ (if (some #{task} grouped-tasks)
+ ;; Cannot allocate anymore, you have what you have.
+ (count (get-in replica [:allocations job task]))
+ ;; Allocate as much the original task saturation allows since it hasn't
+ ;; been allocated yet.
+ (get-in replica [:task-saturation job task])))
+ tasks))
+ (get-in replica [:saturation job] Double/POSITIVE_INFINITY))))
+
+(defn job-lower-bound [replica job]
+ ;; Again, we handle the special case of a grouped task that has already
+ ;; begun.
+ (let [tasks (get-in replica [:tasks job])
+ grouped-tasks (filter (fn [task] (get-in replica [:flux-policies job task])) tasks)]
+ (if (seq (filter (fn [task] (seq (get-in replica [:allocations job task]))) grouped-tasks))
+ (apply + (map (fn [task]
+ (if (some #{task} grouped-tasks)
+ ;; Cannot allocate anymore, you have what you have.
+ (count (get-in replica [:allocations job task]))
+ ;; Grab the absolute minimum for this task, no constraints.
+ (get-in replica [:min-required-peers job task])))
+ tasks))
+ (apply + (vals (get-in replica [:min-required-peers job]))))))
+
+(defn job-coverable? [replica job n]
+ (>= n (job-lower-bound replica job)))
+
(defmulti job-offer-n-peers
(fn [replica]
(:job-scheduler replica)))
@@ -16,6 +52,10 @@
(fn [replica jobs n]
(:job-scheduler replica)))
+(defmulti sort-job-priority
+ (fn [replica jobs]
+ (:job-scheduler replica)))
+
(defmethod job-offer-n-peers :default
[replica]
(throw (ex-info (format "Job scheduler %s not recognized" (:job-scheduler replica))
@@ -26,6 +66,11 @@
(throw (ex-info (format "Job scheduler %s not recognized" (:job-scheduler replica))
{:job-scheduler (:job-scheduler replica)})))
+(defmethod sort-job-priority :default
+ [replica]
+ (throw (ex-info (format "Job scheduler %s not recognized" (:job-scheduler replica))
+ {:job-scheduler (:job-scheduler replica)})))
+
(defn current-job-allocations [replica]
(into {}
(map (fn [j]
@@ -42,27 +87,31 @@
(get-in replica [:tasks j])))})
(:jobs replica))))
-(defn job->task-claims [replica job-offers]
+(defn job-claim-peers [replica job-offers]
(reduce-kv
- (fn [all j claim]
- (assoc all j (cts/task-claim-n-peers replica j claim)))
+ (fn [all j n]
+ (if (job-coverable? replica j n)
+ (let [sat (job-upper-bound replica j)]
+ (assoc all j (min sat n)))
+ (assoc all j 0)))
{}
job-offers))
(defn reallocate-peers [origin-replica displaced-peers max-utilization]
(loop [peer-pool displaced-peers
replica origin-replica]
- (let [candidate-jobs (filter identity
- (mapcat
- (fn [job]
- (let [current (get (current-task-allocations replica) job)
- desired (cts/task-distribute-peer-count replica job (get max-utilization job))
- tasks (get-in replica [:tasks job])]
- (map (fn [t]
- (when (< (or (get current t) 0) (get desired t))
- [job t]))
- tasks)))
- (:jobs replica)))]
+ (let [candidate-jobs (remove
+ nil?
+ (mapcat
+ (fn [job]
+ (let [current (get (current-task-allocations replica) job)
+ desired (cts/task-distribute-peer-count origin-replica job (get max-utilization job))
+ tasks (get-in replica [:tasks job])]
+ (map (fn [t]
+ (when (< (or (get current t) 0) (get desired t))
+ [job t]))
+ tasks)))
+ (sort-job-priority replica (:jobs replica))))]
(if (and (seq peer-pool) (seq candidate-jobs))
(recur (rest peer-pool)
(let [removed (common/remove-peers replica (first peer-pool))
@@ -101,15 +150,43 @@
(and (get-in replica [:acker-exclude-outputs job])
(some #{task} (get-in replica [:output-tasks job])))))
-(defn choose-acker-candidates [replica peers]
- (remove
- (fn [p]
- (let [{:keys [job task]} (common/peer->allocated-job (:allocations replica) p)]
- (exempt-from-acker? replica job task)))
+(defn find-physically-colocated-peers
+ "Takes replica and a peer. Returns a set of peers, exluding this peer,
+ that reside on the same physical machine."
+ [replica peer]
+ (let [peers (remove (fn [p] (= p peer)) (:peers replica))
+ peer-site (extensions/get-peer-site replica peer)]
+ (filter
+ (fn [p]
+ (= (extensions/get-peer-site replica p) peer-site))
+ peers)))
+
+(defn sort-acker-candidates
+ "We try to be smart about which ackers we pick. If we can avoid
+ colocating an acker and any peers executing an exempt task,
+ we try to. It's a best effort, though, so if it's not possible
+ we proceed anyway."
+ [replica peers]
+ (sort-by
+ (fn [peer]
+ (let [colocated-peers (find-physically-colocated-peers replica peer)
+ statuses (map
+ #(let [{:keys [job task]} (common/peer->allocated-job (:allocations replica) %)]
+ (exempt-from-acker? replica job task))
+ colocated-peers)]
+ (some #{true} statuses)))
peers))
+(defn choose-acker-candidates [replica peers]
+ (sort-acker-candidates
+ replica
+ (remove
+ (fn [p]
+ (let [{:keys [job task]} (common/peer->allocated-job (:allocations replica) p)]
+ (exempt-from-acker? replica job task)))
+ peers)))
+
(defn choose-ackers [replica]
- ;; TODO: ensure this behaves consistently with respect to ordering
(reduce
(fn [result job]
(let [peers (apply concat (vals (get-in result [:allocations job])))
@@ -120,11 +197,36 @@
replica
(:jobs replica)))
+(defn deallocate-starved-jobs
+ "Strips out allocations from jobs that no longer meet the minimum number
+ of peers. This can happen if a peer leaves from a running job."
+ [replica]
+ (reduce
+ (fn [result job]
+ (if (< (apply + (map count (vals (get-in result [:allocations job]))))
+ (apply + (vals (get-in result [:min-required-peers job]))))
+ (update-in result [:allocations] dissoc job)
+ result))
+ replica
+ (:jobs replica)))
+
+(defmulti equivalent-allocation?
+ (fn [replica replica-new]
+ (:job-scheduler replica)))
+
+(defmethod equivalent-allocation? :default
+ [_ _]
+ false)
+
(defn reconfigure-cluster-workload [replica]
(let [job-offers (job-offer-n-peers replica)
- job-claims (job->task-claims replica job-offers)
+ job-claims (job-claim-peers replica job-offers)
spare-peers (apply + (vals (merge-with - job-offers job-claims)))
max-utilization (claim-spare-peers replica job-claims spare-peers)
current-allocations (current-job-allocations replica)
- peers-to-displace (find-displaced-peers replica current-allocations max-utilization)]
- (choose-ackers (reallocate-peers replica peers-to-displace max-utilization))))
+ peers-to-displace (find-displaced-peers replica current-allocations max-utilization)
+ deallocated (deallocate-starved-jobs replica)
+ updated-replica (choose-ackers (reallocate-peers deallocated peers-to-displace max-utilization))]
+ (if (equivalent-allocation? replica updated-replica)
+ replica
+ updated-replica)))
diff --git a/src/onyx/scheduling/common_task_scheduler.clj b/src/onyx/scheduling/common_task_scheduler.clj
index 0458655..1e17dfc 100644
--- a/src/onyx/scheduling/common_task_scheduler.clj
+++ b/src/onyx/scheduling/common_task_scheduler.clj
@@ -17,6 +17,18 @@
completed (get-in replica [:completions job])]
(filter identity (second (diff completed tasks)))))
+(defn preallocated-grouped-task? [replica job task]
+ (and (not (nil? (get-in replica [:flux-policies job task])))
+ (> (count (get-in replica [:allocations job task])) 0)))
+
+(defn filter-grouped-tasks [replica job allocations]
+ (into
+ {}
+ (remove
+ (fn [[k v]]
+ (not (nil? (get-in replica [:flux-policies job k]))))
+ allocations)))
+
(defmulti drop-peers
(fn [replica job n]
(get-in replica [:task-schedulers job])))
@@ -29,20 +41,10 @@
scheduler)
{:replica replica}))))
-(defmulti task-claim-n-peers
- (fn [replica job n]
- (get-in replica [:task-schedulers job])))
-
(defmulti task-distribute-peer-count
(fn [replica job n]
(get-in replica [:task-schedulers job])))
-(defmethod task-claim-n-peers :default
- [replica job n]
- (throw (ex-info (format "Task scheduler %s not recognized" (get-in replica [:task-schedulers job]))
- {:task-scheduler (get-in replica [:task-schedulers job])
- :job job})))
-
(defmethod task-distribute-peer-count :default
[replica job n]
(throw (ex-info (format "Task scheduler %s not recognized" (get-in replica [:task-schedulers job]))
diff --git a/src/onyx/scheduling/greedy_job_scheduler.clj b/src/onyx/scheduling/greedy_job_scheduler.clj
index 1ab52b1..7835287 100644
--- a/src/onyx/scheduling/greedy_job_scheduler.clj
+++ b/src/onyx/scheduling/greedy_job_scheduler.clj
@@ -4,16 +4,14 @@
[onyx.log.commands.common :as common]))
(defn job-coverable? [replica job]
- (let [tasks (get-in replica [:tasks job])]
- (>= (count (get-in replica [:peers])) (count tasks))))
+ (let [min-req (apply + (vals (get-in replica [:min-required-peers job])))]
+ (>= (count (get-in replica [:peers])) min-req)))
(defmethod cjs/job-offer-n-peers :onyx.job-scheduler/greedy
[replica]
(if (seq (:jobs replica))
- (let [[active & passive] (:jobs replica)
- coverable? (job-coverable? replica active)
- n (if coverable? (count (:peers replica)) 0)]
- (merge {active n} (zipmap passive (repeat 0))))
+ (let [[active & passive] (:jobs replica)]
+ (merge {active (count (:peers replica))} (zipmap passive (repeat 0))))
{}))
(defmethod cjs/claim-spare-peers :onyx.job-scheduler/greedy
@@ -24,3 +22,10 @@
;; them. Return the same job claims since nothing will change.
;;
jobs)
+
+(defmethod cjs/sort-job-priority :onyx.job-scheduler/greedy
+ [replica jobs]
+ ;; We only care about the first job in a Greedy scheduler.
+ (if-let [x (first jobs)]
+ (vector x)
+ []))
diff --git a/src/onyx/scheduling/percentage_job_scheduler.clj b/src/onyx/scheduling/percentage_job_scheduler.clj
index f0254df..e49570c 100644
--- a/src/onyx/scheduling/percentage_job_scheduler.clj
+++ b/src/onyx/scheduling/percentage_job_scheduler.clj
@@ -36,6 +36,12 @@
init-allocations (min-allocations jobs-to-use n-peers)]
(into {} (map (fn [j] {(:job j) (:capacity j)}) init-allocations))))
+(defmethod cjs/sort-job-priority :onyx.job-scheduler/percentage
+ [replica jobs]
+ (sort-by (juxt #(.indexOf ^clojure.lang.PersistentVector (vec (:jobs replica)) %)
+ (fn [job] (apply + (map count (vals (get-in replica [:allocations job]))))))
+ (:jobs replica)))
+
(defmethod cjs/claim-spare-peers :onyx.job-scheduler/percentage
[replica jobs n]
;; We can get away with using the exact same algorithm as the
diff --git a/src/onyx/scheduling/percentage_task_scheduler.clj b/src/onyx/scheduling/percentage_task_scheduler.clj
index 058650c..040d12a 100644
--- a/src/onyx/scheduling/percentage_task_scheduler.clj
+++ b/src/onyx/scheduling/percentage_task_scheduler.clj
@@ -1,67 +1,93 @@
(ns onyx.scheduling.percentage-task-scheduler
- (:require [onyx.scheduling.common-task-scheduler :as cts]
+ (:require [onyx.scheduling.common-job-scheduler :as cjs]
+ [onyx.scheduling.common-task-scheduler :as cts]
[onyx.log.commands.common :as common]))
-(defn sort-tasks-by-pct [replica job tasks]
- (let [indexed
- (map-indexed
- (fn [k t]
- {:position k :task t :pct (get-in replica [:task-percentages job t])})
- (reverse tasks))]
- (reverse (sort-by (juxt :pct :position) indexed))))
+(defn tasks-by-pct [replica job tasks]
+ (map
+ (fn [t]
+ {:task t :pct (get-in replica [:task-percentages job t])})
+ tasks))
-(defn min-task-allocations [replica job tasks n-peers]
- (mapv
- (fn [task]
- (let [n (int (Math/floor (* (* 0.01 (:pct task)) n-peers)))]
- (assoc task :allocation n)))
- tasks))
+(defn rescale-task-percentages
+ "Rescale task percentages after saturated tasks were removed"
+ [tasks]
+ (let [total (/ (apply + (map :pct tasks)) 100)]
+ (map (fn [task]
+ (update-in task [:pct] / total))
+ tasks)))
-(defn percentage-balanced-taskload [replica job candidate-tasks n-peers]
- (let [sorted-tasks (sort-tasks-by-pct replica job candidate-tasks)
- init-allocations (min-task-allocations replica job sorted-tasks n-peers)
- init-usage (apply + (map :allocation init-allocations))
- left-over-peers (- n-peers init-usage)
- with-leftovers (update-in init-allocations [0 :allocation] + left-over-peers)]
- (into {} (map (fn [t] {(:task t) t}) with-leftovers))))
+(defn largest-remainder-allocations
+ "Allocates remaining peers to the tasks with the largest remainder.
+ e.g. 3 tasks pct allocated 3.5, 1.75, 1.75 -> 3, 2, 2"
+ [replica tasks n-peers job]
+ (let [tasks* (rescale-task-percentages tasks)
+ unrounded (map (fn [task]
+ (cond (cts/preallocated-grouped-task? replica job (:task task))
+ (count (get-in replica [:allocations job (:task task)]))
+ (not (nil? (get-in replica [:flux-policies job (:task task)])))
+ (max (get-in replica [:min-required-peers job (:task task)] Double/POSITIVE_INFINITY)
+ (* 0.01 (:pct task) n-peers))
+ :else (* 0.01 (:pct task) n-peers)))
+ tasks*)
+ full (map int unrounded)
+ taken (apply + full)
+ remaining (- n-peers taken)
+ full-allocated (zipmap tasks* full)
+ remainders (->> (map (fn [task v]
+ (vector task (- v (int v))))
+ tasks*
+ unrounded)
+ (sort-by second)
+ (reverse)
+ (take remaining)
+ (map (juxt first (constantly 1)))
+ (into {}))
+ final-allocations (merge-with + full-allocated remainders)]
+ (mapv (fn [[task allocation]]
+ (assoc task :allocation (int allocation)))
+ final-allocations)))
+
+(defn percentage-balanced-taskload
+ [replica job candidate-tasks n-peers]
+ {:post [(>= n-peers 0)]}
+ (let [sorted-tasks (tasks-by-pct replica job candidate-tasks)
+ allocations (largest-remainder-allocations replica sorted-tasks n-peers job)
+ oversaturated (filter (fn [{:keys [task allocation]}]
+ (> allocation (get-in replica [:task-saturation job task])))
+ allocations)
+ cutoff-oversaturated (->> oversaturated
+ (map (fn [{:keys [task] :as t}]
+ [task (assoc t :allocation (get-in replica [:task-saturation job task]))]))
+ (into {}))]
+ (if (empty? cutoff-oversaturated)
+ (into {} (map (fn [t] {(:task t) t}) allocations))
+ (let [n-peers-fully-saturated (apply + (map :allocation (vals cutoff-oversaturated)))
+ n-remaining-peers (- n-peers n-peers-fully-saturated)
+ unallocated-tasks (remove cutoff-oversaturated candidate-tasks)]
+ (merge (percentage-balanced-taskload replica job unallocated-tasks n-remaining-peers)
+ cutoff-oversaturated)))))
(defmethod cts/drop-peers :onyx.task-scheduler/percentage
[replica job n]
(let [tasks (keys (get-in replica [:allocations job]))
- balanced (percentage-balanced-taskload replica job tasks n)]
+ balanced (percentage-balanced-taskload replica job tasks n)
+ tasks (cts/filter-grouped-tasks replica job balanced)]
(mapcat
(fn [[task {:keys [allocation]}]]
(drop-last allocation (get-in replica [:allocations job task])))
- balanced)))
-
-(defmethod cts/task-claim-n-peers :onyx.task-scheduler/percentage
- [replica job n]
- ;; We can reuse the Balanced task scheduler algorithm as is.
- (cts/task-claim-n-peers
- (assoc-in replica [:task-schedulers job] :onyx.task-scheduler/balanced)
- job n))
-
-(defn reuse-spare-peers [replica job tasks spare-peers]
- (loop [[head & tail :as task-seq] (get-in replica [:tasks job])
- results tasks
- capacity spare-peers]
- (let [tail (vec tail)]
- (cond (or (<= capacity 0) (not (seq task-seq)))
- results
- (< (get results head) (or (get-in replica [:task-saturation job head] Double/POSITIVE_INFINITY)))
- (recur (conj tail head) (update-in results [head] inc) (dec capacity))
- :else
- (recur tail results capacity)))))
+ (take n (cycle tasks)))))
(defmethod cts/task-distribute-peer-count :onyx.task-scheduler/percentage
[replica job n]
(let [tasks (get-in replica [:tasks job])
- init
- (reduce
- (fn [all [task k]]
- (assoc all task (min (get-in replica [:task-saturation job task] Double/POSITIVE_INFINITY)
- (int (* n (get-in replica [:task-percentages job task]) 0.01)))))
- {}
- (map vector tasks (range)))
- spare-peers (- n (apply + (vals init)))]
- (reuse-spare-peers replica job init spare-peers)))
+ t (cjs/job-lower-bound replica job)]
+ (if (< n t)
+ (zipmap tasks (repeat 0))
+ (let [grouped (filter (partial cts/preallocated-grouped-task? replica job) tasks)
+ not-grouped (remove (partial cts/preallocated-grouped-task? replica job) tasks)
+ init (into {} (map (fn [t] {t (count (get-in replica [:allocations job t]))}) grouped))
+ spare-peers (- n (apply + (vals init)))
+ balanced (percentage-balanced-taskload replica job not-grouped spare-peers)
+ rets (into {} (map (fn [[k v]] {k (:allocation v)}) balanced))]
+ (merge init rets)))))
\ No newline at end of file
diff --git a/src/onyx/static/default_vals.clj b/src/onyx/static/default_vals.clj
index 3e08a88..838c3ec 100644
--- a/src/onyx/static/default_vals.clj
+++ b/src/onyx/static/default_vals.clj
@@ -1,19 +1,25 @@
(ns onyx.static.default-vals)
+(def default-pending-timeout 60000)
+
(def defaults
{; input task defaults
:onyx/input-retry-timeout 1000
- :onyx/pending-timeout 60000
+ :onyx/pending-timeout default-pending-timeout
:onyx/max-pending 10000
; task defaults
:onyx/batch-timeout 1000
+ ; zookeeper defaults
+ :onyx.zookeeper/backoff-base-sleep-time-ms 1000
+ :onyx.zookeeper/backoff-max-sleep-time-ms 30000
+ :onyx.zookeeper/backoff-max-retries 5
+
; peer defaults
:onyx.peer/inbox-capacity 1000
:onyx.peer/outbox-capacity 1000
:onyx.peer/drained-back-off 400
- :onyx.peer/sequential-back-off 2000
:onyx.peer/job-not-ready-back-off 500
; messaging defaults
@@ -21,6 +27,16 @@
:onyx.messaging.netty/thread-pool-sizes 1
:onyx.messaging.netty/connect-timeout-millis 1000
:onyx.messaging.netty/pending-buffer-size 10000
+ :onyx.messaging/inbound-buffer-size 200000
:onyx.messaging/completion-buffer-size 50000
:onyx.messaging/release-ch-buffer-size 10000
- :onyx.messaging/retry-ch-buffer-size 10000})
+ :onyx.messaging/retry-ch-buffer-size 10000
+ :onyx.messaging/max-downstream-links 10
+ :onyx.messaging/max-acker-links 5
+ :onyx.messaging/peer-link-gc-interval 90000
+ :onyx.messaging/peer-link-idle-timeout 60000
+ :onyx.messaging/ack-daemon-timeout default-pending-timeout
+ :onyx.messaging/ack-daemon-clear-interval 15000})
+
+(defn arg-or-default [k opts]
+ (or (get opts k) (get defaults k)))
diff --git a/src/onyx/static/validation.clj b/src/onyx/static/validation.clj
index 8a251ce..2104277 100644
--- a/src/onyx/static/validation.clj
+++ b/src/onyx/static/validation.clj
@@ -28,6 +28,12 @@
:else
(merge base-catalog-entry-validator {:onyx/fn schema/Keyword})))
+(def group-entry-validator
+ {(schema/optional-key :onyx/group-by-key) schema/Keyword
+ (schema/optional-key :onyx/group-by-fn) schema/Keyword
+ :onyx/min-peers schema/Int
+ :onyx/flux-policy (schema/enum :continue :kill)})
+
(defn task-dispatch-validator [task]
(when (= (:onyx/name task)
(:onyx/type task))
@@ -42,6 +48,12 @@
[catalog]
(doseq [entry catalog]
(schema/validate catalog-entry-validator entry)
+ (when (and (= (:onyx/type entry) :function)
+ (or (not (nil? (:onyx/group-by-key entry)))
+ (not (nil? (:onyx/group-by-fn entry)))))
+ (let [kws (select-keys entry [:onyx/group-by-fn :onyx/group-by-key
+ :onyx/min-peers :onyx/flux-policy])]
+ (schema/validate group-entry-validator kws)))
(name-and-type-not-equal entry)))
(defn validate-workflow-names [{:keys [workflow catalog]}]
@@ -53,6 +65,12 @@
"for the following workflow keywords: "
(apply str (interpose ", " missing-names)))))))
+(defn validate-workflow-no-dupes [{:keys [workflow]}]
+ (when-not (= (count workflow)
+ (count (set workflow)))
+ (throw (ex-info "Workflows entries cannot contain duplicates"
+ {:workflow workflow}))))
+
(defn catalog->type-task-names [catalog type-pred]
(set (map :onyx/name
(filter (fn [task]
@@ -86,7 +104,8 @@
(defn validate-workflow [job]
(validate-workflow-graph job)
- (validate-workflow-names job))
+ (validate-workflow-names job)
+ (validate-workflow-no-dupes job))
(def job-validator
{:catalog [(schema/pred map? 'map?)]
@@ -102,9 +121,10 @@
(defn validate-lifecycles [lifecycles catalog]
(doseq [lifecycle lifecycles]
- (assert (or (= (:lifecycle/task lifecycle) :all)
- (some #{(:lifecycle/task lifecycle)} (map :onyx/name catalog)))
- (str ":lifecycle/task must either name a task in the catalog or be :all, it was: " (:lifecycle/task lifecycle)))
+ (when-not (or (= (:lifecycle/task lifecycle) :all)
+ (some #{(:lifecycle/task lifecycle)} (map :onyx/name catalog)))
+ (throw (ex-info (str ":lifecycle/task must name a task in the catalog. It was: " (:lifecycle/task lifecycle))
+ {:lifecycle lifecycle :catalog catalog})))
(schema/validate
{:lifecycle/task schema/Keyword
(schema/optional-key :lifecycle/pre) schema/Keyword
@@ -120,10 +140,13 @@
:lifecycle/post
:lifecycle/doc]))))
+(def deployment-id-schema
+ (schema/either schema/Uuid schema/Str))
+
(defn validate-env-config [env-config]
(schema/validate
{:zookeeper/address schema/Str
- :onyx/id schema/Uuid
+ :onyx/id deployment-id-schema
(schema/optional-key :zookeeper/server?) schema/Bool
(schema/optional-key :zookeeper.server/port) schema/Int}
(select-keys env-config
@@ -132,7 +155,7 @@
(defn validate-peer-config [peer-config]
(schema/validate
{:zookeeper/address schema/Str
- :onyx/id schema/Uuid
+ :onyx/id deployment-id-schema
:onyx.peer/job-scheduler schema/Keyword
:onyx.messaging/impl (schema/enum :aeron :netty :core.async :dummy-messenger)
:onyx.messaging/bind-addr schema/Str
diff --git a/src/onyx/system.clj b/src/onyx/system.clj
index c7cfd7b..ff2873a 100644
--- a/src/onyx/system.clj
+++ b/src/onyx/system.clj
@@ -14,6 +14,7 @@
[onyx.log.commands.exhaust-input]
[onyx.log.commands.seal-output]
[onyx.log.commands.signal-ready]
+ [onyx.log.commands.set-replica]
[onyx.log.commands.leave-cluster]
[onyx.log.commands.submit-job]
[onyx.log.commands.kill-job]
diff --git a/src/onyx/types.clj b/src/onyx/types.clj
index aee8554..836f8f9 100644
--- a/src/onyx/types.clj
+++ b/src/onyx/types.clj
@@ -1,11 +1,13 @@
(ns onyx.types)
(defrecord Leaf [message id acker-id completion-id ack-val ack-vals route routes hash-group])
+
(defn leaf
([message]
- (->Leaf message nil nil nil nil nil nil nil nil)))
+ (->Leaf message nil nil nil nil nil nil nil nil)))
+
(defrecord Route [flow exclusions post-transformation action])
-(defrecord Ack [id completion-id ack-val])
-(defrecord Result [root leaves])
+(defrecord Ack [id completion-id ack-val timestamp])
+(defrecord Result [root leaves])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment