Skip to content

Instantly share code, notes, and snippets.

@odidere
Forked from mikeharding/streams_tasks.sql
Created June 23, 2021 22:00
Show Gist options
  • Save odidere/6b8c26f1812b1c609c90a543183e04ba to your computer and use it in GitHub Desktop.
Save odidere/6b8c26f1812b1c609c90a543183e04ba to your computer and use it in GitHub Desktop.
Snowflake Streams and Tasks
----------------------------------------------------------------
-- Streams - Change Data Capture (CDC) on Snowflake tables
-- Tasks - Schedule execution of a statement
-- MERGE - I/U/D based on second table or subquery
----------------------------------------------------------------
-- reset the example
drop table source_table;
drop table target_table;
drop stream source_table_stream;
-- create the tables
create or replace table source_table (id integer, name varchar);
create or replace table target_table (id integer, name varchar);
-- create the stream
create or replace stream source_table_stream
on table source_table;
-- INSERT scenario
insert into source_table values(1,'Johnny');
insert into source_table values(2,'Kate');
insert into source_table values(3,'Marie');
insert into source_table values(4,'Peter');
insert into source_table values(5,'Edward');
insert into source_table values(6,'thomas');
select * from source_table order by id;
select * from target_table order by id;
select * from source_table_stream order by id;
-- MERGE scenario
-- CREATE TASK <TASK_NAME> WAREHOUSE <WH_NAME> SCHEDULE '1 Minute' AS
merge into target_table as t
using ( select * from stream_source_table
where not (metadata$action = 'DELETE'
and metadata$isupdate = TRUE)
) as s
on t.id = s.id
when matched
and s.metadata$action = 'INSERT'
and s.metadata$isupdate then
update set t.name = s.name
when matched
and s.metadata$action = 'DELETE' then delete
when not matched
and s.metadata$action = 'INSERT' then
insert (id, name)
values (s.id, s.name);
select * from target_table order by id;
-- UPDATE scenario
update source_table
set name = 'Thomas'
where id = 6;
select * from source_table_stream;
select * from target_table order by id desc;
-- DELETE scenario
delete from source_table where id = 5;
select * from source_table_stream;
select * from target_table order by id;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment