Created
November 4, 2016 19:26
-
-
Save joshisa/19f7caa1bce068fe9975716f1d482244 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| //================================================================== | |
| // SPARK INSTRUMENTATION | |
| //================================================================== | |
| import com.codahale.metrics.{MetricRegistry, Meter, Gauge} | |
| import org.apache.spark.{SparkEnv, Accumulator} | |
| import org.apache.spark.metrics.source.Source | |
| import org.joda.time.DateTime | |
| import scala.collection.mutable | |
| /** Instrumentation for Spark based on accumulators. | |
| * | |
| * Usage: | |
| * val instrumentation = new SparkInstrumentation("example.metrics") | |
| * val numReqs = sc.accumulator(0L) | |
| * instrumentation.source.registerDailyAccumulator(numReqs, "numReqs") | |
| * instrumentation.register() | |
| * | |
| * Will create and report the following metrics: | |
| * - Gauge with total number of requests (daily) | |
| * - Meter with rate of requests | |
| * | |
| * @param prefix prefix for all metrics that will be reported by this Instrumentation | |
| */ | |
| class SparkInstrumentation(prefix: String) extends Serializable { | |
| val accumulators = mutable.Set[Accumulator[Long]]() | |
| private class InstrumentationSource(prefix: String) extends Source { | |
| val metricRegistry = new MetricRegistry | |
| val sourceName = prefix | |
| val oldgauges = mutable.Map[String,Long]() | |
| val oldtimes = mutable.Map[String, DateTime]() | |
| val meters = mutable.Map[String,Meter]() | |
| /** Computes metrics based on accumulator. Gauge never resets. | |
| * | |
| * @param a Metrics will be derived from this accumulator | |
| * @param name Name of the metrics | |
| */ | |
| def registerAccumulator(a: Accumulator[Long], name: String){ | |
| oldgauges += (name -> 0L) | |
| meters += (name -> metricRegistry.meter(name + "-rate")) | |
| metricRegistry.register(MetricRegistry.name(name), | |
| new Gauge[Long] { | |
| override def getValue: Long = { | |
| meters(name).mark(a.value - oldgauges(name)) | |
| oldgauges(name) = a.value | |
| return a.value | |
| } | |
| }) | |
| } | |
| /** Computes metrics based on accumulator. Gauge resets at the end of the day. | |
| * | |
| * @param a Metrics will be derived from this accumulator | |
| * @param name Name of the metrics | |
| */ | |
| def registerDailyAccumulator(a: Accumulator[Long], name: String){ | |
| oldgauges += (name -> 0L) | |
| meters += (name -> metricRegistry.meter(name + "-rate")) | |
| oldtimes += (name -> DateTime.now) | |
| metricRegistry.register(MetricRegistry.name(name), | |
| new Gauge[Long] { | |
| override def getValue: Long = { | |
| meters(name).mark(a.value - oldgauges(name)) | |
| val now = DateTime.now | |
| if (now.getDayOfMonth != oldtimes(name).getDayOfMonth){ | |
| a.setValue(0L) | |
| } | |
| oldtimes(name) = now | |
| oldgauges(name) = a.value | |
| return a.value | |
| } | |
| }) | |
| } | |
| } | |
| val source = new InstrumentationSource(prefix) | |
| /** Register the Instrumentation with Spark so the metrics are reported to any provided Sink. */ | |
| def register(){ | |
| SparkEnv.get.metricsSystem.registerSource(source) | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment