Last active
October 15, 2024 13:32
-
-
Save ak--47/630c716d0233d6cd0f8236a030cbb5db to your computer and use it in GitHub Desktop.
MIXPANEL → AZURE → SNOWFLAKE pipeline
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- MIXPANEL → AZURE → SNOWFLAKE pipeline | |
-- the second half... | |
-- by [email protected] | |
-- docs: https://docs.snowflake.com/en/user-guide/data-load-azure-create-stage | |
-- create storage integration | |
CREATE OR REPLACE STORAGE INTEGRATION azure_intergration | |
TYPE = EXTERNAL_STAGE | |
STORAGE_PROVIDER = AZURE | |
ENABLED = TRUE | |
AZURE_TENANT_ID = '7ed77f17-bc43-4003-ac64-eb6363490aab' -- this value comes from output of: az ad sp create-for-rbac --sdk-auth | |
STORAGE_ALLOWED_LOCATIONS = ('azure://mixpaneldemo.blob.core.windows.net/mixpanelpipelines/'); -- format is azure://{storage_account}.blob.core.windows.net/${container}/ | |
-- get the snowflake-created service principal... follow the url to connect and login with azure; press "allow" ... you will need AZURE_MULTI_TENANT_APP_NAME | |
DESC STORAGE INTEGRATION azure_intergration; | |
-- see what role you are | |
SELECT CURRENT_ROLE() | |
-- once Azure has a role for the service principal, grant usage rights to yourself | |
GRANT USAGE ON INTEGRATION azure_intergration TO ROLE CURRENT_ROLE(); | |
-- create DB + Schema | |
CREATE OR REPLACE DATABASE mixpanel_data; | |
CREATE OR REPLACE SCHEMA mixpanel_data.schema; | |
-- create a stage | |
CREATE OR REPLACE STAGE mixpanel_data.schema.mp_azure_snow_pipeline | |
STORAGE_INTEGRATION = azure_intergration | |
URL = 'azure://mixpaneldemo.blob.core.windows.net/mixpanelpipelines/mp_parquet/2685993/mp_master_event'; | |
-- see files in stage | |
LIST @mixpanel_data.schema.mp_azure_snow_pipeline; | |
-- create table | |
CREATE OR REPLACE TABLE mixpanel_data.schema.events_raw ( | |
parquet_data VARIANT | |
); | |
-- populate unschematized table from stage; unsure if this is required to infer schema later... | |
COPY INTO mixpanel_data.schema.events_raw | |
FROM (SELECT $1 | |
FROM @mixpanel_data.schema.mp_azure_snow_pipeline) | |
FILE_FORMAT = (TYPE = 'PARQUET'); | |
-- FOR JSON LOADING... but you don't can't infer schema... so usually not a good approach | |
-- COPY INTO mixpanel_data.schema.events_raw | |
-- FROM (SELECT metadata$filename, CURRENT_TIMESTAMP(), $1 | |
-- FROM @mixpanel_data.schema.mp_azure_snow_pipeline) | |
-- FILE_FORMAT = (TYPE = 'JSON', COMPRESSION = 'GZIP'); | |
-- CELEBRATE | |
-- you now have a snowflake table that has your mixpanel data; you can query it! | |
SELECT * FROM mixpanel_data.schema.events_raw | |
-- make a schema for mixpanel files | |
CREATE OR REPLACE FILE FORMAT mixpanel_pq | |
TYPE = 'PARQUET'; | |
-- have snowflake infer the schema for that format | |
ALTER WAREHOUSE MP_PIPELINE RESUME; | |
SELECT * | |
FROM TABLE( | |
INFER_SCHEMA( | |
LOCATION => '@mixpanel_data.schema.mp_azure_snow_pipeline', | |
FILE_FORMAT => 'mixpanel_pq' | |
) | |
); | |
-- schemeatized table creation (output from ^^ command) ... there are better ways to automate the creation of this table. | |
CREATE OR REPLACE TABLE mixpanel_data.schema.events_schematized ( | |
mp_current_url TEXT, | |
shots_taken REAL, | |
dt TEXT, | |
mp_initial_referrer TEXT, | |
status_error_code TEXT, | |
device_info_os_version TEXT, | |
status TEXT, | |
target_page_name_id REAL, | |
text TEXT, | |
location_page_name_name TEXT, | |
upload_timestamp REAL, | |
mp_name TEXT, | |
video_author TEXT, | |
mp_user_id TEXT, | |
foo TEXT, | |
distinct_id TEXT, | |
device_info_referrer_info_advertising_id TEXT, | |
mp_duration REAL, | |
res_timestamp REAL, | |
mp_event_name TEXT, | |
mp_initial_referring_domain TEXT, | |
device_info_project_name TEXT, | |
mp_import BOOLEAN, | |
collect_timestamp REAL, | |
mp_lib TEXT, | |
mp_identified_id TEXT, | |
mp_browser TEXT, | |
mp_processing_time_ms REAL, | |
device_info_referrer_info_user_id TEXT, | |
extra_lastpage TEXT, | |
mpfingerprint TEXT, | |
device_info_referrer_info_type TEXT, | |
mp_lib_version TEXT, | |
mp_country_code TEXT, | |
device_info_referrer_info_device_id TEXT, | |
device_info TEXT, | |
res_tag TEXT, | |
device_info_referrer_info_extra TEXT, | |
device_info_referrer_info_extra_info_original_url TEXT, | |
status_status_id REAL, | |
video_title TEXT, | |
device_info_os_type_name TEXT, | |
mp_screen_width REAL, | |
current_url_path TEXT, | |
mp_device_id TEXT, | |
device_info_user_id TEXT, | |
mp_os TEXT, | |
device_info_referrer_info_extra_info_utm_campaign TEXT, | |
device_info_referrer_info_extra_info_utm_source TEXT, | |
device_info_advertising_id TEXT, | |
mp_insert_id TEXT, | |
location_page_name_id REAL, | |
mp_city TEXT, | |
extra TEXT, | |
action TEXT, | |
mp_mp_api_endpoint TEXT, | |
target_params_pageshowid TEXT, | |
location TEXT, | |
video_length REAL, | |
device_info_referrer_info_appsflyer_id TEXT, | |
mp_region TEXT, | |
video_id TEXT, | |
video_url TEXT, | |
device_info_model TEXT, | |
device_info_device_id TEXT, | |
target_page_name_name TEXT, | |
mp_anon_id TEXT, | |
action_name TEXT, | |
video_quality TEXT, | |
device_info_os_type_id REAL, | |
status_http_code REAL, | |
status_status_name TEXT, | |
target TEXT, | |
hour REAL, | |
mp_browser_version REAL, | |
device_info_referrer_info_install_time TEXT, | |
target_params_pagesessionid TEXT, | |
mp_screen_height REAL, | |
mp_sent_by_lib_version TEXT, | |
status_error_message TEXT, | |
device_info_app_version TEXT, | |
current_url_protocol TEXT, | |
device_info_referrer_info_extra_info_utm_medium TEXT, | |
action_id REAL, | |
device_info_referrer_info_channel TEXT, | |
current_page_title TEXT, | |
device_info_referrer_info_target_id TEXT, | |
time NUMBER(38, 0), | |
sampling_factor REAL, | |
current_domain TEXT, | |
cart TEXT, | |
mp_mp_api_timestamp_ms REAL | |
); | |
ALTER WAREHOUSE MP_PIPELINE RESUME; | |
-- load the data schematized | |
COPY INTO mixpanel_data.schema.events_schematized | |
FROM @mixpanel_data.schema.mp_azure_snow_pipeline | |
FILE_FORMAT = (TYPE = 'PARQUET') | |
MATCH_BY_COLUMN_NAME=CASE_INSENSITIVE; -- this makes it all work! | |
-- query the data | |
SELECT * FROM mixpanel_data.schema.events_schematized | |
-- automate data loading to run twice a day | |
CREATE OR REPLACE TASK mixpanel_data_loader | |
SCHEDULE = 'USING CRON 0 1,13 * * * UTC' -- Runs at 1 AM and 1 PM UTC | |
AS | |
COPY INTO mixpanel_data.schema.events_schematized | |
FROM @mixpanel_data.schema.mp_azure_snow_pipeline | |
FILE_FORMAT = (TYPE = 'PARQUET') | |
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE; | |
-- turn on / pause the automation | |
ALTER TASK mixpanel_data_loader RESUME; | |
ALTER TASK mixpanel_data_loader SUSPEND; | |
-- kill the task if you want to automate a different way | |
DROP TASK IF EXISTS mixpanel_data_loader; | |
-- view historical runs, errors, etc... | |
ALTER WAREHOUSE MP_PIPELINE RESUME; | |
SELECT * | |
FROM TABLE(INFORMATION_SCHEMA.COPY_HISTORY( | |
TABLE_NAME => 'events_schematized', | |
START_TIME => dateadd('hours', -48, current_timestamp()), | |
END_TIME => current_timestamp() | |
)); | |
-- optional: create a stored procedure for the loading | |
CREATE OR REPLACE PROCEDURE load_data_into_events_schematized() | |
RETURNS STRING | |
LANGUAGE SQL | |
AS | |
$$ | |
BEGIN | |
COPY INTO mixpanel_data.schema.events_schematized | |
FROM @mixpanel_data.schema.mp_azure_snow_pipeline | |
FILE_FORMAT = (TYPE = 'PARQUET') | |
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE; | |
RETURN 'Data Load Completed Successfully'; | |
END; | |
$$; | |
-- delete that procedure... because stored procedures are garbage... <3 jeff. | |
DROP PROCEDURE IF EXISTS load_data_into_events_schematized(); | |
-- before you quit! | |
ALTER WAREHOUSE MP_PIPELINE SUSPEND; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
what you end up with:
schematized table: