Skip to content

Instantly share code, notes, and snippets.

@frsyuki
Last active August 29, 2015 14:27
Show Gist options
  • Save frsyuki/adc1eb457a82f1a68e1a to your computer and use it in GitHub Desktop.
Save frsyuki/adc1eb457a82f1a68e1a to your computer and use it in GitHub Desktop.
diff -urp hadoop-2.4.0.2.1.5.0-695-src-orig/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java hadoop-2.4.0.2.1.5.0-695-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
--- hadoop-2.4.0.2.1.5.0-695-src-orig/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java 2014-08-27 20:05:43.000000000 -0700
+++ hadoop-2.4.0.2.1.5.0-695-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java 2015-08-12 18:17:22.000000000 -0700
@@ -82,14 +82,18 @@ public class AppSchedulable extends Sche
Resources.addTo(demand, app.getCurrentConsumption());
// Add up outstanding resource requests
- synchronized (app) {
- for (Priority p : app.getPriorities()) {
- for (ResourceRequest r : app.getResourceRequests(p).values()) {
- Resource total = Resources.multiply(r.getCapability(), r.getNumContainers());
- Resources.addTo(demand, total);
- }
+ Resources.addTo(demand, getOutstandingDemand());
+ }
+
+ public synchronized Resource getOutstandingDemand() {
+ Resource outstandingDemand = Resources.createResource(0);
+ for (Priority p : app.getPriorities()) {
+ for (ResourceRequest r : app.getResourceRequests(p).values()) {
+ Resource total = Resources.multiply(r.getCapability(), r.getNumContainers());
+ Resources.addTo(outstandingDemand, total);
}
}
+ return outstandingDemand;
}
@Override
diff -urp hadoop-2.4.0.2.1.5.0-695-src-orig/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java hadoop-2.4.0.2.1.5.0-695-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
--- hadoop-2.4.0.2.1.5.0-695-src-orig/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java 2014-08-27 20:05:43.000000000 -0700
+++ hadoop-2.4.0.2.1.5.0-695-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java 2015-08-12 18:17:22.000000000 -0700
@@ -182,6 +182,20 @@ public class FSLeafQueue extends FSQueue
demand = Resources.componentwiseMin(demand, maxRes);
}
+ public Resource getOutstandingDemandOfRunnableApps() {
+ Resource maxRes = scheduler.getAllocationConfiguration()
+ .getMaxResources(getName());
+ Resource outstandingDemand = Resources.createResource(0);
+ for (AppSchedulable sched : runnableAppScheds) {
+ if (Resources.equals(outstandingDemand, maxRes)) {
+ break;
+ }
+ outstandingDemand = Resources.add(outstandingDemand, sched.getOutstandingDemand());
+ outstandingDemand = Resources.componentwiseMin(outstandingDemand, maxRes);
+ }
+ return outstandingDemand;
+ }
+
@Override
public Resource assignContainer(FSSchedulerNode node) {
Resource assigned = Resources.none();
diff -urp hadoop-2.4.0.2.1.5.0-695-src-orig/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-2.4.0.2.1.5.0-695-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
--- hadoop-2.4.0.2.1.5.0-695-src-orig/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java 2015-08-12 18:23:20.000000000 -0700
+++ hadoop-2.4.0.2.1.5.0-695-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java 2015-08-12 18:54:39.000000000 -0700
@@ -174,6 +174,7 @@ public class FairScheduler extends Abstr
protected boolean sizeBasedWeight; // Give larger weights to larger jobs
protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster
protected boolean continuousSchedulingEnabled; // Continuous Scheduling enabled or not
+ protected boolean demandBlocksAmEnabled; // AM is blocked if queue has demand
protected int continuousSchedulingSleepMs; // Sleep time for each pass in continuous scheduling
private Comparator nodeAvailableResourceComparator =
new NodeAvailableResourceComparator(); // Node available resource comparator
@@ -236,6 +237,16 @@ public class FairScheduler extends Abstr
+ "=" + maxVcores + ", min should equal greater than 0"
+ ", max should be no smaller than min.");
}
+
+ boolean demandBlocksAmEnabled = conf.getBoolean(
+ FairSchedulerConfiguration.DEMAND_BLOCKS_AM_ENABLED,
+ FairSchedulerConfiguration.DEFAULT_DEMAND_BLOCKS_AM_ENABLED);
+ boolean continuousSchedulingEnabled = conf.getBoolean(
+ FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED,
+ FairSchedulerConfiguration.DEFAULT_CONTINUOUS_SCHEDULING_ENABLED);
+ if (demandBlocksAmEnabled && !continuousSchedulingEnabled) {
+ throw new YarnRuntimeException("continuous-scheduling-enabled must be true if demand-block-am-enabled is true");
+ }
}
public FairSchedulerConfiguration getConf() {
@@ -577,6 +588,10 @@ public class FairScheduler extends Abstr
return continuousSchedulingEnabled;
}
+ public boolean isDemandBlocksAmEnabled() {
+ return demandBlocksAmEnabled;
+ }
+
public synchronized int getContinuousSchedulingSleepMs() {
return continuousSchedulingSleepMs;
}
@@ -665,7 +680,12 @@ public class FairScheduler extends Abstr
}
application.setCurrentAppAttempt(attempt);
- boolean runnable = maxRunningEnforcer.canAppBeRunnable(queue, user);
+ boolean runnable;
+ if (demandBlocksAmEnabled) {
+ runnable = false;
+ } else {
+ runnable = maxRunningEnforcer.canAppBeRunnable(queue, user);
+ }
queue.addApp(attempt, runnable);
if (runnable) {
maxRunningEnforcer.trackRunnableApp(attempt);
@@ -1024,6 +1044,12 @@ public class FairScheduler extends Abstr
": " + ex.toString(), ex);
}
}
+
+ if (demandBlocksAmEnabled) {
+ synchronized (this) {
+ maxRunningEnforcer.updateRunnabilityUnderQueue(queueMgr.getRootQueue());
+ }
+ }
}
/** Sort nodes by available resource */
@@ -1239,6 +1265,8 @@ public class FairScheduler extends Abstr
continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
continuousSchedulingSleepMs =
this.conf.getContinuousSchedulingSleepMs();
+ demandBlocksAmEnabled = this.conf.isDemandBlocksAmEnabled();
+ maxRunningEnforcer.setDemandBlocksAmEnabled(demandBlocksAmEnabled);
nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
rackLocalityThreshold = this.conf.getLocalityThresholdRack();
nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs();
diff -urp hadoop-2.4.0.2.1.5.0-695-src-orig/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java hadoop-2.4.0.2.1.5.0-695-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
--- hadoop-2.4.0.2.1.5.0-695-src-orig/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java 2015-08-12 18:23:20.000000000 -0700
+++ hadoop-2.4.0.2.1.5.0-695-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java 2015-08-12 18:25:42.000000000 -0700
@@ -94,6 +94,10 @@ public class FairSchedulerConfiguration
protected static final String CONTINUOUS_SCHEDULING_ENABLED = CONF_PREFIX + "continuous-scheduling-enabled";
protected static final boolean DEFAULT_CONTINUOUS_SCHEDULING_ENABLED = false;
+ /** Enable blocking ApplicationMaster by demand. */
+ protected static final String DEMAND_BLOCKS_AM_ENABLED = CONF_PREFIX + "demand-blocks-am-enabled";
+ protected static final boolean DEFAULT_DEMAND_BLOCKS_AM_ENABLED = false;
+
/** Sleep time of each pass in continuous scheduling (5ms in default) */
protected static final String CONTINUOUS_SCHEDULING_SLEEP_MS = CONF_PREFIX + "continuous-scheduling-sleep-ms";
protected static final int DEFAULT_CONTINUOUS_SCHEDULING_SLEEP_MS = 5;
@@ -174,6 +178,10 @@ public class FairSchedulerConfiguration
return getBoolean(CONTINUOUS_SCHEDULING_ENABLED, DEFAULT_CONTINUOUS_SCHEDULING_ENABLED);
}
+ public boolean isDemandBlocksAmEnabled() {
+ return getBoolean(DEMAND_BLOCKS_AM_ENABLED, DEFAULT_DEMAND_BLOCKS_AM_ENABLED);
+ }
+
public int getContinuousSchedulingSleepMs() {
return getInt(CONTINUOUS_SCHEDULING_SLEEP_MS, DEFAULT_CONTINUOUS_SCHEDULING_SLEEP_MS);
}
diff -urp hadoop-2.4.0.2.1.5.0-695-src-orig/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java hadoop-2.4.0.2.1.5.0-695-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java
--- hadoop-2.4.0.2.1.5.0-695-src-orig/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java 2014-08-27 20:05:43.000000000 -0700
+++ hadoop-2.4.0.2.1.5.0-695-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java 2015-08-12 18:30:03.000000000 -0700
@@ -30,6 +30,9 @@ import org.apache.commons.logging.LogFac
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
/**
* Handles tracking and enforcement for user and queue maxRunningApps
@@ -39,6 +42,7 @@ public class MaxRunningAppsEnforcer {
private static final Log LOG = LogFactory.getLog(FairScheduler.class);
private final FairScheduler scheduler;
+ private boolean demandBlocksAmEnabled;
// Tracks the number of running applications by user.
private final Map<String, Integer> usersNumRunnableApps;
@@ -51,6 +55,10 @@ public class MaxRunningAppsEnforcer {
this.usersNonRunnableApps = ArrayListMultimap.create();
}
+ public void setDemandBlocksAmEnabled(boolean enabled) {
+ demandBlocksAmEnabled = enabled;
+ }
+
/**
* Checks whether making the application runnable would exceed any
* maxRunningApps limits.
@@ -64,6 +72,9 @@ public class MaxRunningAppsEnforcer {
if (userNumRunnable >= allocConf.getUserMaxApps(user)) {
return false;
}
+
+ FSQueue checkingQueue = queue;
+
// Check queue and all parent queues
while (queue != null) {
int queueMaxApps = allocConf.getQueueMaxApps(queue.getName());
@@ -73,9 +84,53 @@ public class MaxRunningAppsEnforcer {
queue = queue.getParent();
}
+ if (demandBlocksAmEnabled) {
+ if (isDemandBlockingApp(checkingQueue)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Demand is blocking ApplicationMaster");
+ }
+ return false;
+ }
+ }
+
return true;
}
+ private boolean isDemandBlockingApp(FSQueue queue) {
+ ResourceWeights targetWeight = queue.getWeights();
+
+ AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
+ while (allocConf.getMaxResources(queue.getName()).equals(Resources.unbounded())) {
+ queue = queue.getParent();
+ if (queue == null) {
+ return false;
+ }
+ }
+
+ return hasHigherWeightOutstandingDemand(queue, targetWeight);
+ }
+
+ private boolean hasHigherWeightOutstandingDemand(FSQueue queue, ResourceWeights targetWeight) {
+ if (queue instanceof FSLeafQueue) {
+ ResourceWeights weight = queue.getWeights();
+ if (weight.getWeight(ResourceType.MEMORY) < targetWeight.getWeight(ResourceType.MEMORY)) {
+ return false;
+ }
+ FSLeafQueue leaf = (FSLeafQueue) queue;
+ if (!leaf.getOutstandingDemandOfRunnableApps().equals(Resources.none())) {
+ return true;
+ }
+ return false;
+ } else {
+ for (FSQueue childQueue : queue.getChildQueues()) {
+ if (hasHigherWeightOutstandingDemand(childQueue, targetWeight)) {
+ return true;
+ }
+ }
+ return false;
+ }
+ }
+
/**
* Tracks the given new runnable app for purposes of maintaining max running
* app limits.
@@ -156,6 +211,19 @@ public class MaxRunningAppsEnforcer {
}
}
+ tryToRunApps(appsNowMaybeRunnable);
+ }
+
+ public void updateRunnabilityUnderQueue(FSQueue queue) {
+ List<List<AppSchedulable>> appsNowMaybeRunnable =
+ new ArrayList<List<AppSchedulable>>();
+
+ gatherPossiblyRunnableAppLists(queue, appsNowMaybeRunnable);
+
+ tryToRunApps(appsNowMaybeRunnable);
+ }
+
+ private void tryToRunApps(List<List<AppSchedulable>> appsNowMaybeRunnable) {
// Scan through and check whether this means that any apps are now runnable
Iterator<FSSchedulerApp> iter = new MultiListStartTimeIterator(
appsNowMaybeRunnable);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index cfec915..4fc831e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -798,14 +798,18 @@ public void updateDemand() {
Resources.addTo(demand, getCurrentConsumption());
// Add up outstanding resource requests
- synchronized (this) {
- for (Priority p : getPriorities()) {
- for (ResourceRequest r : getResourceRequests(p).values()) {
- Resources.multiplyAndAddTo(demand,
- r.getCapability(), r.getNumContainers());
- }
+ Resources.addTo(demand, getOutstandingDemand());
+ }
+
+ public synchronized Resource getOutstandingDemand() {
+ Resource outstandingDemand = Resources.createResource(0);
+ for (Priority p : getPriorities()) {
+ for (ResourceRequest r : getResourceRequests(p).values()) {
+ Resources.multiplyAndAddTo(outstandingDemand,
+ r.getCapability(), r.getNumContainers());
}
}
+ return outstandingDemand;
}
@Override
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
index f90a198..e86a9f3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
@@ -303,6 +303,25 @@ private void updateDemandForApp(FSAppAttempt sched, Resource maxRes) {
demand = Resources.componentwiseMin(demand, maxRes);
}
+ public Resource getOutstandingDemandOfRunnableApps() {
+ Resource maxRes = scheduler.getAllocationConfiguration()
+ .getMaxResources(getName());
+ Resource outstandingDemand = Resources.createResource(0);
+ readLock.lock();
+ try {
+ for (FSAppAttempt app : runnableApps) {
+ if (Resources.equals(outstandingDemand, maxRes)) {
+ break;
+ }
+ outstandingDemand = Resources.add(outstandingDemand, app.getOutstandingDemand());
+ outstandingDemand = Resources.componentwiseMin(outstandingDemand, maxRes);
+ }
+ } finally {
+ readLock.unlock();
+ }
+ return outstandingDemand;
+ }
+
@Override
public Resource assignContainer(FSSchedulerNode node) {
Resource assigned = Resources.none();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index 3eefb8f..2c1d903 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -176,6 +176,7 @@
protected boolean sizeBasedWeight; // Give larger weights to larger jobs
protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster
protected boolean continuousSchedulingEnabled; // Continuous Scheduling enabled or not
+ protected boolean demandBlocksAmEnabled; // AM is blocked if queue has demand
protected int continuousSchedulingSleepMs; // Sleep time for each pass in continuous scheduling
private Comparator<NodeId> nodeAvailableResourceComparator =
new NodeAvailableResourceComparator(); // Node available resource comparator
@@ -239,6 +240,16 @@ private void validateConf(Configuration conf) {
+ "=" + maxVcores + ", min should equal greater than 0"
+ ", max should be no smaller than min.");
}
+
+ boolean demandBlocksAmEnabled = conf.getBoolean(
+ FairSchedulerConfiguration.DEMAND_BLOCKS_AM_ENABLED,
+ FairSchedulerConfiguration.DEFAULT_DEMAND_BLOCKS_AM_ENABLED);
+ boolean continuousSchedulingEnabled = conf.getBoolean(
+ FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED,
+ FairSchedulerConfiguration.DEFAULT_CONTINUOUS_SCHEDULING_ENABLED);
+ if (demandBlocksAmEnabled && !continuousSchedulingEnabled) {
+ throw new YarnRuntimeException("continuous-scheduling-enabled must be true if demand-blocks-am-enabled is true");
+ }
}
public FairSchedulerConfiguration getConf() {
@@ -566,6 +577,10 @@ public boolean isContinuousSchedulingEnabled() {
return continuousSchedulingEnabled;
}
+ public boolean isDemandBlocksAmEnabled() {
+ return demandBlocksAmEnabled;
+ }
+
public synchronized int getContinuousSchedulingSleepMs() {
return continuousSchedulingSleepMs;
}
@@ -669,7 +684,12 @@ protected synchronized void addApplicationAttempt(
}
application.setCurrentAppAttempt(attempt);
- boolean runnable = maxRunningEnforcer.canAppBeRunnable(queue, user);
+ boolean runnable;
+ if (demandBlocksAmEnabled) {
+ runnable = false;
+ } else {
+ runnable = maxRunningEnforcer.canAppBeRunnable(queue, user);
+ }
queue.addApp(attempt, runnable);
if (runnable) {
maxRunningEnforcer.trackRunnableApp(attempt);
@@ -1041,6 +1061,12 @@ void continuousSchedulingAttempt() throws InterruptedException {
long duration = getClock().getTime() - start;
fsOpDurations.addContinuousSchedulingRunDuration(duration);
+
+ if (demandBlocksAmEnabled) {
+ synchronized (this) {
+ maxRunningEnforcer.updateRunnabilityOnReload();
+ }
+ }
}
/** Sort nodes by available resource */
@@ -1314,6 +1340,8 @@ private void initScheduler(Configuration conf) throws IOException {
continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
continuousSchedulingSleepMs =
this.conf.getContinuousSchedulingSleepMs();
+ demandBlocksAmEnabled = this.conf.isDemandBlocksAmEnabled();
+ maxRunningEnforcer.setDemandBlockAmEnabled(demandBlocksAmEnabled);
nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
rackLocalityThreshold = this.conf.getLocalityThresholdRack();
nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
index e477e6e..002409c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
@@ -95,6 +95,10 @@
protected static final String CONTINUOUS_SCHEDULING_ENABLED = CONF_PREFIX + "continuous-scheduling-enabled";
protected static final boolean DEFAULT_CONTINUOUS_SCHEDULING_ENABLED = false;
+ /** Enable blocking ApplicationMaster by demand. */
+ protected static final String DEMAND_BLOCKS_AM_ENABLED = CONF_PREFIX + "demand-blocks-am-enabled";
+ protected static final boolean DEFAULT_DEMAND_BLOCKS_AM_ENABLED = false;
+
/** Sleep time of each pass in continuous scheduling (5ms in default) */
protected static final String CONTINUOUS_SCHEDULING_SLEEP_MS = CONF_PREFIX + "continuous-scheduling-sleep-ms";
protected static final int DEFAULT_CONTINUOUS_SCHEDULING_SLEEP_MS = 5;
@@ -179,6 +183,10 @@ public boolean isContinuousSchedulingEnabled() {
return getBoolean(CONTINUOUS_SCHEDULING_ENABLED, DEFAULT_CONTINUOUS_SCHEDULING_ENABLED);
}
+ public boolean isDemandBlocksAmEnabled() {
+ return getBoolean(DEMAND_BLOCKS_AM_ENABLED, DEFAULT_DEMAND_BLOCKS_AM_ENABLED);
+ }
+
public int getContinuousSchedulingSleepMs() {
return getInt(CONTINUOUS_SCHEDULING_SLEEP_MS, DEFAULT_CONTINUOUS_SCHEDULING_SLEEP_MS);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java
index f750438..d745f19 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java
@@ -30,6 +30,10 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
/**
* Handles tracking and enforcement for user and queue maxRunningApps
@@ -39,6 +43,7 @@
private static final Log LOG = LogFactory.getLog(FairScheduler.class);
private final FairScheduler scheduler;
+ private boolean demandBlocksAmEnabled;
// Tracks the number of running applications by user.
private final Map<String, Integer> usersNumRunnableApps;
@@ -51,6 +56,10 @@ public MaxRunningAppsEnforcer(FairScheduler scheduler) {
this.usersNonRunnableApps = ArrayListMultimap.create();
}
+ public void setDemandBlockAmEnabled(boolean enabled) {
+ demandBlocksAmEnabled = enabled;
+ }
+
/**
* Checks whether making the application runnable would exceed any
* maxRunningApps limits.
@@ -64,6 +73,8 @@ public boolean canAppBeRunnable(FSQueue queue, String user) {
if (userNumRunnable >= allocConf.getUserMaxApps(user)) {
return false;
}
+ FSQueue checkingQueue = queue;
+
// Check queue and all parent queues
while (queue != null) {
int queueMaxApps = allocConf.getQueueMaxApps(queue.getName());
@@ -73,9 +84,55 @@ public boolean canAppBeRunnable(FSQueue queue, String user) {
queue = queue.getParent();
}
+ if (demandBlocksAmEnabled) {
+ if (isDemandBlockingApp(checkingQueue)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Demand is blocking ApplicationMaster");
+ }
+ return false;
+ }
+ }
+
return true;
}
+ private boolean isDemandBlockingApp(FSQueue queue) {
+ ResourceWeights targetWeight = queue.getWeights();
+
+ AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
+ while (allocConf.getMaxResources(queue.getName()).equals(Resources.unbounded())) {
+ queue = queue.getParent();
+ if (queue == null) {
+ return false;
+ }
+ }
+
+ return hasHigherWeightOutstandingDemand(queue, targetWeight);
+ }
+
+ private boolean hasHigherWeightOutstandingDemand(FSQueue queue, ResourceWeights targetWeight) {
+ if (queue instanceof FSLeafQueue) {
+ ResourceWeights weight = queue.getWeights();
+ // here compares only MEMORY even with DominantResourceFairnessPolicy because
+ // FairScheduler never sets CPU weight.
+ if (weight.getWeight(ResourceType.MEMORY) < targetWeight.getWeight(ResourceType.MEMORY)) {
+ return false;
+ }
+ FSLeafQueue leaf = (FSLeafQueue) queue;
+ if (!leaf.getOutstandingDemandOfRunnableApps().equals(Resources.none())) {
+ return true;
+ }
+ return false;
+ } else {
+ for (FSQueue childQueue : queue.getChildQueues()) {
+ if (hasHigherWeightOutstandingDemand(childQueue, targetWeight)) {
+ return true;
+ }
+ }
+ return false;
+ }
+ }
+
/**
* Tracks the given new runnable app for purposes of maintaining max running
* app limits.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment