Skip to content

Instantly share code, notes, and snippets.

@omri374
Created June 3, 2018 07:16
Show Gist options
  • Save omri374/90c51349294298bda1161eab8220495d to your computer and use it in GitHub Desktop.
Save omri374/90c51349294298bda1161eab8220495d to your computer and use it in GitHub Desktop.
An example of a data pipeline using Sparklyr, mostly based on this blog post: https://beta.rstudioconnect.com/content/1518/notebook-classification.html
# Databricks notebook source
# install sparklyr (we need this every time we start our cluster as it has to install packages on all workers)
install.packages("sparklyr")
#titanic data
install.packages('titanic')
library(titanic)
# Load sparklyr package.
library(sparklyr)
library(tidyr)
library(purrr)
library(dplyr)
library(ggplot2)
## This notebook is adapated from here:
#https://beta.rstudioconnect.com/content/1518/notebook-classification.html
# COMMAND ----------
## Since we're in databricks, we choose method = 'databricks'. If we have a different spark cluster (in the cloud or on premises) we can use these commands:
#cluster_url <- paste0("spark://", system("hostname -i", intern = TRUE), ":7077")
#sc <- spark_connect(master = cluster_url)
sc <- spark_connect(method = "databricks")
# COMMAND ----------
# we use the titanic dataset, but you can load any dataset you wish from a large variety of sources
titanic_tbl <- copy_to(sc, titanic::titanic_train,'titanic',overwrite = TRUE)
# COMMAND ----------
display(titanic_tbl %>% collect())
# COMMAND ----------
# MAGIC %sql
# MAGIC SELECT * FROM titanic
# COMMAND ----------
# we use ordinary dplyr syntax, but dplyr knows that titanic_tbl is a spark data frame so everything is computed in spark in parallel
titanic_groups <- titanic_tbl %>%
group_by(Pclass) %>%
summarise(count = n(), avg_survived = mean(Survived), avg_age = mean(Age)) %>%
filter(count > 20) %>%
arrange(desc(avg_age)) %>%
collect
# COMMAND ----------
display(titanic_groups)
# COMMAND ----------
display(titanic_groups)
# COMMAND ----------
titanic2_tbl <- titanic_tbl %>%
mutate(Family_Size = SibSp + Parch + 1L) %>%
mutate(Pclass = as.character(Pclass)) %>%
filter(!is.na(Embarked) & !is.na(Sex) & !is.na(Cabin)) %>%
mutate(Age = if_else(is.na(Age), mean(Age), Age)) %>%
sdf_register("titanic2")
titanic_final_tbl <- titanic2_tbl %>%
mutate(Family_Size = as.numeric(Family_size)) %>%
sdf_mutate(
Family_Sizes = ft_bucketizer(Family_Size, splits = c(1,2,5,12))
) %>%
mutate(Family_Sizes = as.character(as.integer(Family_Sizes))) %>%
sdf_register("titanic_final")
# COMMAND ----------
display(titanic_final_tbl %>% collect())
# COMMAND ----------
partition <- titanic_final_tbl %>%
mutate(Survived = as.numeric(Survived), SibSp = as.numeric(SibSp), Parch = as.numeric(Parch)) %>%
select(Survived, Pclass, Sex, Age, SibSp, Parch, Fare, Embarked, Family_Sizes) %>%
sdf_partition(train = 0.75, test = 0.25, seed = 8585)
# Create table references
train_tbl <- partition$train
test_tbl <- partition$test
# COMMAND ----------
display(train_tbl %>% collect())
# COMMAND ----------
# Model survival as a function of several predictors
ml_formula <- formula(Survived ~ Pclass + Sex + Age + SibSp + Parch + Fare + Family_Sizes)
# Train a logistic regression model
(ml_log <- ml_logistic_regression(train_tbl, ml_formula))
## Decision Tree
(ml_dt <- ml_decision_tree(train_tbl, ml_formula))
# COMMAND ----------
# Bundle the modelss into a single list object
ml_models <- list(
"Logistic" = ml_log,
"Decision Tree" = ml_dt
)
# Create a function for scoring
score_test_data <- function(model, data=test_tbl){
pred <- sdf_predict(model, data)
select(pred, Survived, prediction)
}
# Score all the models
ml_score <- lapply(ml_models, score_test_data)
# COMMAND ----------
# Function for calculating accuracy
calc_accuracy <- function(data, cutpoint = 0.5){
data %>%
mutate(prediction = if_else(prediction > cutpoint, 1.0, 0.0)) %>%
ml_classification_eval("prediction", "Survived", "accuracy")
}
# Calculate AUC and accuracy
perf_metrics <- data.frame(
model = names(ml_score),
AUC = 100 * sapply(ml_score, ml_binary_classification_eval, "Survived", "prediction"),
Accuracy = 100 * sapply(ml_score, calc_accuracy),
row.names = NULL, stringsAsFactors = FALSE)
# Plot results
gather(perf_metrics, metric, value, AUC, Accuracy) %>%
ggplot(aes(reorder(model, value), value, fill = metric)) +
geom_bar(stat = "identity", position = "dodge") +
coord_flip() +
xlab("") +
ylab("Percent") +
ggtitle("Performance Metrics")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment