This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| input | |
| { | |
| beats { | |
| port => 5044 | |
| } | |
| } | |
| filter | |
| { |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| val client = MongoClient.apply("mongodb.uri") | |
| val database = client.getDatabase("mongodb.database") | |
| val collection = database.getCollection("mongodb.collection") | |
| collection.watch() | |
| .fullDocument(FullDocument.UPDATE_LOOKUP) | |
| .subscribe(new Observer[ChangeStreamDocument[Document]] { | |
| override def onNext(changeDocument: ChangeStreamDocument[Document]): Unit = process(changeDocument) | |
| override def onError(e: Throwable): Unit = println(s"Error: $e") | |
| override def onComplete(): Unit = println("Completed") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| val ssc = new StreamingContext(sparkConf, Seconds(60)) | |
| val rddQueue: collection.mutable.Queue[RDD[String]] = collection.mutable.Queue() | |
| ssc.queueStream(rddQueue) | |
| .foreachRDD(rdd => { | |
| if (!rdd.isEmpty()) sendS3(rdd) | |
| }) | |
| ssc.start() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| def sendS3(rdd: RDD[String]) = { | |
| spark.read.json(rdd) | |
| .coalesce(1) | |
| .write | |
| .mode("append") | |
| .option("compression", "snappy") | |
| .parquet("s3://bucket/folder") | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| def process(changeDocument: ChangeStreamDocument[Document]) = { | |
| implicit val formats: DefaultFormats.type = DefaultFormats | |
| val rdd = spark.sparkContext.makeRDD(List(write(mapRecord(changeDocument)))) | |
| if (rddQueue.isEmpty) | |
| rddQueue.enqueue(rdd) | |
| else | |
| rddQueue.enqueue(rddQueue.dequeue().union(rdd)) | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| val sparkConf: SparkConf = new SparkConf() | |
| .setMaster("local[*]") | |
| .set("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2") | |
| .set("spark.streaming.driver.writeAheadLog.closeFileAfterWrite", "true") | |
| .set("spark.streaming.receiver.writeAheadLog.closeFileAfterWrite", "true") | |
| val spark: SparkSession = SparkSession | |
| .builder() | |
| .config(sparkConf) | |
| .getOrCreate() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| name := "streaming_mongodb" | |
| version := "0.1" | |
| scalaVersion := "2.12.9" | |
| libraryDependencies ++= Seq( | |
| "org.mongodb.scala" %% "mongo-scala-driver" % "2.7.0", | |
| "org.apache.spark" %% "spark-streaming" % "2.4.3", | |
| "org.apache.spark" %% "spark-sql" % "2.4.3", | |
| "org.apache.hadoop" % "hadoop-aws" % "3.2.0", | |
| "org.apache.hadoop" % "hadoop-common" % "3.2.0", |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import org.apache.spark.SparkConf | |
| import org.apache.spark.sql.SparkSession | |
| import org.apache.spark.sql.types._ | |
| import org.slf4j.{Logger, LoggerFactory} | |
| object Main { | |
| val logger: Logger = LoggerFactory.getLogger(this.getClass) | |
| private lazy val sparkConf: SparkConf = new SparkConf() | |
| .setMaster("local[*]") |