Skip to content

Instantly share code, notes, and snippets.

@marcintustin
Created August 17, 2019 18:38
Show Gist options
  • Save marcintustin/c534e35f0e229cb9161944b09ac30f49 to your computer and use it in GitHub Desktop.
Save marcintustin/c534e35f0e229cb9161944b09ac30f49 to your computer and use it in GitHub Desktop.
Using Aggregator in Spark
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Dataset, Encoder, Encoders}
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.TypedColumn
case class Food(key: String, date: String, numeric: long, text: String)
object AggregateLatestFoods {
/** Load the data elsewhere **/
def aggregateLatestFoods(in: Dataset[Food]): Dataset[Food] =
in.groupByKey(_.key).agg(latestFood).map[Food](_._2)
// We're going to extract the date to a long for easier comparison
type CmpFood = (Long, Food)
val latestFood: TypedColumn = new Aggregator[/*input type*/ Food,
/* "buffer" type */ CmpFood,
/* output type */ Food] with Serializable {
def keyForCmp(f: Food): CmpFood = (numericDate(f.date), f)
// We're expecting every key to have at least one value, by construction of the groupbykey
// So this value doesn't need to make sense. Hence the nulls.
def zero: CmpFood = (-1, Food(null, null, -1, null))
// Lift to buffer type and perform aggregation - used on map side
def reduce(b: CmpFood, a: Food) = merge(b, keyForCmp(a))
// Merge results of running on partitions
def merge(b1: CmpFood, b2: CmpFood) = Seq(b1, b2).maxBy(_._1)
// map buffer to output type
def finish(b: CmpFood) = b._2
def bufferEncoder: Encoder[Int] = Encoders.product[CmpFood]
def outputEncoder: Encoder[Int] = Encoders.product[Food]}.toColumn
}
@marcintustin
Copy link
Author

Gist illustrating use of org.apache.spark.sql.expressions.Aggregator for my post Spark: Aggregating your data the fast way https://medium.com/p/e37b53314fad

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment