Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save YordanGeorgiev/661ad3fc606206f11980fe7ccccef327 to your computer and use it in GitHub Desktop.
Save YordanGeorgiev/661ad3fc606206f11980fe7ccccef327 to your computer and use it in GitHub Desktop.
[double-interpolation for udf call in spark] how-to use double interpolation technique with spark dataframes udf call #spark #dataframe #udf #interpolation
class DoubleInterpolationTechniqueForUDFcallInDataFrame {
val colsNum = 100
// this is the func for the udf ...
def getHeight(index: Int, freqs: Seq[Integer]): Option[Double] = {
(0 to colsNum - 1) foreach (n => {
// some logic
})
Option(freqs(index))
}
/**
generates the expression for the call of the org_getHeight udf
*/
def genExprForHeight(index: Int, colNum: Int): String = {
val freqSeq = this.genSeq("frqncy_", colNum)
s"org_getHeight($index,$freqSeq)"
}
// build the array of colPrefix_<<n>> cols aka Array(col_1,col_2) etc.
def genSeq(colPrefix: String, arrSize: Int): String =
"Array(" + Seq.fill(arrSize)(colPrefix).zipWithIndex.map(c => s"${c._1}${c._2 + 1}").mkString(",") + ")"
spark.udf.register(
"org_getHeight",
(index: Integer, freqs: Seq[Integer]) => {
this.getHeight(index, freqs)
}
)
def process(df: DataFrame): DataFrame = {
import org.apache.spark.sql.functions.expr
(1 to colsNum)
.foldLeft(df)((tmpDf, n) => {
tmpDf.withColumn(s"frqncy_$n", expr(genExprForHeight((n - 1), colsNum)))
})
.select(df.columns.head, df.columns.tail: _*)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment