Last active
June 20, 2020 19:11
-
-
Save sanealytics/39884f087e2046e88c8ffce19a2ef1ce to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def get_elt_queries(): | |
"""Gets parameterised queries for ELT""" | |
return { | |
# Parameters: service | |
'input_data': """ | |
select | |
data, | |
added_ts | |
from | |
XXX.TABLE | |
where | |
service = @service | |
order by | |
added_ts desc | |
""", | |
# Parameters: tid | |
'output_merge': """ | |
merge into target t | |
using ( | |
select *, | |
@tid as tid, | |
timestamp_utc as last_updated_ts, | |
tid as last_tid | |
from input_table | |
) s | |
on t.some_id = s.some_id | |
when matched then | |
update set | |
t.last_updated_ts = s.last_updated_ts, | |
t.last_tid = s.last_tid | |
when not matched then | |
insert row; | |
""" | |
} | |
# Deploy a wrapper around this function to wake up on file drop of some file | |
def sample_transform_table(): | |
input_data = get_input_df('input_data', 'some_service') # Returns pandas dataframe | |
r = service_trace_id('sample_transform_table', ...., input_data['tid'].unique().tolist()) | |
tid = json.loads(r).get('tid') | |
# When writing data back, | |
# For new rows, insert this new tid into into tid column. | |
# For updating rows, insert this new tid into prev_tid column. | |
# Check out merge example to do all in one step (when possible) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment