Skip to content

Instantly share code, notes, and snippets.

@mcnamaras
Created October 13, 2014 05:08
Show Gist options
  • Select an option

  • Save mcnamaras/040a362ca8100347e1a6 to your computer and use it in GitHub Desktop.

Select an option

Save mcnamaras/040a362ca8100347e1a6 to your computer and use it in GitHub Desktop.
spark streaming broadcast variable wrapper
import java.io.{ObjectInputStream, ObjectOutputStream}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.streaming.StreamingContext
import scala.reflect.ClassTag
// This wrapper lets us update brodcast variables within DStreams' foreachRDD
// without running into serialization issues
case class BroadcastWrapper[T: ClassTag](
@transient private val ssc: StreamingContext,
@transient private val _v: T
) {
@transient private var v = ssc.sparkContext.broadcast(_v)
def update(newValue: T, blocking: Boolean = false): Unit = {
v.unpersist(blocking)
v = ssc.sparkContext.broadcast(newValue)
}
def value: T = v.value
private def writeObject(out: ObjectOutputStream): Unit = {
out.writeObject(v)
}
private def readObject(in: ObjectInputStream): Unit = {
v = in.readObject().asInstanceOf[Broadcast[T]]
}
}
@kaushalp-ai

Copy link
Copy Markdown

Can you please also share how to use this in steaming application?

@matteuan

Copy link
Copy Markdown

the unpersist should be always blocking, otherwise some executors could have the old cached variable.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment