-
-
Save skp33/c872699fa120a1ffee8e078cdf45b208 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
implicit class DataFrameExtended(df: DataFrame) { | |
import df.sqlContext.implicits._ | |
def anyNull(cols: Seq[Column]): Column = cols.map(_.isNull).reduce (_ || _) | |
/** | |
* LEFT JOIN should not join anything when join-key contains a NULL (but usually this | |
* would result in shuffling NULL keyed items into single or few reducers). | |
* This can be easily fixed by adding an additional temporary join condition that: | |
* - is a random seed when any of the keys is null, thus addressing the NULL skew | |
* by distributing rows evenly/randomly among the nodes | |
* - or a constant to both join sides otherwise, where join will be executed as usually | |
* @param rightDf | |
* @param joinConditions | |
* @param leftNullableCols - nullable columns on left DF | |
* @param rightNullableCols - nullable columns on right DF | |
*/ | |
def nullSafeLeftJoin(rightDf: DataFrame, | |
joinConditions: Column, | |
leftNullableCols: Seq[Column], | |
rightNullableCols: Seq[Column], | |
joinType: String, | |
alias1 = "tbl1", | |
alias2 = "tbl2") = { | |
val leftNullSafe = df.withColumn("sprayNullsKey1", | |
when(anyNull(leftNullableCols), (rand() * 100000).cast(LongType)) | |
.otherwise(0)) | |
// null items on the right are thrown out, so filter them out | |
val rightNullSafe = rightDf | |
.filter(not(anyNull(rightNullableCols))) | |
.withColumn("sprayNullsKey2", lit(0)) | |
leftNullSafe.as(alias1).join( | |
rightNullSafe.as(alias2), | |
$"sprayNullsKey1" === $"sprayNullsKey2" && joinConditions, | |
joinType | |
) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment