Last active
December 31, 2015 10:09
-
-
Save marmbrus/7971431 to your computer and use it in GitHub Desktop.
An example of using catalyst's TreeNode transform functionality to replace UnresolvedRelations with other logical plans.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[info] Running catalyst.ViewsExample | |
Unresolved Plan: | |
Project {'a,'d} | |
Filter ('c > 1) | |
Join Inner, Some(('a = 'c)) | |
UnresolvedRelation view1, None | |
UnresolvedRelation view2, None | |
With relations: | |
Project {'a,'d} | |
Filter ('c > 1) | |
Join Inner, Some(('a = 'c)) | |
Project {'a} | |
LocalRelation {a#0,b#1}, {(1,a),(2,b)} | |
LocalRelation {c#2,d#3}, {(1,c),(2,d)} | |
Analyzed: | |
Project {a#0,d#3} | |
Filter (c#2 > 1) | |
Join Inner, Some((a#0 = c#2)) | |
Project {a#0} | |
LocalRelation {a#0,b#1}, {(1,a),(2,b)} | |
LocalRelation {c#2,d#3}, {(1,c),(2,d)} | |
Physical plan: | |
Project {a#0:0.0,d#3:0.2} | |
Filter (c#2:0.1 > 1) | |
SparkEquiInnerJoin {a#0:0.0}, {c#2:1.0} | |
Project {a#0:0.0} | |
LocalRelation {a#0,b#1}, {Vector(1, a),Vector(2, b)} | |
LocalRelation {c#2,d#3}, {Vector(1, c),Vector(2, d)} | |
Answer: WrappedArray(Vector(2, d)) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import catalyst.analysis.UnresolvedRelation | |
import catalyst.plans.Inner | |
import catalyst.plans.logical._ | |
/* Implicit Conversions */ | |
import dsl._ | |
import shark2.TestShark._ // For .toRdd execution using locally running test Shark. | |
object ViewsExample { | |
def main(args: Array[String]): Unit = { | |
// Create a list of named views that can be substituted into logical plans. | |
// In this example the views read from local, in-memory relations with schema (a INT, b STRING) and (c INT, d STRING) | |
// respectively. loadData returns a copy of that relation with the specified tuples appended to the Rdd. | |
// The .select uses the DSL to add a projection on top of the relation that returns only the column "a". | |
val views = Map( | |
"view1" -> LocalRelation('a.int, 'b.string).loadData((1, "a") :: (2, "b") :: Nil).select('a), | |
"view2" -> LocalRelation('c.int, 'd.string).loadData((1, "c") :: (2, "d") :: Nil) | |
) | |
// Construct a plan that has UnresolvedRelations in it using the DSL. | |
val unresolvedPlan = ( | |
UnresolvedRelation("view1") | |
.join(UnresolvedRelation("view2"), Inner, Some('a === 'c)) | |
.where('c > 1) | |
.select('a, 'd)) | |
println(s"Unresolved Plan:\n$unresolvedPlan") | |
// Replace UnresolvedRelations with logical plans from the views map. | |
val withRelations = unresolvedPlan transform { | |
case UnresolvedRelation(name, _) => views(name) | |
} | |
println(s"With relations:\n$withRelations ") | |
println(s"Analyzed:\n${withRelations.analyze}") // Print with all references resolved. | |
println(s"Physical plan:\n${withRelations.executedPlan}") // Print shark plan. | |
println(s"Answer: ${withRelations.toRdd.collect.toSeq}") | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment