Created
August 6, 2017 16:28
-
-
Save rvilla87/4ff36b9b78dd65947ed761ebb54a7bb6 to your computer and use it in GitHub Desktop.
Inserting documents in MongoDB with Spark Connector (Dataframe vs Spark Structured Streaming)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Dataframe (supported) - read 1 file, no streaming | |
// Step 1, create the Dataframe source | |
val fileDF = spark | |
.read // No streaming | |
.csv("file/file1.csv") | |
.selectExpr("CAST(key as String)", // more code with other casting... | |
) | |
// Out [1]: fileDF: org.apache.spark.sql.package.DataFrame = [key: string, country: string ... 6 more fields] | |
// Step 2, insert Dataframe into MongoDB | |
MongoSpark.save(fileDF.write.option("collection", "trends").mode("append")) | |
// inserts with no problem all the documents contained at the Dataframe with the correct structure | |
////////////////////////////////////////////////////////////////////////////////// | |
////////////////////////////////////////////////////////////////////////////////// | |
// Spark Structured Streaming (not supported) - read multiple files, streaming | |
// Step 1, create the Dataframe source | |
val filesDS = spark | |
.readStream // Streaming | |
.csv("trendFiles/*.csv") | |
.selectExpr("CAST(key as String)", // more code with other casting... | |
) | |
// Out [2]: filesDS: org.apache.spark.sql.package.DataFrame = [key: string, country: string ... 6 more fields] | |
// Step 2, insert Dataframe into MongoDB | |
// I cannot see MongoSpark.save method for Stuctured Streaming. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
No, sorry, I did not code structured streaming using Python.