Created
August 17, 2019 18:38
-
-
Save marcintustin/c534e35f0e229cb9161944b09ac30f49 to your computer and use it in GitHub Desktop.
Using Aggregator in Spark
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.sql.functions._ | |
import org.apache.spark.sql.{Dataset, Encoder, Encoders} | |
import org.apache.spark.sql.expressions.Aggregator | |
import org.apache.spark.sql.TypedColumn | |
case class Food(key: String, date: String, numeric: long, text: String) | |
object AggregateLatestFoods { | |
/** Load the data elsewhere **/ | |
def aggregateLatestFoods(in: Dataset[Food]): Dataset[Food] = | |
in.groupByKey(_.key).agg(latestFood).map[Food](_._2) | |
// We're going to extract the date to a long for easier comparison | |
type CmpFood = (Long, Food) | |
val latestFood: TypedColumn = new Aggregator[/*input type*/ Food, | |
/* "buffer" type */ CmpFood, | |
/* output type */ Food] with Serializable { | |
def keyForCmp(f: Food): CmpFood = (numericDate(f.date), f) | |
// We're expecting every key to have at least one value, by construction of the groupbykey | |
// So this value doesn't need to make sense. Hence the nulls. | |
def zero: CmpFood = (-1, Food(null, null, -1, null)) | |
// Lift to buffer type and perform aggregation - used on map side | |
def reduce(b: CmpFood, a: Food) = merge(b, keyForCmp(a)) | |
// Merge results of running on partitions | |
def merge(b1: CmpFood, b2: CmpFood) = Seq(b1, b2).maxBy(_._1) | |
// map buffer to output type | |
def finish(b: CmpFood) = b._2 | |
def bufferEncoder: Encoder[Int] = Encoders.product[CmpFood] | |
def outputEncoder: Encoder[Int] = Encoders.product[Food]}.toColumn | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Gist illustrating use of
org.apache.spark.sql.expressions.Aggregator
for my post Spark: Aggregating your data the fast way https://medium.com/p/e37b53314fad