Last active
February 10, 2024 09:48
-
-
Save vvgsrk/20602e82a0cc2c2e7a81ae0396075284 to your computer and use it in GitHub Desktop.
Snowflake serverless task batch load using schema detection and evolution
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
USE ROLE data_engineer; | |
-- Create external stage | |
CREATE OR REPLACE STAGE DP_DEV.STAGE.DP_DEV_DATA_INBOUND_TEST | |
STORAGE_INTEGRATION = DP_INGESTION_DEV | |
URL = 's3://dp-dev-data-inbound-test/' | |
; | |
-- List files in external stage | |
LIST @DP_DEV.STAGE.DP_DEV_DATA_INBOUND_TEST; | |
-- Create a file format that describes a set of staged data to access or load into Snowflake tables. | |
create or replace file format DP_DEV.STAGE.test_csv_format | |
type = 'csv' | |
compression = 'auto' | |
field_delimiter = ',' | |
record_delimiter = '\n' | |
field_optionally_enclosed_by = '\042' | |
error_on_column_count_mismatch = FALSE | |
parse_header = true; | |
-- Check the schema of the staged file using infer schema table function | |
select * from table( | |
infer_schema( | |
location=>'@DP_DEV.STAGE.DP_DEV_DATA_INBOUND_TEST/order_csv/', | |
file_format=>'DP_DEV.STAGE.test_csv_format' | |
) | |
); | |
-- create stage or raw schema | |
create schema dp_dev.stage_orders; | |
-- create target table | |
create or replace table dp_dev.stage_orders.order_tbl_hist | |
using template ( | |
select arr_agg(object_construct(*)) | |
from table( | |
infer_schema( | |
location=>'@DP_DEV.STAGE.DP_DEV_DATA_INBOUND_TEST/order_csv/', | |
file_format=>'DP_DEV.STAGE.test_csv_format' | |
) | |
)) | |
ENABLE_SCHEMA_EVOLUTION = TRUE; | |
-- Grant the EVOLVE SCHEMA privilege to any other roles that could insert data and evolve table schema in addition to the table owner | |
GRANT EVOLVE SCHEMA ON TABLE dp_dev.stage_orders.order_tbl_hist TO ROLE data_engineer; | |
-- Use show tables to check weather enable_schema_evolution enabled or not | |
SHOW TABLES LIKE 'order_tbl_hist'; | |
-- Use decribe table to see the column data types | |
DESCRIBE TABLE dp_dev.stage_orders.order_tbl_hist; | |
-- micro batch: regular serverless task to load data into stage table | |
CREATE OR REPLACE TASK dp_dev.stage_orders.order_task | |
schedule = '1 MINUTE' | |
error_integration = TASK_ERROR_NOTIFICATIONS | |
USER_TASK_MANAGED_INITIAL_WAREHOUSE_SIZE = 'SMALL' | |
AS | |
-- Standard data load without transformations | |
COPY INTO dp_dev.stage_orders.order_tbl_hist | |
FROM '@DP_DEV.STAGE.DP_DEV_DATA_INBOUND_TEST/order_data.csv' | |
FILE_FORMAT = 'dp_dev.stage.test_csv_format' | |
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE | |
ON_ERROR = CONTINUE; | |
-- To manually execute the task | |
EXECUTE TASK dp_dev.stage_orders.order_task; | |
-- Check the status of serverless and user managed task. | |
select * from table(information_schema.task_history()) | |
where name = 'ORDER_TASK'; | |
-- To see whether credits are used for Serverless task executions and consumption, query the serverless_task_history table. | |
select * from table(information_schema.SERVERLESS_TASK_HISTORY()) | |
where task_name = 'ORDER_TASK'; | |
-- Current execution status. | |
SHOW TASKS; | |
-- To resume the task | |
ALTER TASK dp_dev.stage_orders.order_task RESUME; | |
-- To suspend the task | |
ALTER TASK dp_dev.stage_orders.order_task SUSPEND; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment