Last active
August 12, 2022 11:00
-
-
Save dmateusp/e738a9647ffe2fa432457460d1c0c445 to your computer and use it in GitHub Desktop.
DataFrame.transform - Spark Function Composition
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// load me with :load AfterRefactor.scala | |
type Transform = DataFrame => DataFrame | |
def sumAmounts(by: Column*): Transform = | |
df => df.groupBy(by: _*).agg(sum(col("amount"))) | |
def extractPayerBeneficiary(columnName: String): Transform = | |
df => | |
df.withColumn( | |
s"${columnName}_payer", | |
regexp_extract( | |
col(columnName), | |
"paid by ([A-Z])", 1) | |
).withColumn( | |
s"${columnName}_beneficiary", | |
regexp_extract( | |
col(columnName), | |
"to ([A-Z])", 1)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// load me with :load BeforeRefactor.scala | |
def sumAmounts(df: DataFrame, by: Column*): DataFrame = | |
df.groupBy(by: _*).agg(sum(col("amount"))) | |
def extractPayerBeneficiary(columnName: String, df: DataFrame): DataFrame = | |
df.withColumn( | |
s"${columnName}_payer", | |
regexp_extract( | |
col(columnName), | |
"paid by ([A-Z])", 1) | |
).withColumn( | |
s"${columnName}_beneficiary", | |
regexp_extract( | |
col(columnName), | |
"to ([A-Z])", 1)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
name := "spark-playground" | |
version := "1.0" | |
scalaVersion := "2.12.8" | |
libraryDependencies ++= Seq( | |
"org.apache.spark" %% "spark-core" % "2.4.3", | |
"org.apache.spark" %% "spark-sql" % "2.4.3") | |
initialCommands in console := | |
""" | |
|import org.apache.spark.sql.functions._ | |
|import org.apache.spark.sql.{DataFrame, Dataset, SparkSession, Column} | |
| | |
|val spark = | |
| SparkSession | |
| .builder() | |
| .appName("Local Spark") | |
| .master("local[*]") | |
| .getOrCreate() | |
| | |
|import spark.implicits._ | |
| | |
| | |
""".stripMargin | |
cleanupCommands in console := "spark.stop()" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// chain transform calls | |
dfTransactions | |
.transform(extractPayerBeneficiary("details")) | |
.transform(sumAmounts(date_trunc("day", col("ts")), col("details_beneficiary"))) | |
// andThen | |
dfTransactions | |
.transform(extractPayerBeneficiary("details") andThen sumAmounts(date_trunc("day", col("ts")), col("details_beneficiary"))) | |
// compose | |
dfTransactions | |
.transform(sumAmounts(date_trunc("day", col("ts")), col("details_beneficiary")) compose extractPayerBeneficiary("details")) | |
// Function.chain | |
dfTransactions | |
.transform(Function.chain(List(extractPayerBeneficiary("details"), sumAmounts(date_trunc("day", col("ts")), col("details_beneficiary"))))) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// load me with :load Transactions.scala | |
import java.sql.Timestamp | |
import org.apache.spark.sql.DataFrame | |
case class Transaction(details: String, amount: Int, ts: Timestamp) | |
val dfTransactions: DataFrame = Seq( | |
Transaction("paid by A to X", 100, Timestamp.valueOf("2018-01-05 08:00:00")), | |
Transaction("paid by B to X", 10, Timestamp.valueOf("2018-01-05 11:00:00")), | |
Transaction("paid by C to Y", 15, Timestamp.valueOf("2018-01-06 12:00:00")), | |
Transaction("paid by D to Z", 50, Timestamp.valueOf("2018-01-06 15:00:00")) | |
).toDF |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment