Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save kumar-de/e657e8308d06d65bd1c8cb4a98616cb0 to your computer and use it in GitHub Desktop.
Save kumar-de/e657e8308d06d65bd1c8cb4a98616cb0 to your computer and use it in GitHub Desktop.
Create custom accumulator using Scala for Spark 2.x
class MyComplex(var x: Int, var y: Int) extends Serializable{
def reset(): Unit = {
x = 0
y = 0
def add(p:MyComplex): MyComplex = {
x = x + p.x
y = y + p.y
return this
import org.apache.spark.util.AccumulatorV2
object ComplexAccumulatorV2 extends AccumulatorV2[MyComplex, MyComplex] {
private val myc:MyComplex = new MyComplex(0,0)
def reset(): Unit = {
def add(v: MyComplex): Unit = {
def value():MyComplex = {
return myc
def isZero(): Boolean = {
return (myc.x == 0 && myc.y == 0)
def copy():AccumulatorV2[MyComplex, MyComplex] = {
return ComplexAccumulatorV2
def merge(other:AccumulatorV2[MyComplex, MyComplex]) = {
sc.register(ComplexAccumulatorV2, "mycomplexacc")
//using custom accumulator
var ca = ComplexAccumulatorV2
var rdd = sc.parallelize(1 to 10)
var res = => ca.add(new MyComplex(x,x)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment