Last active
August 29, 2015 14:25
-
-
Save frsyuki/a3416e36d4d70b30f48d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Problem: | |
HistoryFileManager.addIfAbsent produces large amount of logs if number of | |
cached entries whose age is less than mapreduce.jobhistory.max-age-ms becomes | |
larger than mapreduce.jobhistory.joblist.cache.size by far. | |
Example: | |
For example, if the cache contains 50000 entries in total and 10,000 entries | |
newer than mapreduce.jobhistory.max-age-ms where | |
mapreduce.jobhistory.joblist.cache.size is 20000, HistoryFileManager.addIfAbsent | |
method produces 50000 - 20000 = 30000 lines of "Waiting to remove <key> from | |
JobListCache because it is not in done yet" message. | |
Here is stacktrace: https://gist.github.com/frsyuki/6d269dd52c9b8b3cc190 | |
Impact: | |
In addition to large disk consumption, this issue blocks JobHistory.getJob | |
long time and slows job execution down significantly because getJob is called | |
by RPC such as HistoryClientService.HSClientProtocolHandler.getJobReport. | |
This impact happens because HistoryFileManager.UserLogDir.scanIfNeeded | |
eventually calls HistoryFileManager.addIfAbsent in a synchronized block. When | |
multiple threads call scanIfNeeded simultaneously, one of them acquires lock | |
and the other threads are blocked until the first thread completes long-running | |
HistoryFileManager.addIfAbsent call. | |
Solution: | |
* Reduce amount of logs so that HistoryFileManager.addIfAbsent doesn't take too | |
long time. | |
* Good to have if possible: HistoryFileManager.UserLogDir.scanIfNeeded skips | |
scanning if another thread is already scanning. This changes semantics of | |
some HistoryFileManager methods (such as getAllFileInfo and getFileInfo) | |
because scanIfNeeded keep outdated state. | |
* Good to have if possible: Make scanIfNeeded asynchronous so that RPC calls are | |
not blocked by a loop at scale of tens of thousands. | |
This patch implemented the first item. | |
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java | |
index f0786da..90333c1 100644 | |
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java | |
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java | |
@@ -219,6 +220,7 @@ public HistoryFileInfo addIfAbsent(HistoryFileInfo fileInfo) { | |
// keeping the cache size exactly at the maximum. | |
Iterator<JobId> keys = cache.navigableKeySet().iterator(); | |
long cutoff = System.currentTimeMillis() - maxAge; | |
+ int waitingCount = 0; | |
while(cache.size() > maxSize && keys.hasNext()) { | |
JobId key = keys.next(); | |
HistoryFileInfo firstValue = cache.get(key); | |
@@ -236,8 +238,7 @@ public HistoryFileInfo addIfAbsent(HistoryFileInfo fileInfo) { | |
" that could not be moved to done.", e); | |
} | |
} else { | |
- LOG.warn("Waiting to remove " + key | |
- + " from JobListCache because it is not in done yet."); | |
+ waitingCount++; | |
} | |
} else { | |
cache.remove(key); | |
@@ -245,6 +246,10 @@ public HistoryFileInfo addIfAbsent(HistoryFileInfo fileInfo) { | |
} | |
} | |
} | |
+ if (waitingCount > 0) { | |
+ LOG.warn("Waiting to remove " + waitingCount | |
+ + " files from JobListCache because they are not in done yet."); | |
+ } | |
} | |
return old; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java | |
index f0786da..90333c1 100644 | |
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java | |
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java | |
@@ -38,6 +38,7 @@ | |
import java.util.concurrent.ThreadFactory; | |
import java.util.concurrent.ThreadPoolExecutor; | |
import java.util.concurrent.TimeUnit; | |
+import java.util.concurrent.locks.ReentrantLock; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import org.apache.commons.logging.Log; | |
@@ -219,6 +220,7 @@ public HistoryFileInfo addIfAbsent(HistoryFileInfo fileInfo) { | |
// keeping the cache size exactly at the maximum. | |
Iterator<JobId> keys = cache.navigableKeySet().iterator(); | |
long cutoff = System.currentTimeMillis() - maxAge; | |
+ int waitingCount = 0; | |
while(cache.size() > maxSize && keys.hasNext()) { | |
JobId key = keys.next(); | |
HistoryFileInfo firstValue = cache.get(key); | |
@@ -236,8 +238,7 @@ public HistoryFileInfo addIfAbsent(HistoryFileInfo fileInfo) { | |
" that could not be moved to done.", e); | |
} | |
} else { | |
- LOG.warn("Waiting to remove " + key | |
- + " from JobListCache because it is not in done yet."); | |
+ waitingCount++; | |
} | |
} else { | |
cache.remove(key); | |
@@ -245,6 +246,10 @@ public HistoryFileInfo addIfAbsent(HistoryFileInfo fileInfo) { | |
} | |
} | |
} | |
+ if (waitingCount > 0) { | |
+ LOG.warn("Waiting to remove " + waitingCount | |
+ + " files from JobListCache because they are not in done yet."); | |
+ } | |
} | |
return old; | |
} | |
@@ -275,22 +280,30 @@ public boolean isFull() { | |
*/ | |
private class UserLogDir { | |
long modTime = 0; | |
+ | |
+ private final ReentrantLock lock = new ReentrantLock(); | |
- public synchronized void scanIfNeeded(FileStatus fs) { | |
- long newModTime = fs.getModificationTime(); | |
- if (modTime != newModTime) { | |
- Path p = fs.getPath(); | |
+ public void scanIfNeeded(FileStatus fs) { | |
+ if (lock.tryLock()) { | |
try { | |
- scanIntermediateDirectory(p); | |
- //If scanning fails, we will scan again. We assume the failure is | |
- // temporary. | |
- modTime = newModTime; | |
- } catch (IOException e) { | |
- LOG.error("Error while trying to scan the directory " + p, e); | |
- } | |
- } else { | |
- if (LOG.isDebugEnabled()) { | |
- LOG.debug("Scan not needed of " + fs.getPath()); | |
+ long newModTime = fs.getModificationTime(); | |
+ if (modTime != newModTime) { | |
+ Path p = fs.getPath(); | |
+ try { | |
+ scanIntermediateDirectory(p); | |
+ //If scanning fails, we will scan again. We assume the failure is | |
+ // temporary. | |
+ modTime = newModTime; | |
+ } catch (IOException e) { | |
+ LOG.error("Error while trying to scan the directory " + p, e); | |
+ } | |
+ } else { | |
+ if (LOG.isDebugEnabled()) { | |
+ LOG.debug("Scan not needed of " + fs.getPath()); | |
+ } | |
+ } | |
+ } finally { | |
+ lock.unlock(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment