Skip to content

Instantly share code, notes, and snippets.

@frsyuki
Created August 7, 2015 06:36
Show Gist options
  • Save frsyuki/7b57e9c246b938f8496c to your computer and use it in GitHub Desktop.
Save frsyuki/7b57e9c246b938f8496c to your computer and use it in GitHub Desktop.
diff -urp hadoop-2.4.0.2.1.5.0-695-src-orig/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java hadoop-2.4.0.2.1.5.0-695-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
--- hadoop-2.4.0.2.1.5.0-695-src-orig/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java 2014-08-27 20:05:43.000000000 -0700
+++ hadoop-2.4.0.2.1.5.0-695-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java 2015-08-06 15:46:40.000000000 -0700
@@ -92,6 +92,20 @@ public class AppSchedulable extends Sche
}
}
+ public Resource getOutstandingDemand() {
+ // Add up outstanding resource requests
+ Resource outstandingDemand = Resources.createResource(0);
+ synchronized (app) {
+ for (Priority p : app.getPriorities()) {
+ for (ResourceRequest r : app.getResourceRequests(p).values()) {
+ Resource total = Resources.multiply(r.getCapability(), r.getNumContainers());
+ Resources.addTo(outstandingDemand, total);
+ }
+ }
+ }
+ return outstandingDemand;
+ }
+
@Override
public Resource getDemand() {
return demand;
diff -urp hadoop-2.4.0.2.1.5.0-695-src-orig/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java hadoop-2.4.0.2.1.5.0-695-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
--- hadoop-2.4.0.2.1.5.0-695-src-orig/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java 2014-08-27 20:05:43.000000000 -0700
+++ hadoop-2.4.0.2.1.5.0-695-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java 2015-08-06 15:42:59.000000000 -0700
@@ -182,6 +182,20 @@ public class FSLeafQueue extends FSQueue
demand = Resources.componentwiseMin(demand, maxRes);
}
+ public Resource getOutstandingDemandOfRunnableApps() {
+ Resource maxRes = scheduler.getAllocationConfiguration()
+ .getMaxResources(getName());
+ Resource outstandingDemand = Resources.createResource(0);
+ for (AppSchedulable sched : runnableAppScheds) {
+ if (Resources.equals(outstandingDemand, maxRes)) {
+ break;
+ }
+ outstandingDemand = Resources.add(outstandingDemand, sched.getOutstandingDemand());
+ outstandingDemand = Resources.componentwiseMin(outstandingDemand, maxRes);
+ }
+ return outstandingDemand;
+ }
+
@Override
public Resource assignContainer(FSSchedulerNode node) {
Resource assigned = Resources.none();
diff -urp hadoop-2.4.0.2.1.5.0-695-src-orig/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-2.4.0.2.1.5.0-695-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
--- hadoop-2.4.0.2.1.5.0-695-src-orig/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java 2015-08-06 23:35:31.000000000 -0700
+++ hadoop-2.4.0.2.1.5.0-695-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java 2015-08-06 17:36:34.000000000 -0700
@@ -799,8 +799,8 @@ public class FairScheduler extends Abstr
Container container = rmContainer.getContainer();
- // Get the application for the finished container
- FSSchedulerApp application =
+ // Get the application for the finished container
+ FSSchedulerApp application =
getCurrentAttemptForContainer(container.getId());
ApplicationId appId =
container.getId().getApplicationAttemptId().getApplicationId();
@@ -823,6 +823,9 @@ public class FairScheduler extends Abstr
updateRootQueueMetrics();
}
+ LOG.error("updateRunnabilityOnAppRemoval: "+application+" ");
+ maxRunningEnforcer.updateRunnabilityOnAppRemoval(application, application.getQueue());
+
LOG.info("Application attempt " + application.getApplicationAttemptId()
+ " released container " + container.getId() + " on node: " + node
+ " with event: " + event);
@@ -1024,6 +1027,11 @@ public class FairScheduler extends Abstr
": " + ex.toString(), ex);
}
}
+
+ synchronized (this) {
+ LOG.error("updateRunnabilityUnderQueue of root");
+ maxRunningEnforcer.updateRunnabilityUnderQueue(queueMgr.getRootQueue());
+ }
}
/** Sort nodes by available resource */
@@ -1239,6 +1247,8 @@ public class FairScheduler extends Abstr
continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
continuousSchedulingSleepMs =
this.conf.getContinuousSchedulingSleepMs();
+ continuousSchedulingEnabled = true;
+ continuousSchedulingSleepMs = 1000;
nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
rackLocalityThreshold = this.conf.getLocalityThresholdRack();
nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs();
diff -urp hadoop-2.4.0.2.1.5.0-695-src-orig/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java hadoop-2.4.0.2.1.5.0-695-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java
--- hadoop-2.4.0.2.1.5.0-695-src-orig/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java 2014-08-27 20:05:43.000000000 -0700
+++ hadoop-2.4.0.2.1.5.0-695-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java 2015-08-06 18:51:01.000000000 -0700
@@ -30,6 +30,9 @@ import org.apache.commons.logging.LogFac
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
/**
* Handles tracking and enforcement for user and queue maxRunningApps
@@ -64,6 +67,14 @@ public class MaxRunningAppsEnforcer {
if (userNumRunnable >= allocConf.getUserMaxApps(user)) {
return false;
}
+
+ LOG.error("checking demand for queue "+queue);
+ if (isDemandBlockingApp(queue)) {
+ LOG.error("Demand is blocking app");
+ return false;
+ }
+ LOG.error("No demand. Run app.");
+
// Check queue and all parent queues
while (queue != null) {
int queueMaxApps = allocConf.getQueueMaxApps(queue.getName());
@@ -76,6 +87,68 @@ public class MaxRunningAppsEnforcer {
return true;
}
+ private boolean isDemandBlockingApp(FSQueue queue) {
+ ResourceWeights targetWeight = queue.getWeights();
+ LOG.error("target weight: "+targetWeight);
+
+ AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
+ while (allocConf.getMaxResources(queue.getName()).equals(Resources.unbounded())) {
+ queue = queue.getParent();
+ LOG.error("Going upto parent queue because getMaxResources is none: "+queue.getMaxShare());
+ if (queue == null) {
+ LOG.error("root queue?");
+ return false;
+ }
+ }
+ LOG.error("Using this queue to check deman: "+queue);
+
+ return hasHigherWeightOutstandingDemand(queue, targetWeight);
+ }
+
+ //private boolean hasHigherWeightWaitingApp(FSQueue queue, ResourceWeights targetWeight) {
+ // if (queue instanceof FSLeafQueue) {
+ // ResourceWeights weight = queue.getWeights();
+ // if (weight.getWeight(ResourceType.MEMORY) < targetWeight.getWeight(ResourceType.MEMORY)) {
+ // return false;
+ // }
+ // return !((FSLeafQueue) queue).getNonRunnableAppSchedulables().isEmpty();
+ // } else {
+ // while (FSQueue childQueue : queue.getChildQueues()) {
+ // if (!hasHigherWeightWaitingApp(childQueue)) {
+ // return true;
+ // }
+ // }
+ // return false;
+ // }
+ //}
+
+ private boolean hasHigherWeightOutstandingDemand(FSQueue queue, ResourceWeights targetWeight) {
+ if (queue instanceof FSLeafQueue) {
+ ResourceWeights weight = queue.getWeights();
+ LOG.error("Found leaf queue. weight: "+weight+ " < targetWeight("+targetWeight+")?"+(weight.getWeight(ResourceType.MEMORY) < targetWeight.getWeight(ResourceType.MEMORY)));
+ if (weight.getWeight(ResourceType.MEMORY) < targetWeight.getWeight(ResourceType.MEMORY)) {
+ return false;
+ }
+ FSLeafQueue leaf = (FSLeafQueue) queue;
+ // Demand is current consumption plus outstanding requests
+ LOG.error("Checking demand: "+leaf.getDemand());
+ LOG.error("Checking resource usage: "+leaf.getResourceUsage());
+ LOG.error("Checking outstanding demand: "+leaf.getOutstandingDemandOfRunnableApps());
+ if (leaf.getOutstandingDemandOfRunnableApps().equals(Resources.none())) {
+ return false;
+ }
+ return true;
+ } else {
+ LOG.error("Found parent queue. Traversing child queues: "+queue.getChildQueues().size());
+ for (FSQueue childQueue : queue.getChildQueues()) {
+ if (hasHigherWeightOutstandingDemand(childQueue, targetWeight)) {
+ return true;
+ }
+ }
+ return false;
+ }
+ }
+
/**
* Tracks the given new runnable app for purposes of maintaining max running
* app limits.
@@ -156,6 +229,19 @@ public class MaxRunningAppsEnforcer {
}
}
+ tryToRunApps(appsNowMaybeRunnable);
+ }
+
+ public void updateRunnabilityUnderQueue(FSQueue queue) {
+ List<List<AppSchedulable>> appsNowMaybeRunnable =
+ new ArrayList<List<AppSchedulable>>();
+
+ gatherPossiblyRunnableAppLists(queue, appsNowMaybeRunnable);
+
+ tryToRunApps(appsNowMaybeRunnable);
+ }
+
+ private void tryToRunApps(List<List<AppSchedulable>> appsNowMaybeRunnable) {
// Scan through and check whether this means that any apps are now runnable
Iterator<FSSchedulerApp> iter = new MultiListStartTimeIterator(
appsNowMaybeRunnable);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment