Skip to content

Instantly share code, notes, and snippets.

@knight76
Created November 20, 2019 07:09
Show Gist options
  • Save knight76/74d2abf0b247736d2070ba5137f5f777 to your computer and use it in GitHub Desktop.
Save knight76/74d2abf0b247736d2070ba5137f5f777 to your computer and use it in GitHub Desktop.
Spark - TupleAccumulator.scala example
import org.apache.spark.util.AccumulatorV2
import scala.collection.mutable
class TupleAccumulator extends AccumulatorV2[((String, String), Long), Map[(String, String), Long]] {
private val underlying: mutable.HashMap[(String, String), Long] = mutable.HashMap.empty
override def isZero: Boolean = underlying.isEmpty
override def copy(): AccumulatorV2[((String, String), Long), Map[(String, String), Long]] = {
val newMapAccumulator = new TupleAccumulator()
underlying.foreach(newMapAccumulator.add)
newMapAccumulator
}
override def reset(): Unit = underlying.clear
override def value: Map[(String, String), Long] = underlying.toMap
override def add(kv: ((String, String), Long)): Unit = {
val k = kv._1
val v = kv._2
if (!underlying.contains(k)) {
underlying(k) = 0
}
underlying += k -> (v + underlying(k))
}
override def merge(other: AccumulatorV2[((String, String), Long), Map[(String, String), Long]]): Unit = {
other match {
case ta: TupleAccumulator =>
ta.value.foreach {
case ((k1: String, k2: String), count: Long) =>
this.add(((k1, k2), count))
}
case _ =>
throw new UnsupportedOperationException(s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
}
}
}
// test
class TupleAccumulatorTest extends FunSuite with Matchers {
test("adding tupule accumulator.") {
val acc = new TupleAccumulator
acc.add((("samuel", "a"), 1))
acc.add((("samuel", "a"), 3))
acc.add((("mac", "c"), 1))
acc.value.get(("samuel", "a")) should be(Some(4))
acc.value.get(("mac", "c")) should be(Some(1))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment