Skip to content

Instantly share code, notes, and snippets.

@frsyuki
Last active August 29, 2015 14:25
Show Gist options
  • Save frsyuki/a3416e36d4d70b30f48d to your computer and use it in GitHub Desktop.
Save frsyuki/a3416e36d4d70b30f48d to your computer and use it in GitHub Desktop.
Problem:
HistoryFileManager.addIfAbsent produces large amount of logs if number of
cached entries whose age is less than mapreduce.jobhistory.max-age-ms becomes
larger than mapreduce.jobhistory.joblist.cache.size by far.
Example:
For example, if the cache contains 50000 entries in total and 10,000 entries
newer than mapreduce.jobhistory.max-age-ms where
mapreduce.jobhistory.joblist.cache.size is 20000, HistoryFileManager.addIfAbsent
method produces 50000 - 20000 = 30000 lines of "Waiting to remove <key> from
JobListCache because it is not in done yet" message.
Here is stacktrace: https://gist.github.com/frsyuki/6d269dd52c9b8b3cc190
Impact:
In addition to large disk consumption, this issue blocks JobHistory.getJob
long time and slows job execution down significantly because getJob is called
by RPC such as HistoryClientService.HSClientProtocolHandler.getJobReport.
This impact happens because HistoryFileManager.UserLogDir.scanIfNeeded
eventually calls HistoryFileManager.addIfAbsent in a synchronized block. When
multiple threads call scanIfNeeded simultaneously, one of them acquires lock
and the other threads are blocked until the first thread completes long-running
HistoryFileManager.addIfAbsent call.
Solution:
* Reduce amount of logs so that HistoryFileManager.addIfAbsent doesn't take too
long time.
* Good to have if possible: HistoryFileManager.UserLogDir.scanIfNeeded skips
scanning if another thread is already scanning. This changes semantics of
some HistoryFileManager methods (such as getAllFileInfo and getFileInfo)
because scanIfNeeded keep outdated state.
* Good to have if possible: Make scanIfNeeded asynchronous so that RPC calls are
not blocked by a loop at scale of tens of thousands.
This patch implemented the first item.
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
index f0786da..90333c1 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
@@ -219,6 +220,7 @@ public HistoryFileInfo addIfAbsent(HistoryFileInfo fileInfo) {
// keeping the cache size exactly at the maximum.
Iterator<JobId> keys = cache.navigableKeySet().iterator();
long cutoff = System.currentTimeMillis() - maxAge;
+ int waitingCount = 0;
while(cache.size() > maxSize && keys.hasNext()) {
JobId key = keys.next();
HistoryFileInfo firstValue = cache.get(key);
@@ -236,8 +238,7 @@ public HistoryFileInfo addIfAbsent(HistoryFileInfo fileInfo) {
" that could not be moved to done.", e);
}
} else {
- LOG.warn("Waiting to remove " + key
- + " from JobListCache because it is not in done yet.");
+ waitingCount++;
}
} else {
cache.remove(key);
@@ -245,6 +246,10 @@ public HistoryFileInfo addIfAbsent(HistoryFileInfo fileInfo) {
}
}
}
+ if (waitingCount > 0) {
+ LOG.warn("Waiting to remove " + waitingCount
+ + " files from JobListCache because they are not in done yet.");
+ }
}
return old;
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
index f0786da..90333c1 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
@@ -38,6 +38,7 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
@@ -219,6 +220,7 @@ public HistoryFileInfo addIfAbsent(HistoryFileInfo fileInfo) {
// keeping the cache size exactly at the maximum.
Iterator<JobId> keys = cache.navigableKeySet().iterator();
long cutoff = System.currentTimeMillis() - maxAge;
+ int waitingCount = 0;
while(cache.size() > maxSize && keys.hasNext()) {
JobId key = keys.next();
HistoryFileInfo firstValue = cache.get(key);
@@ -236,8 +238,7 @@ public HistoryFileInfo addIfAbsent(HistoryFileInfo fileInfo) {
" that could not be moved to done.", e);
}
} else {
- LOG.warn("Waiting to remove " + key
- + " from JobListCache because it is not in done yet.");
+ waitingCount++;
}
} else {
cache.remove(key);
@@ -245,6 +246,10 @@ public HistoryFileInfo addIfAbsent(HistoryFileInfo fileInfo) {
}
}
}
+ if (waitingCount > 0) {
+ LOG.warn("Waiting to remove " + waitingCount
+ + " files from JobListCache because they are not in done yet.");
+ }
}
return old;
}
@@ -275,22 +280,30 @@ public boolean isFull() {
*/
private class UserLogDir {
long modTime = 0;
+
+ private final ReentrantLock lock = new ReentrantLock();
- public synchronized void scanIfNeeded(FileStatus fs) {
- long newModTime = fs.getModificationTime();
- if (modTime != newModTime) {
- Path p = fs.getPath();
+ public void scanIfNeeded(FileStatus fs) {
+ if (lock.tryLock()) {
try {
- scanIntermediateDirectory(p);
- //If scanning fails, we will scan again. We assume the failure is
- // temporary.
- modTime = newModTime;
- } catch (IOException e) {
- LOG.error("Error while trying to scan the directory " + p, e);
- }
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Scan not needed of " + fs.getPath());
+ long newModTime = fs.getModificationTime();
+ if (modTime != newModTime) {
+ Path p = fs.getPath();
+ try {
+ scanIntermediateDirectory(p);
+ //If scanning fails, we will scan again. We assume the failure is
+ // temporary.
+ modTime = newModTime;
+ } catch (IOException e) {
+ LOG.error("Error while trying to scan the directory " + p, e);
+ }
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Scan not needed of " + fs.getPath());
+ }
+ }
+ } finally {
+ lock.unlock();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment