Created
June 3, 2018 07:16
-
-
Save omri374/90c51349294298bda1161eab8220495d to your computer and use it in GitHub Desktop.
An example of a data pipeline using Sparklyr, mostly based on this blog post: https://beta.rstudioconnect.com/content/1518/notebook-classification.html
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Databricks notebook source | |
# install sparklyr (we need this every time we start our cluster as it has to install packages on all workers) | |
install.packages("sparklyr") | |
#titanic data | |
install.packages('titanic') | |
library(titanic) | |
# Load sparklyr package. | |
library(sparklyr) | |
library(tidyr) | |
library(purrr) | |
library(dplyr) | |
library(ggplot2) | |
## This notebook is adapated from here: | |
#https://beta.rstudioconnect.com/content/1518/notebook-classification.html | |
# COMMAND ---------- | |
## Since we're in databricks, we choose method = 'databricks'. If we have a different spark cluster (in the cloud or on premises) we can use these commands: | |
#cluster_url <- paste0("spark://", system("hostname -i", intern = TRUE), ":7077") | |
#sc <- spark_connect(master = cluster_url) | |
sc <- spark_connect(method = "databricks") | |
# COMMAND ---------- | |
# we use the titanic dataset, but you can load any dataset you wish from a large variety of sources | |
titanic_tbl <- copy_to(sc, titanic::titanic_train,'titanic',overwrite = TRUE) | |
# COMMAND ---------- | |
display(titanic_tbl %>% collect()) | |
# COMMAND ---------- | |
# MAGIC %sql | |
# MAGIC SELECT * FROM titanic | |
# COMMAND ---------- | |
# we use ordinary dplyr syntax, but dplyr knows that titanic_tbl is a spark data frame so everything is computed in spark in parallel | |
titanic_groups <- titanic_tbl %>% | |
group_by(Pclass) %>% | |
summarise(count = n(), avg_survived = mean(Survived), avg_age = mean(Age)) %>% | |
filter(count > 20) %>% | |
arrange(desc(avg_age)) %>% | |
collect | |
# COMMAND ---------- | |
display(titanic_groups) | |
# COMMAND ---------- | |
display(titanic_groups) | |
# COMMAND ---------- | |
titanic2_tbl <- titanic_tbl %>% | |
mutate(Family_Size = SibSp + Parch + 1L) %>% | |
mutate(Pclass = as.character(Pclass)) %>% | |
filter(!is.na(Embarked) & !is.na(Sex) & !is.na(Cabin)) %>% | |
mutate(Age = if_else(is.na(Age), mean(Age), Age)) %>% | |
sdf_register("titanic2") | |
titanic_final_tbl <- titanic2_tbl %>% | |
mutate(Family_Size = as.numeric(Family_size)) %>% | |
sdf_mutate( | |
Family_Sizes = ft_bucketizer(Family_Size, splits = c(1,2,5,12)) | |
) %>% | |
mutate(Family_Sizes = as.character(as.integer(Family_Sizes))) %>% | |
sdf_register("titanic_final") | |
# COMMAND ---------- | |
display(titanic_final_tbl %>% collect()) | |
# COMMAND ---------- | |
partition <- titanic_final_tbl %>% | |
mutate(Survived = as.numeric(Survived), SibSp = as.numeric(SibSp), Parch = as.numeric(Parch)) %>% | |
select(Survived, Pclass, Sex, Age, SibSp, Parch, Fare, Embarked, Family_Sizes) %>% | |
sdf_partition(train = 0.75, test = 0.25, seed = 8585) | |
# Create table references | |
train_tbl <- partition$train | |
test_tbl <- partition$test | |
# COMMAND ---------- | |
display(train_tbl %>% collect()) | |
# COMMAND ---------- | |
# Model survival as a function of several predictors | |
ml_formula <- formula(Survived ~ Pclass + Sex + Age + SibSp + Parch + Fare + Family_Sizes) | |
# Train a logistic regression model | |
(ml_log <- ml_logistic_regression(train_tbl, ml_formula)) | |
## Decision Tree | |
(ml_dt <- ml_decision_tree(train_tbl, ml_formula)) | |
# COMMAND ---------- | |
# Bundle the modelss into a single list object | |
ml_models <- list( | |
"Logistic" = ml_log, | |
"Decision Tree" = ml_dt | |
) | |
# Create a function for scoring | |
score_test_data <- function(model, data=test_tbl){ | |
pred <- sdf_predict(model, data) | |
select(pred, Survived, prediction) | |
} | |
# Score all the models | |
ml_score <- lapply(ml_models, score_test_data) | |
# COMMAND ---------- | |
# Function for calculating accuracy | |
calc_accuracy <- function(data, cutpoint = 0.5){ | |
data %>% | |
mutate(prediction = if_else(prediction > cutpoint, 1.0, 0.0)) %>% | |
ml_classification_eval("prediction", "Survived", "accuracy") | |
} | |
# Calculate AUC and accuracy | |
perf_metrics <- data.frame( | |
model = names(ml_score), | |
AUC = 100 * sapply(ml_score, ml_binary_classification_eval, "Survived", "prediction"), | |
Accuracy = 100 * sapply(ml_score, calc_accuracy), | |
row.names = NULL, stringsAsFactors = FALSE) | |
# Plot results | |
gather(perf_metrics, metric, value, AUC, Accuracy) %>% | |
ggplot(aes(reorder(model, value), value, fill = metric)) + | |
geom_bar(stat = "identity", position = "dodge") + | |
coord_flip() + | |
xlab("") + | |
ylab("Percent") + | |
ggtitle("Performance Metrics") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment