This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.SparkConf | |
import org.apache.spark.sql.SparkSession | |
import org.apache.spark.sql.types._ | |
import org.slf4j.{Logger, LoggerFactory} | |
object Main { | |
val logger: Logger = LoggerFactory.getLogger(this.getClass) | |
private lazy val sparkConf: SparkConf = new SparkConf() | |
.setMaster("local[*]") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
name := "streaming_mongodb" | |
version := "0.1" | |
scalaVersion := "2.12.9" | |
libraryDependencies ++= Seq( | |
"org.mongodb.scala" %% "mongo-scala-driver" % "2.7.0", | |
"org.apache.spark" %% "spark-streaming" % "2.4.3", | |
"org.apache.spark" %% "spark-sql" % "2.4.3", | |
"org.apache.hadoop" % "hadoop-aws" % "3.2.0", | |
"org.apache.hadoop" % "hadoop-common" % "3.2.0", |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val sparkConf: SparkConf = new SparkConf() | |
.setMaster("local[*]") | |
.set("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2") | |
.set("spark.streaming.driver.writeAheadLog.closeFileAfterWrite", "true") | |
.set("spark.streaming.receiver.writeAheadLog.closeFileAfterWrite", "true") | |
val spark: SparkSession = SparkSession | |
.builder() | |
.config(sparkConf) | |
.getOrCreate() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def process(changeDocument: ChangeStreamDocument[Document]) = { | |
implicit val formats: DefaultFormats.type = DefaultFormats | |
val rdd = spark.sparkContext.makeRDD(List(write(mapRecord(changeDocument)))) | |
if (rddQueue.isEmpty) | |
rddQueue.enqueue(rdd) | |
else | |
rddQueue.enqueue(rddQueue.dequeue().union(rdd)) | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def sendS3(rdd: RDD[String]) = { | |
spark.read.json(rdd) | |
.coalesce(1) | |
.write | |
.mode("append") | |
.option("compression", "snappy") | |
.parquet("s3://bucket/folder") | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val ssc = new StreamingContext(sparkConf, Seconds(60)) | |
val rddQueue: collection.mutable.Queue[RDD[String]] = collection.mutable.Queue() | |
ssc.queueStream(rddQueue) | |
.foreachRDD(rdd => { | |
if (!rdd.isEmpty()) sendS3(rdd) | |
}) | |
ssc.start() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val client = MongoClient.apply("mongodb.uri") | |
val database = client.getDatabase("mongodb.database") | |
val collection = database.getCollection("mongodb.collection") | |
collection.watch() | |
.fullDocument(FullDocument.UPDATE_LOOKUP) | |
.subscribe(new Observer[ChangeStreamDocument[Document]] { | |
override def onNext(changeDocument: ChangeStreamDocument[Document]): Unit = process(changeDocument) | |
override def onError(e: Throwable): Unit = println(s"Error: $e") | |
override def onComplete(): Unit = println("Completed") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
input | |
{ | |
beats { | |
port => 5044 | |
} | |
} | |
filter | |
{ |