Created
October 8, 2018 16:13
-
-
Save adekunleba/0d8107c1cdc4d62955a917584e6d208c to your computer and use it in GitHub Desktop.
Streaming KMeans with Kafka multiple topic consumer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package com.sparkserver.sparkstream | |
| import org.apache.spark.mllib.clustering.StreamingKMeansModel | |
| import org.apache.spark.streaming.{ Seconds, StreamingContext } | |
| import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent | |
| import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe | |
| //import org.apache.spark.{ SparkConf, SparkContext } | |
| import org.apache.spark.streaming.kafka010._ | |
| import sun.misc.BASE64Decoder | |
| import org.apache.spark.mllib.clustering.StreamingKMeans | |
| import org.apache.spark.mllib.linalg.{ Vector, Vectors } | |
| import org.apache.spark.sql.SparkSession | |
| import org.apache.spark.streaming.dstream.DStream | |
| import redis.clients.jedis.Jedis | |
| import collection.JavaConverters._ | |
| object SparkStreamingOnEmbeddings extends App { | |
| //System.setProperty("hadoop.home.dir", "C:\\winutils") //May need to set up hadoop on environment variable | |
| //AT HADOOP_HOME | |
| //StreamUtility.setStreamingLogLevels() | |
| // | |
| // val sparkConf = new SparkConf() | |
| // .setMaster("local[*]") //To Package your jar remember to remove this | |
| // .setAppName("StreamEmbeddings") | |
| // val sc = new SparkContext(sparkConf) | |
| val spark = SparkSession.builder | |
| .master("local[*]") | |
| .appName("StreamEmbeddings") | |
| .getOrCreate() | |
| val sc = spark.sparkContext | |
| //The Seconds here is how often the application check for new data in Stream | |
| val ssc = new StreamingContext(sc, Seconds(10)) | |
| //val model = new StreamingLogisticRegressionWithSGD().setInitialWeights(Vectors.zeros(512)) | |
| //Generate a model or create model here | |
| val model = new StreamingKMeans() | |
| .setK(1000) | |
| .setDecayFactor(1.0) | |
| .setRandomCenters(512, 0.0) | |
| //Use this to make a new model and then upload | |
| var checkModel = new StreamingKMeansModel(model.latestModel().clusterCenters, model.latestModel().clusterWeights) | |
| // Convert Vector to DStream | |
| def withRedis[T](f: Jedis => T): T = { | |
| val jedis = new Jedis("localhost") //redis | |
| try { | |
| f(jedis) | |
| } finally { | |
| jedis.close() | |
| } | |
| } | |
| //Declare dataRDD | |
| val topic = "EmbeddingUpdate" | |
| val testTopic = "ClassifyEmbeddings" | |
| val saveTopic = "idtopics" | |
| val kafkaParamsTopic = KafkaSupport.getKafkaConsumerConfig("localhost:9092", "EmbeddingUpdateTopics") //kafka:29092 | |
| val kafkaParamsTest = KafkaSupport.getKafkaConsumerConfig("localhost:9092", "ClassifiyTopic") //kafka:29092 | |
| val kafkaParamsSaveUserPred = KafkaSupport.getKafkaConsumerConfig("localhost:9092", "SaveUserPredictionTopic") //kafka:29092 | |
| val dataStream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, | |
| Subscribe[String, String](Set(topic), kafkaParamsTopic)) | |
| val testDataStream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, | |
| Subscribe[String, String](Set(testTopic), kafkaParamsTest)) | |
| val toSaveTrainingStream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, | |
| Subscribe[String, String](Set(saveTopic), kafkaParamsSaveUserPred)) | |
| //Extract DStream from Enrollment Embeddings | |
| val data: DStream[Vector] = dataStream.map(r => { | |
| //println(r.value.getClass) | |
| val features = new BASE64Decoder() | |
| .decodeBuffer(r.value) | |
| .map(_.toDouble) | |
| .map(_ / 1000) | |
| val label = 0 | |
| //val toTrain = LabeledPoint(label, Vectors.dense(features)) | |
| val toTrain = Vectors.dense(features) | |
| //model.latestModel().update(toTrain, 2, "dkla") | |
| toTrain | |
| }) | |
| //Extract DStream from New Uploads from Client's gallery | |
| data.print() | |
| //If exist model path | |
| //model.trainOn(data) | |
| var latestModel = model.latestModel() | |
| //model.latestModel.save(sc, "models") | |
| //model.decayFactor | |
| data.foreachRDD { (rdd, time) => | |
| checkModel = checkModel.update(rdd, model.decayFactor, model.timeUnit) | |
| val weights = checkModel.clusterWeights | |
| val centers = checkModel.clusterCenters | |
| } | |
| //Save Enrollments Clusters mapped to an enrollment ID. | |
| val predictionStream: DStream[(String, Int)] = toSaveTrainingStream.map { a => | |
| println(s"Value of ids in topic IDStopic is ${a.value}") | |
| val image: String = withRedis { jedis => | |
| jedis.hmget(a.value, "enrollmentEmbedding").asScala.toList.head | |
| } | |
| val features = new BASE64Decoder() | |
| .decodeBuffer(image) | |
| .map(_.toDouble) | |
| .map(_ / 1000) | |
| val vecFeatures = Vectors.dense(features) | |
| val trainingPredictions = model.latestModel().predict(vecFeatures) | |
| (a.value, trainingPredictions) | |
| } | |
| predictionStream.print() | |
| //Final Side-Effecting for prediction Streams | |
| predictionStream.foreachRDD { redd => | |
| redd.foreachPartition { partititionOfRecords => | |
| withRedis { jedis => | |
| //Currently we are of the assumption that one prediction for each enrollment, might likely change | |
| partititionOfRecords.foreach { | |
| case (ids, idPredictions) => | |
| jedis.hset(idPredictions.toString, "memberID", ids) | |
| } | |
| } | |
| } | |
| } | |
| //New Prediction Stream with only the ID hence we can send new stuffs to the service | |
| val testPrediction: DStream[(String, List[Int])] = testDataStream.map { t => | |
| //t is a string of number | |
| print(s"Identified an image ID sent ${t.value}") | |
| //Extract array from DB | |
| val embeddings = withRedis { jedis => | |
| jedis.lrange(t.value + "_", 0, 1).asScala.toList | |
| } | |
| val predictions = embeddings map { emb => | |
| val features = new BASE64Decoder() | |
| .decodeBuffer(emb).map(_.toDouble).map(_ / 1000) | |
| val toTest = Vectors.dense(features) | |
| model.latestModel().predict(toTest) | |
| } | |
| (t.value, predictions) | |
| } | |
| testPrediction.print() | |
| testPrediction.foreachRDD { testRedd => | |
| testRedd.foreachPartition { prtOfRecords => | |
| withRedis { jedis => | |
| //Currently we are of the assumption that one prediction for each enrollment, might likely change | |
| prtOfRecords.foreach { | |
| case (ids, idPredictions) => | |
| //println(idPredictions.length) | |
| idPredictions foreach { pred => | |
| val getTime = math.abs(System.nanoTime().toInt).toString | |
| val emailID = Option { jedis.hget(pred.toString, "memberID") } | |
| emailID match { | |
| case Some(i) => jedis.hmset(getTime + i, Map("prediction" -> ids, "emailIdentified" -> i).asJava) | |
| case _ => println("No result found for a recent object checked") | |
| } | |
| } | |
| } | |
| } | |
| } | |
| } | |
| //Convert Streams | |
| //Run Model update on Stream | |
| ssc.start() | |
| ssc.awaitTermination() | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment