Skip to content

Instantly share code, notes, and snippets.

@rmarsch
Last active June 1, 2016 18:46
Show Gist options
  • Save rmarsch/3ea78b3a9a8a0e83ce162ed947fcab06 to your computer and use it in GitHub Desktop.
Save rmarsch/3ea78b3a9a8a0e83ce162ed947fcab06 to your computer and use it in GitHub Desktop.
Dataset with enriched joins
implicit class DatasetEnrichedJoinWiths[T](ds: Dataset[T])(implicit ct: ClassTag[T]) {
val Inner = "inner"
val Outer = "outer"
val LeftOuter = "left_outer"
val RightOuter = "right_outer"
val LeftSemi = "leftsemi"
def outerJoin[U](other: Dataset[U], expr: Column): Dataset[(Option[T], Option[U])] = {
// Ideally use a better encoder, but this compiles for now
implicit val enc = Encoders.kryo[(Option[T], Option[U])]
ds.joinWith(other, expr, Outer).map { case (t, u) => (Option(t), Option(u)) }
}
def leftOuterJoin[U](other: Dataset[U], expr: Column): Dataset[(T, Option[U])] = {
// Ideally use a better encoder, but this compiles for now
implicit val enc = Encoders.kryo[(T, Option[U])]
ds.joinWith(other, expr, LeftOuter).map { case (t, u) => (t, Option(u)) }
}
def rightOuterJoin[U](other: Dataset[U], expr: Column): Dataset[(Option[T], U)] = {
// Ideally use a better encoder, but this compiles for now
implicit val enc = Encoders.kryo[(Option[T], U)]
ds.joinWith(other, expr, RightOuter).map { case (t, u) => (Option(t), u) }
}
def leftSemiJoin[U](other: Dataset[U], expr: Column)(implicit tEnc: Encoder[T]): Dataset[T] = {
// Ideally use a better encoder, but this compiles for now
implicit val enc = Encoders.kryo[Option[T]]
ds.joinWith(other, expr, LeftSemi).flatMap { case (t, _) => Option(t) }
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment