Skip to content

Instantly share code, notes, and snippets.

@oneryalcin
oneryalcin / tokenize.py
Created October 25, 2019 10:08
Tokenizing channel
import re
import nltk
nltk.download(['punkt', 'stopwords', 'wordnet'])
from nltk.stem import WordNetLemmatizer
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
def tokenize(text):
"""
@oneryalcin
oneryalcin / sparkify_13_cross_validation.py
Last active September 24, 2019 00:08
13 Sparkify Cross Validation
# Our Grid Search object allows us to define various hyperparameters to test our model
params = ParamGridBuilder()\
.addGrid(classifier.maxDepth, [2, 5, 10])\
.addGrid(classifier.featureSubsetStrategy, ['all', 'onethird', 'sqrt', 'log2'])\
.build()
# Define the evaluator, this will measure teh success of model(s)
evaluator = binary_evaluator = BinaryClassificationEvaluator(labelCol='churned')
# CrossValidator will build pipeline, create models based on ParamGridBuilder,
@oneryalcin
oneryalcin / sparkify_12_pipeline_stages.py
Last active September 24, 2019 00:09
12 Pipeline stages
# Keep a list for Pipeline stages
stages = []
# Convert categorical variables to indexes
indexers = [StringIndexer(inputCol=column, outputCol=column+"_idx").fit(j) for column in ['level', 'gender']]
# Convert indexes to OnHotEncoded Sparse Vectors
onehotencoder = OneHotEncoderEstimator(inputCols=['gender_idx', 'level_idx'],
outputCols=['gender_dummy','level_dummy'])
@oneryalcin
oneryalcin / sparkify_11_vector_assembler.py
Created September 23, 2019 23:15
11 Sparkify Vector Assembler
joined_vector = VectorAssembler(inputCols=['gender_dummy', 'level_dummy', 'logSessionCount',
'sqrtMeanSongCount', 'sqrtSessionsFreqDay'],
outputCol='nonScaledFeatures')\
.transform(joined)
joined_vector = joined_vector.withColumn('label', joined_vector.churned.cast('integer'))
joined_vector.drop('userId','level','gender', 'sessionCount', 'meanSongCount',
'sessionsFreqDay', 'gender_idx', 'level_idx', 'churned').show(4)
@oneryalcin
oneryalcin / sparkify_10_skew_fix.py
Created September 23, 2019 23:10
10 Sparkify Skew fix
f, axes = plt.subplots(2, 3, figsize=(14, 7), sharex=False)
sns.distplot( joined_pandas["sessionCount"] , color="skyblue", ax=axes[0, 0])
sns.distplot( joined_pandas["meanSongCount"] , color="olive", ax=axes[0, 1])
sns.distplot( joined_pandas["sessionsFreqDay"] , color="gold", ax=axes[0, 2])
# Skew handling
sns.distplot( np.log(joined_pandas["sessionCount"]) , color="skyblue", ax=axes[1, 0])
sns.distplot( np.sqrt(joined_pandas["meanSongCount"]) , color="olive", ax=axes[1, 1])
sns.distplot( np.sqrt(joined_pandas["sessionsFreqDay"]) , color="gold", ax=axes[1, 2])
@oneryalcin
oneryalcin / sparkify_9_inexer_onehotencoder.py
Created September 23, 2019 23:04
9 StringIndexer and OneHotEncoder
joined = StringIndexer(inputCol='gender', outputCol='gender_idx')\
.fit(joined)\
.transform(joined)
joined = StringIndexer(inputCol='level', outputCol='level_idx')\
.fit(joined)\
.transform(joined)
joined = OneHotEncoderEstimator(inputCols=['gender_idx', 'level_idx'],
outputCols=['gender_dummy','level_dummy'])\
@oneryalcin
oneryalcin / sparkify_8_joined_features.py
Created September 23, 2019 22:48
8 Sparkify Joined Features
joined = user_features\
.join(churn_data_summary,
on=['userId'],
how='left')\
.join(user_engagement,
on=['userId'],
how='left')\
.join(listen_freq,
on=['userId'],
how='left')\
@oneryalcin
oneryalcin / sparkify_7_sql.py
Created September 23, 2019 22:19
7 Sparkify SQL
# Show that we can do the same calculation above using SQL
data.createOrReplaceTempView('sparkify')
sub_query = """
SELECT
userId,
sessionId,
max(itemInSession) as itemCount
FROM
sparkify
@oneryalcin
oneryalcin / sparkify_6_avg_song_count_per_session.py
Created September 23, 2019 22:14
6 Sparkify Session Count and Avg Song Count/Session
user_engagement = data\
.groupBy('userId', 'sessionId')\
.agg(F.max('itemInSession').alias('itemCount'))\
.groupBy('userId')\
.agg({"itemCount": "mean", "sessionId": "count"})\
.withColumnRenamed('count(sessionId)', 'sessionCount')\
.withColumnRenamed('avg(itemCount)', 'meanSongCount')\
.orderBy('userId')
user_engagement.show(10)
@oneryalcin
oneryalcin / sparkify_5_listen_freq.py
Created September 23, 2019 22:05
5 Sparkify listen_freq
# Create a new aggreated dataframe called listen_freq
# (stands for listening frequency) for each user
listen_freq = data.select('userId','sessionId', 'timeStamp')\
.groupBy('userId','sessionId')\
.agg(F.min('timeStamp').alias('sessionTime'))\
.orderBy('userId', 'sessionId')\
.groupBy('userId')\
.agg(F.min('sessionTime').alias('minSessionTime'),
F.max('sessionTime').alias('maxSessionTime'),