Last active
March 15, 2019 06:31
-
-
Save squito/788abf5f55a96deeea65 to your computer and use it in GitHub Desktop.
get paths of a spark-sql query
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.lang.reflect.Method | |
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan | |
import org.apache.spark.sql.sources.{HadoopFsRelation, BaseRelation} | |
import org.apache.spark.sql.DataFrame | |
def getPaths(relation: BaseRelation): Iterator[String] = { | |
relation match { | |
case hr: HadoopFsRelation => | |
hr.paths.toIterator | |
case other: BaseRelation => | |
throw new IllegalArgumentException(s"wasn't expecting relation ${other}") | |
} | |
} | |
def getPaths(plan: LogicalPlan): Iterator[String] = { | |
plan match { | |
// LogicalRelation will be public in the next release, but for now, need to use a reflection hack | |
// https://issues.apache.org/jira/browse/SPARK-7275 | |
case lr if lr.getClass() == Class.forName("org.apache.spark.sql.execution.datasources.LogicalRelation") => | |
getPaths(get("relation", lr).asInstanceOf[BaseRelation]) | |
case other => | |
plan.children.map { | |
getPaths | |
}.reduce(_ ++ _) | |
} | |
} | |
def getPaths(df: DataFrame): Iterator[String] = { | |
getPaths(df.queryExecution.analyzed) | |
} | |
// some reflection helpers. Overkill for this example, but I use them in general so putting them here | |
// note that you won't need reflection at all after SPARK-7275 | |
def methods(obj: Any): Seq[String] = { | |
obj.getClass().getDeclaredMethods().map(_.getName()).sorted | |
} | |
def findMethod(name: String, obj: Any): Method = { | |
val method = obj.getClass().getDeclaredMethods().find(_.getName() == name).get | |
method.setAccessible(true) | |
method | |
} | |
def get(name: String, obj: Any): Any = { | |
val clz = obj.getClass() | |
clz.getDeclaredFields().find(_.getName() == name) match { | |
case Some(f) => | |
f.setAccessible(true) | |
f.get(obj) | |
case None => | |
val m = findMethod(name, obj) | |
m.invoke(obj) | |
} | |
} | |
// end reflection helpers | |
val df = sqlContext.sql("select count(1) from catalog_sales") | |
scala> getPaths(df).length | |
res4: Int = 1836 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment