Skip to content

Instantly share code, notes, and snippets.

@adekunleba
Created October 16, 2018 17:12
Show Gist options
  • Select an option

  • Save adekunleba/5058c0227f19f009046cf359ad79719b to your computer and use it in GitHub Desktop.

Select an option

Save adekunleba/5058c0227f19f009046cf359ad79719b to your computer and use it in GitHub Desktop.
object SparkStreamingOnEmbeddings extends App {
val spark = SparkSession.builder
.master("local[*]")
.appName("StreamEmbeddings")
.getOrCreate()
val sc = spark.sparkContext
val ssc = new StreamingContext(sc, Seconds(10))
val model = new StreamingKMeans()
.setK(1000)
.setDecayFactor(1.0)
.setRandomCenters(512, 0.0)
val topic = "signatures"
val testTopic = "predictionSignatures"
val kafkaParamsTopic = KafkaSupport.getKafkaConsumerConfig("localhost:9092", "EmbeddingUpdateTopics")
val kafkaParamsTest = KafkaSupport.getKafkaConsumerConfig("localhost:9092", "ClassifiyTopic")
val dataStream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent,
Subscribe[String, String](Set(topic), kafkaParamsTopic))
val testDataStream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent,
Subscribe[String, String](Set(testTopic), kafkaParamsTest))
val data: DStream[Vector] = dataStream.map(r => {
//println(r.value.getClass)
val features = process_stream_to_features(r.values)
//val label = get_label(r.value) //Useful for Supervised Learning
val toTrain = Vectors.dense(features)
toTrain
})
model.trainOn(data)
val predictionStream: DStream[(String, Int)] = testDataStream.map { a =>
val features = process_stream_to_features(a.values)
val vecFeatures = Vectors.dense(features)
val trainingPredictions = model.latestModel().predict(vecFeatures)
(a.value, trainingPredictions)
}
predictionStream.foreachRDD { redd =>
redd.foreachPartition { partititionOfRecords =>
partititionOfRecords.foreach {
case (ids, idPredictions) => "Do Something with predictions E.G. Save to DB for use"
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment