-
-
Save odidere/6b8c26f1812b1c609c90a543183e04ba to your computer and use it in GitHub Desktop.
Snowflake Streams and Tasks
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
---------------------------------------------------------------- | |
-- Streams - Change Data Capture (CDC) on Snowflake tables | |
-- Tasks - Schedule execution of a statement | |
-- MERGE - I/U/D based on second table or subquery | |
---------------------------------------------------------------- | |
-- reset the example | |
drop table source_table; | |
drop table target_table; | |
drop stream source_table_stream; | |
-- create the tables | |
create or replace table source_table (id integer, name varchar); | |
create or replace table target_table (id integer, name varchar); | |
-- create the stream | |
create or replace stream source_table_stream | |
on table source_table; | |
-- INSERT scenario | |
insert into source_table values(1,'Johnny'); | |
insert into source_table values(2,'Kate'); | |
insert into source_table values(3,'Marie'); | |
insert into source_table values(4,'Peter'); | |
insert into source_table values(5,'Edward'); | |
insert into source_table values(6,'thomas'); | |
select * from source_table order by id; | |
select * from target_table order by id; | |
select * from source_table_stream order by id; | |
-- MERGE scenario | |
-- CREATE TASK <TASK_NAME> WAREHOUSE <WH_NAME> SCHEDULE '1 Minute' AS | |
merge into target_table as t | |
using ( select * from stream_source_table | |
where not (metadata$action = 'DELETE' | |
and metadata$isupdate = TRUE) | |
) as s | |
on t.id = s.id | |
when matched | |
and s.metadata$action = 'INSERT' | |
and s.metadata$isupdate then | |
update set t.name = s.name | |
when matched | |
and s.metadata$action = 'DELETE' then delete | |
when not matched | |
and s.metadata$action = 'INSERT' then | |
insert (id, name) | |
values (s.id, s.name); | |
select * from target_table order by id; | |
-- UPDATE scenario | |
update source_table | |
set name = 'Thomas' | |
where id = 6; | |
select * from source_table_stream; | |
select * from target_table order by id desc; | |
-- DELETE scenario | |
delete from source_table where id = 5; | |
select * from source_table_stream; | |
select * from target_table order by id; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment