Skip to content

Instantly share code, notes, and snippets.

@TomLous
Created April 14, 2018 11:50
Show Gist options
  • Save TomLous/92e7f3faa9a09c0b8a713159b49785b9 to your computer and use it in GitHub Desktop.
Save TomLous/92e7f3faa9a09c0b8a713159b49785b9 to your computer and use it in GitHub Desktop.
import com.mongodb.spark.MongoSpark
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
val spark = SparkSession.builder()
.master("local[2]")
.appName("test")
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/dbname")
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/dbname")
.getOrCreate()
import spark.implicits._
val df = List(
("abc",1),
("def",4),
("xyz",23)
).toDF("Word", "Count")
val aggregated = df.agg(
collect_list(map(lit("word"), 'Word, lit("count"), 'Count)).as("words")
)
MongoSpark.save(aggregated.write.option("collection", "wordcountagg").mode("overwrite"))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment