Skip to content

Instantly share code, notes, and snippets.

@emesday
Created August 31, 2016 13:40
Show Gist options
  • Save emesday/5dfafe4c28922c001d2c662327eb48ca to your computer and use it in GitHub Desktop.
Save emesday/5dfafe4c28922c001d2c662327eb48ca to your computer and use it in GitHub Desktop.
def topByKey(key: String, orderBy: String, n: Int): DataFrame = {
val keyIndex = df.schema.fieldIndex(key)
val orderByIndex = df.schema.fieldIndex(orderBy)
val ord = df.schema.fields(orderByIndex).dataType match {
case o: StringType => Ordering.by[Row, String](_.getString(orderByIndex))
case o: IntegerType => Ordering.by[Row, Int](_.getInt(orderByIndex))
case o: LongType => Ordering.by[Row, Long](_.getLong(orderByIndex))
case o: FloatType => Ordering.by[Row, Float](_.getFloat(orderByIndex))
case o: DoubleType => Ordering.by[Row, Double](_.getDouble(orderByIndex))
case _ => throw new IllegalArgumentException
}
val rdd = df
.map { row =>
(row.get(keyIndex), row)
}
.topByKey(n)(ord)
.flatMap(_._2)
df.sqlContext.createDataFrame(rdd, df.schema)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment