Last active
August 29, 2015 14:27
-
-
Save frsyuki/adc1eb457a82f1a68e1a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff -urp hadoop-2.4.0.2.1.5.0-695-src-orig/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java hadoop-2.4.0.2.1.5.0-695-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java | |
--- hadoop-2.4.0.2.1.5.0-695-src-orig/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java 2014-08-27 20:05:43.000000000 -0700 | |
+++ hadoop-2.4.0.2.1.5.0-695-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java 2015-08-12 18:17:22.000000000 -0700 | |
@@ -82,14 +82,18 @@ public class AppSchedulable extends Sche | |
Resources.addTo(demand, app.getCurrentConsumption()); | |
// Add up outstanding resource requests | |
- synchronized (app) { | |
- for (Priority p : app.getPriorities()) { | |
- for (ResourceRequest r : app.getResourceRequests(p).values()) { | |
- Resource total = Resources.multiply(r.getCapability(), r.getNumContainers()); | |
- Resources.addTo(demand, total); | |
- } | |
+ Resources.addTo(demand, getOutstandingDemand()); | |
+ } | |
+ | |
+ public synchronized Resource getOutstandingDemand() { | |
+ Resource outstandingDemand = Resources.createResource(0); | |
+ for (Priority p : app.getPriorities()) { | |
+ for (ResourceRequest r : app.getResourceRequests(p).values()) { | |
+ Resource total = Resources.multiply(r.getCapability(), r.getNumContainers()); | |
+ Resources.addTo(outstandingDemand, total); | |
} | |
} | |
+ return outstandingDemand; | |
} | |
@Override | |
diff -urp hadoop-2.4.0.2.1.5.0-695-src-orig/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java hadoop-2.4.0.2.1.5.0-695-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java | |
--- hadoop-2.4.0.2.1.5.0-695-src-orig/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java 2014-08-27 20:05:43.000000000 -0700 | |
+++ hadoop-2.4.0.2.1.5.0-695-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java 2015-08-12 18:17:22.000000000 -0700 | |
@@ -182,6 +182,20 @@ public class FSLeafQueue extends FSQueue | |
demand = Resources.componentwiseMin(demand, maxRes); | |
} | |
+ public Resource getOutstandingDemandOfRunnableApps() { | |
+ Resource maxRes = scheduler.getAllocationConfiguration() | |
+ .getMaxResources(getName()); | |
+ Resource outstandingDemand = Resources.createResource(0); | |
+ for (AppSchedulable sched : runnableAppScheds) { | |
+ if (Resources.equals(outstandingDemand, maxRes)) { | |
+ break; | |
+ } | |
+ outstandingDemand = Resources.add(outstandingDemand, sched.getOutstandingDemand()); | |
+ outstandingDemand = Resources.componentwiseMin(outstandingDemand, maxRes); | |
+ } | |
+ return outstandingDemand; | |
+ } | |
+ | |
@Override | |
public Resource assignContainer(FSSchedulerNode node) { | |
Resource assigned = Resources.none(); | |
diff -urp hadoop-2.4.0.2.1.5.0-695-src-orig/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-2.4.0.2.1.5.0-695-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java | |
--- hadoop-2.4.0.2.1.5.0-695-src-orig/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java 2015-08-12 18:23:20.000000000 -0700 | |
+++ hadoop-2.4.0.2.1.5.0-695-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java 2015-08-12 18:54:39.000000000 -0700 | |
@@ -174,6 +174,7 @@ public class FairScheduler extends Abstr | |
protected boolean sizeBasedWeight; // Give larger weights to larger jobs | |
protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster | |
protected boolean continuousSchedulingEnabled; // Continuous Scheduling enabled or not | |
+ protected boolean demandBlocksAmEnabled; // AM is blocked if queue has demand | |
protected int continuousSchedulingSleepMs; // Sleep time for each pass in continuous scheduling | |
private Comparator nodeAvailableResourceComparator = | |
new NodeAvailableResourceComparator(); // Node available resource comparator | |
@@ -236,6 +237,16 @@ public class FairScheduler extends Abstr | |
+ "=" + maxVcores + ", min should equal greater than 0" | |
+ ", max should be no smaller than min."); | |
} | |
+ | |
+ boolean demandBlocksAmEnabled = conf.getBoolean( | |
+ FairSchedulerConfiguration.DEMAND_BLOCKS_AM_ENABLED, | |
+ FairSchedulerConfiguration.DEFAULT_DEMAND_BLOCKS_AM_ENABLED); | |
+ boolean continuousSchedulingEnabled = conf.getBoolean( | |
+ FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED, | |
+ FairSchedulerConfiguration.DEFAULT_CONTINUOUS_SCHEDULING_ENABLED); | |
+ if (demandBlocksAmEnabled && !continuousSchedulingEnabled) { | |
+ throw new YarnRuntimeException("continuous-scheduling-enabled must be true if demand-block-am-enabled is true"); | |
+ } | |
} | |
public FairSchedulerConfiguration getConf() { | |
@@ -577,6 +588,10 @@ public class FairScheduler extends Abstr | |
return continuousSchedulingEnabled; | |
} | |
+ public boolean isDemandBlocksAmEnabled() { | |
+ return demandBlocksAmEnabled; | |
+ } | |
+ | |
public synchronized int getContinuousSchedulingSleepMs() { | |
return continuousSchedulingSleepMs; | |
} | |
@@ -665,7 +680,12 @@ public class FairScheduler extends Abstr | |
} | |
application.setCurrentAppAttempt(attempt); | |
- boolean runnable = maxRunningEnforcer.canAppBeRunnable(queue, user); | |
+ boolean runnable; | |
+ if (demandBlocksAmEnabled) { | |
+ runnable = false; | |
+ } else { | |
+ runnable = maxRunningEnforcer.canAppBeRunnable(queue, user); | |
+ } | |
queue.addApp(attempt, runnable); | |
if (runnable) { | |
maxRunningEnforcer.trackRunnableApp(attempt); | |
@@ -1024,6 +1044,12 @@ public class FairScheduler extends Abstr | |
": " + ex.toString(), ex); | |
} | |
} | |
+ | |
+ if (demandBlocksAmEnabled) { | |
+ synchronized (this) { | |
+ maxRunningEnforcer.updateRunnabilityUnderQueue(queueMgr.getRootQueue()); | |
+ } | |
+ } | |
} | |
/** Sort nodes by available resource */ | |
@@ -1239,6 +1265,8 @@ public class FairScheduler extends Abstr | |
continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled(); | |
continuousSchedulingSleepMs = | |
this.conf.getContinuousSchedulingSleepMs(); | |
+ demandBlocksAmEnabled = this.conf.isDemandBlocksAmEnabled(); | |
+ maxRunningEnforcer.setDemandBlocksAmEnabled(demandBlocksAmEnabled); | |
nodeLocalityThreshold = this.conf.getLocalityThresholdNode(); | |
rackLocalityThreshold = this.conf.getLocalityThresholdRack(); | |
nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs(); | |
diff -urp hadoop-2.4.0.2.1.5.0-695-src-orig/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java hadoop-2.4.0.2.1.5.0-695-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java | |
--- hadoop-2.4.0.2.1.5.0-695-src-orig/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java 2015-08-12 18:23:20.000000000 -0700 | |
+++ hadoop-2.4.0.2.1.5.0-695-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java 2015-08-12 18:25:42.000000000 -0700 | |
@@ -94,6 +94,10 @@ public class FairSchedulerConfiguration | |
protected static final String CONTINUOUS_SCHEDULING_ENABLED = CONF_PREFIX + "continuous-scheduling-enabled"; | |
protected static final boolean DEFAULT_CONTINUOUS_SCHEDULING_ENABLED = false; | |
+ /** Enable blocking ApplicationMaster by demand. */ | |
+ protected static final String DEMAND_BLOCKS_AM_ENABLED = CONF_PREFIX + "demand-blocks-am-enabled"; | |
+ protected static final boolean DEFAULT_DEMAND_BLOCKS_AM_ENABLED = false; | |
+ | |
/** Sleep time of each pass in continuous scheduling (5ms in default) */ | |
protected static final String CONTINUOUS_SCHEDULING_SLEEP_MS = CONF_PREFIX + "continuous-scheduling-sleep-ms"; | |
protected static final int DEFAULT_CONTINUOUS_SCHEDULING_SLEEP_MS = 5; | |
@@ -174,6 +178,10 @@ public class FairSchedulerConfiguration | |
return getBoolean(CONTINUOUS_SCHEDULING_ENABLED, DEFAULT_CONTINUOUS_SCHEDULING_ENABLED); | |
} | |
+ public boolean isDemandBlocksAmEnabled() { | |
+ return getBoolean(DEMAND_BLOCKS_AM_ENABLED, DEFAULT_DEMAND_BLOCKS_AM_ENABLED); | |
+ } | |
+ | |
public int getContinuousSchedulingSleepMs() { | |
return getInt(CONTINUOUS_SCHEDULING_SLEEP_MS, DEFAULT_CONTINUOUS_SCHEDULING_SLEEP_MS); | |
} | |
diff -urp hadoop-2.4.0.2.1.5.0-695-src-orig/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java hadoop-2.4.0.2.1.5.0-695-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java | |
--- hadoop-2.4.0.2.1.5.0-695-src-orig/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java 2014-08-27 20:05:43.000000000 -0700 | |
+++ hadoop-2.4.0.2.1.5.0-695-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java 2015-08-12 18:30:03.000000000 -0700 | |
@@ -30,6 +30,9 @@ import org.apache.commons.logging.LogFac | |
import com.google.common.annotations.VisibleForTesting; | |
import com.google.common.collect.ArrayListMultimap; | |
import com.google.common.collect.ListMultimap; | |
+import org.apache.hadoop.yarn.util.resource.Resources; | |
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; | |
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; | |
/** | |
* Handles tracking and enforcement for user and queue maxRunningApps | |
@@ -39,6 +42,7 @@ public class MaxRunningAppsEnforcer { | |
private static final Log LOG = LogFactory.getLog(FairScheduler.class); | |
private final FairScheduler scheduler; | |
+ private boolean demandBlocksAmEnabled; | |
// Tracks the number of running applications by user. | |
private final Map<String, Integer> usersNumRunnableApps; | |
@@ -51,6 +55,10 @@ public class MaxRunningAppsEnforcer { | |
this.usersNonRunnableApps = ArrayListMultimap.create(); | |
} | |
+ public void setDemandBlocksAmEnabled(boolean enabled) { | |
+ demandBlocksAmEnabled = enabled; | |
+ } | |
+ | |
/** | |
* Checks whether making the application runnable would exceed any | |
* maxRunningApps limits. | |
@@ -64,6 +72,9 @@ public class MaxRunningAppsEnforcer { | |
if (userNumRunnable >= allocConf.getUserMaxApps(user)) { | |
return false; | |
} | |
+ | |
+ FSQueue checkingQueue = queue; | |
+ | |
// Check queue and all parent queues | |
while (queue != null) { | |
int queueMaxApps = allocConf.getQueueMaxApps(queue.getName()); | |
@@ -73,9 +84,53 @@ public class MaxRunningAppsEnforcer { | |
queue = queue.getParent(); | |
} | |
+ if (demandBlocksAmEnabled) { | |
+ if (isDemandBlockingApp(checkingQueue)) { | |
+ if (LOG.isDebugEnabled()) { | |
+ LOG.debug("Demand is blocking ApplicationMaster"); | |
+ } | |
+ return false; | |
+ } | |
+ } | |
+ | |
return true; | |
} | |
+ private boolean isDemandBlockingApp(FSQueue queue) { | |
+ ResourceWeights targetWeight = queue.getWeights(); | |
+ | |
+ AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); | |
+ while (allocConf.getMaxResources(queue.getName()).equals(Resources.unbounded())) { | |
+ queue = queue.getParent(); | |
+ if (queue == null) { | |
+ return false; | |
+ } | |
+ } | |
+ | |
+ return hasHigherWeightOutstandingDemand(queue, targetWeight); | |
+ } | |
+ | |
+ private boolean hasHigherWeightOutstandingDemand(FSQueue queue, ResourceWeights targetWeight) { | |
+ if (queue instanceof FSLeafQueue) { | |
+ ResourceWeights weight = queue.getWeights(); | |
+ if (weight.getWeight(ResourceType.MEMORY) < targetWeight.getWeight(ResourceType.MEMORY)) { | |
+ return false; | |
+ } | |
+ FSLeafQueue leaf = (FSLeafQueue) queue; | |
+ if (!leaf.getOutstandingDemandOfRunnableApps().equals(Resources.none())) { | |
+ return true; | |
+ } | |
+ return false; | |
+ } else { | |
+ for (FSQueue childQueue : queue.getChildQueues()) { | |
+ if (hasHigherWeightOutstandingDemand(childQueue, targetWeight)) { | |
+ return true; | |
+ } | |
+ } | |
+ return false; | |
+ } | |
+ } | |
+ | |
/** | |
* Tracks the given new runnable app for purposes of maintaining max running | |
* app limits. | |
@@ -156,6 +211,19 @@ public class MaxRunningAppsEnforcer { | |
} | |
} | |
+ tryToRunApps(appsNowMaybeRunnable); | |
+ } | |
+ | |
+ public void updateRunnabilityUnderQueue(FSQueue queue) { | |
+ List<List<AppSchedulable>> appsNowMaybeRunnable = | |
+ new ArrayList<List<AppSchedulable>>(); | |
+ | |
+ gatherPossiblyRunnableAppLists(queue, appsNowMaybeRunnable); | |
+ | |
+ tryToRunApps(appsNowMaybeRunnable); | |
+ } | |
+ | |
+ private void tryToRunApps(List<List<AppSchedulable>> appsNowMaybeRunnable) { | |
// Scan through and check whether this means that any apps are now runnable | |
Iterator<FSSchedulerApp> iter = new MultiListStartTimeIterator( | |
appsNowMaybeRunnable); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java | |
index cfec915..4fc831e 100644 | |
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java | |
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java | |
@@ -798,14 +798,18 @@ public void updateDemand() { | |
Resources.addTo(demand, getCurrentConsumption()); | |
// Add up outstanding resource requests | |
- synchronized (this) { | |
- for (Priority p : getPriorities()) { | |
- for (ResourceRequest r : getResourceRequests(p).values()) { | |
- Resources.multiplyAndAddTo(demand, | |
- r.getCapability(), r.getNumContainers()); | |
- } | |
+ Resources.addTo(demand, getOutstandingDemand()); | |
+ } | |
+ | |
+ public synchronized Resource getOutstandingDemand() { | |
+ Resource outstandingDemand = Resources.createResource(0); | |
+ for (Priority p : getPriorities()) { | |
+ for (ResourceRequest r : getResourceRequests(p).values()) { | |
+ Resources.multiplyAndAddTo(outstandingDemand, | |
+ r.getCapability(), r.getNumContainers()); | |
} | |
} | |
+ return outstandingDemand; | |
} | |
@Override | |
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java | |
index f90a198..e86a9f3 100644 | |
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java | |
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java | |
@@ -303,6 +303,25 @@ private void updateDemandForApp(FSAppAttempt sched, Resource maxRes) { | |
demand = Resources.componentwiseMin(demand, maxRes); | |
} | |
+ public Resource getOutstandingDemandOfRunnableApps() { | |
+ Resource maxRes = scheduler.getAllocationConfiguration() | |
+ .getMaxResources(getName()); | |
+ Resource outstandingDemand = Resources.createResource(0); | |
+ readLock.lock(); | |
+ try { | |
+ for (FSAppAttempt app : runnableApps) { | |
+ if (Resources.equals(outstandingDemand, maxRes)) { | |
+ break; | |
+ } | |
+ outstandingDemand = Resources.add(outstandingDemand, app.getOutstandingDemand()); | |
+ outstandingDemand = Resources.componentwiseMin(outstandingDemand, maxRes); | |
+ } | |
+ } finally { | |
+ readLock.unlock(); | |
+ } | |
+ return outstandingDemand; | |
+ } | |
+ | |
@Override | |
public Resource assignContainer(FSSchedulerNode node) { | |
Resource assigned = Resources.none(); | |
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java | |
index 3eefb8f..2c1d903 100644 | |
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java | |
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java | |
@@ -176,6 +176,7 @@ | |
protected boolean sizeBasedWeight; // Give larger weights to larger jobs | |
protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster | |
protected boolean continuousSchedulingEnabled; // Continuous Scheduling enabled or not | |
+ protected boolean demandBlocksAmEnabled; // AM is blocked if queue has demand | |
protected int continuousSchedulingSleepMs; // Sleep time for each pass in continuous scheduling | |
private Comparator<NodeId> nodeAvailableResourceComparator = | |
new NodeAvailableResourceComparator(); // Node available resource comparator | |
@@ -239,6 +240,16 @@ private void validateConf(Configuration conf) { | |
+ "=" + maxVcores + ", min should equal greater than 0" | |
+ ", max should be no smaller than min."); | |
} | |
+ | |
+ boolean demandBlocksAmEnabled = conf.getBoolean( | |
+ FairSchedulerConfiguration.DEMAND_BLOCKS_AM_ENABLED, | |
+ FairSchedulerConfiguration.DEFAULT_DEMAND_BLOCKS_AM_ENABLED); | |
+ boolean continuousSchedulingEnabled = conf.getBoolean( | |
+ FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED, | |
+ FairSchedulerConfiguration.DEFAULT_CONTINUOUS_SCHEDULING_ENABLED); | |
+ if (demandBlocksAmEnabled && !continuousSchedulingEnabled) { | |
+ throw new YarnRuntimeException("continuous-scheduling-enabled must be true if demand-blocks-am-enabled is true"); | |
+ } | |
} | |
public FairSchedulerConfiguration getConf() { | |
@@ -566,6 +577,10 @@ public boolean isContinuousSchedulingEnabled() { | |
return continuousSchedulingEnabled; | |
} | |
+ public boolean isDemandBlocksAmEnabled() { | |
+ return demandBlocksAmEnabled; | |
+ } | |
+ | |
public synchronized int getContinuousSchedulingSleepMs() { | |
return continuousSchedulingSleepMs; | |
} | |
@@ -669,7 +684,12 @@ protected synchronized void addApplicationAttempt( | |
} | |
application.setCurrentAppAttempt(attempt); | |
- boolean runnable = maxRunningEnforcer.canAppBeRunnable(queue, user); | |
+ boolean runnable; | |
+ if (demandBlocksAmEnabled) { | |
+ runnable = false; | |
+ } else { | |
+ runnable = maxRunningEnforcer.canAppBeRunnable(queue, user); | |
+ } | |
queue.addApp(attempt, runnable); | |
if (runnable) { | |
maxRunningEnforcer.trackRunnableApp(attempt); | |
@@ -1041,6 +1061,12 @@ void continuousSchedulingAttempt() throws InterruptedException { | |
long duration = getClock().getTime() - start; | |
fsOpDurations.addContinuousSchedulingRunDuration(duration); | |
+ | |
+ if (demandBlocksAmEnabled) { | |
+ synchronized (this) { | |
+ maxRunningEnforcer.updateRunnabilityOnReload(); | |
+ } | |
+ } | |
} | |
/** Sort nodes by available resource */ | |
@@ -1314,6 +1340,8 @@ private void initScheduler(Configuration conf) throws IOException { | |
continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled(); | |
continuousSchedulingSleepMs = | |
this.conf.getContinuousSchedulingSleepMs(); | |
+ demandBlocksAmEnabled = this.conf.isDemandBlocksAmEnabled(); | |
+ maxRunningEnforcer.setDemandBlockAmEnabled(demandBlocksAmEnabled); | |
nodeLocalityThreshold = this.conf.getLocalityThresholdNode(); | |
rackLocalityThreshold = this.conf.getLocalityThresholdRack(); | |
nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs(); | |
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java | |
index e477e6e..002409c 100644 | |
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java | |
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java | |
@@ -95,6 +95,10 @@ | |
protected static final String CONTINUOUS_SCHEDULING_ENABLED = CONF_PREFIX + "continuous-scheduling-enabled"; | |
protected static final boolean DEFAULT_CONTINUOUS_SCHEDULING_ENABLED = false; | |
+ /** Enable blocking ApplicationMaster by demand. */ | |
+ protected static final String DEMAND_BLOCKS_AM_ENABLED = CONF_PREFIX + "demand-blocks-am-enabled"; | |
+ protected static final boolean DEFAULT_DEMAND_BLOCKS_AM_ENABLED = false; | |
+ | |
/** Sleep time of each pass in continuous scheduling (5ms in default) */ | |
protected static final String CONTINUOUS_SCHEDULING_SLEEP_MS = CONF_PREFIX + "continuous-scheduling-sleep-ms"; | |
protected static final int DEFAULT_CONTINUOUS_SCHEDULING_SLEEP_MS = 5; | |
@@ -179,6 +183,10 @@ public boolean isContinuousSchedulingEnabled() { | |
return getBoolean(CONTINUOUS_SCHEDULING_ENABLED, DEFAULT_CONTINUOUS_SCHEDULING_ENABLED); | |
} | |
+ public boolean isDemandBlocksAmEnabled() { | |
+ return getBoolean(DEMAND_BLOCKS_AM_ENABLED, DEFAULT_DEMAND_BLOCKS_AM_ENABLED); | |
+ } | |
+ | |
public int getContinuousSchedulingSleepMs() { | |
return getInt(CONTINUOUS_SCHEDULING_SLEEP_MS, DEFAULT_CONTINUOUS_SCHEDULING_SLEEP_MS); | |
} | |
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java | |
index f750438..d745f19 100644 | |
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java | |
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java | |
@@ -30,6 +30,10 @@ | |
import com.google.common.annotations.VisibleForTesting; | |
import com.google.common.collect.ArrayListMultimap; | |
import com.google.common.collect.ListMultimap; | |
+import org.apache.hadoop.yarn.api.records.Resource; | |
+import org.apache.hadoop.yarn.util.resource.Resources; | |
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; | |
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; | |
/** | |
* Handles tracking and enforcement for user and queue maxRunningApps | |
@@ -39,6 +43,7 @@ | |
private static final Log LOG = LogFactory.getLog(FairScheduler.class); | |
private final FairScheduler scheduler; | |
+ private boolean demandBlocksAmEnabled; | |
// Tracks the number of running applications by user. | |
private final Map<String, Integer> usersNumRunnableApps; | |
@@ -51,6 +56,10 @@ public MaxRunningAppsEnforcer(FairScheduler scheduler) { | |
this.usersNonRunnableApps = ArrayListMultimap.create(); | |
} | |
+ public void setDemandBlockAmEnabled(boolean enabled) { | |
+ demandBlocksAmEnabled = enabled; | |
+ } | |
+ | |
/** | |
* Checks whether making the application runnable would exceed any | |
* maxRunningApps limits. | |
@@ -64,6 +73,8 @@ public boolean canAppBeRunnable(FSQueue queue, String user) { | |
if (userNumRunnable >= allocConf.getUserMaxApps(user)) { | |
return false; | |
} | |
+ FSQueue checkingQueue = queue; | |
+ | |
// Check queue and all parent queues | |
while (queue != null) { | |
int queueMaxApps = allocConf.getQueueMaxApps(queue.getName()); | |
@@ -73,9 +84,55 @@ public boolean canAppBeRunnable(FSQueue queue, String user) { | |
queue = queue.getParent(); | |
} | |
+ if (demandBlocksAmEnabled) { | |
+ if (isDemandBlockingApp(checkingQueue)) { | |
+ if (LOG.isDebugEnabled()) { | |
+ LOG.debug("Demand is blocking ApplicationMaster"); | |
+ } | |
+ return false; | |
+ } | |
+ } | |
+ | |
return true; | |
} | |
+ private boolean isDemandBlockingApp(FSQueue queue) { | |
+ ResourceWeights targetWeight = queue.getWeights(); | |
+ | |
+ AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); | |
+ while (allocConf.getMaxResources(queue.getName()).equals(Resources.unbounded())) { | |
+ queue = queue.getParent(); | |
+ if (queue == null) { | |
+ return false; | |
+ } | |
+ } | |
+ | |
+ return hasHigherWeightOutstandingDemand(queue, targetWeight); | |
+ } | |
+ | |
+ private boolean hasHigherWeightOutstandingDemand(FSQueue queue, ResourceWeights targetWeight) { | |
+ if (queue instanceof FSLeafQueue) { | |
+ ResourceWeights weight = queue.getWeights(); | |
+ // here compares only MEMORY even with DominantResourceFairnessPolicy because | |
+ // FairScheduler never sets CPU weight. | |
+ if (weight.getWeight(ResourceType.MEMORY) < targetWeight.getWeight(ResourceType.MEMORY)) { | |
+ return false; | |
+ } | |
+ FSLeafQueue leaf = (FSLeafQueue) queue; | |
+ if (!leaf.getOutstandingDemandOfRunnableApps().equals(Resources.none())) { | |
+ return true; | |
+ } | |
+ return false; | |
+ } else { | |
+ for (FSQueue childQueue : queue.getChildQueues()) { | |
+ if (hasHigherWeightOutstandingDemand(childQueue, targetWeight)) { | |
+ return true; | |
+ } | |
+ } | |
+ return false; | |
+ } | |
+ } | |
+ | |
/** | |
* Tracks the given new runnable app for purposes of maintaining max running | |
* app limits. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment