Skip to content

Instantly share code, notes, and snippets.

@kanterov
Last active September 27, 2017 08:52
Show Gist options
  • Select an option

  • Save kanterov/49014d462b265eaf54353f09f0ac517a to your computer and use it in GitHub Desktop.

Select an option

Save kanterov/49014d462b265eaf54353f09f0ac517a to your computer and use it in GitHub Desktop.
Different kinds of corner cases with join in frameless
package frameless
import shapeless.Witness
class AmbiguousJoinTests extends TypedDatasetSuite {
def col[T, A](column: Witness.Lt[Symbol])(
implicit
exists: TypedColumn.Exists[T, column.T, A],
encoder: TypedEncoder[A]): TypedColumn[T, A] = {
val untypedExpr = org.apache.spark.sql.functions.col(column.value.name).as[A](TypedExpressionEncoder[A])
new TypedColumn[T, A](untypedExpr)
}
test("ambigous join (1)") {
val pairs = List(X1(1), X1(2))
val df1 = TypedDataset.create(pairs)
val df2 = TypedDataset.create(pairs)
val _a1: TypedColumn[X1[Int], Int] = df1.col('a)
val _a2: TypedColumn[X1[Int], Int] = df2.col('a)
/*
(1) Doesn't work:
[info] org.apache.spark.sql.AnalysisException: resolved attribute(s) a#8,a#15 missing from a#27,a#36 in operator !Join Inner, (a#8 = a#15);;
[info] !Join Inner, (a#8 = a#15)
[info] :- SerializeFromObject [input[0, frameless.X1, false].a AS a#27]
[info] : +- MapElements <function1>, class frameless.X1, [StructField(a,IntegerType,false)], obj#26: frameless.X1
[info] : +- DeserializeToObject newInstance(class frameless.X1), obj#25: frameless.X1
[info] : +- LocalRelation [a#8]
[info] +- SerializeFromObject [input[0, frameless.X1, false].a AS a#36]
[info] +- MapElements <function1>, class frameless.X1, [StructField(a,IntegerType,false)], obj#35: frameless.X1
[info] +- DeserializeToObject newInstance(class frameless.X1), obj#34: frameless.X1
[info] +- LocalRelation [a#8]
*/
val from_df1 = df1.deserialized.map(x => x)
val from_df2 = df1.deserialized.map(x => x)
/*
(2) Doesn't work:
Need to know if expression is left or right to resolve conflict
[info] org.apache.spark.sql.AnalysisException: Reference 'a' is ambiguous, could be: a#1, a#8.;
*/
val _a0: TypedColumn[X1[Int], Int] = col('a)
//
// _a === _a
//
println("_a === _a")
df1.joinInner(df2) { _a1 === _a2 }.show().run()
// see (1)
// println("\nvs\n")
// from_df1.joinInner(from_df2) { _a1 === _a2 }.show().run()
// see (2)
// println("\nvs\n")
// df1.joinInner(df2) { _a0 === _a0 }.show().run()
// works because of TypedDataset#resolveSelfJoin
println("\nvs\n")
df1.joinInner(df2) { _a1 === _a1 }.show().run()
println("")
//
// _a >= _a
//
println("_a >= _a")
df1.joinInner(df2) { _a1 >= _a2 }.show().run()
// see (1)
// println("\nvs\n")
// from_df1.joinInner(from_df2) { _a1 >= _a2 }.show().run()
// see (2)
// println("\nvs\n")
// df1.joinInner(df2) { _a0 >= _a0 }.show().run()
/*
Doesn't work (3):
[info] org.apache.spark.sql.AnalysisException: Detected cartesian product for INNER join between logical plans
[info] LocalRelation [a#1]
[info] and
[info] LocalRelation [a#8]
[info] Join condition is missing or trivial.
[info] Use the CROSS JOIN syntax to allow cartesian products between these relations.;
*/
// println("\nvs\n")
// df1.joinInner(df2) { _a1 >= _a1 }.show().run()
println("")
//
// _a <= _a
//
println("_a <= _a")
df1.joinInner(df2) { _a1 <= _a2 }.show().run()
// see (1)
// println("\nvs\n")
// from_df1.joinInner(from_df2) { _a1 <= _a2 }.show().run()
// see (2)
// println("\nvs\n")
// df1.joinInner(df2) { _a0 <= _a0 }.show().run()
// see (3)
// println("\nvs\n")
// df1.joinInner(df2) { _a1 <= _a1 }.show().run()
println("")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment