Skip to content

Instantly share code, notes, and snippets.

@ak--47
Last active October 15, 2024 13:32
Show Gist options
  • Save ak--47/630c716d0233d6cd0f8236a030cbb5db to your computer and use it in GitHub Desktop.
Save ak--47/630c716d0233d6cd0f8236a030cbb5db to your computer and use it in GitHub Desktop.
MIXPANEL → AZURE → SNOWFLAKE pipeline
-- MIXPANEL → AZURE → SNOWFLAKE pipeline
-- the second half...
-- by [email protected]
-- docs: https://docs.snowflake.com/en/user-guide/data-load-azure-create-stage
-- create storage integration
CREATE OR REPLACE STORAGE INTEGRATION azure_intergration
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = AZURE
ENABLED = TRUE
AZURE_TENANT_ID = '7ed77f17-bc43-4003-ac64-eb6363490aab' -- this value comes from output of: az ad sp create-for-rbac --sdk-auth
STORAGE_ALLOWED_LOCATIONS = ('azure://mixpaneldemo.blob.core.windows.net/mixpanelpipelines/'); -- format is azure://{storage_account}.blob.core.windows.net/${container}/
-- get the snowflake-created service principal... follow the url to connect and login with azure; press "allow" ... you will need AZURE_MULTI_TENANT_APP_NAME
DESC STORAGE INTEGRATION azure_intergration;
-- see what role you are
SELECT CURRENT_ROLE()
-- once Azure has a role for the service principal, grant usage rights to yourself
GRANT USAGE ON INTEGRATION azure_intergration TO ROLE CURRENT_ROLE();
-- create DB + Schema
CREATE OR REPLACE DATABASE mixpanel_data;
CREATE OR REPLACE SCHEMA mixpanel_data.schema;
-- create a stage
CREATE OR REPLACE STAGE mixpanel_data.schema.mp_azure_snow_pipeline
STORAGE_INTEGRATION = azure_intergration
URL = 'azure://mixpaneldemo.blob.core.windows.net/mixpanelpipelines/mp_parquet/2685993/mp_master_event';
-- see files in stage
LIST @mixpanel_data.schema.mp_azure_snow_pipeline;
-- create table
CREATE OR REPLACE TABLE mixpanel_data.schema.events_raw (
parquet_data VARIANT
);
-- populate unschematized table from stage; unsure if this is required to infer schema later...
COPY INTO mixpanel_data.schema.events_raw
FROM (SELECT $1
FROM @mixpanel_data.schema.mp_azure_snow_pipeline)
FILE_FORMAT = (TYPE = 'PARQUET');
-- FOR JSON LOADING... but you don't can't infer schema... so usually not a good approach
-- COPY INTO mixpanel_data.schema.events_raw
-- FROM (SELECT metadata$filename, CURRENT_TIMESTAMP(), $1
-- FROM @mixpanel_data.schema.mp_azure_snow_pipeline)
-- FILE_FORMAT = (TYPE = 'JSON', COMPRESSION = 'GZIP');
-- CELEBRATE
-- you now have a snowflake table that has your mixpanel data; you can query it!
SELECT * FROM mixpanel_data.schema.events_raw
-- make a schema for mixpanel files
CREATE OR REPLACE FILE FORMAT mixpanel_pq
TYPE = 'PARQUET';
-- have snowflake infer the schema for that format
ALTER WAREHOUSE MP_PIPELINE RESUME;
SELECT *
FROM TABLE(
INFER_SCHEMA(
LOCATION => '@mixpanel_data.schema.mp_azure_snow_pipeline',
FILE_FORMAT => 'mixpanel_pq'
)
);
-- schemeatized table creation (output from ^^ command) ... there are better ways to automate the creation of this table.
CREATE OR REPLACE TABLE mixpanel_data.schema.events_schematized (
mp_current_url TEXT,
shots_taken REAL,
dt TEXT,
mp_initial_referrer TEXT,
status_error_code TEXT,
device_info_os_version TEXT,
status TEXT,
target_page_name_id REAL,
text TEXT,
location_page_name_name TEXT,
upload_timestamp REAL,
mp_name TEXT,
video_author TEXT,
mp_user_id TEXT,
foo TEXT,
distinct_id TEXT,
device_info_referrer_info_advertising_id TEXT,
mp_duration REAL,
res_timestamp REAL,
mp_event_name TEXT,
mp_initial_referring_domain TEXT,
device_info_project_name TEXT,
mp_import BOOLEAN,
collect_timestamp REAL,
mp_lib TEXT,
mp_identified_id TEXT,
mp_browser TEXT,
mp_processing_time_ms REAL,
device_info_referrer_info_user_id TEXT,
extra_lastpage TEXT,
mpfingerprint TEXT,
device_info_referrer_info_type TEXT,
mp_lib_version TEXT,
mp_country_code TEXT,
device_info_referrer_info_device_id TEXT,
device_info TEXT,
res_tag TEXT,
device_info_referrer_info_extra TEXT,
device_info_referrer_info_extra_info_original_url TEXT,
status_status_id REAL,
video_title TEXT,
device_info_os_type_name TEXT,
mp_screen_width REAL,
current_url_path TEXT,
mp_device_id TEXT,
device_info_user_id TEXT,
mp_os TEXT,
device_info_referrer_info_extra_info_utm_campaign TEXT,
device_info_referrer_info_extra_info_utm_source TEXT,
device_info_advertising_id TEXT,
mp_insert_id TEXT,
location_page_name_id REAL,
mp_city TEXT,
extra TEXT,
action TEXT,
mp_mp_api_endpoint TEXT,
target_params_pageshowid TEXT,
location TEXT,
video_length REAL,
device_info_referrer_info_appsflyer_id TEXT,
mp_region TEXT,
video_id TEXT,
video_url TEXT,
device_info_model TEXT,
device_info_device_id TEXT,
target_page_name_name TEXT,
mp_anon_id TEXT,
action_name TEXT,
video_quality TEXT,
device_info_os_type_id REAL,
status_http_code REAL,
status_status_name TEXT,
target TEXT,
hour REAL,
mp_browser_version REAL,
device_info_referrer_info_install_time TEXT,
target_params_pagesessionid TEXT,
mp_screen_height REAL,
mp_sent_by_lib_version TEXT,
status_error_message TEXT,
device_info_app_version TEXT,
current_url_protocol TEXT,
device_info_referrer_info_extra_info_utm_medium TEXT,
action_id REAL,
device_info_referrer_info_channel TEXT,
current_page_title TEXT,
device_info_referrer_info_target_id TEXT,
time NUMBER(38, 0),
sampling_factor REAL,
current_domain TEXT,
cart TEXT,
mp_mp_api_timestamp_ms REAL
);
ALTER WAREHOUSE MP_PIPELINE RESUME;
-- load the data schematized
COPY INTO mixpanel_data.schema.events_schematized
FROM @mixpanel_data.schema.mp_azure_snow_pipeline
FILE_FORMAT = (TYPE = 'PARQUET')
MATCH_BY_COLUMN_NAME=CASE_INSENSITIVE; -- this makes it all work!
-- query the data
SELECT * FROM mixpanel_data.schema.events_schematized
-- automate data loading to run twice a day
CREATE OR REPLACE TASK mixpanel_data_loader
SCHEDULE = 'USING CRON 0 1,13 * * * UTC' -- Runs at 1 AM and 1 PM UTC
AS
COPY INTO mixpanel_data.schema.events_schematized
FROM @mixpanel_data.schema.mp_azure_snow_pipeline
FILE_FORMAT = (TYPE = 'PARQUET')
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE;
-- turn on / pause the automation
ALTER TASK mixpanel_data_loader RESUME;
ALTER TASK mixpanel_data_loader SUSPEND;
-- kill the task if you want to automate a different way
DROP TASK IF EXISTS mixpanel_data_loader;
-- view historical runs, errors, etc...
ALTER WAREHOUSE MP_PIPELINE RESUME;
SELECT *
FROM TABLE(INFORMATION_SCHEMA.COPY_HISTORY(
TABLE_NAME => 'events_schematized',
START_TIME => dateadd('hours', -48, current_timestamp()),
END_TIME => current_timestamp()
));
-- optional: create a stored procedure for the loading
CREATE OR REPLACE PROCEDURE load_data_into_events_schematized()
RETURNS STRING
LANGUAGE SQL
AS
$$
BEGIN
COPY INTO mixpanel_data.schema.events_schematized
FROM @mixpanel_data.schema.mp_azure_snow_pipeline
FILE_FORMAT = (TYPE = 'PARQUET')
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE;
RETURN 'Data Load Completed Successfully';
END;
$$;
-- delete that procedure... because stored procedures are garbage... <3 jeff.
DROP PROCEDURE IF EXISTS load_data_into_events_schematized();
-- before you quit!
ALTER WAREHOUSE MP_PIPELINE SUSPEND;
@ak--47
Copy link
Author

ak--47 commented Jan 28, 2024

what you end up with:
overview

schematized table:
schematized

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment