Last active
November 14, 2017 02:56
-
-
Save erikdw/7e35d455de871b73a155745fa4bd2f3f to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git storm/src/main/storm/mesos/MesosSupervisor.java storm/src/main/storm/mesos/MesosSupervisor.java | |
index 4d3b05b..2cbb777 100644 | |
--- storm/src/main/storm/mesos/MesosSupervisor.java | |
+++ storm/src/main/storm/mesos/MesosSupervisor.java | |
@@ -41,9 +41,13 @@ import storm.mesos.util.MesosCommon; | |
import java.lang.reflect.Method; | |
import java.util.ArrayList; | |
import java.util.Collection; | |
+import java.util.HashSet; | |
import java.util.List; | |
import java.util.Map; | |
+import java.util.Set; | |
import java.util.concurrent.CountDownLatch; | |
+import java.util.concurrent.atomic.AtomicReference; | |
public class MesosSupervisor implements ISupervisor { | |
public static final Logger LOG = LoggerFactory.getLogger(MesosSupervisor.class); | |
@@ -56,17 +60,15 @@ public class MesosSupervisor implements ISupervisor { | |
Map _conf; | |
// Store state on port assignments arriving from MesosNimbus as task-launching requests. | |
private static final TaskAssignments _taskAssignments = TaskAssignments.getInstance(); | |
+ // What is the storm-core supervisor's view of the assigned ports? | |
+ AtomicReference<Set<Integer>> _supervisorViewOfAssignedPorts = new AtomicReference<Set<Integer>>(new HashSet<Integer>()); | |
public static void main(String[] args) { | |
Map<String, Object> conf = ConfigUtils.readStormConfig(); | |
try { | |
Supervisor supervisor = new Supervisor(conf, null, new MesosSupervisor()); | |
- | |
- // We use reflection to call the private method, 'launchDaemon', which calls 'launch' | |
- Method m = Supervisor.class.getDeclaredMethod("launchDaemon"); | |
- m.setAccessible(true); | |
- m.invoke(supervisor); | |
+ supervisor.launchDaemon(); | |
} catch (Exception e) { | |
String msg = String.format("main: Exception: %s", e.getMessage()); | |
LOG.error(msg); | |
@@ -74,12 +76,10 @@ public class MesosSupervisor implements ISupervisor { | |
} | |
} | |
- /** | |
- * This method is no longer called by the Supervisor since it was refactored from Clojure to Java, | |
- * starting in Storm 1.0.3. Now, port changes get captured in calls to 'confirmAssigned'. | |
- */ | |
@Override | |
public void assigned(Collection<Integer> ports) { | |
+ if (ports == null) ports = new HashSet<>(); | |
+ _supervisorViewOfAssignedPorts.set(new HashSet<>(ports)); | |
} | |
@Override | |
@@ -267,7 +267,7 @@ public class MesosSupervisor implements ISupervisor { | |
try { | |
while (true) { | |
long now = System.currentTimeMillis(); | |
- if (!_taskAssignments.getAssignedPorts().isEmpty()) { | |
+ if (!_supervisorViewOfAssignedPorts.get().isEmpty()) { | |
_lastTime = now; | |
} | |
if ((now - _lastTime) > 1000L * _timeoutSecs) { |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment