Created
August 7, 2015 06:36
-
-
Save frsyuki/7b57e9c246b938f8496c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff -urp hadoop-2.4.0.2.1.5.0-695-src-orig/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java hadoop-2.4.0.2.1.5.0-695-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java | |
--- hadoop-2.4.0.2.1.5.0-695-src-orig/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java 2014-08-27 20:05:43.000000000 -0700 | |
+++ hadoop-2.4.0.2.1.5.0-695-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java 2015-08-06 15:46:40.000000000 -0700 | |
@@ -92,6 +92,20 @@ public class AppSchedulable extends Sche | |
} | |
} | |
+ public Resource getOutstandingDemand() { | |
+ // Add up outstanding resource requests | |
+ Resource outstandingDemand = Resources.createResource(0); | |
+ synchronized (app) { | |
+ for (Priority p : app.getPriorities()) { | |
+ for (ResourceRequest r : app.getResourceRequests(p).values()) { | |
+ Resource total = Resources.multiply(r.getCapability(), r.getNumContainers()); | |
+ Resources.addTo(outstandingDemand, total); | |
+ } | |
+ } | |
+ } | |
+ return outstandingDemand; | |
+ } | |
+ | |
@Override | |
public Resource getDemand() { | |
return demand; | |
diff -urp hadoop-2.4.0.2.1.5.0-695-src-orig/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java hadoop-2.4.0.2.1.5.0-695-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java | |
--- hadoop-2.4.0.2.1.5.0-695-src-orig/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java 2014-08-27 20:05:43.000000000 -0700 | |
+++ hadoop-2.4.0.2.1.5.0-695-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java 2015-08-06 15:42:59.000000000 -0700 | |
@@ -182,6 +182,20 @@ public class FSLeafQueue extends FSQueue | |
demand = Resources.componentwiseMin(demand, maxRes); | |
} | |
+ public Resource getOutstandingDemandOfRunnableApps() { | |
+ Resource maxRes = scheduler.getAllocationConfiguration() | |
+ .getMaxResources(getName()); | |
+ Resource outstandingDemand = Resources.createResource(0); | |
+ for (AppSchedulable sched : runnableAppScheds) { | |
+ if (Resources.equals(outstandingDemand, maxRes)) { | |
+ break; | |
+ } | |
+ outstandingDemand = Resources.add(outstandingDemand, sched.getOutstandingDemand()); | |
+ outstandingDemand = Resources.componentwiseMin(outstandingDemand, maxRes); | |
+ } | |
+ return outstandingDemand; | |
+ } | |
+ | |
@Override | |
public Resource assignContainer(FSSchedulerNode node) { | |
Resource assigned = Resources.none(); | |
diff -urp hadoop-2.4.0.2.1.5.0-695-src-orig/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-2.4.0.2.1.5.0-695-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java | |
--- hadoop-2.4.0.2.1.5.0-695-src-orig/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java 2015-08-06 23:35:31.000000000 -0700 | |
+++ hadoop-2.4.0.2.1.5.0-695-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java 2015-08-06 17:36:34.000000000 -0700 | |
@@ -799,8 +799,8 @@ public class FairScheduler extends Abstr | |
Container container = rmContainer.getContainer(); | |
- // Get the application for the finished container | |
- FSSchedulerApp application = | |
+ // Get the application for the finished container | |
+ FSSchedulerApp application = | |
getCurrentAttemptForContainer(container.getId()); | |
ApplicationId appId = | |
container.getId().getApplicationAttemptId().getApplicationId(); | |
@@ -823,6 +823,9 @@ public class FairScheduler extends Abstr | |
updateRootQueueMetrics(); | |
} | |
+ LOG.error("updateRunnabilityOnAppRemoval: "+application+" "); | |
+ maxRunningEnforcer.updateRunnabilityOnAppRemoval(application, application.getQueue()); | |
+ | |
LOG.info("Application attempt " + application.getApplicationAttemptId() | |
+ " released container " + container.getId() + " on node: " + node | |
+ " with event: " + event); | |
@@ -1024,6 +1027,11 @@ public class FairScheduler extends Abstr | |
": " + ex.toString(), ex); | |
} | |
} | |
+ | |
+ synchronized (this) { | |
+ LOG.error("updateRunnabilityUnderQueue of root"); | |
+ maxRunningEnforcer.updateRunnabilityUnderQueue(queueMgr.getRootQueue()); | |
+ } | |
} | |
/** Sort nodes by available resource */ | |
@@ -1239,6 +1247,8 @@ public class FairScheduler extends Abstr | |
continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled(); | |
continuousSchedulingSleepMs = | |
this.conf.getContinuousSchedulingSleepMs(); | |
+ continuousSchedulingEnabled = true; | |
+ continuousSchedulingSleepMs = 1000; | |
nodeLocalityThreshold = this.conf.getLocalityThresholdNode(); | |
rackLocalityThreshold = this.conf.getLocalityThresholdRack(); | |
nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs(); | |
diff -urp hadoop-2.4.0.2.1.5.0-695-src-orig/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java hadoop-2.4.0.2.1.5.0-695-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java | |
--- hadoop-2.4.0.2.1.5.0-695-src-orig/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java 2014-08-27 20:05:43.000000000 -0700 | |
+++ hadoop-2.4.0.2.1.5.0-695-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java 2015-08-06 18:51:01.000000000 -0700 | |
@@ -30,6 +30,9 @@ import org.apache.commons.logging.LogFac | |
import com.google.common.annotations.VisibleForTesting; | |
import com.google.common.collect.ArrayListMultimap; | |
import com.google.common.collect.ListMultimap; | |
+import org.apache.hadoop.yarn.util.resource.Resources; | |
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; | |
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; | |
/** | |
* Handles tracking and enforcement for user and queue maxRunningApps | |
@@ -64,6 +67,14 @@ public class MaxRunningAppsEnforcer { | |
if (userNumRunnable >= allocConf.getUserMaxApps(user)) { | |
return false; | |
} | |
+ | |
+ LOG.error("checking demand for queue "+queue); | |
+ if (isDemandBlockingApp(queue)) { | |
+ LOG.error("Demand is blocking app"); | |
+ return false; | |
+ } | |
+ LOG.error("No demand. Run app."); | |
+ | |
// Check queue and all parent queues | |
while (queue != null) { | |
int queueMaxApps = allocConf.getQueueMaxApps(queue.getName()); | |
@@ -76,6 +87,68 @@ public class MaxRunningAppsEnforcer { | |
return true; | |
} | |
+ private boolean isDemandBlockingApp(FSQueue queue) { | |
+ ResourceWeights targetWeight = queue.getWeights(); | |
+ LOG.error("target weight: "+targetWeight); | |
+ | |
+ AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); | |
+ while (allocConf.getMaxResources(queue.getName()).equals(Resources.unbounded())) { | |
+ queue = queue.getParent(); | |
+ LOG.error("Going upto parent queue because getMaxResources is none: "+queue.getMaxShare()); | |
+ if (queue == null) { | |
+ LOG.error("root queue?"); | |
+ return false; | |
+ } | |
+ } | |
+ LOG.error("Using this queue to check deman: "+queue); | |
+ | |
+ return hasHigherWeightOutstandingDemand(queue, targetWeight); | |
+ } | |
+ | |
+ //private boolean hasHigherWeightWaitingApp(FSQueue queue, ResourceWeights targetWeight) { | |
+ // if (queue instanceof FSLeafQueue) { | |
+ // ResourceWeights weight = queue.getWeights(); | |
+ // if (weight.getWeight(ResourceType.MEMORY) < targetWeight.getWeight(ResourceType.MEMORY)) { | |
+ // return false; | |
+ // } | |
+ // return !((FSLeafQueue) queue).getNonRunnableAppSchedulables().isEmpty(); | |
+ // } else { | |
+ // while (FSQueue childQueue : queue.getChildQueues()) { | |
+ // if (!hasHigherWeightWaitingApp(childQueue)) { | |
+ // return true; | |
+ // } | |
+ // } | |
+ // return false; | |
+ // } | |
+ //} | |
+ | |
+ private boolean hasHigherWeightOutstandingDemand(FSQueue queue, ResourceWeights targetWeight) { | |
+ if (queue instanceof FSLeafQueue) { | |
+ ResourceWeights weight = queue.getWeights(); | |
+ LOG.error("Found leaf queue. weight: "+weight+ " < targetWeight("+targetWeight+")?"+(weight.getWeight(ResourceType.MEMORY) < targetWeight.getWeight(ResourceType.MEMORY))); | |
+ if (weight.getWeight(ResourceType.MEMORY) < targetWeight.getWeight(ResourceType.MEMORY)) { | |
+ return false; | |
+ } | |
+ FSLeafQueue leaf = (FSLeafQueue) queue; | |
+ // Demand is current consumption plus outstanding requests | |
+ LOG.error("Checking demand: "+leaf.getDemand()); | |
+ LOG.error("Checking resource usage: "+leaf.getResourceUsage()); | |
+ LOG.error("Checking outstanding demand: "+leaf.getOutstandingDemandOfRunnableApps()); | |
+ if (leaf.getOutstandingDemandOfRunnableApps().equals(Resources.none())) { | |
+ return false; | |
+ } | |
+ return true; | |
+ } else { | |
+ LOG.error("Found parent queue. Traversing child queues: "+queue.getChildQueues().size()); | |
+ for (FSQueue childQueue : queue.getChildQueues()) { | |
+ if (hasHigherWeightOutstandingDemand(childQueue, targetWeight)) { | |
+ return true; | |
+ } | |
+ } | |
+ return false; | |
+ } | |
+ } | |
+ | |
/** | |
* Tracks the given new runnable app for purposes of maintaining max running | |
* app limits. | |
@@ -156,6 +229,19 @@ public class MaxRunningAppsEnforcer { | |
} | |
} | |
+ tryToRunApps(appsNowMaybeRunnable); | |
+ } | |
+ | |
+ public void updateRunnabilityUnderQueue(FSQueue queue) { | |
+ List<List<AppSchedulable>> appsNowMaybeRunnable = | |
+ new ArrayList<List<AppSchedulable>>(); | |
+ | |
+ gatherPossiblyRunnableAppLists(queue, appsNowMaybeRunnable); | |
+ | |
+ tryToRunApps(appsNowMaybeRunnable); | |
+ } | |
+ | |
+ private void tryToRunApps(List<List<AppSchedulable>> appsNowMaybeRunnable) { | |
// Scan through and check whether this means that any apps are now runnable | |
Iterator<FSSchedulerApp> iter = new MultiListStartTimeIterator( | |
appsNowMaybeRunnable); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment