Last active
November 8, 2023 15:53
-
-
Save helena/319490f92d775149b37d to your computer and use it in GitHub Desktop.
JSON Integration with Spark SQL and Cassandra
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.datastax.spark.connector.cql.CassandraConnector | |
import org.apache.spark.{SparkContext, SparkConf} | |
import org.apache.spark.sql.{Row, SQLContext} | |
/** Spark SQL: Txt, Parquet, JSON Support with the Spark Cassandra Connector */ | |
object SampleJson extends App { | |
import com.datastax.spark.connector._ | |
import GitHubEvents._ | |
val conf = new SparkConf(true) | |
.set("spark.cassandra.connection.host", "127.0.0.1") | |
.setMaster("local[*]") | |
.setAppName("app2") | |
CassandraConnector(conf).withSessionDo { session => | |
session.execute("DROP KEYSPACE IF EXISTS githubstats") | |
session.execute("CREATE KEYSPACE githubstats WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }") | |
session.execute("CREATE TABLE githubstats.monthly_commits (user VARCHAR PRIMARY KEY, commits INT, date INT)") | |
} | |
val sc = new SparkContext(conf) | |
val sqlContext = new SQLContext(sc) | |
val json = sc.parallelize(Seq("""{"user":"helena","commits":98, "month":12, "year":2014}""","""{"user":"pkolaczk", "commits":42, "month":12, "year":2014}""")) | |
sqlContext.jsonRDD(json).map(MonthlyCommits(_)).saveToCassandra("githubstats","monthly_commits") | |
sc.cassandraTable[MonthlyCommits]("githubstats","monthly_commits").collect foreach println | |
sc.stop() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
what is MonthlyCommits(_) here ?
I just wanted insert JSON data which come from Kafka queue.