Created
February 26, 2021 12:34
-
-
Save saswata-dutta/5ee05056776192b74c45fc8229eca7a8 to your computer and use it in GitHub Desktop.
Select latest record per id, while merging deltas or snapshots
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val df = Seq( | |
(1,"foo", 123L), | |
(2,"foo", 123L), | |
(3,"foo", 123L), | |
(4,"foo", 123L), | |
(3,"foo", 124L), | |
(2,"foo", 123L), | |
(1,"foo", 122L), | |
(1,"foo", 120L) | |
).toDF("id", "meta", "time_stamp"). | |
repartition(5) | |
// inefficient in case of significant data skew | |
import org.apache.spark.sql.expressions.Window | |
val w = Window.partitionBy($"id").orderBy($"time_stamp".desc) | |
val dfTopRows1 = df.withColumn("rn", row_number.over(w)).where($"rn" === 1).drop("rn") | |
// use joins and aggregates | |
val dfMax = df.groupBy($"id").agg(max($"time_stamp").as("max_time_stamp")) | |
val dfTopRows2 = df.join(broadcast(dfMax), Seq("id"), "inner"). | |
where($"time_stamp" === $"max_time_stamp"). | |
drop("max_time_stamp"). | |
dropDuplicates("id") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment