Created
July 1, 2020 11:20
-
-
Save marnixkoops/0b4c1e94422053bc2d4041e043709439 to your computer and use it in GitHub Desktop.
Cox Proportional Hazard Regression model (Survival Analysis)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| # [+] PROJECT INFO | |
| # - Cox Proportional Hazard Regression model (Survival Analysis) | |
| # - Main purpose is to predict post-session conversion likelihood on customer level (cookie_id) | |
| # | |
| # Owner: Marnix Koops / marnixkoops@gmail.com | |
| """ | |
| # ================================================================================================== | |
| # [+] SETUP | |
| # ================================================================================================== | |
| import os | |
| import argparse | |
| import logging | |
| import datetime | |
| import pytz | |
| import time | |
| import re | |
| import tempfile | |
| import pickle | |
| import warnings | |
| from pathlib import Path | |
| import numpy as np | |
| import pandas as pd | |
| import lifelines as life | |
| from google.cloud import storage | |
| from google.cloud.storage.bucket import Bucket | |
| from google.cloud import bigquery, bigquery_storage | |
| warnings.simplefilter(action="ignore", category=UserWarning) # avoid Google Cloud credential warnings | |
| # ================================================================================================== | |
| # [+] RUN SETTINGS | |
| # ================================================================================================== | |
| # production settings | |
| PROJECT = "coolblue-bi-data-science-exp" | |
| BUCKET = "conversion-likelihood-ratio" | |
| DATASET = "conversion_likelihood" | |
| # cli argument parsing | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument( | |
| "--project", help="GCP project to use", | |
| ) | |
| parser.add_argument( | |
| "--dry_run", default=False, action="store_true", help="if True will run process on subset of data", | |
| ) | |
| parser.add_argument( | |
| "--train", | |
| default=False, | |
| action="store_true", | |
| help="if True will train model and predict. Else will try to download latest model from bucket and predict", | |
| ) | |
| parser.add_argument( | |
| "--evaluate", default=False, action="store_true", help="if True will compute prediction metrics", | |
| ) | |
| # parser.add_argument( | |
| # "--log", default=True, action="store_true", help="if True will log results to the bucket", | |
| # ) | |
| args = parser.parse_args() | |
| # run settings | |
| PROJECT = args.project | |
| DRY_RUN = args.dry_run # runs flow on small subset of data for speed | |
| TRAIN_MODEL = args.train # if False, will load most recent model object instead of training | |
| EVALUATE_MODEL = args.evaluate # if False, will create predictions without further evaluation | |
| # LOGGING = args.log # mlflow experiment logging | |
| # data settings | |
| # MAX_TRAINING_CASES = int(5e6) # max number of cases to train on | |
| # model settings | |
| PENALTY_STRENGTH = 0.001 # strength of penalizer for model fitting | |
| # input/output settings | |
| LOGGING = True # log info to console and local logfile | |
| SAVE_MODEL_TO_DISK = True # if true saves trained model to local disk | |
| SAVE_MODEL_TO_GCS = True # if true saves trained model to Google Cloud Storage bucket | |
| SAVE_LOGS_TO_GCS = True # if true uploads log file to Google Cloud Storage bucket | |
| SAVE_RESULTS_TO_BQ = True # if true appends results to Google Cloud BigQuery table | |
| # ================================================================================================== | |
| # [+] LOGGING | |
| # ================================================================================================== | |
| # timestamp of run | |
| DATETIME = datetime.datetime.now(pytz.timezone("Europe/Amsterdam")).replace(microsecond=0) | |
| DATETIME = re.sub(r"([\s+:])", r"-", "{}".format(DATETIME)) # replace spaces and : with - | |
| # initialize logger | |
| p = Path("./log") # create directory for logging | |
| p.mkdir(exist_ok=True) | |
| logging.basicConfig( # logging to terminal & disk file | |
| level=logging.INFO, | |
| format="%(asctime)s [%(threadName)s] [%(levelname)s] %(message)s", | |
| handlers=[ | |
| logging.FileHandler("{}/{}_{}.log".format("./log", "logfile", DATETIME)), | |
| logging.StreamHandler(), | |
| ], | |
| ) | |
| logger = logging.getLogger() | |
| logger.info(f"[+] Starting process in PROJECT {PROJECT}, BUCKET {BUCKET} WORK_DIR {os.getcwd()}") | |
| logger.info( | |
| f" Run settings: DRY_RUN {DRY_RUN}, TRAIN_MODEL {TRAIN_MODEL}, EVALUATE_MODEL {EVALUATE_MODEL}, LOGGING: {LOGGING}" | |
| ) | |
| logger.info(f" Model settings: PENALTY_STRENGTH {PENALTY_STRENGTH}") | |
| # ================================================================================================== | |
| # [+] INPUT DATA QUERIES | |
| # ================================================================================================== | |
| def query_customer_feature_input_data(project=PROJECT, dataset=DATASET): | |
| """ | |
| Queries customer features for model input | |
| """ | |
| logger.info(" Downloading customer features for model input from BigQuery") | |
| query = """ | |
| WITH | |
| avoid_duplicated_sessions AS ( | |
| SELECT DISTINCT * | |
| FROM | |
| `{}.{}.input_session_data` | |
| ), | |
| summarize_raw_session_data_step_1 AS ( | |
| SELECT | |
| coolblue_cookie_id, | |
| visit_date, | |
| visit_start_time, | |
| hit_number, | |
| transactions, | |
| pageviews, | |
| traffic_source, | |
| traffic_medium, | |
| mobile_device, | |
| country, | |
| region, | |
| LEAD(visit_date, 1) OVER (PARTITION BY coolblue_cookie_id ORDER BY visit_start_time ) AS visit_date_lead, | |
| LAG(visit_start_time, 1) OVER (PARTITION BY coolblue_cookie_id ORDER BY visit_start_time ) AS visit_start_time_lag, | |
| LEAD(visit_start_time, 1) OVER (PARTITION BY coolblue_cookie_id ORDER BY visit_start_time ) AS visit_start_time_lead, | |
| COUNT(coolblue_cookie_id) OVER (PARTITION BY coolblue_cookie_id ORDER BY visit_start_time ROWS UNBOUNDED PRECEDING) AS visits_sum, | |
| COUNT(coolblue_cookie_id) OVER (PARTITION BY coolblue_cookie_id ORDER BY visit_start_time RANGE BETWEEN 432000 PRECEDING AND CURRENT ROW) AS visits_sum_5, | |
| SUM(hit_number) OVER (PARTITION BY coolblue_cookie_id ORDER BY visit_start_time RANGE BETWEEN 432000 PRECEDING AND CURRENT ROW) AS hit_number_sum_5, | |
| SUM(transactions) OVER (PARTITION BY coolblue_cookie_id ORDER BY visit_start_time ROWS UNBOUNDED PRECEDING) AS transactions_sum, | |
| SUM(transactions) OVER (PARTITION BY coolblue_cookie_id ORDER BY visit_start_time RANGE BETWEEN 259200 PRECEDING AND CURRENT ROW) AS transactions_sum_3, | |
| LEAD(transactions, 1) OVER (PARTITION BY coolblue_cookie_id ORDER BY visit_start_time ) AS transactions_lead, | |
| session_quality, | |
| LAG(session_quality, 1) OVER (PARTITION BY coolblue_cookie_id ORDER BY visit_start_time) AS session_quality_lag, | |
| SUM(session_quality) OVER (PARTITION BY coolblue_cookie_id ORDER BY visit_start_time RANGE BETWEEN 432000 PRECEDING AND CURRENT ROW) AS session_quality_sum_5, | |
| FROM | |
| avoid_duplicated_sessions), | |
| summarize_raw_session_data_step_2 AS ( | |
| SELECT | |
| coolblue_cookie_id, | |
| visit_date, | |
| visit_start_time, | |
| hit_number, | |
| transactions, | |
| pageviews, | |
| traffic_source, | |
| traffic_medium, | |
| mobile_device, | |
| country, | |
| region, | |
| visit_date_lead, | |
| visit_start_time_lag, | |
| visit_start_time_lead, | |
| (visit_start_time - visit_start_time_lag) AS visit_time_delta, | |
| visits_sum, | |
| visits_sum_5, | |
| hit_number_sum_5, | |
| transactions_sum, | |
| transactions_sum_3, | |
| transactions_lead, | |
| session_quality_lag, | |
| session_quality_sum_5, | |
| FROM | |
| summarize_raw_session_data_step_1) | |
| SELECT | |
| coolblue_cookie_id, | |
| visit_date, | |
| visit_start_time, | |
| transactions, | |
| pageviews, | |
| traffic_source, | |
| traffic_medium, | |
| mobile_device, | |
| country, | |
| region, | |
| visit_date_lead, | |
| visit_start_time_lag, | |
| visit_start_time_lead, | |
| visit_time_delta, | |
| AVG(visit_time_delta) OVER (PARTITION BY coolblue_cookie_id ORDER BY visit_start_time RANGE BETWEEN 432000 PRECEDING AND CURRENT ROW) AS visit_time_delta_avg_5, | |
| visits_sum, | |
| visits_sum_5, | |
| hit_number_sum_5, | |
| transactions_sum, | |
| transactions_sum_3, | |
| transactions_lead, | |
| session_quality_lag, | |
| session_quality_sum_5, | |
| FROM | |
| summarize_raw_session_data_step_2 | |
| """.format( | |
| project, dataset | |
| ) | |
| print(query) | |
| client = bigquery.Client(project=PROJECT) | |
| bqstorage_client = bigquery_storage.BigQueryStorageClient() | |
| query_result = client.query(query).result() | |
| df = query_result.to_dataframe(bqstorage_client=bqstorage_client, progress_bar_type="tqdm") | |
| return df | |
| # ================================================================================================== | |
| # [+] FUNCTION DEFS | |
| # ================================================================================================== | |
| def downcast_datatypes(df: pd.DataFrame) -> pd.DataFrame: | |
| float_cols = df.select_dtypes(include=["float"]) | |
| int_cols = df.select_dtypes(include=["int"]) | |
| for cols in float_cols.columns: | |
| df[cols] = pd.to_numeric(df[cols], downcast="float") | |
| for cols in int_cols.columns: | |
| df[cols] = pd.to_numeric(df[cols], downcast="integer") | |
| return df | |
| def drop_duplicate_rows(df: pd.DataFrame) -> pd.DataFrame: | |
| """Drops exact duplicates (checks all included columns) | |
| Args: | |
| df (pd.DataFrame): Dataframe to check for duplicate rows. | |
| Returns: | |
| pd.DataFrame: Dataframe without repeating duplicates. | |
| """ | |
| df_len = len(df) | |
| df = df.drop_duplicates(keep="last") # data is sorted by date asc, keep newest observations | |
| df = df.reset_index(drop=True) | |
| logger.info(" Removed {} duplicate rows".format(df_len - len(df))) | |
| return df | |
| def upload_file_to_gcs(project, bucket, file_location, destination_file_location): | |
| if type(bucket) is not Bucket: | |
| client = storage.Client(project=project) | |
| bucket = client.get_bucket(bucket) | |
| blob = bucket.blob(destination_file_location) | |
| # blob.chunk_size = 1 << 29 # increase chunk size for faster uploading | |
| blob.upload_from_filename(file_location) | |
| return True | |
| def download_file_from_gcs(project, bucket_name, file_location, destination_file_location=None): | |
| gs_location = "gs://{}/{}".format(bucket_name, file_location) | |
| client = storage.Client(project=project) | |
| bucket = client.get_bucket(bucket_name) | |
| blob = bucket.get_blob(file_location) | |
| if not blob: | |
| raise FileNotFoundError("{} does not exist!".format(gs_location)) | |
| if not destination_file_location: | |
| destination_file_location = tempfile.NamedTemporaryFile(delete=False).name | |
| # blob.chunk_size = 1 << 29 # increase chunk size for faster downloading | |
| blob.download_to_filename(destination_file_location) | |
| return destination_file_location | |
| def save_trained_model_to_disk(hazard_model): | |
| with open("./model/hazard_model.pickle", "wb") as handle: | |
| pickle.dump(hazard_model, handle, protocol=pickle.HIGHEST_PROTOCOL) | |
| def upload_trained_model_to_gcs(project=PROJECT, bucket=BUCKET): | |
| save_trained_model_to_disk(hazard_model) # first save model to disk | |
| model_file_size = os.path.getsize("./model/hazard_model.pickle") | |
| logger.info( | |
| " Writing trained model to disk, filesize: {} MB".format(round(model_file_size / (1024.0 ** 2), 0)) | |
| ) | |
| logger.info(" Uploading trained model to GCS: {}/{}/model/".format(PROJECT, BUCKET)) | |
| upload_file_to_gcs(project, bucket, "./model/hazard_model.pickle", "model/hazard_model.pickle") | |
| def download_trained_model_from_gcs(project=PROJECT, bucket=BUCKET): | |
| Path("./model").mkdir(exist_ok=True) | |
| model_file_location = download_file_from_gcs( | |
| project, bucket, "model/hazard_model.pickle", "./model/hazard_model.pickle" | |
| ) | |
| with open(model_file_location, "rb") as model: | |
| hazard_model = pickle.load(model) | |
| return hazard_model | |
| def write_output_to_bigquery( | |
| df: pd.DataFrame, project: str = PROJECT, dataset: str = DATASET, table: str = "predictions" | |
| ): | |
| upload_location = "{}.{}.{}".format(project, dataset, table) | |
| logger.info("Appending data to BigQuery at {}".format(upload_location)) | |
| client = bigquery.Client(project) | |
| job_config = bigquery.LoadJobConfig( | |
| schema=[ | |
| bigquery.SchemaField("coolblue_cookie_id", bigquery.enums.SqlTypeNames.STRING, mode="REQUIRED"), | |
| bigquery.SchemaField("visit_date", bigquery.enums.SqlTypeNames.INTEGER, mode="REQUIRED"), | |
| bigquery.SchemaField("predicted_hazard", bigquery.enums.SqlTypeNames.FLOAT, mode="REQUIRED"), | |
| bigquery.SchemaField( | |
| "predicted_hazard_quantile", bigquery.enums.SqlTypeNames.FLOAT, mode="REQUIRED" | |
| ), | |
| bigquery.SchemaField( | |
| "prediction_timestamp", bigquery.enums.SqlTypeNames.TIMESTAMP, mode="REQUIRED" | |
| ), | |
| ], | |
| write_disposition="WRITE_APPEND", | |
| ) | |
| job = client.load_table_from_dataframe(df, upload_location, job_config=job_config) | |
| job.result() # wait for the job to complete | |
| table = client.get_table(upload_location) # Make an API request. | |
| logger.info("Added {} rows and {} columns to {}".format(len(df), len(table.schema), upload_location)) | |
| def upload_log_to_gcs(project=PROJECT, bucket=BUCKET): | |
| logger.info(" Uploading log to GCS: {}/{}/log/".format(PROJECT, BUCKET)) | |
| upload_file_to_gcs( | |
| project, bucket, "./log/logfile_{}.log".format(DATETIME), "log/logfile_{}.log".format(DATETIME), | |
| ) | |
| # ================================================================================================== | |
| # [+] DATA LOADING | |
| # ================================================================================================== | |
| logger.info("[+] Loading input data") | |
| t0_input = time.time() | |
| # columns_to_load = [ | |
| # "coolblue_cookie_id", | |
| # "visit_date", | |
| # "visit_start_time", | |
| # "transactions", | |
| # "pageviews", | |
| # "traffic_source", | |
| # "traffic_medium", | |
| # "mobile_device", | |
| # "country", | |
| # "region", | |
| # "visit_date_lead", | |
| # "visit_start_time_lag", | |
| # "visit_start_time_lead", | |
| # "visit_time_delta", | |
| # "visit_time_delta_avg_5", | |
| # "visits_sum", | |
| # "visits_sum_5", | |
| # "hit_number_sum_5", | |
| # "transactions_sum", | |
| # "transactions_sum_3", | |
| # "transactions_lead", | |
| # "session_quality_lag", | |
| # "session_quality_sum_5", | |
| # "add_to_cart", | |
| # ] | |
| # | |
| # column_dtypes = { | |
| # "coolblue_cookie_id": "object", | |
| # "visit_date": "int32", | |
| # "visit_start_time": "int32", | |
| # "transactions": "float32", | |
| # "pageviews": "float32", | |
| # "traffic_source": "object", | |
| # "traffic_medium": "object", | |
| # "mobile_device": "bool", | |
| # "country": "object", | |
| # "region": "object", | |
| # "visit_date_lead": "float32", | |
| # "visit_start_time_lag": "float32", | |
| # "visit_start_time_lead": "float32", | |
| # "visit_time_delta": "float32", | |
| # "visit_time_delta_avg_5": "float32", | |
| # "visits_sum": "int32", | |
| # "visits_sum_5": "int16", | |
| # "hit_number_sum_5": "int32", | |
| # "transactions_sum": "float32", | |
| # "transactions_sum_3": "float32", | |
| # "transactions_lead": "float32", | |
| # "session_quality_lag": "float32", | |
| # "session_quality_sum_5": "int16", | |
| # "add_to_cart": "float32", | |
| # } | |
| # | |
| # input_df = pd.read_csv("./data/survival_df_cart_5m_2.csv", usecols=columns_to_load, dtype=column_dtypes) | |
| input_df = query_customer_feature_input_data() | |
| time_input = (time.time() - t0_input) / 60 | |
| logger.info("[v] Elapsed time for loading input data: {:.3} minutes".format(time_input)) | |
| if DRY_RUN: | |
| logger.info(" Subsetting data to a small sample for DRY_RUN") | |
| input_df = input_df.sample(50000) | |
| # ================================================================================================== | |
| # [+] DATA PROCESSING AND FEATURE ENGINEERING | |
| # ================================================================================================== | |
| logger.info("[+] Processing data and feature engineering for model training") | |
| t0_prep = time.time() | |
| # drop duplicate rows and fix datatypes | |
| input_df = drop_duplicate_rows(input_df) | |
| input_df["visit_date"] = input_df["visit_date"].astype(np.int) # change date from str to int type | |
| input_df = downcast_datatypes(input_df) | |
| # add event feature target column (conversion after a session) | |
| logger.info(" Adding lifetime and target column to input data") | |
| input_df["target"] = 0 | |
| input_df.loc[input_df["transactions_lead"].notnull(), "target"] = 1 | |
| # if there is no consecutive visit_start_time, customer is still alive at current time | |
| current_timestamp = int(datetime.datetime.now(pytz.timezone("Europe/Amsterdam")).timestamp()) | |
| input_df.loc[input_df["visit_start_time_lead"].isnull(), "visit_start_time_lead"] = current_timestamp | |
| # add lifetime duration per customer (in hours) | |
| input_df["timedelta_hours"] = (input_df["visit_start_time_lead"] - input_df["visit_start_time"]) / 60 / 60 | |
| input_df.loc[input_df["timedelta_hours"] < 0, "timedelta_hours"] = 0 # drop unreliable negative timedeltas | |
| input_df = input_df[input_df["timedelta_hours"].notnull()].copy() # drop rows without timedelta | |
| # add flag for first session in time-window | |
| input_df["first_session_in_window"] = 1 | |
| input_df.loc[input_df["visit_start_time_lag"].notnull(), "first_session_in_window"] = 0 | |
| # floats to ints to reduce memory footprint, note that any NA's have to be filled | |
| logger.info(" Parsing floats to ints; reduce memory footprint and this precision is not needed") | |
| input_df.loc[:, input_df.dtypes == np.float] = ( | |
| input_df.loc[:, input_df.dtypes == np.float].round(0).fillna(0).astype(np.int) | |
| ) | |
| # fill NaNs | |
| input_df = input_df.fillna(0) | |
| # add averages and product features on customer level | |
| input_df["transactions_per_visit"] = input_df["transactions_sum"] / input_df["visits_sum"] | |
| input_df["hit_number_avg_5"] = input_df["hit_number_sum_5"] / input_df["visits_sum_5"] | |
| input_df["session_quality_avg_5"] = input_df["session_quality_sum_5"] / input_df["visits_sum_5"] | |
| # One-Hot-Encoding selected levels for categorical input features | |
| logger.info(" One-Hot-Encoding categorical feature columns") | |
| region_features = [ | |
| # "Brussels", # note: commented categories are not significantly different than baseline level | |
| "Flanders", | |
| "Gelderland", | |
| # "Groningen", | |
| # "Limburg", | |
| "North Brabant", | |
| "North Holland", | |
| # "Overijssel", | |
| "South Holland", | |
| "Utrecht", | |
| ] | |
| country_features = [ | |
| "(not set)", | |
| "Belgium", | |
| # "Finland", | |
| "France", | |
| "Germany", | |
| "Netherlands", | |
| # "Spain", | |
| # "Switzerland", | |
| # "United Kingdom", | |
| # "United States", | |
| ] | |
| traffic_source_features = [ | |
| "(direct)", | |
| # "Newsletter_Themed", | |
| "SmileMails_ReviewAcquisition", | |
| # "bing", | |
| "facebook", | |
| "google", | |
| "kieskeurig", | |
| # "performancehorizon", | |
| "tweakers.net", | |
| # "vanessa", | |
| ] | |
| traffic_medium_features = [ | |
| "(none)", | |
| "affiliate", | |
| "cpc", | |
| "cpm", | |
| "email", | |
| "ios-app", | |
| "mail", | |
| "organic", | |
| ] | |
| # add 'other' category for unincluded levels | |
| input_df.loc[~input_df["region"].isin(region_features), "region"] = "other" | |
| input_df.loc[~input_df["country"].isin(country_features), "country"] = "other" | |
| input_df.loc[~input_df["traffic_source"].isin(traffic_source_features), "traffic_source"] = "other" | |
| input_df.loc[~input_df["traffic_medium"].isin(traffic_medium_features), "traffic_medium"] = "other" | |
| # add selected One-Hot-Encoded categorical feature levels to input dataframe | |
| input_df = pd.concat([input_df, pd.get_dummies(input_df["region"], prefix="region")], axis=1) | |
| input_df = pd.concat([input_df, pd.get_dummies(input_df["country"], prefix="country")], axis=1) | |
| input_df = pd.concat([input_df, pd.get_dummies(input_df["traffic_source"], prefix="traffic_source")], axis=1) | |
| input_df = pd.concat([input_df, pd.get_dummies(input_df["traffic_medium"], prefix="traffic_medium")], axis=1) | |
| # drop all columns with NaNs | |
| nan_features = input_df.isnull().any() | |
| logger.info(" Dropping columns due to NaN values: {}".format(nan_features[nan_features == True])) | |
| non_nan_features = nan_features[nan_features == False].index | |
| input_df = input_df[non_nan_features] | |
| # downcast datatypes | |
| logger.info(" Downcastig datatypes to reduce memory footprint of dataframe") | |
| input_df = downcast_datatypes(input_df) | |
| t_prep = (time.time() - t0_prep) / 60 | |
| logger.info("[v] Elapsed time for preparing input data: {:.3} minutes".format(t_prep)) | |
| # ================================================================================================== | |
| # [+] MODEL TRAINING | |
| # ================================================================================================== | |
| # cross-validated features to include for prediction purposes | |
| features = [ | |
| # "visit_number", | |
| # "visit_date", | |
| # "visit_start_time", | |
| "pageviews", | |
| "mobile_device", | |
| # "hit_number", | |
| "transactions", | |
| # "transactions_revenue", | |
| # "time_on_site", | |
| # "visit_date_lead", # future info | |
| # "visit_start_time_lag", | |
| # "visit_start_time_lead", # future info | |
| "visit_time_delta", | |
| # "visit_time_delta_min", | |
| # "visit_time_delta_max", | |
| # "visit_time_delta_avg", | |
| # "visit_time_delta_min_5", | |
| # "visit_time_delta_max_5", | |
| "visit_time_delta_avg_5", | |
| # "add_to_cart", | |
| "first_session_in_window", | |
| "visits_sum", | |
| # "visits_sum_1", | |
| # "visits_sum_3", | |
| # "visits_sum_5", | |
| # "hit_number_sum", | |
| # "hit_number_lag", | |
| # "hit_number_sum_1", | |
| # "hit_number_sum_3", | |
| # "hit_number_sum_5", | |
| # "hit_number_min_5", | |
| # "hit_number_max_5", | |
| "hit_number_avg_5", | |
| # "time_on_site_sum", | |
| # "time_on_site_sum_1", | |
| # "time_on_site_sum_3", | |
| # "time_on_site_sum_5", | |
| # "time_on_site_min_5", | |
| # "time_on_site_max_5", | |
| "transactions_sum", | |
| # "transactions_sum_1", | |
| "transactions_sum_3", | |
| # "transactions_sum_5", | |
| # "transactions_min_5", | |
| # "transactions_max_5", | |
| # "transactions_lead", # future info | |
| # "transactions_revenue_lag", | |
| # "transactions_revenue_lead", # future info | |
| # "transactions_revenue_sum", | |
| # "transactions_revenue_sum_1", | |
| # "transactions_revenue_sum_3", | |
| # "transactions_revenue_sum_5", | |
| # "transactions_revenue_min_5", | |
| # "transactions_revenue_max_5", | |
| # "session_quality", | |
| "session_quality_lag", | |
| # "session_quality_avg", | |
| # "session_quality_sum_1", | |
| # "session_quality_sum_3", | |
| # "session_quality_sum_5", | |
| "session_quality_avg_5", | |
| # "region_Brussels", | |
| "region_Flanders", | |
| "region_Gelderland", | |
| # "region_Groningen", | |
| # "region_Limburg", | |
| "region_North Brabant", | |
| "region_North Holland", | |
| # "region_Overijssel", | |
| "region_South Holland", | |
| "region_Utrecht", | |
| # "region_other", # baseline categorical level | |
| "country_(not set)", | |
| "country_Belgium", | |
| # "country_Finland", | |
| "country_France", | |
| "country_Germany", | |
| "country_Netherlands", | |
| # "country_Spain", | |
| # "country_Switzerland", | |
| # "country_United Kingdom", | |
| # "country_United States", | |
| # "country_other", # baseline categorical level | |
| "traffic_source_(direct)", | |
| # "traffic_source_Newsletter_Themed", | |
| "traffic_source_SmileMails_ReviewAcquisition", | |
| # "traffic_source_bing", | |
| "traffic_source_facebook", | |
| "traffic_source_google", | |
| "traffic_source_kieskeurig", | |
| # "traffic_source_other", # baseline categorical level | |
| # "traffic_source_performancehorizon", | |
| "traffic_source_tweakers.net", | |
| # "traffic_source_vanessa", | |
| "traffic_medium_(none)", | |
| "traffic_medium_affiliate", | |
| "traffic_medium_cpc", | |
| "traffic_medium_cpm", | |
| "traffic_medium_email", | |
| "traffic_medium_ios-app", | |
| "traffic_medium_mail", | |
| "traffic_medium_organic", | |
| # "traffic_medium_other", # baseline categorical level | |
| # "traffic_medium_pricecomparison", | |
| # "traffic_medium_referral", | |
| "transactions_per_visit", | |
| # "transactions_revenue_per_visit", | |
| # "hit_number_per_visit", | |
| # "time_on_site_avg_5", | |
| # "hit_number_squared", | |
| # "transactions_per_visit_squared", | |
| # "hit_number_sum_3_squared", | |
| # "hit_number_x_time_on_site", | |
| # "hit_number_x_transactions_per_visit", | |
| # "time_delta_sec", | |
| # "time_delta_hours", | |
| # "traffic_source_tweakers.net_add_to_cart_prod", | |
| # "traffic_source_tweakers.net_transactions_prod", | |
| # "traffic_source_kieskeurig_transactions_prod", | |
| # "first_session_in_window_traffic_source_facebook_prod", | |
| # "add_to_cart_traffic_source_facebook_prod", | |
| # "add_to_cart_traffic_medium_cpm_prod", | |
| # "add_to_cart_transactions_per_visit_prod", | |
| "timedelta_hours", | |
| "target", | |
| ] | |
| if TRAIN_MODEL: | |
| t0_train = time.time() | |
| logger.info("[+] Defining Cox Proportional Hazard Model") | |
| hazard_model = life.CoxPHFitter(penalizer=PENALTY_STRENGTH) | |
| logger.info(f" Applying model: {hazard_model} with penalty: {PENALTY_STRENGTH}") | |
| logger.info(f" Input dataframe shape: {input_df[features].shape}") | |
| logger.info(f" Including {len(features)} features (including target columns): {features} \n") | |
| logger.info("[+] Training hazard model") | |
| hazard_model.fit( | |
| input_df[features], | |
| duration_col="timedelta_hours", | |
| event_col="target", | |
| step_size=0.1, | |
| # show_progress=True, | |
| ) | |
| t_train = (time.time() - t0_train) / 60 | |
| logger.info("[v] Finished hazard model training in: {:.3} minutes".format(t_train)) | |
| if SAVE_MODEL_TO_DISK and not DRY_RUN: | |
| save_trained_model_to_disk(hazard_model) | |
| if SAVE_MODEL_TO_GCS and not DRY_RUN: | |
| upload_trained_model_to_gcs(PROJECT, BUCKET) | |
| # ================================================================================================== | |
| # [+] MODEL PREDICTIONS | |
| # ================================================================================================== | |
| logger.info("[+] Predicting partial hazards for customers") | |
| t0_pred = time.time() | |
| # if no model was trained in this flow load most recent model object for predictions | |
| if not TRAIN_MODEL: | |
| logger.info(" Attempting to download model from GCS: {}/model/".format(BUCKET)) | |
| hazard_model = download_trained_model_from_gcs() | |
| # create subset of df for predictions | |
| logger.info(" Creatings subset of df for predictions") | |
| min_prediction_date = np.sort(input_df["visit_date"].unique())[-2] # latest day of complete GA session data | |
| pred_df = input_df[input_df["visit_date"] >= min_prediction_date] | |
| # create output result dataframe | |
| output_df = pred_df[["coolblue_cookie_id", "visit_date"]].copy() | |
| output_df["predicted_hazard"] = hazard_model.predict_partial_hazard(pred_df[features]) | |
| # add partial hazard quantile (1-10) | |
| output_df["predicted_hazard_quantile"] = 1 | |
| output_df["prediction_timestamp"] = datetime.datetime.now() | |
| quantiles = np.flip(np.arange(0, 1, 0.05)) # 1 to 0 in steps of 5% | |
| for quantile in quantiles: | |
| hazard_limit = np.quantile(output_df["predicted_hazard"], quantile) | |
| output_df.loc[output_df["predicted_hazard"] < hazard_limit, "predicted_hazard_quantile"] = quantile | |
| # round floats before uploading results | |
| output_df["predicted_hazard"] = np.round(output_df["predicted_hazard"], 4) | |
| output_df["predicted_hazard_quantile"] = np.round(output_df["predicted_hazard_quantile"], 1) | |
| # upload prediction output results to BigQuery table | |
| if SAVE_RESULTS_TO_BQ and not DRY_RUN: | |
| write_output_to_bigquery(df=output_df, project=PROJECT, dataset=DATASET) | |
| t_pred = time.time() - t0_pred | |
| logger.info("[v] Finished predicting in: {:.3} seconds".format(t_pred)) | |
| # ================================================================================================== | |
| # [+] MODEL EVALUATION | |
| # ================================================================================================== | |
| if EVALUATE_MODEL: | |
| t0_eval = time.time() | |
| logger.info("[+] Starting hazard model evaluation") | |
| eval_train_ratio = 0.7 | |
| eval_rows = int(0.7 * len(input_df)) | |
| eval_train_df = input_df[:eval_rows] | |
| eval_test_df = input_df[eval_rows:] | |
| logger.info(" Retraining model on subset to have hold-out data for evaluation") | |
| hazard_model.fit( | |
| eval_train_df[features], | |
| duration_col="timedelta_hours", | |
| event_col="target", | |
| step_size=0.1, | |
| # show_progress=True, | |
| ) | |
| hazard_model_score = np.round(hazard_model.score(eval_test_df[features]), 4) | |
| log_likelihood_ratio = np.round(hazard_model.log_likelihood_ratio_test().test_statistic, 4) | |
| concordance = np.round(hazard_model.concordance_index_, 4) | |
| pred_hazards = hazard_model.predict_partial_hazard(eval_test_df[features]) | |
| logger.info(f" [v] Partial Log-Likelihood: {hazard_model_score}") | |
| logger.info(f" [v] Log-Likelihood Ratio Test: {log_likelihood_ratio}") | |
| logger.info(f" [v] Model Concordance Index: {concordance} \n") | |
| # check predicted hazard ratios versus observed conversion ratio sliced on quartiles | |
| logger.info("[+] Observed conversion over predicted customer hazard ratios segments") | |
| # uppers | |
| upper_quantiles = [0.99, 0.95, 0.90, 0.75, 0.5] | |
| for quantile in upper_quantiles: | |
| segment = eval_test_df.loc[pred_hazards > np.quantile(pred_hazards, quantile)]["target"] | |
| segment_ratio = len(segment) / len(eval_test_df) | |
| avg_conversion = np.mean(segment) | |
| avg_conversion = np.round(avg_conversion, 4) * 100 | |
| logger.info(f" [v] Mean over upper {quantile}th segment: {avg_conversion:.2f}%") | |
| # downers | |
| lower_quantiles = [0.75, 0.50, 0.25, 0.10, 0.05, 0.01] | |
| for quantile in lower_quantiles: | |
| segment = eval_test_df.loc[pred_hazards < np.quantile(pred_hazards, quantile)]["target"] | |
| segment_ratio = len(segment) / len(eval_test_df) | |
| avg_conversion = np.mean(segment) | |
| avg_conversion = np.round(avg_conversion, 4) * 100 | |
| logger.info(f" [v] Mean over lower {quantile}th segment: {avg_conversion:.2f}%") | |
| # check predicted hazard ratios versus observed conversion timedeltas sliced on quartiles | |
| logger.info("\n[+] Observed conversion timedeltas over predicted customer hazard segments") | |
| for quantile in upper_quantiles: | |
| avg_delta = np.mean( | |
| eval_test_df.loc[ | |
| (pred_hazards > np.quantile(pred_hazards, quantile)) & (eval_test_df["target"] == 1) | |
| ]["timedelta_hours"] | |
| ) | |
| logger.info(f" [v] Average timedelta over upper {quantile}th segment: {avg_delta:.1f} hours") | |
| for quantile in lower_quantiles: | |
| avg_delta = np.mean( | |
| eval_test_df.loc[ | |
| (pred_hazards < np.quantile(pred_hazards, quantile)) & (eval_test_df["target"] == 1) | |
| ]["timedelta_hours"] | |
| ) | |
| logger.info(f" [v] Average timedelta over lower {quantile}th segment: {avg_delta:.1f} hours") | |
| logger.info("\n[+] Observed median conversion timedeltas over predicted customer hazard segments") | |
| for quantile in upper_quantiles: | |
| med_delta = np.median( | |
| eval_test_df.loc[ | |
| (pred_hazards > np.quantile(pred_hazards, quantile)) & (eval_test_df["target"] == 1) | |
| ]["timedelta_hours"] | |
| ) | |
| logger.info(f" [v] Median timedelta over upper {quantile}th segment: {med_delta:.1f} hours") | |
| for quantile in lower_quantiles: | |
| med_delta = np.median( | |
| eval_test_df.loc[ | |
| (pred_hazards < np.quantile(pred_hazards, quantile)) & (eval_test_df["target"] == 1) | |
| ]["timedelta_hours"] | |
| ) | |
| logger.info(f" [v] Median timedelta over lower {quantile}th segment: {med_delta:.1f} hours") | |
| # check predicted hazard ratios versus observed timedeltas sliced on quartiles | |
| logger.info("\n[+] Observed average non-conversion timedeltas over predicted customer hazard segments") | |
| for quantile in upper_quantiles: | |
| avg_delta = np.mean( | |
| eval_test_df.loc[ | |
| (pred_hazards > np.quantile(pred_hazards, quantile)) & (eval_test_df["target"] == 0) | |
| ]["timedelta_hours"] | |
| ) | |
| logger.info(f" [v] Average timedelta over upper {quantile}th segment: {avg_delta:.1f} hours") | |
| for quantile in lower_quantiles: | |
| avg_delta = np.mean( | |
| eval_test_df.loc[ | |
| (pred_hazards < np.quantile(pred_hazards, quantile)) & (eval_test_df["target"] == 0) | |
| ]["timedelta_hours"] | |
| ) | |
| logger.info(f" [v] Average timedelta over lower {quantile}th segment: {avg_delta:.1f} hours") | |
| logger.info("\n[+] Observed median non-conversion timedeltas over predicted customer hazard segments") | |
| for quantile in upper_quantiles: | |
| med_delta = np.median( | |
| eval_test_df.loc[ | |
| (pred_hazards > np.quantile(pred_hazards, quantile)) & (eval_test_df["target"] == 0) | |
| ]["timedelta_hours"] | |
| ) | |
| logger.info(f" [v] Median timedelta over upper {quantile}th segment: {med_delta:.1f} hours") | |
| for quantile in lower_quantiles: | |
| med_delta = np.median( | |
| eval_test_df.loc[ | |
| (pred_hazards < np.quantile(pred_hazards, quantile)) & (eval_test_df["target"] == 0) | |
| ]["timedelta_hours"] | |
| ) | |
| logger.info(f" [v] Median timedelta over lower {quantile}th segment: {med_delta:.1f} hours") | |
| t_eval = (time.time() - t0_eval) / 60 | |
| logger.info("[v] Finished hazard model evalution in: {:.3} minutes".format(t_eval)) | |
| # upload log to GCS bucket | |
| if SAVE_LOGS_TO_GCS: | |
| upload_log_to_gcs() | |
| t_finish = (time.time() - t0_input) / 60 | |
| logger.info("[v] Succesfully Finished process in: {:.3} minutes".format(t_finish)) | |
| # ================================================================================================== |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment