Created
May 3, 2019 16:12
-
-
Save pezon/b71e30d0f30afca15fab6495f688fccd to your computer and use it in GitHub Desktop.
Applying a UDF function to multiple columns of different types. The UDF function here (null operation) is trivial.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.sql._ | |
import org.apache.spark.sql.functions._ | |
import org.apache.spark.sql.types._ | |
object NullTransformer { | |
val nullStringColumnUDF = udf(() => None: Option[String]) | |
val nullLongColumnUDF = udf(() => None: Option[Long]) | |
val nullIntegerColumnUDF = udf(() => None: Option[Integer]) | |
val nullFloatColumnUDF = udf(() => None: Option[Float]) | |
val nullDoubleColumnUDF = udf(() => None: Option[Double]) | |
val nullBooleanColumnUDF = udf(() => None: Option[Boolean]) | |
def withNullColumns(columns: Seq[String])(df: DataFrame) = { | |
val stringColumns = df.schema.fields.filter(_.dataType == StringType).map(_.name) | |
val longColumns = df.schema.fields.filter(_.dataType == LongType).map(_.name) | |
val floatColumns = df.schema.fields.filter(_.dataType == FloatType).map(_.name) | |
val doubleColumns = df.schema.fields.filter(_.dataType == DoubleType).map(_.name) | |
val integerColumns = df.schema.fields.filter(_.dataType == IntegerType).map(_.name) | |
val booleanColumns = df.schema.fields.filter(_.dataType == BooleanType).map(_.name) | |
df.columns.foldLeft(df) { | |
(memoDF, columnName) => { | |
if (columns.contains(columnName) && stringColumns.contains(columnName)) { | |
memoDF.withColumn(columnName, nullStringColumnUDF()) | |
} else if (columns.contains(columnName) && longColumns.contains(columnName)) { | |
memoDF.withColumn(columnName, nullLongColumnUDF()) | |
} else if (columns.contains(columnName) && integerColumns.contains(columnName)) { | |
memoDF.withColumn(columnName, nullIntegerColumnUDF()) | |
} else if (columns.contains(columnName) && floatColumns.contains(columnName)) { | |
memoDF.withColumn(columnName, nullFloatColumnUDF()) | |
} else if (columns.contains(columnName) && doubleColumns.contains(columnName)) { | |
memoDF.withColumn(columnName, nullDoubleColumnUDF()) | |
} else if (columns.contains(columnName) && booleanColumns.contains(columnName)) { | |
memoDF.withColumn(columnName, nullBooleanColumnUDF()) | |
} else { | |
memoDF.withColumn(columnName, col(columnName)) | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment