Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save skp33/c872699fa120a1ffee8e078cdf45b208 to your computer and use it in GitHub Desktop.
Save skp33/c872699fa120a1ffee8e078cdf45b208 to your computer and use it in GitHub Desktop.
implicit class DataFrameExtended(df: DataFrame) {
import df.sqlContext.implicits._
def anyNull(cols: Seq[Column]): Column = cols.map(_.isNull).reduce (_ || _)
/**
* LEFT JOIN should not join anything when join-key contains a NULL (but usually this
* would result in shuffling NULL keyed items into single or few reducers).
* This can be easily fixed by adding an additional temporary join condition that:
* - is a random seed when any of the keys is null, thus addressing the NULL skew
* by distributing rows evenly/randomly among the nodes
* - or a constant to both join sides otherwise, where join will be executed as usually
* @param rightDf
* @param joinConditions
* @param leftNullableCols - nullable columns on left DF
* @param rightNullableCols - nullable columns on right DF
*/
def nullSafeLeftJoin(rightDf: DataFrame,
joinConditions: Column,
leftNullableCols: Seq[Column],
rightNullableCols: Seq[Column],
joinType: String,
alias1 = "tbl1",
alias2 = "tbl2") = {
val leftNullSafe = df.withColumn("sprayNullsKey1",
when(anyNull(leftNullableCols), (rand() * 100000).cast(LongType))
.otherwise(0))
// null items on the right are thrown out, so filter them out
val rightNullSafe = rightDf
.filter(not(anyNull(rightNullableCols)))
.withColumn("sprayNullsKey2", lit(0))
leftNullSafe.as(alias1).join(
rightNullSafe.as(alias2),
$"sprayNullsKey1" === $"sprayNullsKey2" && joinConditions,
joinType
)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment