This gist consists of Spark presentation examples.
Last active
May 13, 2016 03:38
-
-
Save pavlov99/25a5cab2ab0199a27300b070cb1e02c7 to your computer and use it in GitHub Desktop.
Apache Spark in data science presentation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Initial data | |
val colors = sc.parallelize(Array( | |
("FFFFFF"), | |
("000000"), | |
("123456") | |
)).toDF("color") | |
// Function in Scala | |
def hex2rgb(s: String): (Int, Int, Int) = { | |
val hex = Integer.parseInt(s, 16) | |
val r = (hex & 0xFF0000) >> 16 | |
val g = (hex & 0xFF00) >> 8 | |
val b = (hex & 0xFF) | |
return (r, g, b) | |
} | |
// Register UDF function | |
val hex2rgbUDF = sqlContext.udf | |
.register("hex2rgb", (s: String) => hex2rgb(s)) | |
colors | |
.withColumn("rgb", hex2rgbUDF($"color")) | |
.show() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.sql.expressions.Window | |
val win1 = Window.partitionBy("name").orderBy("endDate") | |
val win2 = Window.partitionBy("name").orderBy("endDate").rowsBetween(Long.MinValue, 0) | |
products | |
.withColumn("monthsFromLastUpdate", months_between($"endDate", lag("endDate", 1).over(win1))) | |
.withColumn("origPriceUplift", $"price" - first($"price").over(win2)) | |
.show() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val products = sc.parallelize(Array( | |
("steak", "1990-01-01", "2000-01-01", 150), | |
("steak", "2000-01-02", "2010-01-01", 180), | |
("steak", "2010-01-02", "2020-01-01", 200), | |
("fish", "1990-01-01", "2020-01-01", 100) | |
)).toDF("name", "startDate", "endDate", "price") | |
val orders = sc.parallelize(Array( | |
("1995-01-01", "steak"), | |
("2000-01-01", "fish"), | |
("2005-01-01", "steak"), | |
("2010-01-01", "fish"), | |
("2015-01-01", "steak") | |
)).toDF("date", "product") | |
orders | |
.join(products, $"product" === $"name" && $"date" >= $"startDate" && $"date" <= $"endDate") | |
.show() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val data = sc.parallelize(Array( | |
("M", "EN", 1.0), | |
("M", "ES", 0.0), | |
("F", "EN", 1.0), | |
("F", "ZH", 0.1) | |
)).toDF("gender", "language", "label") | |
import org.apache.spark.ml.Pipeline | |
import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, VectorAssembler} | |
// Define indexers and encoders | |
val fieldsToIndex = Array("gender", "language") | |
val indexers = fieldsToIndex.map(f => new StringIndexer() | |
.setInputCol(f).setOutputCol(f + "_index")) | |
val fieldsToEncode = Array("gender", "language") | |
val oneHotEncoders = fieldsToEncode.map(f => new OneHotEncoder() | |
.setInputCol(f + "_index").setOutputCol(f + "_flags")) | |
val featureAssembler = new VectorAssembler() | |
.setInputCols(Array("gender_flags", "language_flags")) | |
.setOutputCol("features") | |
// Combine stages into pipeline | |
val pipeline = new Pipeline().setStages(indexers ++ oneHotEncoders :+ featureAssembler) | |
pipeline | |
.fit(data) | |
.transform(data) | |
.drop("gender_flags") | |
.drop("language_flags") | |
.show() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sqlContext.implicits._ | |
// https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data | |
// Data From UCI repository. Please refer to link below. | |
val url = "https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data" | |
val data = sc.parallelize( | |
scala.io.Source | |
.fromURL(url) | |
.mkString | |
.split("\n") | |
.map(_.split(",")) | |
.map({case Array(f1,f2,f3,f4,label) => (f1.toDouble, f2.toDouble, f3.toDouble, f4.toDouble, label)}) | |
) | |
.toDF("sepal_length", "sepal_width", "petal_length", "petal_width", "label") | |
.filter($"label".isin("Iris-versicolor", "Iris-virginica")) | |
.withColumn("label", when($"label" === "Iris-versicolor", 0.0).otherwise(1.0)) | |
import org.apache.spark.ml.classification.LogisticRegression | |
import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator} | |
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator | |
val Array(training, test) = data.randomSplit(Array(0.9, 0.1), seed = 12345) | |
val featureAssembler = new VectorAssembler() | |
.setInputCols(Array("sepal_length", "sepal_width", "petal_length", "petal_width")) | |
.setOutputCol("features") | |
val lr = new LogisticRegression() | |
.setMaxIter(10) | |
val fullPipeline = new Pipeline().setStages(Array(featureAssembler, lr)) | |
val paramGrid = new ParamGridBuilder() | |
.addGrid(lr.regParam, Array(0.1, 0.01)) | |
.build() | |
val cv = new CrossValidator() | |
.setEstimator(fullPipeline) | |
.setEvaluator(new BinaryClassificationEvaluator) | |
.setEstimatorParamMaps(paramGrid) | |
.setNumFolds(5) | |
val cvModel = cv.fit(training) | |
cvModel.avgMetrics | |
cvModel.transform(test).show() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment