Skip to content

Instantly share code, notes, and snippets.

@saswata-dutta
Created February 26, 2021 12:34
Show Gist options
  • Save saswata-dutta/5ee05056776192b74c45fc8229eca7a8 to your computer and use it in GitHub Desktop.
Save saswata-dutta/5ee05056776192b74c45fc8229eca7a8 to your computer and use it in GitHub Desktop.
Select latest record per id, while merging deltas or snapshots
val df = Seq(
(1,"foo", 123L),
(2,"foo", 123L),
(3,"foo", 123L),
(4,"foo", 123L),
(3,"foo", 124L),
(2,"foo", 123L),
(1,"foo", 122L),
(1,"foo", 120L)
).toDF("id", "meta", "time_stamp").
repartition(5)
// inefficient in case of significant data skew
import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy($"id").orderBy($"time_stamp".desc)
val dfTopRows1 = df.withColumn("rn", row_number.over(w)).where($"rn" === 1).drop("rn")
// use joins and aggregates
val dfMax = df.groupBy($"id").agg(max($"time_stamp").as("max_time_stamp"))
val dfTopRows2 = df.join(broadcast(dfMax), Seq("id"), "inner").
where($"time_stamp" === $"max_time_stamp").
drop("max_time_stamp").
dropDuplicates("id")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment