Skip to content

Instantly share code, notes, and snippets.

@pezon
Created May 3, 2019 16:12
Show Gist options
  • Save pezon/b71e30d0f30afca15fab6495f688fccd to your computer and use it in GitHub Desktop.
Save pezon/b71e30d0f30afca15fab6495f688fccd to your computer and use it in GitHub Desktop.
Applying a UDF function to multiple columns of different types. The UDF function here (null operation) is trivial.
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
object NullTransformer {
val nullStringColumnUDF = udf(() => None: Option[String])
val nullLongColumnUDF = udf(() => None: Option[Long])
val nullIntegerColumnUDF = udf(() => None: Option[Integer])
val nullFloatColumnUDF = udf(() => None: Option[Float])
val nullDoubleColumnUDF = udf(() => None: Option[Double])
val nullBooleanColumnUDF = udf(() => None: Option[Boolean])
def withNullColumns(columns: Seq[String])(df: DataFrame) = {
val stringColumns = df.schema.fields.filter(_.dataType == StringType).map(_.name)
val longColumns = df.schema.fields.filter(_.dataType == LongType).map(_.name)
val floatColumns = df.schema.fields.filter(_.dataType == FloatType).map(_.name)
val doubleColumns = df.schema.fields.filter(_.dataType == DoubleType).map(_.name)
val integerColumns = df.schema.fields.filter(_.dataType == IntegerType).map(_.name)
val booleanColumns = df.schema.fields.filter(_.dataType == BooleanType).map(_.name)
df.columns.foldLeft(df) {
(memoDF, columnName) => {
if (columns.contains(columnName) && stringColumns.contains(columnName)) {
memoDF.withColumn(columnName, nullStringColumnUDF())
} else if (columns.contains(columnName) && longColumns.contains(columnName)) {
memoDF.withColumn(columnName, nullLongColumnUDF())
} else if (columns.contains(columnName) && integerColumns.contains(columnName)) {
memoDF.withColumn(columnName, nullIntegerColumnUDF())
} else if (columns.contains(columnName) && floatColumns.contains(columnName)) {
memoDF.withColumn(columnName, nullFloatColumnUDF())
} else if (columns.contains(columnName) && doubleColumns.contains(columnName)) {
memoDF.withColumn(columnName, nullDoubleColumnUDF())
} else if (columns.contains(columnName) && booleanColumns.contains(columnName)) {
memoDF.withColumn(columnName, nullBooleanColumnUDF())
} else {
memoDF.withColumn(columnName, col(columnName))
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment