Skip to content

Instantly share code, notes, and snippets.

@adekunleba
Created October 8, 2018 16:13
Show Gist options
  • Select an option

  • Save adekunleba/0d8107c1cdc4d62955a917584e6d208c to your computer and use it in GitHub Desktop.

Select an option

Save adekunleba/0d8107c1cdc4d62955a917584e6d208c to your computer and use it in GitHub Desktop.
Streaming KMeans with Kafka multiple topic consumer
package com.sparkserver.sparkstream
import org.apache.spark.mllib.clustering.StreamingKMeansModel
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
//import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.streaming.kafka010._
import sun.misc.BASE64Decoder
import org.apache.spark.mllib.clustering.StreamingKMeans
import org.apache.spark.mllib.linalg.{ Vector, Vectors }
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.DStream
import redis.clients.jedis.Jedis
import collection.JavaConverters._
object SparkStreamingOnEmbeddings extends App {
//System.setProperty("hadoop.home.dir", "C:\\winutils") //May need to set up hadoop on environment variable
//AT HADOOP_HOME
//StreamUtility.setStreamingLogLevels()
//
// val sparkConf = new SparkConf()
// .setMaster("local[*]") //To Package your jar remember to remove this
// .setAppName("StreamEmbeddings")
// val sc = new SparkContext(sparkConf)
val spark = SparkSession.builder
.master("local[*]")
.appName("StreamEmbeddings")
.getOrCreate()
val sc = spark.sparkContext
//The Seconds here is how often the application check for new data in Stream
val ssc = new StreamingContext(sc, Seconds(10))
//val model = new StreamingLogisticRegressionWithSGD().setInitialWeights(Vectors.zeros(512))
//Generate a model or create model here
val model = new StreamingKMeans()
.setK(1000)
.setDecayFactor(1.0)
.setRandomCenters(512, 0.0)
//Use this to make a new model and then upload
var checkModel = new StreamingKMeansModel(model.latestModel().clusterCenters, model.latestModel().clusterWeights)
// Convert Vector to DStream
def withRedis[T](f: Jedis => T): T = {
val jedis = new Jedis("localhost") //redis
try {
f(jedis)
} finally {
jedis.close()
}
}
//Declare dataRDD
val topic = "EmbeddingUpdate"
val testTopic = "ClassifyEmbeddings"
val saveTopic = "idtopics"
val kafkaParamsTopic = KafkaSupport.getKafkaConsumerConfig("localhost:9092", "EmbeddingUpdateTopics") //kafka:29092
val kafkaParamsTest = KafkaSupport.getKafkaConsumerConfig("localhost:9092", "ClassifiyTopic") //kafka:29092
val kafkaParamsSaveUserPred = KafkaSupport.getKafkaConsumerConfig("localhost:9092", "SaveUserPredictionTopic") //kafka:29092
val dataStream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent,
Subscribe[String, String](Set(topic), kafkaParamsTopic))
val testDataStream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent,
Subscribe[String, String](Set(testTopic), kafkaParamsTest))
val toSaveTrainingStream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent,
Subscribe[String, String](Set(saveTopic), kafkaParamsSaveUserPred))
//Extract DStream from Enrollment Embeddings
val data: DStream[Vector] = dataStream.map(r => {
//println(r.value.getClass)
val features = new BASE64Decoder()
.decodeBuffer(r.value)
.map(_.toDouble)
.map(_ / 1000)
val label = 0
//val toTrain = LabeledPoint(label, Vectors.dense(features))
val toTrain = Vectors.dense(features)
//model.latestModel().update(toTrain, 2, "dkla")
toTrain
})
//Extract DStream from New Uploads from Client's gallery
data.print()
//If exist model path
//model.trainOn(data)
var latestModel = model.latestModel()
//model.latestModel.save(sc, "models")
//model.decayFactor
data.foreachRDD { (rdd, time) =>
checkModel = checkModel.update(rdd, model.decayFactor, model.timeUnit)
val weights = checkModel.clusterWeights
val centers = checkModel.clusterCenters
}
//Save Enrollments Clusters mapped to an enrollment ID.
val predictionStream: DStream[(String, Int)] = toSaveTrainingStream.map { a =>
println(s"Value of ids in topic IDStopic is ${a.value}")
val image: String = withRedis { jedis =>
jedis.hmget(a.value, "enrollmentEmbedding").asScala.toList.head
}
val features = new BASE64Decoder()
.decodeBuffer(image)
.map(_.toDouble)
.map(_ / 1000)
val vecFeatures = Vectors.dense(features)
val trainingPredictions = model.latestModel().predict(vecFeatures)
(a.value, trainingPredictions)
}
predictionStream.print()
//Final Side-Effecting for prediction Streams
predictionStream.foreachRDD { redd =>
redd.foreachPartition { partititionOfRecords =>
withRedis { jedis =>
//Currently we are of the assumption that one prediction for each enrollment, might likely change
partititionOfRecords.foreach {
case (ids, idPredictions) =>
jedis.hset(idPredictions.toString, "memberID", ids)
}
}
}
}
//New Prediction Stream with only the ID hence we can send new stuffs to the service
val testPrediction: DStream[(String, List[Int])] = testDataStream.map { t =>
//t is a string of number
print(s"Identified an image ID sent ${t.value}")
//Extract array from DB
val embeddings = withRedis { jedis =>
jedis.lrange(t.value + "_", 0, 1).asScala.toList
}
val predictions = embeddings map { emb =>
val features = new BASE64Decoder()
.decodeBuffer(emb).map(_.toDouble).map(_ / 1000)
val toTest = Vectors.dense(features)
model.latestModel().predict(toTest)
}
(t.value, predictions)
}
testPrediction.print()
testPrediction.foreachRDD { testRedd =>
testRedd.foreachPartition { prtOfRecords =>
withRedis { jedis =>
//Currently we are of the assumption that one prediction for each enrollment, might likely change
prtOfRecords.foreach {
case (ids, idPredictions) =>
//println(idPredictions.length)
idPredictions foreach { pred =>
val getTime = math.abs(System.nanoTime().toInt).toString
val emailID = Option { jedis.hget(pred.toString, "memberID") }
emailID match {
case Some(i) => jedis.hmset(getTime + i, Map("prediction" -> ids, "emailIdentified" -> i).asJava)
case _ => println("No result found for a recent object checked")
}
}
}
}
}
}
//Convert Streams
//Run Model update on Stream
ssc.start()
ssc.awaitTermination()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment