Skip to content

Instantly share code, notes, and snippets.

@d4rkc0de
Last active October 26, 2022 12:34
Show Gist options
  • Save d4rkc0de/159f68b16cd89441cf7cc3160fe9f714 to your computer and use it in GitHub Desktop.
Save d4rkc0de/159f68b16cd89441cf7cc3160fe9f714 to your computer and use it in GitHub Desktop.
// when you have a skewed data when joining this method will fix it
def saltedJoin(df: DataFrame, buildDf: DataFrame, joinExpression: Column, joinType: String, salt: Int): DataFrame = {
import org.apache.spark.sql.functions._
val tmpDf = buildDf.withColumn("slt_range", array(Range(0, salt).toList.map(lit): _*))
val tableDf = tmpDf.withColumn("slt_ratio_s", explode(tmpDf("slt_range"))).drop("slt_range")
val streamDf = df.withColumn("slt_ratio", monotonically_increasing_id % salt)
val saltedExpr = streamDf("slt_ratio") === tableDf("slt_ratio_s") && joinExpression
streamDf.join(tableDf, saltedExpr, joinType).drop("slt_ratio_s").drop("slt_ratio")
}
df
.saltedJoin(geoDataDf, exprGeo, “left”, 200)
.saltedJoin(userAgentDf, exprUserAgent, “left”, 200)
.saltedJoin(ownerMetadataDf, exprOwnerMetadata, “left”, 200)
.write
.parquet(“s3://...”)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment