Created
October 13, 2014 05:08
-
-
Save mcnamaras/040a362ca8100347e1a6 to your computer and use it in GitHub Desktop.
spark streaming broadcast variable wrapper
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import java.io.{ObjectInputStream, ObjectOutputStream} | |
| import org.apache.spark.broadcast.Broadcast | |
| import org.apache.spark.streaming.StreamingContext | |
| import scala.reflect.ClassTag | |
| // This wrapper lets us update brodcast variables within DStreams' foreachRDD | |
| // without running into serialization issues | |
| case class BroadcastWrapper[T: ClassTag]( | |
| @transient private val ssc: StreamingContext, | |
| @transient private val _v: T | |
| ) { | |
| @transient private var v = ssc.sparkContext.broadcast(_v) | |
| def update(newValue: T, blocking: Boolean = false): Unit = { | |
| v.unpersist(blocking) | |
| v = ssc.sparkContext.broadcast(newValue) | |
| } | |
| def value: T = v.value | |
| private def writeObject(out: ObjectOutputStream): Unit = { | |
| out.writeObject(v) | |
| } | |
| private def readObject(in: ObjectInputStream): Unit = { | |
| v = in.readObject().asInstanceOf[Broadcast[T]] | |
| } | |
| } |
the unpersist should be always blocking, otherwise some executors could have the old cached variable.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Can you please also share how to use this in steaming application?