Created
August 3, 2018 14:03
-
-
Save mimiyan/031f3c25455792d92a01f83ac415a94f to your computer and use it in GitHub Desktop.
This piece of test code will lead to Spark WARN "Managed memory leak detected"
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package test | |
import com.holdenkarau.spark.testing.{ DataFrameSuiteBase } | |
import org.apache.spark.sql.{ DataFrame } | |
import org.apache.spark.sql.SparkSession | |
import org.apache.spark.sql.functions.{ array, col, collect_set, udf } | |
import org.graphframes.GraphFrame | |
import org.graphframes.lib.AggregateMessages | |
import org.scalatest.FunSuite | |
object TestMemoryTest { | |
val spark = SparkSession.builder.master("local").config("spark.sql.shuffle.partitions", 8).getOrCreate() | |
//val spark = SparkSession.builder.getOrCreate() | |
val sparkContext = spark.sparkContext | |
import spark.implicits._ | |
// Create edges DataFrame with columns: tenant_id, from, to, and count. | |
def createEdgesDF(): DataFrame = { | |
val edges = Seq( | |
("u1", "u2"), | |
("u1", "u3"), | |
("u3", "u6"), | |
("u6", "u7"), | |
("u4", "u5"), | |
("u5", "u7"), | |
("u5", "u6"), | |
("u6", "u1") | |
) | |
edges.toDF("src", "dst") | |
} | |
def getGraphFrame(edgesDF: DataFrame): GraphFrame = { | |
val v = edgesDF.withColumnRenamed("src", "id").drop("dst") | |
.union(edgesDF.withColumnRenamed("dst", "id").drop("src")) | |
.distinct | |
.withColumn("egoSubgraph", array(col("id"))) | |
GraphFrame(v, edgesDF) | |
} | |
def getEgo(): DataFrame = { | |
val edgesDF = createEdgesDF() | |
val G = getGraphFrame(edgesDF) | |
var g = G | |
val flattenUdf = udf((x: Seq[Seq[String]]) => x.flatten.distinct) | |
val msgToSrc1 = AggregateMessages.dst("egoSubgraph") | |
var agg: DataFrame = null | |
/** | |
* if there is empty partition in edges or vertices, | |
* the following code will lead to Spark warning info: | |
* org.apache.spark.executor.Executor....[Executor task launch worker for task 100] Managed memory leak detected; size = 262144 bytes, TID = 100 | |
*/ | |
for (i <- 0 to 1) { | |
agg = g.aggregateMessages | |
.sendToSrc(msgToSrc1) | |
.agg(flattenUdf(collect_set(AggregateMessages.msg)).as("egoSubgraph")) | |
g = GraphFrame(agg, g.edges) | |
} | |
g.vertices | |
} | |
} | |
class TestMemoryTest extends FunSuite with DataFrameSuiteBase { | |
test("test managed memory leakage") { | |
val v = TestMemoryTest.getEgo() | |
v.count | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment