Last active
June 30, 2022 12:30
-
-
Save vvgsrk/1c8e76e9faf5b049eb983725207da31d to your computer and use it in GitHub Desktop.
Create database objects
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- A database, schema and table are required before you can load data | |
CREATE DATABASE amadeus_dynamic_pricing; | |
CREATE SCHEMA amadeus_dynamic_pricing.dynamic_pricing; | |
-- Create a table in newly created permanent database as same as the table from the share | |
CREATE TABLE amadeus_dynamic_pricing.dynamic_pricing.dp_logs | |
AS | |
SELECT * FROM dynamic_pricing_data_share_by_amadeus.dynamic_pricing.dp_logs WHERE 1=2; | |
-- Insert all existing rows from the share table to the target table | |
INSERT INTO amadeus_dynamic_pricing.dynamic_pricing.dp_logs | |
SELECT * FROM dynamic_pricing_data_share_by_amadeus.dynamic_pricing.dp_logs; | |
-- Create a append only stream to track newely added rows from the share table | |
CREATE OR REPLACE STREAM amadeus_dynamic_pricing.dynamic_pricing.dp_logs_stream | |
ON TABLE dynamic_pricing_data_share_by_amadeus.dynamic_pricing.dp_logs | |
APPEND_ONLY = TRUE | |
; | |
-- Create SQL stored procedure to insert data from stream to target table | |
CREATE OR REPLACE PROCEDURE amadeus_dynamic_pricing.dynamic_pricing.insert_dp_logs() | |
RETURNS INTEGER | |
LANGUAGE SQL | |
AS | |
$$ | |
BEGIN | |
INSERT INTO amadeus_dynamic_pricing.dynamic_pricing.dp_logs( | |
session_details, solution, product, created_time, created_date, version) | |
SELECT session_details, solution, product, created_time, created_date, version | |
FROM amadeus_dynamic_pricing.dynamic_pricing.dp_logs_stream | |
WHERE METADATA$ACTION = 'INSERT'; | |
COMMIT; | |
END; | |
$$; | |
-- Create a task to schedule the load and invoke the procedure to insert data | |
CREATE OR REPLACE TASK amadeus_dynamic_pricing.dynamic_pricing.dp_logs_task | |
WAREHOUSE = LOADER_L | |
SCHEDULE = 'USING CRON 0 1 * * * UTC' | |
WHEN | |
SYSTEM$STREAM_HAS_DATA('DP_LOGS_STREAM') | |
AS | |
call amadeus_dynamic_pricing.dynamic_pricing.insert_dp_logs() | |
; | |
-- When you create a task, it wouldn't start so you need to explicitly resume it using following statement | |
ALTER TASK IF EXISTS amadeus_dynamic_pricing.dynamic_pricing.dp_logs_task RESUME; | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment