Skip to content

Instantly share code, notes, and snippets.

@marmbrus
Last active December 31, 2015 10:09
Show Gist options
  • Save marmbrus/7971431 to your computer and use it in GitHub Desktop.
Save marmbrus/7971431 to your computer and use it in GitHub Desktop.
An example of using catalyst's TreeNode transform functionality to replace UnresolvedRelations with other logical plans.
[info] Running catalyst.ViewsExample
Unresolved Plan:
Project {'a,'d}
Filter ('c > 1)
Join Inner, Some(('a = 'c))
UnresolvedRelation view1, None
UnresolvedRelation view2, None
With relations:
Project {'a,'d}
Filter ('c > 1)
Join Inner, Some(('a = 'c))
Project {'a}
LocalRelation {a#0,b#1}, {(1,a),(2,b)}
LocalRelation {c#2,d#3}, {(1,c),(2,d)}
Analyzed:
Project {a#0,d#3}
Filter (c#2 > 1)
Join Inner, Some((a#0 = c#2))
Project {a#0}
LocalRelation {a#0,b#1}, {(1,a),(2,b)}
LocalRelation {c#2,d#3}, {(1,c),(2,d)}
Physical plan:
Project {a#0:0.0,d#3:0.2}
Filter (c#2:0.1 > 1)
SparkEquiInnerJoin {a#0:0.0}, {c#2:1.0}
Project {a#0:0.0}
LocalRelation {a#0,b#1}, {Vector(1, a),Vector(2, b)}
LocalRelation {c#2,d#3}, {Vector(1, c),Vector(2, d)}
Answer: WrappedArray(Vector(2, d))
import catalyst.analysis.UnresolvedRelation
import catalyst.plans.Inner
import catalyst.plans.logical._
/* Implicit Conversions */
import dsl._
import shark2.TestShark._ // For .toRdd execution using locally running test Shark.
object ViewsExample {
def main(args: Array[String]): Unit = {
// Create a list of named views that can be substituted into logical plans.
// In this example the views read from local, in-memory relations with schema (a INT, b STRING) and (c INT, d STRING)
// respectively. loadData returns a copy of that relation with the specified tuples appended to the Rdd.
// The .select uses the DSL to add a projection on top of the relation that returns only the column "a".
val views = Map(
"view1" -> LocalRelation('a.int, 'b.string).loadData((1, "a") :: (2, "b") :: Nil).select('a),
"view2" -> LocalRelation('c.int, 'd.string).loadData((1, "c") :: (2, "d") :: Nil)
)
// Construct a plan that has UnresolvedRelations in it using the DSL.
val unresolvedPlan = (
UnresolvedRelation("view1")
.join(UnresolvedRelation("view2"), Inner, Some('a === 'c))
.where('c > 1)
.select('a, 'd))
println(s"Unresolved Plan:\n$unresolvedPlan")
// Replace UnresolvedRelations with logical plans from the views map.
val withRelations = unresolvedPlan transform {
case UnresolvedRelation(name, _) => views(name)
}
println(s"With relations:\n$withRelations ")
println(s"Analyzed:\n${withRelations.analyze}") // Print with all references resolved.
println(s"Physical plan:\n${withRelations.executedPlan}") // Print shark plan.
println(s"Answer: ${withRelations.toRdd.collect.toSeq}")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment