This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
import nltk | |
nltk.download(['punkt', 'stopwords', 'wordnet']) | |
from nltk.stem import WordNetLemmatizer | |
from nltk.corpus import stopwords | |
from nltk.tokenize import word_tokenize | |
def tokenize(text): | |
""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Our Grid Search object allows us to define various hyperparameters to test our model | |
params = ParamGridBuilder()\ | |
.addGrid(classifier.maxDepth, [2, 5, 10])\ | |
.addGrid(classifier.featureSubsetStrategy, ['all', 'onethird', 'sqrt', 'log2'])\ | |
.build() | |
# Define the evaluator, this will measure teh success of model(s) | |
evaluator = binary_evaluator = BinaryClassificationEvaluator(labelCol='churned') | |
# CrossValidator will build pipeline, create models based on ParamGridBuilder, |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Keep a list for Pipeline stages | |
stages = [] | |
# Convert categorical variables to indexes | |
indexers = [StringIndexer(inputCol=column, outputCol=column+"_idx").fit(j) for column in ['level', 'gender']] | |
# Convert indexes to OnHotEncoded Sparse Vectors | |
onehotencoder = OneHotEncoderEstimator(inputCols=['gender_idx', 'level_idx'], | |
outputCols=['gender_dummy','level_dummy']) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
joined_vector = VectorAssembler(inputCols=['gender_dummy', 'level_dummy', 'logSessionCount', | |
'sqrtMeanSongCount', 'sqrtSessionsFreqDay'], | |
outputCol='nonScaledFeatures')\ | |
.transform(joined) | |
joined_vector = joined_vector.withColumn('label', joined_vector.churned.cast('integer')) | |
joined_vector.drop('userId','level','gender', 'sessionCount', 'meanSongCount', | |
'sessionsFreqDay', 'gender_idx', 'level_idx', 'churned').show(4) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
f, axes = plt.subplots(2, 3, figsize=(14, 7), sharex=False) | |
sns.distplot( joined_pandas["sessionCount"] , color="skyblue", ax=axes[0, 0]) | |
sns.distplot( joined_pandas["meanSongCount"] , color="olive", ax=axes[0, 1]) | |
sns.distplot( joined_pandas["sessionsFreqDay"] , color="gold", ax=axes[0, 2]) | |
# Skew handling | |
sns.distplot( np.log(joined_pandas["sessionCount"]) , color="skyblue", ax=axes[1, 0]) | |
sns.distplot( np.sqrt(joined_pandas["meanSongCount"]) , color="olive", ax=axes[1, 1]) | |
sns.distplot( np.sqrt(joined_pandas["sessionsFreqDay"]) , color="gold", ax=axes[1, 2]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
joined = StringIndexer(inputCol='gender', outputCol='gender_idx')\ | |
.fit(joined)\ | |
.transform(joined) | |
joined = StringIndexer(inputCol='level', outputCol='level_idx')\ | |
.fit(joined)\ | |
.transform(joined) | |
joined = OneHotEncoderEstimator(inputCols=['gender_idx', 'level_idx'], | |
outputCols=['gender_dummy','level_dummy'])\ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
joined = user_features\ | |
.join(churn_data_summary, | |
on=['userId'], | |
how='left')\ | |
.join(user_engagement, | |
on=['userId'], | |
how='left')\ | |
.join(listen_freq, | |
on=['userId'], | |
how='left')\ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Show that we can do the same calculation above using SQL | |
data.createOrReplaceTempView('sparkify') | |
sub_query = """ | |
SELECT | |
userId, | |
sessionId, | |
max(itemInSession) as itemCount | |
FROM | |
sparkify |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
user_engagement = data\ | |
.groupBy('userId', 'sessionId')\ | |
.agg(F.max('itemInSession').alias('itemCount'))\ | |
.groupBy('userId')\ | |
.agg({"itemCount": "mean", "sessionId": "count"})\ | |
.withColumnRenamed('count(sessionId)', 'sessionCount')\ | |
.withColumnRenamed('avg(itemCount)', 'meanSongCount')\ | |
.orderBy('userId') | |
user_engagement.show(10) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Create a new aggreated dataframe called listen_freq | |
# (stands for listening frequency) for each user | |
listen_freq = data.select('userId','sessionId', 'timeStamp')\ | |
.groupBy('userId','sessionId')\ | |
.agg(F.min('timeStamp').alias('sessionTime'))\ | |
.orderBy('userId', 'sessionId')\ | |
.groupBy('userId')\ | |
.agg(F.min('sessionTime').alias('minSessionTime'), | |
F.max('sessionTime').alias('maxSessionTime'), |