Created
June 12, 2020 06:35
-
-
Save j450h1/d249054ccb738283c4019b281335a442 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def aggregate_to_user_level(df): | |
""" | |
Aggregate the selected features to the user level | |
""" | |
exprs = [\ | |
sparkMax(col('churn')).alias('churn')\ | |
,sparkMax(col('Gender')).alias('gender')\ | |
,sparkMax(col('level')).alias('subscription_level')\ | |
,sparkMax(col('device_type')).alias('device_type')\ | |
,sparkMax(when(col("page") == 'Upgrade', 1).otherwise(0)).alias('page_upgraded') | |
,sparkMax(when(col("page") == 'Downgrade', 1).otherwise(0)).alias('page_downgraded') | |
,count(when(col("auth") == 'Logged In', True)).alias('auth_logged_in_cnt')\ | |
,count(when(col("auth") == 'Logged Out', True)).alias('auth_logged_out_cnt')\ | |
,count(when(col("auth") == 'Guest', True)).alias('auth_guest_cnt')\ | |
,count(when(col("status") == '404', True)).alias('status_404_cnt')\ | |
,count(when(col("status") == '307', True)).alias('status_307_cnt')\ | |
,count(when(col("page") == 'Next Song', True)).alias('page_next_song_cnt') | |
,count(when(col("page") == 'Thumbs Up', True)).alias('page_thumbs_up_cnt') | |
,count(when(col("page") == 'Thumbs Down', True)).alias('page_thumbs_down_cnt') | |
,count(when(col("page") == 'Add to Playlist', True)).alias('page_playlist_cnt') | |
,count(when(col("page") == 'Add Friend', True)).alias('page_friend_cnt') | |
,count(when(col("page") == 'Roll Advert', True)).alias('page_roll_ad_cnt') | |
,count(when(col("page") == 'Logout', True)).alias('page_logout_cnt') | |
,count(when(col("page") == 'Help', True)).alias('page_help_cnt') | |
,countDistinct('artist').alias('artist_cnt')\ | |
,countDistinct('song').alias('song_cnt')\ | |
,countDistinct('sessionId').alias('session_cnt')\ | |
] | |
# Additional feature engineering | |
df = df.withColumn("device_type",\ | |
expr("CASE WHEN rlike(userAgent, '(Windows|Macintosh|Linux)') THEN 'desktop' \ | |
WHEN rlike(userAgent, 'iP') THEN 'mobile' ELSE 'other' END AS device_type")) | |
user_df = df.groupBy('userId')\ | |
.agg(*exprs) | |
# Remove data with null values - needs to be added to pipeline | |
user_df = user_df.where(col("gender").isNotNull()) #remove when gender is not specified | |
return user_df |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment