Created
November 4, 2015 21:06
-
-
Save tzachz/0b0a0e6ea3bfddb36557 to your computer and use it in GitHub Desktop.
Creating a Metrics Counter backed by a Spark Accumulator
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.kenshoo.kripke.core | |
import com.yammer.metrics.Metrics | |
import com.yammer.metrics.core.{MetricName, Counter} | |
import org.apache.spark.Accumulator | |
import org.apache.spark.rdd.RDD | |
import scala.reflect.ClassTag | |
object CounterBackedAccumulatorUtil { | |
/** * | |
* Call returned callback after acting on returned RDD to get counter updated | |
*/ | |
def countSilently[V: ClassTag](rdd: RDD[V], metricName: String, clazz: Class[_]): (RDD[V], Unit => Unit) = { | |
val counter: Counter = Metrics.newCounter(new MetricName(clazz, metricName)) | |
val accumulator: Accumulator[Long] = rdd.sparkContext.accumulator(0) | |
val countedRdd = rdd.map(v => { accumulator += 1; v }) | |
val callback: Unit => Unit = u => counter.inc(accumulator.value) | |
(countedRdd, callback) | |
} | |
} | |
class MyClass { | |
import CounterBackedAccumulatorUtil._ | |
def someTransformations(input: RDD[String]): RDD[Long] = ??? // some logic... | |
def doSomeProcessing(rdd: RDD[String]): Unit = { | |
// count input: | |
val (countedInput, callback1) = countSilently(rdd, "inputRecords", classOf[MyClass]) | |
// do some work: | |
val processed: RDD[Long] = someTransformations(countedInput) | |
// count output: | |
val (countedOutput, callback2) = countSilently(processed, "outputRecords", classOf[MyClass]) | |
// some action on result (in this case - save) | |
countedOutput.saveAsTextFile("/output/file") | |
// callbacks must be called AFTER the RDD was acted on - that's when the accumulators get updated | |
callback1.apply() | |
callback2.apply() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment