Skip to content

Instantly share code, notes, and snippets.

@marnixkoops
Created July 1, 2020 11:20
Show Gist options
  • Select an option

  • Save marnixkoops/0b4c1e94422053bc2d4041e043709439 to your computer and use it in GitHub Desktop.

Select an option

Save marnixkoops/0b4c1e94422053bc2d4041e043709439 to your computer and use it in GitHub Desktop.
Cox Proportional Hazard Regression model (Survival Analysis)
"""
# [+] PROJECT INFO
# - Cox Proportional Hazard Regression model (Survival Analysis)
# - Main purpose is to predict post-session conversion likelihood on customer level (cookie_id)
#
# Owner: Marnix Koops / marnixkoops@gmail.com
"""
# ==================================================================================================
# [+] SETUP
# ==================================================================================================
import os
import argparse
import logging
import datetime
import pytz
import time
import re
import tempfile
import pickle
import warnings
from pathlib import Path
import numpy as np
import pandas as pd
import lifelines as life
from google.cloud import storage
from google.cloud.storage.bucket import Bucket
from google.cloud import bigquery, bigquery_storage
warnings.simplefilter(action="ignore", category=UserWarning) # avoid Google Cloud credential warnings
# ==================================================================================================
# [+] RUN SETTINGS
# ==================================================================================================
# production settings
PROJECT = "coolblue-bi-data-science-exp"
BUCKET = "conversion-likelihood-ratio"
DATASET = "conversion_likelihood"
# cli argument parsing
parser = argparse.ArgumentParser()
parser.add_argument(
"--project", help="GCP project to use",
)
parser.add_argument(
"--dry_run", default=False, action="store_true", help="if True will run process on subset of data",
)
parser.add_argument(
"--train",
default=False,
action="store_true",
help="if True will train model and predict. Else will try to download latest model from bucket and predict",
)
parser.add_argument(
"--evaluate", default=False, action="store_true", help="if True will compute prediction metrics",
)
# parser.add_argument(
# "--log", default=True, action="store_true", help="if True will log results to the bucket",
# )
args = parser.parse_args()
# run settings
PROJECT = args.project
DRY_RUN = args.dry_run # runs flow on small subset of data for speed
TRAIN_MODEL = args.train # if False, will load most recent model object instead of training
EVALUATE_MODEL = args.evaluate # if False, will create predictions without further evaluation
# LOGGING = args.log # mlflow experiment logging
# data settings
# MAX_TRAINING_CASES = int(5e6) # max number of cases to train on
# model settings
PENALTY_STRENGTH = 0.001 # strength of penalizer for model fitting
# input/output settings
LOGGING = True # log info to console and local logfile
SAVE_MODEL_TO_DISK = True # if true saves trained model to local disk
SAVE_MODEL_TO_GCS = True # if true saves trained model to Google Cloud Storage bucket
SAVE_LOGS_TO_GCS = True # if true uploads log file to Google Cloud Storage bucket
SAVE_RESULTS_TO_BQ = True # if true appends results to Google Cloud BigQuery table
# ==================================================================================================
# [+] LOGGING
# ==================================================================================================
# timestamp of run
DATETIME = datetime.datetime.now(pytz.timezone("Europe/Amsterdam")).replace(microsecond=0)
DATETIME = re.sub(r"([\s+:])", r"-", "{}".format(DATETIME)) # replace spaces and : with -
# initialize logger
p = Path("./log") # create directory for logging
p.mkdir(exist_ok=True)
logging.basicConfig( # logging to terminal & disk file
level=logging.INFO,
format="%(asctime)s [%(threadName)s] [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler("{}/{}_{}.log".format("./log", "logfile", DATETIME)),
logging.StreamHandler(),
],
)
logger = logging.getLogger()
logger.info(f"[+] Starting process in PROJECT {PROJECT}, BUCKET {BUCKET} WORK_DIR {os.getcwd()}")
logger.info(
f" Run settings: DRY_RUN {DRY_RUN}, TRAIN_MODEL {TRAIN_MODEL}, EVALUATE_MODEL {EVALUATE_MODEL}, LOGGING: {LOGGING}"
)
logger.info(f" Model settings: PENALTY_STRENGTH {PENALTY_STRENGTH}")
# ==================================================================================================
# [+] INPUT DATA QUERIES
# ==================================================================================================
def query_customer_feature_input_data(project=PROJECT, dataset=DATASET):
"""
Queries customer features for model input
"""
logger.info(" Downloading customer features for model input from BigQuery")
query = """
WITH
avoid_duplicated_sessions AS (
SELECT DISTINCT *
FROM
`{}.{}.input_session_data`
),
summarize_raw_session_data_step_1 AS (
SELECT
coolblue_cookie_id,
visit_date,
visit_start_time,
hit_number,
transactions,
pageviews,
traffic_source,
traffic_medium,
mobile_device,
country,
region,
LEAD(visit_date, 1) OVER (PARTITION BY coolblue_cookie_id ORDER BY visit_start_time ) AS visit_date_lead,
LAG(visit_start_time, 1) OVER (PARTITION BY coolblue_cookie_id ORDER BY visit_start_time ) AS visit_start_time_lag,
LEAD(visit_start_time, 1) OVER (PARTITION BY coolblue_cookie_id ORDER BY visit_start_time ) AS visit_start_time_lead,
COUNT(coolblue_cookie_id) OVER (PARTITION BY coolblue_cookie_id ORDER BY visit_start_time ROWS UNBOUNDED PRECEDING) AS visits_sum,
COUNT(coolblue_cookie_id) OVER (PARTITION BY coolblue_cookie_id ORDER BY visit_start_time RANGE BETWEEN 432000 PRECEDING AND CURRENT ROW) AS visits_sum_5,
SUM(hit_number) OVER (PARTITION BY coolblue_cookie_id ORDER BY visit_start_time RANGE BETWEEN 432000 PRECEDING AND CURRENT ROW) AS hit_number_sum_5,
SUM(transactions) OVER (PARTITION BY coolblue_cookie_id ORDER BY visit_start_time ROWS UNBOUNDED PRECEDING) AS transactions_sum,
SUM(transactions) OVER (PARTITION BY coolblue_cookie_id ORDER BY visit_start_time RANGE BETWEEN 259200 PRECEDING AND CURRENT ROW) AS transactions_sum_3,
LEAD(transactions, 1) OVER (PARTITION BY coolblue_cookie_id ORDER BY visit_start_time ) AS transactions_lead,
session_quality,
LAG(session_quality, 1) OVER (PARTITION BY coolblue_cookie_id ORDER BY visit_start_time) AS session_quality_lag,
SUM(session_quality) OVER (PARTITION BY coolblue_cookie_id ORDER BY visit_start_time RANGE BETWEEN 432000 PRECEDING AND CURRENT ROW) AS session_quality_sum_5,
FROM
avoid_duplicated_sessions),
summarize_raw_session_data_step_2 AS (
SELECT
coolblue_cookie_id,
visit_date,
visit_start_time,
hit_number,
transactions,
pageviews,
traffic_source,
traffic_medium,
mobile_device,
country,
region,
visit_date_lead,
visit_start_time_lag,
visit_start_time_lead,
(visit_start_time - visit_start_time_lag) AS visit_time_delta,
visits_sum,
visits_sum_5,
hit_number_sum_5,
transactions_sum,
transactions_sum_3,
transactions_lead,
session_quality_lag,
session_quality_sum_5,
FROM
summarize_raw_session_data_step_1)
SELECT
coolblue_cookie_id,
visit_date,
visit_start_time,
transactions,
pageviews,
traffic_source,
traffic_medium,
mobile_device,
country,
region,
visit_date_lead,
visit_start_time_lag,
visit_start_time_lead,
visit_time_delta,
AVG(visit_time_delta) OVER (PARTITION BY coolblue_cookie_id ORDER BY visit_start_time RANGE BETWEEN 432000 PRECEDING AND CURRENT ROW) AS visit_time_delta_avg_5,
visits_sum,
visits_sum_5,
hit_number_sum_5,
transactions_sum,
transactions_sum_3,
transactions_lead,
session_quality_lag,
session_quality_sum_5,
FROM
summarize_raw_session_data_step_2
""".format(
project, dataset
)
print(query)
client = bigquery.Client(project=PROJECT)
bqstorage_client = bigquery_storage.BigQueryStorageClient()
query_result = client.query(query).result()
df = query_result.to_dataframe(bqstorage_client=bqstorage_client, progress_bar_type="tqdm")
return df
# ==================================================================================================
# [+] FUNCTION DEFS
# ==================================================================================================
def downcast_datatypes(df: pd.DataFrame) -> pd.DataFrame:
float_cols = df.select_dtypes(include=["float"])
int_cols = df.select_dtypes(include=["int"])
for cols in float_cols.columns:
df[cols] = pd.to_numeric(df[cols], downcast="float")
for cols in int_cols.columns:
df[cols] = pd.to_numeric(df[cols], downcast="integer")
return df
def drop_duplicate_rows(df: pd.DataFrame) -> pd.DataFrame:
"""Drops exact duplicates (checks all included columns)
Args:
df (pd.DataFrame): Dataframe to check for duplicate rows.
Returns:
pd.DataFrame: Dataframe without repeating duplicates.
"""
df_len = len(df)
df = df.drop_duplicates(keep="last") # data is sorted by date asc, keep newest observations
df = df.reset_index(drop=True)
logger.info(" Removed {} duplicate rows".format(df_len - len(df)))
return df
def upload_file_to_gcs(project, bucket, file_location, destination_file_location):
if type(bucket) is not Bucket:
client = storage.Client(project=project)
bucket = client.get_bucket(bucket)
blob = bucket.blob(destination_file_location)
# blob.chunk_size = 1 << 29 # increase chunk size for faster uploading
blob.upload_from_filename(file_location)
return True
def download_file_from_gcs(project, bucket_name, file_location, destination_file_location=None):
gs_location = "gs://{}/{}".format(bucket_name, file_location)
client = storage.Client(project=project)
bucket = client.get_bucket(bucket_name)
blob = bucket.get_blob(file_location)
if not blob:
raise FileNotFoundError("{} does not exist!".format(gs_location))
if not destination_file_location:
destination_file_location = tempfile.NamedTemporaryFile(delete=False).name
# blob.chunk_size = 1 << 29 # increase chunk size for faster downloading
blob.download_to_filename(destination_file_location)
return destination_file_location
def save_trained_model_to_disk(hazard_model):
with open("./model/hazard_model.pickle", "wb") as handle:
pickle.dump(hazard_model, handle, protocol=pickle.HIGHEST_PROTOCOL)
def upload_trained_model_to_gcs(project=PROJECT, bucket=BUCKET):
save_trained_model_to_disk(hazard_model) # first save model to disk
model_file_size = os.path.getsize("./model/hazard_model.pickle")
logger.info(
" Writing trained model to disk, filesize: {} MB".format(round(model_file_size / (1024.0 ** 2), 0))
)
logger.info(" Uploading trained model to GCS: {}/{}/model/".format(PROJECT, BUCKET))
upload_file_to_gcs(project, bucket, "./model/hazard_model.pickle", "model/hazard_model.pickle")
def download_trained_model_from_gcs(project=PROJECT, bucket=BUCKET):
Path("./model").mkdir(exist_ok=True)
model_file_location = download_file_from_gcs(
project, bucket, "model/hazard_model.pickle", "./model/hazard_model.pickle"
)
with open(model_file_location, "rb") as model:
hazard_model = pickle.load(model)
return hazard_model
def write_output_to_bigquery(
df: pd.DataFrame, project: str = PROJECT, dataset: str = DATASET, table: str = "predictions"
):
upload_location = "{}.{}.{}".format(project, dataset, table)
logger.info("Appending data to BigQuery at {}".format(upload_location))
client = bigquery.Client(project)
job_config = bigquery.LoadJobConfig(
schema=[
bigquery.SchemaField("coolblue_cookie_id", bigquery.enums.SqlTypeNames.STRING, mode="REQUIRED"),
bigquery.SchemaField("visit_date", bigquery.enums.SqlTypeNames.INTEGER, mode="REQUIRED"),
bigquery.SchemaField("predicted_hazard", bigquery.enums.SqlTypeNames.FLOAT, mode="REQUIRED"),
bigquery.SchemaField(
"predicted_hazard_quantile", bigquery.enums.SqlTypeNames.FLOAT, mode="REQUIRED"
),
bigquery.SchemaField(
"prediction_timestamp", bigquery.enums.SqlTypeNames.TIMESTAMP, mode="REQUIRED"
),
],
write_disposition="WRITE_APPEND",
)
job = client.load_table_from_dataframe(df, upload_location, job_config=job_config)
job.result() # wait for the job to complete
table = client.get_table(upload_location) # Make an API request.
logger.info("Added {} rows and {} columns to {}".format(len(df), len(table.schema), upload_location))
def upload_log_to_gcs(project=PROJECT, bucket=BUCKET):
logger.info(" Uploading log to GCS: {}/{}/log/".format(PROJECT, BUCKET))
upload_file_to_gcs(
project, bucket, "./log/logfile_{}.log".format(DATETIME), "log/logfile_{}.log".format(DATETIME),
)
# ==================================================================================================
# [+] DATA LOADING
# ==================================================================================================
logger.info("[+] Loading input data")
t0_input = time.time()
# columns_to_load = [
# "coolblue_cookie_id",
# "visit_date",
# "visit_start_time",
# "transactions",
# "pageviews",
# "traffic_source",
# "traffic_medium",
# "mobile_device",
# "country",
# "region",
# "visit_date_lead",
# "visit_start_time_lag",
# "visit_start_time_lead",
# "visit_time_delta",
# "visit_time_delta_avg_5",
# "visits_sum",
# "visits_sum_5",
# "hit_number_sum_5",
# "transactions_sum",
# "transactions_sum_3",
# "transactions_lead",
# "session_quality_lag",
# "session_quality_sum_5",
# "add_to_cart",
# ]
#
# column_dtypes = {
# "coolblue_cookie_id": "object",
# "visit_date": "int32",
# "visit_start_time": "int32",
# "transactions": "float32",
# "pageviews": "float32",
# "traffic_source": "object",
# "traffic_medium": "object",
# "mobile_device": "bool",
# "country": "object",
# "region": "object",
# "visit_date_lead": "float32",
# "visit_start_time_lag": "float32",
# "visit_start_time_lead": "float32",
# "visit_time_delta": "float32",
# "visit_time_delta_avg_5": "float32",
# "visits_sum": "int32",
# "visits_sum_5": "int16",
# "hit_number_sum_5": "int32",
# "transactions_sum": "float32",
# "transactions_sum_3": "float32",
# "transactions_lead": "float32",
# "session_quality_lag": "float32",
# "session_quality_sum_5": "int16",
# "add_to_cart": "float32",
# }
#
# input_df = pd.read_csv("./data/survival_df_cart_5m_2.csv", usecols=columns_to_load, dtype=column_dtypes)
input_df = query_customer_feature_input_data()
time_input = (time.time() - t0_input) / 60
logger.info("[v] Elapsed time for loading input data: {:.3} minutes".format(time_input))
if DRY_RUN:
logger.info(" Subsetting data to a small sample for DRY_RUN")
input_df = input_df.sample(50000)
# ==================================================================================================
# [+] DATA PROCESSING AND FEATURE ENGINEERING
# ==================================================================================================
logger.info("[+] Processing data and feature engineering for model training")
t0_prep = time.time()
# drop duplicate rows and fix datatypes
input_df = drop_duplicate_rows(input_df)
input_df["visit_date"] = input_df["visit_date"].astype(np.int) # change date from str to int type
input_df = downcast_datatypes(input_df)
# add event feature target column (conversion after a session)
logger.info(" Adding lifetime and target column to input data")
input_df["target"] = 0
input_df.loc[input_df["transactions_lead"].notnull(), "target"] = 1
# if there is no consecutive visit_start_time, customer is still alive at current time
current_timestamp = int(datetime.datetime.now(pytz.timezone("Europe/Amsterdam")).timestamp())
input_df.loc[input_df["visit_start_time_lead"].isnull(), "visit_start_time_lead"] = current_timestamp
# add lifetime duration per customer (in hours)
input_df["timedelta_hours"] = (input_df["visit_start_time_lead"] - input_df["visit_start_time"]) / 60 / 60
input_df.loc[input_df["timedelta_hours"] < 0, "timedelta_hours"] = 0 # drop unreliable negative timedeltas
input_df = input_df[input_df["timedelta_hours"].notnull()].copy() # drop rows without timedelta
# add flag for first session in time-window
input_df["first_session_in_window"] = 1
input_df.loc[input_df["visit_start_time_lag"].notnull(), "first_session_in_window"] = 0
# floats to ints to reduce memory footprint, note that any NA's have to be filled
logger.info(" Parsing floats to ints; reduce memory footprint and this precision is not needed")
input_df.loc[:, input_df.dtypes == np.float] = (
input_df.loc[:, input_df.dtypes == np.float].round(0).fillna(0).astype(np.int)
)
# fill NaNs
input_df = input_df.fillna(0)
# add averages and product features on customer level
input_df["transactions_per_visit"] = input_df["transactions_sum"] / input_df["visits_sum"]
input_df["hit_number_avg_5"] = input_df["hit_number_sum_5"] / input_df["visits_sum_5"]
input_df["session_quality_avg_5"] = input_df["session_quality_sum_5"] / input_df["visits_sum_5"]
# One-Hot-Encoding selected levels for categorical input features
logger.info(" One-Hot-Encoding categorical feature columns")
region_features = [
# "Brussels", # note: commented categories are not significantly different than baseline level
"Flanders",
"Gelderland",
# "Groningen",
# "Limburg",
"North Brabant",
"North Holland",
# "Overijssel",
"South Holland",
"Utrecht",
]
country_features = [
"(not set)",
"Belgium",
# "Finland",
"France",
"Germany",
"Netherlands",
# "Spain",
# "Switzerland",
# "United Kingdom",
# "United States",
]
traffic_source_features = [
"(direct)",
# "Newsletter_Themed",
"SmileMails_ReviewAcquisition",
# "bing",
"facebook",
"google",
"kieskeurig",
# "performancehorizon",
"tweakers.net",
# "vanessa",
]
traffic_medium_features = [
"(none)",
"affiliate",
"cpc",
"cpm",
"email",
"ios-app",
"mail",
"organic",
]
# add 'other' category for unincluded levels
input_df.loc[~input_df["region"].isin(region_features), "region"] = "other"
input_df.loc[~input_df["country"].isin(country_features), "country"] = "other"
input_df.loc[~input_df["traffic_source"].isin(traffic_source_features), "traffic_source"] = "other"
input_df.loc[~input_df["traffic_medium"].isin(traffic_medium_features), "traffic_medium"] = "other"
# add selected One-Hot-Encoded categorical feature levels to input dataframe
input_df = pd.concat([input_df, pd.get_dummies(input_df["region"], prefix="region")], axis=1)
input_df = pd.concat([input_df, pd.get_dummies(input_df["country"], prefix="country")], axis=1)
input_df = pd.concat([input_df, pd.get_dummies(input_df["traffic_source"], prefix="traffic_source")], axis=1)
input_df = pd.concat([input_df, pd.get_dummies(input_df["traffic_medium"], prefix="traffic_medium")], axis=1)
# drop all columns with NaNs
nan_features = input_df.isnull().any()
logger.info(" Dropping columns due to NaN values: {}".format(nan_features[nan_features == True]))
non_nan_features = nan_features[nan_features == False].index
input_df = input_df[non_nan_features]
# downcast datatypes
logger.info(" Downcastig datatypes to reduce memory footprint of dataframe")
input_df = downcast_datatypes(input_df)
t_prep = (time.time() - t0_prep) / 60
logger.info("[v] Elapsed time for preparing input data: {:.3} minutes".format(t_prep))
# ==================================================================================================
# [+] MODEL TRAINING
# ==================================================================================================
# cross-validated features to include for prediction purposes
features = [
# "visit_number",
# "visit_date",
# "visit_start_time",
"pageviews",
"mobile_device",
# "hit_number",
"transactions",
# "transactions_revenue",
# "time_on_site",
# "visit_date_lead", # future info
# "visit_start_time_lag",
# "visit_start_time_lead", # future info
"visit_time_delta",
# "visit_time_delta_min",
# "visit_time_delta_max",
# "visit_time_delta_avg",
# "visit_time_delta_min_5",
# "visit_time_delta_max_5",
"visit_time_delta_avg_5",
# "add_to_cart",
"first_session_in_window",
"visits_sum",
# "visits_sum_1",
# "visits_sum_3",
# "visits_sum_5",
# "hit_number_sum",
# "hit_number_lag",
# "hit_number_sum_1",
# "hit_number_sum_3",
# "hit_number_sum_5",
# "hit_number_min_5",
# "hit_number_max_5",
"hit_number_avg_5",
# "time_on_site_sum",
# "time_on_site_sum_1",
# "time_on_site_sum_3",
# "time_on_site_sum_5",
# "time_on_site_min_5",
# "time_on_site_max_5",
"transactions_sum",
# "transactions_sum_1",
"transactions_sum_3",
# "transactions_sum_5",
# "transactions_min_5",
# "transactions_max_5",
# "transactions_lead", # future info
# "transactions_revenue_lag",
# "transactions_revenue_lead", # future info
# "transactions_revenue_sum",
# "transactions_revenue_sum_1",
# "transactions_revenue_sum_3",
# "transactions_revenue_sum_5",
# "transactions_revenue_min_5",
# "transactions_revenue_max_5",
# "session_quality",
"session_quality_lag",
# "session_quality_avg",
# "session_quality_sum_1",
# "session_quality_sum_3",
# "session_quality_sum_5",
"session_quality_avg_5",
# "region_Brussels",
"region_Flanders",
"region_Gelderland",
# "region_Groningen",
# "region_Limburg",
"region_North Brabant",
"region_North Holland",
# "region_Overijssel",
"region_South Holland",
"region_Utrecht",
# "region_other", # baseline categorical level
"country_(not set)",
"country_Belgium",
# "country_Finland",
"country_France",
"country_Germany",
"country_Netherlands",
# "country_Spain",
# "country_Switzerland",
# "country_United Kingdom",
# "country_United States",
# "country_other", # baseline categorical level
"traffic_source_(direct)",
# "traffic_source_Newsletter_Themed",
"traffic_source_SmileMails_ReviewAcquisition",
# "traffic_source_bing",
"traffic_source_facebook",
"traffic_source_google",
"traffic_source_kieskeurig",
# "traffic_source_other", # baseline categorical level
# "traffic_source_performancehorizon",
"traffic_source_tweakers.net",
# "traffic_source_vanessa",
"traffic_medium_(none)",
"traffic_medium_affiliate",
"traffic_medium_cpc",
"traffic_medium_cpm",
"traffic_medium_email",
"traffic_medium_ios-app",
"traffic_medium_mail",
"traffic_medium_organic",
# "traffic_medium_other", # baseline categorical level
# "traffic_medium_pricecomparison",
# "traffic_medium_referral",
"transactions_per_visit",
# "transactions_revenue_per_visit",
# "hit_number_per_visit",
# "time_on_site_avg_5",
# "hit_number_squared",
# "transactions_per_visit_squared",
# "hit_number_sum_3_squared",
# "hit_number_x_time_on_site",
# "hit_number_x_transactions_per_visit",
# "time_delta_sec",
# "time_delta_hours",
# "traffic_source_tweakers.net_add_to_cart_prod",
# "traffic_source_tweakers.net_transactions_prod",
# "traffic_source_kieskeurig_transactions_prod",
# "first_session_in_window_traffic_source_facebook_prod",
# "add_to_cart_traffic_source_facebook_prod",
# "add_to_cart_traffic_medium_cpm_prod",
# "add_to_cart_transactions_per_visit_prod",
"timedelta_hours",
"target",
]
if TRAIN_MODEL:
t0_train = time.time()
logger.info("[+] Defining Cox Proportional Hazard Model")
hazard_model = life.CoxPHFitter(penalizer=PENALTY_STRENGTH)
logger.info(f" Applying model: {hazard_model} with penalty: {PENALTY_STRENGTH}")
logger.info(f" Input dataframe shape: {input_df[features].shape}")
logger.info(f" Including {len(features)} features (including target columns): {features} \n")
logger.info("[+] Training hazard model")
hazard_model.fit(
input_df[features],
duration_col="timedelta_hours",
event_col="target",
step_size=0.1,
# show_progress=True,
)
t_train = (time.time() - t0_train) / 60
logger.info("[v] Finished hazard model training in: {:.3} minutes".format(t_train))
if SAVE_MODEL_TO_DISK and not DRY_RUN:
save_trained_model_to_disk(hazard_model)
if SAVE_MODEL_TO_GCS and not DRY_RUN:
upload_trained_model_to_gcs(PROJECT, BUCKET)
# ==================================================================================================
# [+] MODEL PREDICTIONS
# ==================================================================================================
logger.info("[+] Predicting partial hazards for customers")
t0_pred = time.time()
# if no model was trained in this flow load most recent model object for predictions
if not TRAIN_MODEL:
logger.info(" Attempting to download model from GCS: {}/model/".format(BUCKET))
hazard_model = download_trained_model_from_gcs()
# create subset of df for predictions
logger.info(" Creatings subset of df for predictions")
min_prediction_date = np.sort(input_df["visit_date"].unique())[-2] # latest day of complete GA session data
pred_df = input_df[input_df["visit_date"] >= min_prediction_date]
# create output result dataframe
output_df = pred_df[["coolblue_cookie_id", "visit_date"]].copy()
output_df["predicted_hazard"] = hazard_model.predict_partial_hazard(pred_df[features])
# add partial hazard quantile (1-10)
output_df["predicted_hazard_quantile"] = 1
output_df["prediction_timestamp"] = datetime.datetime.now()
quantiles = np.flip(np.arange(0, 1, 0.05)) # 1 to 0 in steps of 5%
for quantile in quantiles:
hazard_limit = np.quantile(output_df["predicted_hazard"], quantile)
output_df.loc[output_df["predicted_hazard"] < hazard_limit, "predicted_hazard_quantile"] = quantile
# round floats before uploading results
output_df["predicted_hazard"] = np.round(output_df["predicted_hazard"], 4)
output_df["predicted_hazard_quantile"] = np.round(output_df["predicted_hazard_quantile"], 1)
# upload prediction output results to BigQuery table
if SAVE_RESULTS_TO_BQ and not DRY_RUN:
write_output_to_bigquery(df=output_df, project=PROJECT, dataset=DATASET)
t_pred = time.time() - t0_pred
logger.info("[v] Finished predicting in: {:.3} seconds".format(t_pred))
# ==================================================================================================
# [+] MODEL EVALUATION
# ==================================================================================================
if EVALUATE_MODEL:
t0_eval = time.time()
logger.info("[+] Starting hazard model evaluation")
eval_train_ratio = 0.7
eval_rows = int(0.7 * len(input_df))
eval_train_df = input_df[:eval_rows]
eval_test_df = input_df[eval_rows:]
logger.info(" Retraining model on subset to have hold-out data for evaluation")
hazard_model.fit(
eval_train_df[features],
duration_col="timedelta_hours",
event_col="target",
step_size=0.1,
# show_progress=True,
)
hazard_model_score = np.round(hazard_model.score(eval_test_df[features]), 4)
log_likelihood_ratio = np.round(hazard_model.log_likelihood_ratio_test().test_statistic, 4)
concordance = np.round(hazard_model.concordance_index_, 4)
pred_hazards = hazard_model.predict_partial_hazard(eval_test_df[features])
logger.info(f" [v] Partial Log-Likelihood: {hazard_model_score}")
logger.info(f" [v] Log-Likelihood Ratio Test: {log_likelihood_ratio}")
logger.info(f" [v] Model Concordance Index: {concordance} \n")
# check predicted hazard ratios versus observed conversion ratio sliced on quartiles
logger.info("[+] Observed conversion over predicted customer hazard ratios segments")
# uppers
upper_quantiles = [0.99, 0.95, 0.90, 0.75, 0.5]
for quantile in upper_quantiles:
segment = eval_test_df.loc[pred_hazards > np.quantile(pred_hazards, quantile)]["target"]
segment_ratio = len(segment) / len(eval_test_df)
avg_conversion = np.mean(segment)
avg_conversion = np.round(avg_conversion, 4) * 100
logger.info(f" [v] Mean over upper {quantile}th segment: {avg_conversion:.2f}%")
# downers
lower_quantiles = [0.75, 0.50, 0.25, 0.10, 0.05, 0.01]
for quantile in lower_quantiles:
segment = eval_test_df.loc[pred_hazards < np.quantile(pred_hazards, quantile)]["target"]
segment_ratio = len(segment) / len(eval_test_df)
avg_conversion = np.mean(segment)
avg_conversion = np.round(avg_conversion, 4) * 100
logger.info(f" [v] Mean over lower {quantile}th segment: {avg_conversion:.2f}%")
# check predicted hazard ratios versus observed conversion timedeltas sliced on quartiles
logger.info("\n[+] Observed conversion timedeltas over predicted customer hazard segments")
for quantile in upper_quantiles:
avg_delta = np.mean(
eval_test_df.loc[
(pred_hazards > np.quantile(pred_hazards, quantile)) & (eval_test_df["target"] == 1)
]["timedelta_hours"]
)
logger.info(f" [v] Average timedelta over upper {quantile}th segment: {avg_delta:.1f} hours")
for quantile in lower_quantiles:
avg_delta = np.mean(
eval_test_df.loc[
(pred_hazards < np.quantile(pred_hazards, quantile)) & (eval_test_df["target"] == 1)
]["timedelta_hours"]
)
logger.info(f" [v] Average timedelta over lower {quantile}th segment: {avg_delta:.1f} hours")
logger.info("\n[+] Observed median conversion timedeltas over predicted customer hazard segments")
for quantile in upper_quantiles:
med_delta = np.median(
eval_test_df.loc[
(pred_hazards > np.quantile(pred_hazards, quantile)) & (eval_test_df["target"] == 1)
]["timedelta_hours"]
)
logger.info(f" [v] Median timedelta over upper {quantile}th segment: {med_delta:.1f} hours")
for quantile in lower_quantiles:
med_delta = np.median(
eval_test_df.loc[
(pred_hazards < np.quantile(pred_hazards, quantile)) & (eval_test_df["target"] == 1)
]["timedelta_hours"]
)
logger.info(f" [v] Median timedelta over lower {quantile}th segment: {med_delta:.1f} hours")
# check predicted hazard ratios versus observed timedeltas sliced on quartiles
logger.info("\n[+] Observed average non-conversion timedeltas over predicted customer hazard segments")
for quantile in upper_quantiles:
avg_delta = np.mean(
eval_test_df.loc[
(pred_hazards > np.quantile(pred_hazards, quantile)) & (eval_test_df["target"] == 0)
]["timedelta_hours"]
)
logger.info(f" [v] Average timedelta over upper {quantile}th segment: {avg_delta:.1f} hours")
for quantile in lower_quantiles:
avg_delta = np.mean(
eval_test_df.loc[
(pred_hazards < np.quantile(pred_hazards, quantile)) & (eval_test_df["target"] == 0)
]["timedelta_hours"]
)
logger.info(f" [v] Average timedelta over lower {quantile}th segment: {avg_delta:.1f} hours")
logger.info("\n[+] Observed median non-conversion timedeltas over predicted customer hazard segments")
for quantile in upper_quantiles:
med_delta = np.median(
eval_test_df.loc[
(pred_hazards > np.quantile(pred_hazards, quantile)) & (eval_test_df["target"] == 0)
]["timedelta_hours"]
)
logger.info(f" [v] Median timedelta over upper {quantile}th segment: {med_delta:.1f} hours")
for quantile in lower_quantiles:
med_delta = np.median(
eval_test_df.loc[
(pred_hazards < np.quantile(pred_hazards, quantile)) & (eval_test_df["target"] == 0)
]["timedelta_hours"]
)
logger.info(f" [v] Median timedelta over lower {quantile}th segment: {med_delta:.1f} hours")
t_eval = (time.time() - t0_eval) / 60
logger.info("[v] Finished hazard model evalution in: {:.3} minutes".format(t_eval))
# upload log to GCS bucket
if SAVE_LOGS_TO_GCS:
upload_log_to_gcs()
t_finish = (time.time() - t0_input) / 60
logger.info("[v] Succesfully Finished process in: {:.3} minutes".format(t_finish))
# ==================================================================================================
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment