Skip to content

Instantly share code, notes, and snippets.

@nightscape
Last active August 29, 2017 12:38
Show Gist options
  • Select an option

  • Save nightscape/ed0da08aa1af1ed618a4682de6d790e5 to your computer and use it in GitHub Desktop.

Select an option

Save nightscape/ed0da08aa1af1ed618a4682de6d790e5 to your computer and use it in GitHub Desktop.
Compares two DataFrames and creates a resulting DataFrame with diffs
import org.apache.spark.sql.DataFrame
def compareDataFrames(dfA: DataFrame, dfB: DataFrame, joinFields: Array[String]): (DataFrame, Seq[String]) = {
val nonJoinFields = (dfA.schema.fieldNames ++ dfB.schema.fieldNames).distinct.diff(joinFields)
val joined = dfA.join(dfB, joinFields)
val differing = joined.where(nonJoinFields.map(f => dfA(f) =!= dfB(f)).reduce(_ || _)).cache
val differingFields = nonJoinFields.filter(f => differing.select(dfA(f)).except(differing.select(dfB(f))).count + differing.select(dfB(f)).except(differing.select(dfA(f))).count > 0)
val nonDifferingNonJoinFields = nonJoinFields.diff(differingFields)
val differingRenamed = differingFields.foldLeft(differing) { case(df, field) => df.withColumn(s"${field}_a", dfA(field)).withColumn(s"${field}_b", dfB(field)).drop(field) }
val comparisonDf = nonDifferingNonJoinFields.foldLeft(differingRenamed) { case(df, field) => df.withColumn(s"${field}_common", dfA(field)).drop(field) }
(comparisonDf, differingFields)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment