Skip to content

Instantly share code, notes, and snippets.

@maasg
Created May 30, 2017 16:02
Show Gist options
  • Save maasg/54d6da2d9e27edf5913c1b7946af2e79 to your computer and use it in GitHub Desktop.
Save maasg/54d6da2d9e27edf5913c1b7946af2e79 to your computer and use it in GitHub Desktop.
Minimalistic SparkStreaming-FileStream project
name := "sparkstreamfile"
version := "0.1"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
libraryDependencies += "org.apache.spark" %% "spark-mllib" % "2.1.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.1.0"
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD
import org.apache.spark.sql._
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import scala.util.Try
import java.io.File
object StreamingFileTest {
def main(args: Array[String]): Unit = {
val tmpDir = "/tmp/streaming/test"
val f = new File(tmpDir)
if (!f.exists()) {
f.mkdir()
} else {}
val session = SparkSession.builder.
master("local[*]")
.appName("maasg_test")
.getOrCreate()
val ssc = new StreamingContext(session.sparkContext, Seconds(30))
val dstream = ssc.textFileStream(tmpDir)
dstream.print()
ssc.start()
println(s"Drop files in this dir for processing: $tmpDir")
ssc.awaitTermination()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment