Last active
April 23, 2019 14:21
-
-
Save squito/76527a394cf167b301108e7c3471a503 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// run with "--conf spark.cleaner.referenceTracking=false" | |
// spin up our full set of executors | |
sc.parallelize(1 to 100, 100).map { x => Thread.sleep(1000); x}.collect() | |
def getLocalDirs(): Array[String] = { | |
val clz = Class.forName("org.apache.spark.util.Utils") | |
val conf = org.apache.spark.SparkEnv.get.conf | |
val method = clz.getMethod("getConfiguredLocalDirs", conf.getClass()) | |
method.setAccessible(true) | |
method.invoke(null, conf).asInstanceOf[Array[String]] | |
} | |
val localDirsByExec = sc.parallelize(1 to 100, 100).map { x => | |
org.apache.spark.SparkEnv.get.executorId -> getLocalDirs().mkString(",") | |
}.collect().toMap | |
localDirsByExec.foreach { println } | |
// put in original code here ... | |
// ... and when you notice the issue, do this: | |
// choose a path here which you know will exist on the executor, that yarn won't clean up, | |
// and you can easily grab later on | |
val targetDir = new java.io.File("/tmp/copiedSparkTmpData") | |
def showShuffleFiles(): = { | |
println(sc.parallelize(1 to 100, 100).map { _ => | |
import scala.collection.JavaConverters._ | |
val localDirs = getLocalDirs() | |
val targetDir = new java.io.File("/tmp/copiedSparkTmpData") | |
val moves = org.apache.spark.SparkEnv.get.synchronized { | |
localDirs.flatMap { d => | |
org.apache.commons.io.FileUtils.listFiles(new java.io.File(d), null, true) | |
.asScala | |
.filter{_.getName().contains("shuffle")} | |
} | |
} | |
org.apache.spark.SparkEnv.get.executorId -> moves | |
}.collect().toMap) | |
} | |
val shuffleFiles = sc.parallelize(1 to 100, 100).map { _ => | |
import scala.collection.JavaConverters._ | |
val localDirs = getLocalDirs() | |
val targetDir = new java.io.File("/tmp/copiedSparkTmpData") | |
// this task may get executed multiple times on each executor, or even concurrently by multiple tasks within one executor | |
// locking the SparkEnv is not safe in general, but it seems I can't create a real object which is common to all tasks in | |
// the repl | |
val moves = org.apache.spark.SparkEnv.get.synchronized { | |
targetDir.mkdirs() | |
localDirs.flatMap { d => | |
val sfs: Array[java.io.File] = org.apache.commons.io.FileUtils.listFiles(new java.io.File(d), null, true) | |
.asScala | |
.filter{_.getName().contains("shuffle")}.toArray | |
sfs.map { sf => | |
val parentDirName = sf.getParentFile().getName() | |
val target = new java.io.File(targetDir, parentDirName) | |
target.mkdirs() | |
val dest = new java.io.File(target, sf.getName()) | |
val name = sf.toString | |
val success = sf.renameTo(dest) | |
name -> (dest.toString, success) | |
} | |
} | |
} | |
org.apache.spark.SparkEnv.get.executorId -> moves | |
}.collect().toMap | |
println("moved shuffle files:" + shuffleFiles) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// very simple code to generate shuffle data to put into the "original code" section | |
sc.parallelize(1 to 1e6.toInt, 100).map { x => (x % 10) -> x } .reduceByKey { _ + _ }.collect() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment