Skip to content

Instantly share code, notes, and snippets.

@dmateusp
Last active August 12, 2022 11:00
Show Gist options
  • Save dmateusp/e738a9647ffe2fa432457460d1c0c445 to your computer and use it in GitHub Desktop.
Save dmateusp/e738a9647ffe2fa432457460d1c0c445 to your computer and use it in GitHub Desktop.
DataFrame.transform - Spark Function Composition
// load me with :load AfterRefactor.scala
type Transform = DataFrame => DataFrame
def sumAmounts(by: Column*): Transform =
df => df.groupBy(by: _*).agg(sum(col("amount")))
def extractPayerBeneficiary(columnName: String): Transform =
df =>
df.withColumn(
s"${columnName}_payer",
regexp_extract(
col(columnName),
"paid by ([A-Z])", 1)
).withColumn(
s"${columnName}_beneficiary",
regexp_extract(
col(columnName),
"to ([A-Z])", 1))
// load me with :load BeforeRefactor.scala
def sumAmounts(df: DataFrame, by: Column*): DataFrame =
df.groupBy(by: _*).agg(sum(col("amount")))
def extractPayerBeneficiary(columnName: String, df: DataFrame): DataFrame =
df.withColumn(
s"${columnName}_payer",
regexp_extract(
col(columnName),
"paid by ([A-Z])", 1)
).withColumn(
s"${columnName}_beneficiary",
regexp_extract(
col(columnName),
"to ([A-Z])", 1))
name := "spark-playground"
version := "1.0"
scalaVersion := "2.12.8"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.4.3",
"org.apache.spark" %% "spark-sql" % "2.4.3")
initialCommands in console :=
"""
|import org.apache.spark.sql.functions._
|import org.apache.spark.sql.{DataFrame, Dataset, SparkSession, Column}
|
|val spark =
| SparkSession
| .builder()
| .appName("Local Spark")
| .master("local[*]")
| .getOrCreate()
|
|import spark.implicits._
|
|
""".stripMargin
cleanupCommands in console := "spark.stop()"
// chain transform calls
dfTransactions
.transform(extractPayerBeneficiary("details"))
.transform(sumAmounts(date_trunc("day", col("ts")), col("details_beneficiary")))
// andThen
dfTransactions
.transform(extractPayerBeneficiary("details") andThen sumAmounts(date_trunc("day", col("ts")), col("details_beneficiary")))
// compose
dfTransactions
.transform(sumAmounts(date_trunc("day", col("ts")), col("details_beneficiary")) compose extractPayerBeneficiary("details"))
// Function.chain
dfTransactions
.transform(Function.chain(List(extractPayerBeneficiary("details"), sumAmounts(date_trunc("day", col("ts")), col("details_beneficiary")))))
// load me with :load Transactions.scala
import java.sql.Timestamp
import org.apache.spark.sql.DataFrame
case class Transaction(details: String, amount: Int, ts: Timestamp)
val dfTransactions: DataFrame = Seq(
Transaction("paid by A to X", 100, Timestamp.valueOf("2018-01-05 08:00:00")),
Transaction("paid by B to X", 10, Timestamp.valueOf("2018-01-05 11:00:00")),
Transaction("paid by C to Y", 15, Timestamp.valueOf("2018-01-06 12:00:00")),
Transaction("paid by D to Z", 50, Timestamp.valueOf("2018-01-06 15:00:00"))
).toDF
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment