Last active
September 27, 2017 08:52
-
-
Save kanterov/49014d462b265eaf54353f09f0ac517a to your computer and use it in GitHub Desktop.
Different kinds of corner cases with join in frameless
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package frameless | |
| import shapeless.Witness | |
| class AmbiguousJoinTests extends TypedDatasetSuite { | |
| def col[T, A](column: Witness.Lt[Symbol])( | |
| implicit | |
| exists: TypedColumn.Exists[T, column.T, A], | |
| encoder: TypedEncoder[A]): TypedColumn[T, A] = { | |
| val untypedExpr = org.apache.spark.sql.functions.col(column.value.name).as[A](TypedExpressionEncoder[A]) | |
| new TypedColumn[T, A](untypedExpr) | |
| } | |
| test("ambigous join (1)") { | |
| val pairs = List(X1(1), X1(2)) | |
| val df1 = TypedDataset.create(pairs) | |
| val df2 = TypedDataset.create(pairs) | |
| val _a1: TypedColumn[X1[Int], Int] = df1.col('a) | |
| val _a2: TypedColumn[X1[Int], Int] = df2.col('a) | |
| /* | |
| (1) Doesn't work: | |
| [info] org.apache.spark.sql.AnalysisException: resolved attribute(s) a#8,a#15 missing from a#27,a#36 in operator !Join Inner, (a#8 = a#15);; | |
| [info] !Join Inner, (a#8 = a#15) | |
| [info] :- SerializeFromObject [input[0, frameless.X1, false].a AS a#27] | |
| [info] : +- MapElements <function1>, class frameless.X1, [StructField(a,IntegerType,false)], obj#26: frameless.X1 | |
| [info] : +- DeserializeToObject newInstance(class frameless.X1), obj#25: frameless.X1 | |
| [info] : +- LocalRelation [a#8] | |
| [info] +- SerializeFromObject [input[0, frameless.X1, false].a AS a#36] | |
| [info] +- MapElements <function1>, class frameless.X1, [StructField(a,IntegerType,false)], obj#35: frameless.X1 | |
| [info] +- DeserializeToObject newInstance(class frameless.X1), obj#34: frameless.X1 | |
| [info] +- LocalRelation [a#8] | |
| */ | |
| val from_df1 = df1.deserialized.map(x => x) | |
| val from_df2 = df1.deserialized.map(x => x) | |
| /* | |
| (2) Doesn't work: | |
| Need to know if expression is left or right to resolve conflict | |
| [info] org.apache.spark.sql.AnalysisException: Reference 'a' is ambiguous, could be: a#1, a#8.; | |
| */ | |
| val _a0: TypedColumn[X1[Int], Int] = col('a) | |
| // | |
| // _a === _a | |
| // | |
| println("_a === _a") | |
| df1.joinInner(df2) { _a1 === _a2 }.show().run() | |
| // see (1) | |
| // println("\nvs\n") | |
| // from_df1.joinInner(from_df2) { _a1 === _a2 }.show().run() | |
| // see (2) | |
| // println("\nvs\n") | |
| // df1.joinInner(df2) { _a0 === _a0 }.show().run() | |
| // works because of TypedDataset#resolveSelfJoin | |
| println("\nvs\n") | |
| df1.joinInner(df2) { _a1 === _a1 }.show().run() | |
| println("") | |
| // | |
| // _a >= _a | |
| // | |
| println("_a >= _a") | |
| df1.joinInner(df2) { _a1 >= _a2 }.show().run() | |
| // see (1) | |
| // println("\nvs\n") | |
| // from_df1.joinInner(from_df2) { _a1 >= _a2 }.show().run() | |
| // see (2) | |
| // println("\nvs\n") | |
| // df1.joinInner(df2) { _a0 >= _a0 }.show().run() | |
| /* | |
| Doesn't work (3): | |
| [info] org.apache.spark.sql.AnalysisException: Detected cartesian product for INNER join between logical plans | |
| [info] LocalRelation [a#1] | |
| [info] and | |
| [info] LocalRelation [a#8] | |
| [info] Join condition is missing or trivial. | |
| [info] Use the CROSS JOIN syntax to allow cartesian products between these relations.; | |
| */ | |
| // println("\nvs\n") | |
| // df1.joinInner(df2) { _a1 >= _a1 }.show().run() | |
| println("") | |
| // | |
| // _a <= _a | |
| // | |
| println("_a <= _a") | |
| df1.joinInner(df2) { _a1 <= _a2 }.show().run() | |
| // see (1) | |
| // println("\nvs\n") | |
| // from_df1.joinInner(from_df2) { _a1 <= _a2 }.show().run() | |
| // see (2) | |
| // println("\nvs\n") | |
| // df1.joinInner(df2) { _a0 <= _a0 }.show().run() | |
| // see (3) | |
| // println("\nvs\n") | |
| // df1.joinInner(df2) { _a1 <= _a1 }.show().run() | |
| println("") | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment