Skip to content

Instantly share code, notes, and snippets.

@raghothams
Last active September 19, 2017 03:01
Show Gist options
  • Save raghothams/6183df0e60ed62989be448795360d27b to your computer and use it in GitHub Desktop.
Save raghothams/6183df0e60ed62989be448795360d27b to your computer and use it in GitHub Desktop.
Spark 2.X MongoDB
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("demography mapper") \
.getOrCreate()
df_user = spark.read.format("com.mongodb.spark.sql.DefaultSource")\
.option("spark.mongodb.input.uri", "mongodb://localhost:27017/raw.user").load()
print(df_user.printSchema())
df_user = df_user.repartition(2)
print(df_user.count())
df_demography = spark.read.format("com.mongodb.spark.sql.DefaultSource")\
.option("spark.mongodb.input.uri", "mongodb://localhost:27017/raw.demography").load()
print(df_demography.printSchema())
df_demography = df_demography.repartition(2)
print(df_demography.count())
df_result = df_user.join(df_demography, on="user", how="left_outer").cache()
print(df_result.printSchema())
print(df_result.count())
df_result.write.format("com.mongodb.spark.sql.DefaultSource")\
.option("spark.mongodb.output.uri", "mongodb://localhost:27017/raw.user_demography")\
.save()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment