Skip to content

Instantly share code, notes, and snippets.

@erikdw
Last active November 14, 2017 02:56
Show Gist options
  • Save erikdw/7e35d455de871b73a155745fa4bd2f3f to your computer and use it in GitHub Desktop.
Save erikdw/7e35d455de871b73a155745fa4bd2f3f to your computer and use it in GitHub Desktop.
diff --git storm/src/main/storm/mesos/MesosSupervisor.java storm/src/main/storm/mesos/MesosSupervisor.java
index 4d3b05b..2cbb777 100644
--- storm/src/main/storm/mesos/MesosSupervisor.java
+++ storm/src/main/storm/mesos/MesosSupervisor.java
@@ -41,9 +41,13 @@ import storm.mesos.util.MesosCommon;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
public class MesosSupervisor implements ISupervisor {
public static final Logger LOG = LoggerFactory.getLogger(MesosSupervisor.class);
@@ -56,17 +60,15 @@ public class MesosSupervisor implements ISupervisor {
Map _conf;
// Store state on port assignments arriving from MesosNimbus as task-launching requests.
private static final TaskAssignments _taskAssignments = TaskAssignments.getInstance();
+ // What is the storm-core supervisor's view of the assigned ports?
+ AtomicReference<Set<Integer>> _supervisorViewOfAssignedPorts = new AtomicReference<Set<Integer>>(new HashSet<Integer>());
public static void main(String[] args) {
Map<String, Object> conf = ConfigUtils.readStormConfig();
try {
Supervisor supervisor = new Supervisor(conf, null, new MesosSupervisor());
-
- // We use reflection to call the private method, 'launchDaemon', which calls 'launch'
- Method m = Supervisor.class.getDeclaredMethod("launchDaemon");
- m.setAccessible(true);
- m.invoke(supervisor);
+ supervisor.launchDaemon();
} catch (Exception e) {
String msg = String.format("main: Exception: %s", e.getMessage());
LOG.error(msg);
@@ -74,12 +76,10 @@ public class MesosSupervisor implements ISupervisor {
}
}
- /**
- * This method is no longer called by the Supervisor since it was refactored from Clojure to Java,
- * starting in Storm 1.0.3. Now, port changes get captured in calls to 'confirmAssigned'.
- */
@Override
public void assigned(Collection<Integer> ports) {
+ if (ports == null) ports = new HashSet<>();
+ _supervisorViewOfAssignedPorts.set(new HashSet<>(ports));
}
@Override
@@ -267,7 +267,7 @@ public class MesosSupervisor implements ISupervisor {
try {
while (true) {
long now = System.currentTimeMillis();
- if (!_taskAssignments.getAssignedPorts().isEmpty()) {
+ if (!_supervisorViewOfAssignedPorts.get().isEmpty()) {
_lastTime = now;
}
if ((now - _lastTime) > 1000L * _timeoutSecs) {
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment