Skip to content

Instantly share code, notes, and snippets.

@joshisa
Created November 4, 2016 19:26
Show Gist options
  • Select an option

  • Save joshisa/19f7caa1bce068fe9975716f1d482244 to your computer and use it in GitHub Desktop.

Select an option

Save joshisa/19f7caa1bce068fe9975716f1d482244 to your computer and use it in GitHub Desktop.
//==================================================================
// SPARK INSTRUMENTATION
//==================================================================
import com.codahale.metrics.{MetricRegistry, Meter, Gauge}
import org.apache.spark.{SparkEnv, Accumulator}
import org.apache.spark.metrics.source.Source
import org.joda.time.DateTime
import scala.collection.mutable
/** Instrumentation for Spark based on accumulators.
*
* Usage:
* val instrumentation = new SparkInstrumentation("example.metrics")
* val numReqs = sc.accumulator(0L)
* instrumentation.source.registerDailyAccumulator(numReqs, "numReqs")
* instrumentation.register()
*
* Will create and report the following metrics:
* - Gauge with total number of requests (daily)
* - Meter with rate of requests
*
* @param prefix prefix for all metrics that will be reported by this Instrumentation
*/
class SparkInstrumentation(prefix: String) extends Serializable {
val accumulators = mutable.Set[Accumulator[Long]]()
private class InstrumentationSource(prefix: String) extends Source {
val metricRegistry = new MetricRegistry
val sourceName = prefix
val oldgauges = mutable.Map[String,Long]()
val oldtimes = mutable.Map[String, DateTime]()
val meters = mutable.Map[String,Meter]()
/** Computes metrics based on accumulator. Gauge never resets.
*
* @param a Metrics will be derived from this accumulator
* @param name Name of the metrics
*/
def registerAccumulator(a: Accumulator[Long], name: String){
oldgauges += (name -> 0L)
meters += (name -> metricRegistry.meter(name + "-rate"))
metricRegistry.register(MetricRegistry.name(name),
new Gauge[Long] {
override def getValue: Long = {
meters(name).mark(a.value - oldgauges(name))
oldgauges(name) = a.value
return a.value
}
})
}
/** Computes metrics based on accumulator. Gauge resets at the end of the day.
*
* @param a Metrics will be derived from this accumulator
* @param name Name of the metrics
*/
def registerDailyAccumulator(a: Accumulator[Long], name: String){
oldgauges += (name -> 0L)
meters += (name -> metricRegistry.meter(name + "-rate"))
oldtimes += (name -> DateTime.now)
metricRegistry.register(MetricRegistry.name(name),
new Gauge[Long] {
override def getValue: Long = {
meters(name).mark(a.value - oldgauges(name))
val now = DateTime.now
if (now.getDayOfMonth != oldtimes(name).getDayOfMonth){
a.setValue(0L)
}
oldtimes(name) = now
oldgauges(name) = a.value
return a.value
}
})
}
}
val source = new InstrumentationSource(prefix)
/** Register the Instrumentation with Spark so the metrics are reported to any provided Sink. */
def register(){
SparkEnv.get.metricsSystem.registerSource(source)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment