Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save admariner/dabe0a504ea609a57c168cf347db55e1 to your computer and use it in GitHub Desktop.
Save admariner/dabe0a504ea609a57c168cf347db55e1 to your computer and use it in GitHub Desktop.
Get actionable insights on your channel performance by building custom attribution models using Google Analytics 4 data in BigQuery. Full article on stacktonic.com
###################################################
# Author Krisjan Oldekamp / Stacktonic.com
# Email [email protected]
# Article https://stacktonic.com/article/build-a-data-driven-attribution-model-using-google-analytics-4-big-query-and-python
####################################################
#pip install marketing_attribution_models
#pip install --upgrade 'google-cloud-bigquery[bqstorage,pandas]'
#pip install pyarrow -> newest version!
import pandas as pd
import numpy as np
import pyarrow
import pyarrow.parquet as pq
from google.cloud import bigquery
from google.oauth2 import service_account
from marketing_attribution_models import MAM
####################################################
# Settings
####################################################
GBQ_PROJECT = "<your-project>" # Google Cloud Project-ID
GBQ_KEYPATH = "./<your-keyfile>.json" # Path to Service Account JSON keyfile
GBQ_SQL_PATHS = "./<your-sql-file>.sql" # Path to SQL file -> querying the conversions paths (see tutorial)
GBQ_TABLE_CHANNELS = "<your-project>.<your-dataset>.conversions_attr_paths" # Table name for storing attributed conversions on journey level
GBQ_TABLE_CHANNELS_GROUPED = "<your-project>.<your-dataset>.conversions_attr_channels" # Table name for storing conversions grouped on channel and date level
####################################################
# Write dataframe to BigQuery
def write_to_bigquery(df, table):
writer = pyarrow.BufferOutputStream()
pyarrow.parquet.write_table(
pyarrow.Table.from_pandas(df),
writer,
use_compliant_nested_type=True
)
reader = pyarrow.BufferReader(writer.getvalue())
parquet_options = bigquery.format_options.ParquetOptions()
parquet_options.enable_list_inference = True
job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.PARQUET
job_config.write_disposition = "WRITE_TRUNCATE"
job_config.parquet_options = parquet_options
job = client.load_table_from_file(
reader, table, job_config=job_config
)
# Group journeys to conversion date and channel
def group_by_channel(df, conversion_date, path, conversion_value, models):
channels_grouped = None
for model in models:
dates = df.apply(lambda row: [row[conversion_date] for credit in row[model]], axis=1)
#channels = df[path].apply(lambda x: x.split(" > "))
channels = df[path]
channels_conversions = df[model]
channels_values = df.apply(lambda row: [credit * row[conversion_value] for credit in row[model]], axis=1)
dates_list = []
channels_list = []
conversions_list = []
values_list = []
dates.apply(dates_list.extend)
channels.apply(channels_list.extend)
channels_conversions.apply(conversions_list.extend)
channels_values.apply(values_list.extend)
frame = pd.DataFrame({
"conversion_date": dates_list,
"channels": channels_list,
"conversions": conversions_list,
"conversions_value": values_list
})
frame = frame.groupby(["conversion_date","channels"]).agg('sum')
frame[model] = frame.apply(lambda row: [row["conversions"], row["conversions_value"]], axis=1)
frame = frame.drop(columns=["conversions", "conversions_value"], axis=1)
if isinstance(channels_grouped, pd.DataFrame):
frame = frame.reset_index()
frame.columns = ["conversion_date","channels", model]
channels_grouped = pd.merge(
channels_grouped, frame, how="outer", on=["conversion_date","channels"]
).fillna(0)
else:
channels_grouped = frame.reset_index()
channels_grouped.columns = ["conversion_date","channels", model]
return channels_grouped
# Create BigQuey Client
credentials = service_account.Credentials.from_service_account_file(
GBQ_KEYPATH, scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
client = bigquery.Client(credentials=credentials, project=credentials.project_id)
# Open the .sql file
with open(GBQ_SQL_PATHS,"r") as f:
sql = f.read()
# Query BigQuery and convert to dataframe
df_conversion_paths = client.query(sql).to_dataframe()
# (Optional) - Save dataframe to CSV and read from CSV (speeds up testing instead of fetching data from BigQuery)
#df_conversion_paths.to_csv("conversion_paths.csv", index=False)
#df_conversion_paths = pd.read_csv("conversion_paths.csv")
# View information
df_conversion_paths.info()
# Create and configure the attribution lib instance
attributions = MAM(df_conversion_paths,
group_channels=False,
channels_colname = "path_channels",
time_till_conv_colname = "path_timestamps",
group_channels_by_id_list=["journey_id"]
)
# Models and model settings + running the models
models = {
"attr_first_click": attributions.attribution_first_click(),
"attr_last_click": attributions.attribution_last_click(),
"attr_last_non_direct_click": attributions.attribution_last_click_non(but_not_this_channel='direct'),
"attr_position": attributions.attribution_position_based(list_positions_first_middle_last=[0.3, 0.3, 0.4]),
"attr_time_decay": attributions.attribution_time_decay(decay_over_time=0.6, frequency=7),
"attr_linear": attributions.attribution_linear(),
"attr_markov": attributions.attribution_markov(transition_to_same_state=False),
#"attr_shapley": attributions.attribution_shapley(size=4, order=True, values_col='conversions'), -> Not available on path level
}
# View results grouped by channel for complete conversion period
print(attributions.group_by_channels_models)
# Join models into one dataframe
models_list = [df_conversion_paths]
for model in models:
attr = pd.DataFrame(models[model][0]).iloc[:,-1:]
attr = attr.rename(columns={attr.columns[0]: model})
models_list.append(attr)
df_attributions = models_list[0].join(models_list [1:])
df_attributions["path_channels"] = df_attributions["path_channels"].apply(lambda x: x.split(" > "))
df_attributions["path_timestamps"] = df_attributions["path_timestamps"].apply(lambda x: x.split(" > "))
# View resulting dataframe
df_attributions.info()
print(df_attributions.head())
# Group conversions and value for every model by date and channel
channels_grouped = group_by_channel(
df_attributions,
"conversion_date", # Conversion date column
"path_channels", # Conversion path column
"conversion_value", # Conversion value column
list(models) # Models to group
)
# View grouped results
print(channels_grouped.head())
# Save to Google BigQuery
write_to_bigquery(df_attributions, GBQ_TABLE_CHANNELS)
write_to_bigquery(channels_grouped, GBQ_TABLE_CHANNELS_GROUPED)
-- Author: Krisjan Oldekamp
-- https://stacktonic.com/article/build-a-data-driven-attribution-model-using-google-analytics-4-big-query-and-python
declare conversion_period int64 default 90; -- select conversions in last x days
declare lookback_window int64 default 30; -- how many days to lookback from the moment the conversion occurred;
with
-- group event level google analytics 4 data to sessions (visits)
sessions as (
select
user_pseudo_id as ga_client_id,
--user_id as custom_user_id, -- use a custom user-id instead, like a customer-id
concat(user_pseudo_id,'.',(select cast(value.int_value as string) from unnest(event_params) where key = 'ga_session_id')) as session_id, -- combine user_pseudo_id and session_id for a unique session-id
timestamp_micros(min(event_timestamp)) as session_start,
array_agg(
if(event_name in('page_view','user_engagement','scroll'), struct(
event_timestamp,
lower((select value.string_value from unnest(event_params) where key = 'source')) as source,
lower((select value.string_value from unnest(event_params) where key = 'medium')) as medium,
lower((select value.string_value from unnest(event_params) where key = 'campaign')) as campaign,
(select value.int_value from unnest(event_params) where key = 'entrances') as is_entrance,
(select value.int_value from unnest(event_params) where key = 'ignore_referrer') as ignore_referrer
), null)
ignore nulls) as channels_in_session,
--countif(event_name = '<name-of-some-other-conversion-event>') as conversions_in_session,
countif(event_name = 'purchase') as conversions,
sum(ecommerce.purchase_revenue) as conversion_value
from `<your-project>.analytics_<your-dataset>.events_*`
where
-- select conversions based on <conversion_period> + additional daterange to construct the path of a conversion (based on <lookback_window>)
_table_suffix between
format_date('%Y%m%d', date_sub(current_date(), interval (conversion_period + lookback_window) day))
and format_date('%Y%m%d', date_sub(current_date(), interval 1 day))
group by 1, 2
),
-- build conversion paths for all sessions with at least 1 conversion within the last <conversion_period> days
sessions_converted as (
select
s.session_start,
s.session_id,
string_agg(
-- select first channel / campaign within session
`<your-project>.<your-dataset>.channel_grouping`(
(select t.source from unnest(s_lb.channels_in_session) as t where t.ignore_referrer is null order by t.event_timestamp asc limit 1),
(select t.medium from unnest(s_lb.channels_in_session) as t where t.ignore_referrer is null order by t.event_timestamp asc limit 1),
null
),
' > '
order by s_lb.session_start asc
) as path_channels,
string_agg(cast(timestamp_diff(timestamp(s.session_start), timestamp(s_lb.session_start), hour) as string), ' > ' order by s_lb.session_start asc) as path_timestamps, -- hours till conversion
string_agg(cast(s_lb.session_start as string), ' > ' order by s_lb.session_start asc) as path_timestamps_check,
max(s.conversions) as conversions_in_session,
max(s.conversion_value) as conversion_value
from sessions as s
left join
-- joining historical sessions to construct the conversion path (with a max path length of <lookback_window>)
sessions as s_lb
on s.ga_client_id = s_lb.ga_client_id
and s.session_start >= s_lb.session_start -- only join current session and sessions before current session
and datetime(s_lb.session_start) >= date_sub(datetime(s.session_start), interval lookback_window day) -- only join sessions not older than <lookback_window> days counted from conversion
where
s.conversions > 0
and date(s.session_start) >= date_sub(current_date(), interval conversion_period day)
group by 1, 2
order by
s.session_start asc
)
-- query data on user (journey) level
select
date(session_start) as conversion_date,
session_start as conversion_timestamp,
session_id as journey_id,
path_channels,
path_timestamps,
true as conversion,
conversions_in_session,
conversion_value as conversion_value
from sessions_converted
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment