Skip to content

Instantly share code, notes, and snippets.

@vvgsrk
Last active February 10, 2024 09:48
Show Gist options
  • Save vvgsrk/20602e82a0cc2c2e7a81ae0396075284 to your computer and use it in GitHub Desktop.
Save vvgsrk/20602e82a0cc2c2e7a81ae0396075284 to your computer and use it in GitHub Desktop.
Snowflake serverless task batch load using schema detection and evolution
USE ROLE data_engineer;
-- Create external stage
CREATE OR REPLACE STAGE DP_DEV.STAGE.DP_DEV_DATA_INBOUND_TEST
STORAGE_INTEGRATION = DP_INGESTION_DEV
URL = 's3://dp-dev-data-inbound-test/'
;
-- List files in external stage
LIST @DP_DEV.STAGE.DP_DEV_DATA_INBOUND_TEST;
-- Create a file format that describes a set of staged data to access or load into Snowflake tables.
create or replace file format DP_DEV.STAGE.test_csv_format
type = 'csv'
compression = 'auto'
field_delimiter = ','
record_delimiter = '\n'
field_optionally_enclosed_by = '\042'
error_on_column_count_mismatch = FALSE
parse_header = true;
-- Check the schema of the staged file using infer schema table function
select * from table(
infer_schema(
location=>'@DP_DEV.STAGE.DP_DEV_DATA_INBOUND_TEST/order_csv/',
file_format=>'DP_DEV.STAGE.test_csv_format'
)
);
-- create stage or raw schema
create schema dp_dev.stage_orders;
-- create target table
create or replace table dp_dev.stage_orders.order_tbl_hist
using template (
select arr_agg(object_construct(*))
from table(
infer_schema(
location=>'@DP_DEV.STAGE.DP_DEV_DATA_INBOUND_TEST/order_csv/',
file_format=>'DP_DEV.STAGE.test_csv_format'
)
))
ENABLE_SCHEMA_EVOLUTION = TRUE;
-- Grant the EVOLVE SCHEMA privilege to any other roles that could insert data and evolve table schema in addition to the table owner
GRANT EVOLVE SCHEMA ON TABLE dp_dev.stage_orders.order_tbl_hist TO ROLE data_engineer;
-- Use show tables to check weather enable_schema_evolution enabled or not
SHOW TABLES LIKE 'order_tbl_hist';
-- Use decribe table to see the column data types
DESCRIBE TABLE dp_dev.stage_orders.order_tbl_hist;
-- micro batch: regular serverless task to load data into stage table
CREATE OR REPLACE TASK dp_dev.stage_orders.order_task
schedule = '1 MINUTE'
error_integration = TASK_ERROR_NOTIFICATIONS
USER_TASK_MANAGED_INITIAL_WAREHOUSE_SIZE = 'SMALL'
AS
-- Standard data load without transformations
COPY INTO dp_dev.stage_orders.order_tbl_hist
FROM '@DP_DEV.STAGE.DP_DEV_DATA_INBOUND_TEST/order_data.csv'
FILE_FORMAT = 'dp_dev.stage.test_csv_format'
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE
ON_ERROR = CONTINUE;
-- To manually execute the task
EXECUTE TASK dp_dev.stage_orders.order_task;
-- Check the status of serverless and user managed task.
select * from table(information_schema.task_history())
where name = 'ORDER_TASK';
-- To see whether credits are used for Serverless task executions and consumption, query the serverless_task_history table.
select * from table(information_schema.SERVERLESS_TASK_HISTORY())
where task_name = 'ORDER_TASK';
-- Current execution status.
SHOW TASKS;
-- To resume the task
ALTER TASK dp_dev.stage_orders.order_task RESUME;
-- To suspend the task
ALTER TASK dp_dev.stage_orders.order_task SUSPEND;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment