Skip to content

Instantly share code, notes, and snippets.

@freeman-lab
Last active February 26, 2019 07:13
Show Gist options
  • Save freeman-lab/9672685 to your computer and use it in GitHub Desktop.
Save freeman-lab/9672685 to your computer and use it in GitHub Desktop.
Spark Streaming + MLLib integration examples
package thunder.streaming
import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.mllib.clustering.KMeansModel
import scala.util.Random.nextDouble
import scala.util.Random.nextGaussian
/**
* Extends clustering model for K-means with the current counts of each cluster (for streaming algorithms)
*/
class StreamingKMeansModel(override val clusterCenters: Array[Array[Double]],
val clusterCounts: Array[Int] = Array(1)) extends KMeansModel(clusterCenters)
/**
* K-means clustering on streaming data with support for
* mini batch and forgetful algorithms.
*
* The underlying assumption is that all streaming data points
* belong to one of several clusters, and we want to
* learn the identity of those clusters (the "KMeans Model")
* as new data arrive. Given this assumption, all data
* MUST have the same dimensionality.
*
* For mini batch algorithms, we update the underlying
* cluster identities for each batch of data, and keep
* a running count of the number of data points per cluster,
* so that all data points are treated equally. The
* number of data points per batch can be arbitrary.
*
* For forgetful algorithms, each new batch of data
* is weighted in its contribution, so that
* more recent data is weighted more heavily.
* The weighting is per batch (i.e. per time window),
* rather than per data point, so for meaningful
* interpretation, the number of data points per batch
* should be approximately constant.
*
*/
class StreamingKMeans (
var k: Int,
var d: Int,
var a: Double,
var maxIterations: Int,
var initializationMode: String)
extends Serializable with Logging
{
private type ClusterCentersAndCounts = Array[(Array[Double], Int)]
/** Construct a StreamingKMeans object with default parameters */
def this() = this(2, 5, 1.0, 1, "gauss")
/** Set the number of clusters to create (k). Default: 2. */
def setK(k: Int): StreamingKMeans = {
this.k = k
this
}
/** Set the dimensionality of the data (d). Default: 5
* TODO: if possible, set this automatically based on first data point
*/
def setD(d: Int): StreamingKMeans = {
this.d = d
this
}
/**
* Set the parameter alpha to determine the update rule.
* If alpha = 1, perform "mini batch" KMeans, which treats all data
* points equivalently. If alpha < 1, perform forgetful KMeans,
* which uses a constant to weight old data
* less strongly (with exponential weighting), e.g. 0.9 will
* favor only recent data, whereas 0.1 will update slowly.
* Weighting over time is per batch, so this algorithm implicitly
* assumes an approximately constant number of data points per batch
* Default: 1 (mini batch)
*/
def setAlpha(a: Double): StreamingKMeans = {
this.a = a
this
}
/** Set the number of iterations per batch of data */
def setMaxIterations(maxIterations: Int): StreamingKMeans = {
this.maxIterations = maxIterations
this
}
/**
* Set the initialization algorithm. Unlike batch KMeans, we
* initialize randomly before we have seen any data. Options are "gauss"
* for random Gaussian centers, and "pos" for random positive uniform centers.
* Default: gauss
*/
def setInitializationMode(initializationMode: String): StreamingKMeans = {
if (initializationMode != "gauss" && initializationMode != "pos") {
throw new IllegalArgumentException("Invalid initialization mode: " + initializationMode)
}
this.initializationMode = initializationMode
this
}
/** Initialize random points for KMeans clustering */
def initRandom(): StreamingKMeansModel = {
val clusters = new Array[(Array[Double], Int)](k)
for (ik <- 0 until k) {
clusters(ik) = initializationMode match {
case "gauss" => (Array.fill(d)(nextGaussian()), 0)
case "pos" => (Array.fill(d)(nextDouble()), 0)
}
}
new StreamingKMeansModel(clusters.map(_._1), clusters.map(_._2))
}
/** Update KMeans clusters by doing training passes over an RDD
* TODO: stop iterating if clusters have converged
*/
def update(data: RDD[Array[Double]], model: StreamingKMeansModel): StreamingKMeansModel = {
val centers = model.clusterCenters
val counts = model.clusterCounts
// do iterative KMeans updates on a batch of data
for (i <- Range(0, maxIterations)) {
// find nearest cluster to each point
val closest = data.map(point => (model.predict(point), (point, 1)))
// get sums and counts for updating each cluster
val pointStats = closest.reduceByKey{
case ((x1, y1), (x2, y2)) => (x1.zip(x2).map{case (x, y) => x + y}, y1 + y2)}
val newPoints = pointStats.map{
pair => (pair._1, (pair._2._1, pair._2._2))}.collectAsMap()
a match {
case 1 => for (newP <- newPoints) {
// remove previous count scaling
centers(newP._1) = centers(newP._1).map(x => x * counts(newP._1))
// update sums
centers(newP._1) = centers(newP._1).zip(newP._2._1).map{case (x, y) => x + y}
// update counts
counts(newP._1) += newP._2._2
// rescale to compute new means (of both old and new points)
centers(newP._1) = centers(newP._1).map(x => x / counts(newP._1))
}
case _ => for (newP <- newPoints) {
// update centers with forgetting factor a
centers(newP._1) = centers(newP._1).zip(newP._2._1.map(x => x / newP._2._2)).map{
case (x, y) => x + a * (y - x)}
}
}
val model.clusterCenters = centers
}
// log the cluster centers
centers.zip(Range(0, centers.length)).foreach{
case (x, ix) => logInfo("Cluster center " + ix.toString + ": " + x.mkString(", "))}
new StreamingKMeansModel(centers, counts)
}
/** Main streaming operation: initialize the KMeans model
* and then update it based on new data from the stream.
*/
def runStreaming(data: DStream[Array[Double]]): DStream[Int] = {
var model = initRandom()
data.foreachRDD(RDD => model = update(RDD, model))
data.map(point => model.predict(point))
}
}
/** Top-level methods for calling Streaming KMeans clustering. */
object StreamingKMeans {
/**
* Train a Streaming KMeans model. We initialize a set of
* cluster centers randomly and then update them
* after receiving each batch of data from the stream.
* If a = 1 this is equivalent to mini-batch KMeans,
* where each batch of data from the stream is a different
* mini-batch. If a < 1, perform forgetful KMeans, which
* weights more recent data points more heavily.
*
* @param input Input DStream of (Array[Double]) data points
* @param k Number of clusters to estimate.
* @param d Number of dimensions per data point.
* @param a Update rule (1 mini batch, < 1 forgetful).
* @param maxIterations Maximum number of iterations per batch.
* @param initializationMode Random initialization of cluster centers.
* @return Output DStream of (Int) assignments of data points to clusters.
*/
def trainStreaming(input: DStream[Array[Double]],
k: Int,
d: Int,
a: Double,
maxIterations: Int,
initializationMode: String)
: DStream[Int] =
{
new StreamingKMeans().setK(k)
.setD(d)
.setAlpha(a)
.setMaxIterations(maxIterations)
.setInitializationMode(initializationMode)
.runStreaming(input)
}
def main(args: Array[String]) {
if (args.length != 8) {
System.err.println("Usage: StreamingKMeans <master> <directory> <batchTime> <k> <d> <a> <maxIterations> <initializationMode>")
System.exit(1)
}
val (master, directory, batchTime, k, d, a, maxIterations, initializationMode) = (
args(0), args(1), args(2).toLong, args(3).toInt, args(4).toInt, args(5).toDouble, args(6).toInt, args(7))
val conf = new SparkConf().setMaster(master).setAppName("StreamingKMeans")
if (!master.contains("local")) {
conf.setSparkHome(System.getenv("SPARK_HOME"))
.setJars(List("target/scala-2.10/thunder_2.10-0.1.0.jar"))
.set("spark.executor.memory", "100G")
}
/** Create Streaming Context */
val ssc = new StreamingContext(conf, Seconds(batchTime))
/** Train KMeans model */
val data = ssc.textFileStream(directory).map(x => x.split(' ').map(_.toDouble))
val assignments = StreamingKMeans.trainStreaming(data, k, d, a, maxIterations, initializationMode)
/** Print assignments (for testing) */
assignments.print()
ssc.start()
}
}
package thunder.streaming
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.mllib.regression.{LinearRegressionModel, GeneralizedLinearAlgorithm}
import org.apache.spark.mllib.optimization._
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.SparkConf
import scala.util.Random._
/**
* Linear Regression on streaming data.
*
* The underlying assumption is that every data point
* in the stream is a random label-feature pair, and
* there is a single set of weights and coefficients
* (the "linear model") that can predict each label
* given the features. Given this assumption,
* all streaming data points MUST the same
* number of features.
*
* We update the weights by performing several
* iterations of gradient descent on each batch
* of streaming data. The number of data points
* per batch can be arbitrary. Compared to single
* batch algorithms, it should be OK to use
* fewer iterations of gradient descent because
* new updates will be performed for each batch.
*
*/
class StreamingLinearRegression (
var d: Int,
var stepSize: Double,
var numIterations: Int,
var initializationMode: String) extends GeneralizedLinearAlgorithm[LinearRegressionModel] with Serializable {
val gradient = new SquaredGradient()
val updater = new SimpleUpdater()
@transient val optimizer = new GradientDescent(gradient, updater).setStepSize(stepSize)
.setNumIterations(numIterations)
.setMiniBatchFraction(1.0)
/** Construct a StreamingLinearRegression object with default parameters */
def this() = this(5, 1.0, 10, "fixed")
/** Set the number of features per data point (d). Default: 5
* TODO: if possible, set this automatically based on first data point
*/
def setD(d: Int): StreamingLinearRegression = {
this.d = d
this
}
/**
* Set the initialization mode, either random (gaussian) or fixed.
* Default: fixed
*/
def setInitializationMode(initializationMode: String): StreamingLinearRegression = {
if (initializationMode != "random" && initializationMode != "fixed") {
throw new IllegalArgumentException("Invalid initialization mode: " + initializationMode)
}
this.initializationMode = initializationMode
this
}
/** Create a Linear Regression model */
def createModel(weights: Array[Double], intercept: Double) = {
new LinearRegressionModel(weights, intercept)
}
/** Initialize a Linear Regression model with fixed weights */
def initFixed(): LinearRegressionModel = {
val weights = Array.fill(d)(1.0)
val intercept = 0.0
createModel(weights, intercept)
}
/** Initialize a Linear Regression model with random weights */
def initRandom(): LinearRegressionModel = {
val weights = Array.fill(d)(nextGaussian())
val intercept = nextGaussian()
createModel(weights, intercept)
}
/** Update a Linear Regression model by running a gradient update */
def update(rdd: RDD[LabeledPoint], model: LinearRegressionModel): LinearRegressionModel = {
if (rdd.count() != 0) {
run(rdd, model.weights)
} else {
model
}
}
/** Main streaming operation: initialize the Linear Regression model
* and then update it based on new data from the stream.
*/
def runStreaming(data: DStream[LabeledPoint]): DStream[Double] = {
var model = initFixed()
data.foreachRDD(rdd => model = update(rdd, model))
data.map(x => model.predict(x.features))
}
}
/** Top-level methods for calling Streaming Linear Regression.*/
object StreamingLinearRegression {
/**
* Train a Streaming Linear Regression model. We initialize a model
* and then perform gradient descent updates on each batch of
* received data in the data stream (akin to mini-batch gradient descent
* where each new batch from the stream is a different mini-batch).
*
* @param input Input DStream of (label, features) pairs.
* @param d Number of features per data point.
* @param stepSize Step size to be used for each iteration of Gradient Descent.
* @param numIterations Number of iterations of gradient descent to run per batch.
* @param initializationMode How to initialize model parameters (random or fixed).
* @return DStream of (double) model predictions for data points.
*/
def trainStreaming(
input: DStream[LabeledPoint],
d: Int,
stepSize: Double,
numIterations: Int,
initializationMode: String)
: DStream[Double] =
{
new StreamingLinearRegression(d, stepSize, numIterations, initializationMode).runStreaming(input)
}
def main(args: Array[String]) {
if (args.length != 7) {
System.err.println("Usage: StreamingLinearRegression <master> <directory> <batchTime> <d> <stepSize> <numIterations> <initializationMode>")
System.exit(1)
}
val (master, directory, batchTime, d, stepSize, numIterations, initializationMode) = (
args(0), args(1), args(2).toLong, args(3).toInt, args(4).toDouble, args(5).toInt, args(6).toString)
val conf = new SparkConf().setMaster(master).setAppName("StreamingLinearRegression")
if (!master.contains("local")) {
conf.setSparkHome(System.getenv("SPARK_HOME"))
.setJars(List("target/scala-2.10/thunder_2.10-0.1.0.jar"))
.set("spark.executor.memory", "100G")
}
/** Create Streaming Context */
val ssc = new StreamingContext(conf, Seconds(batchTime))
/** Train Streaming Linear Regression model */
val data = ssc.textFileStream(directory).map{x =>
val parts = x.split(',')
val label = parts(0).toDouble
val features = parts(1).trim().split(' ').map(_.toDouble)
LabeledPoint(label, features)}
val predictions = StreamingLinearRegression.trainStreaming(data, d, stepSize, numIterations, initializationMode)
/** Print predictions (for testing) */
predictions.print()
ssc.start()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment