Skip to content

Instantly share code, notes, and snippets.

@gjcourt
Created February 7, 2014 21:56
Show Gist options
  • Save gjcourt/8872783 to your computer and use it in GitHub Desktop.
Save gjcourt/8872783 to your computer and use it in GitHub Desktop.
diff --git a/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj b/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj
index 28b9202..9e10fab 100644
--- a/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj
+++ b/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj
@@ -22,15 +22,16 @@
:implements [backtype.storm.scheduler.IScheduler]))
(defn sort-slots [all-slots]
- (let [split-up (vals (group-by first all-slots))]
- (apply interleave-all split-up)
+ ; Creates a sequence of [[node, port], ...] ordered by node
+ (let [split-up (reverse (map reverse (vals (group-by first all-slots))))]
+ (reverse (apply interleave-all split-up))
))
(defn get-alive-assigned-node+port->executors [cluster topology-id]
(let [existing-assignment (.getAssignmentById cluster topology-id)
executor->slot (if existing-assignment
(.getExecutorToSlot existing-assignment)
- {})
+ {})
executor->node+port (into {} (for [[^ExecutorDetails executor ^WorkerSlot slot] executor->slot
:let [executor [(.getStartTask executor) (.getEndTask executor)]
node+port [(.getNodeId slot) (.getPort slot)]]]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment