Created
October 16, 2018 17:12
-
-
Save adekunleba/5058c0227f19f009046cf359ad79719b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| object SparkStreamingOnEmbeddings extends App { | |
| val spark = SparkSession.builder | |
| .master("local[*]") | |
| .appName("StreamEmbeddings") | |
| .getOrCreate() | |
| val sc = spark.sparkContext | |
| val ssc = new StreamingContext(sc, Seconds(10)) | |
| val model = new StreamingKMeans() | |
| .setK(1000) | |
| .setDecayFactor(1.0) | |
| .setRandomCenters(512, 0.0) | |
| val topic = "signatures" | |
| val testTopic = "predictionSignatures" | |
| val kafkaParamsTopic = KafkaSupport.getKafkaConsumerConfig("localhost:9092", "EmbeddingUpdateTopics") | |
| val kafkaParamsTest = KafkaSupport.getKafkaConsumerConfig("localhost:9092", "ClassifiyTopic") | |
| val dataStream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, | |
| Subscribe[String, String](Set(topic), kafkaParamsTopic)) | |
| val testDataStream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, | |
| Subscribe[String, String](Set(testTopic), kafkaParamsTest)) | |
| val data: DStream[Vector] = dataStream.map(r => { | |
| //println(r.value.getClass) | |
| val features = process_stream_to_features(r.values) | |
| //val label = get_label(r.value) //Useful for Supervised Learning | |
| val toTrain = Vectors.dense(features) | |
| toTrain | |
| }) | |
| model.trainOn(data) | |
| val predictionStream: DStream[(String, Int)] = testDataStream.map { a => | |
| val features = process_stream_to_features(a.values) | |
| val vecFeatures = Vectors.dense(features) | |
| val trainingPredictions = model.latestModel().predict(vecFeatures) | |
| (a.value, trainingPredictions) | |
| } | |
| predictionStream.foreachRDD { redd => | |
| redd.foreachPartition { partititionOfRecords => | |
| partititionOfRecords.foreach { | |
| case (ids, idPredictions) => "Do Something with predictions E.G. Save to DB for use" | |
| } | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment