Last active
August 29, 2015 14:07
-
-
Save AtlasPilotPuppy/21497db830b337e8602d to your computer and use it in GitHub Desktop.
Quick Intro to Spark SQL
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// data files can be downloaded at https://s3.amazonaws.com/hw-sandbox/tutorial1/infochimps_dataset_4778_download_16677-csv.zip | |
import java.io.Serializable | |
import java.util | |
import org.apache.spark.sql._ | |
val sc = new SparkContext("spark://master:7077", "Spark SQL Intro") | |
val sqlContext = new SQLContext(sc) | |
import sqlContext.createSchemaRDD | |
/* Spark SQL requires case classes or classes implementing the Product interface to be able to use them as table schema */ | |
case class DividendRecord(exchange: String, symbol: String, date: String, dividends: Double) | |
def parseDividend(row: Array[String]) = new DividendRecord(row(0), row(1), row(2), row(3).toDouble) | |
// Create RDD with file contents | |
val dividends = sc.textFile("hdfs://master:9000/user/hdfs/NYSE_dividends_A.csv") | |
// filter header from the dataset, then split the rows on ',' and create an rdd on class DividendRecord | |
val div_schema = dividends.filter(!_.startsWith("exchange")).map(_.split(",")).map(parseDividend(_)) | |
//Register the rdd as a table | |
div_schema.registerAsTable("div") | |
// Try a query | |
val result = sqlContext.sql("SELECT * FROM div").collect() | |
val result = sqlContext.sql("SELECT * FROM div where exchange='NYSE'").collect() | |
// Read second file | |
val daily_prices = sc.textFile("hdfs://master:9000/user/hdfs/NYSE_daily_prices_A.csv") | |
case class DailyPricesRecord(exchange: String, symbol: String, date: String, price_open: Double, price_high: Double, price_low: Double, price_close: Double, stock_volume: Double, price_adj_close: Double) | |
def parseDailyPrices(row: Array[String]) = new DailyPricesRecord(row(0), row(1), row(2), row(3).toDouble, row(4).toDouble, row(5).toDouble, row(6).toDouble, row(7).toDouble, row(8).toDouble) | |
val daily_prices_schema = daily_prices.filter(!_.startsWith("exchange")).map(_.split(",")).map(parseDailyPrices(_)) | |
daily_prices_schema.registerAsTable("daily_prices") | |
val daily_prices_nyse = sqlContext.sql("select * from daily_prices where exchange = 'NYSE'").collect() | |
val join = sqlContext.sql("select * from div join daily_prices on div.symbol=daily_prices.symbol LIMIT 10").collect() | |
val group_by = sqlContext.sql("select dividends, count(*) from div where symbol='AZZ' group by dividends").collect() | |
val group_by = sqlContext.sql("select exchange, count(*) from div where group by exchange").collect() | |
val group_by = sqlContext.sql("select symbol, count(*) from div group by symbol").collect() | |
val join = sqlContext.sql("select * from div join daily_prices on div.symbol=daily_prices.symbol and div.date=daily_prices.date LIMIT 50").collect() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment