import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD
import org.apache.spark.sql._
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import scala.util.Try
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD
import org.apache.spark.sql._
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import scala.util.Try
val ssc = new StreamingContext(sparkContext, Seconds(5))
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@32d037e1
val customSchema = StructType(Array(
StructField("column0", StringType, true),
StructField("column1", StringType, true),
StructField("column2", StringType, true)))
customSchema: org.apache.spark.sql.types.StructType = StructType(StructField(column0,StringType,true), StructField(column1,StringType,true), StructField(column2,StringType,true))
val dStream = ssc.textFileStream("/tmp/streaming/test")
val rowDStream = dStream.map(line => line.split(">")).map(array => {
val first = Try(array(0).trim.split(" ")(0)) getOrElse ""
val second = Try(array(1).trim.split(" ")(6)) getOrElse ""
val third = Try(array(2).trim.split(" ")(0).replace(":", "")) getOrElse ""
Row.fromSeq(Seq(first, second, third))
})
dStream: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.MappedDStream@3b04513c
rowDStream: org.apache.spark.streaming.dstream.DStream[org.apache.spark.sql.Row] = org.apache.spark.streaming.dstream.MappedDStream@5f03e1f2
@transient val rawDstreamViz = ul(10)
rawDstreamViz
rawDstreamViz: notebook.front.widgets.HtmlList =
res87: notebook.front.widgets.HtmlList =
dStream.foreachRDD(rdd => rdd.collect.foreach(e => rawDstreamViz.append(e.toString)))
@transient val dfViz = ul(10)
dfViz
dfViz: notebook.front.widgets.HtmlList =
res91: notebook.front.widgets.HtmlList =
rowDStream.foreachRDD { rdd =>
val trainingDF = sparkSession.createDataFrame(rdd, customSchema)
val res = trainingDF.groupBy("column1", "column2").count().collect().map(e => e.toString)
dfViz.appendAll(res)
/////////////////////end extract the packet
val numFeatures = 3
val model = new StreamingLinearRegressionWithSGD()
.setInitialWeights(Vectors.zeros(numFeatures))
/////////////////////////////////////////////////////Model made
}
Raw Data
Grouped DataFrame