Last active
June 1, 2018 13:31
-
-
Save erichgess/a02aefddd6231c91babb to your computer and use it in GitHub Desktop.
Planet Cassandra Spark Blog
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
create KEYSPACE spark_demo WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}; | |
create table spark_demo.raw_files (filename text,line int, contents text, PRIMARY KEY(filename,line)); | |
create table spark_demo.users (id int PRIMARY KEY ); | |
create table spark_demo.movies (id int PRIMARY KEY, name text, year int); | |
create table spark_demo.ratings (id int PRIMARY KEY, user_id int, movie_id int, rating float ); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
sc.stop | |
import com.datastax.spark.connector._ | |
import org.apache.spark.SparkContext, org.apache.spark.SparkContext._ | |
import org.apache.spark.SparkConf | |
val conf = new SparkConf(true).set("spark.cassandra.connection.host", “localhost") | |
val sc = new SparkContext(conf) | |
val test_spark_rdd = sc.cassandraTable(“spark_demo”, “raw_files”) | |
test_spark_rdd.first |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This script takes the files from the Movie Lens project (http://grouplens.org/datasets/movielens/) | |
# reads them in line by line and saves them to a table for raw data in Cassandra | |
from cassandra import cluster | |
my_cluster = cluster.Cluster(['localhost']) | |
session = my_cluster.connect('spark_demo') | |
raw_movies = open( './ml-1m/movies.dat', 'rb') | |
for idx,line in enumerate(raw_movies): | |
session.execute('INSERT INTO raw_files (line,filename,contents) VALUES (%s,%s,%s)', (idx, 'movies.dat', line)) | |
raw_ratings = open( './ml-1m/ratings.dat', 'rb') | |
for idx,line in enumerate(raw_ratings): | |
if (idx % 10000 ) == 0: | |
print "{}".format(idx) | |
session.execute_async('INSERT INTO raw_files (line,filename,contents) VALUES (%s,%s,%s)', (idx, 'ratings.dat',line)) | |
raw_users = open('./ml-1m/users.dat', 'rb') | |
for idx,line in enumerate(raw_users): | |
session.execute('INSERT INTO raw_files (line,filename,contents) VALUES (%s,%s,%s)', (idx, 'users.dat', line)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// users.dat structure is: UserID::Gender::Age::Occupation::Zip-code | |
// movies.dat structure is: MovieID::Title::Genres | |
// ratings.dat structure is: UserID::MovieID::Rating::Timestamp | |
case class RawFileData(Filename: String, Line: Int, Contents: String ) | |
case class User(Id: Int, Gender: String, Age: Int, Occupation: Int, ZipCode: String ) | |
case class Movie(Id: Int, Title: String, Genres: String) | |
case class Rating(UserId: Int, MovieId: Int, Rating: Float) | |
val raw_files = sc.cassandraTable[RawFileData]("spark_demo", "raw_files" ) | |
val users = raw_files.filter( raw => raw.Filename == "users.dat" ). | |
map(raw => raw.Contents.trim).map( raw => raw.split("::")). | |
map(raw=>User(raw(0).toInt, raw(1), raw(2).toInt, raw(3).toInt, raw(4))) | |
users.saveToCassandra("spark_demo", "users") | |
val movies = raw_files.filter( raw => raw.Filename == "movies.dat" ). | |
map(raw => raw.Contents.trim).map( raw => raw.split("::")). | |
map(raw=>Movie(raw(0).toInt,raw(1),raw(2))) | |
movies.saveToCassandra("spark_demo", "movies") | |
val ratings = raw_files.filter( raw => raw.Filename == "ratings.dat" ). | |
map(raw => raw.Contents.trim).map( raw => raw.split("::")). | |
map(raw=>Rating(raw(0).toInt,raw(1).toInt,raw(2).toFloat)) | |
ratings.saveToCassandra("spark_demo", "ratings") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment