Created
December 8, 2017 10:55
-
-
Save ras44/d42be2b6e1fd78d217d0190da3a53588 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- Note: All variables that look like <VARIABLE_NAME> need to be changed to user's preferences | |
-- Pipelines to migrate from non-partitioned to daily partitioned tables | |
-- This example illustrates partitioning only the past three days | |
-- Modify the invocation queries to partition dates between any date range | |
DEFINE ({ | |
"connection": "<YOUR_CONNECTION>", | |
"mode": "bigquery-v2", | |
"pipelineRenderer": "MUSTACHE", | |
"bigquery-v2": { | |
"JobConfigurationQuery": { | |
"allowLargeResults": false, | |
"createDisposition": "CREATE_IF_NEEDED", | |
"flattenResults": true, | |
"maximumBillingTier": 1, | |
"useLegacySql": false, | |
"useQueryCache": true, | |
"writeDisposition": "WRITE_TRUNCATE" | |
}, | |
"QueryRequest": { | |
"maxResults": 1000, | |
"timeoutMs": 1000000, | |
"useLegacySql": false, | |
"useQueryCache": true | |
}, | |
"JobConfigurationExtract": { | |
"compression": "NONE", | |
"destinationFormat": "CSV", | |
"fieldDelimiter": ",", | |
"printHeader": false | |
} | |
} | |
}); | |
CREATE PIPELINE <YOUR_USERNAME>.migrating_to_partitioned_step_001_create_pivot AS ( | |
DROP TABLE {{{dataset_id}}}.{{{table_prefix}}}_partitions; | |
CREATE TABLE {{{dataset_id}}}.{{{table_prefix}}}_partitions AS ( | |
SELECT | |
{{#date_list}} | |
ARRAY_CONCAT_AGG(CASE WHEN d = 'day{{{yyyymmdd}}}' THEN r END) AS day_{{{yyyymmdd}}}, | |
{{/date_list}} | |
line | |
FROM ( | |
SELECT d, r, ROW_NUMBER() OVER(PARTITION BY d) AS line | |
FROM ( | |
SELECT | |
stn, CONCAT('day', year, mo, da) AS d, ARRAY_AGG(t) AS r | |
FROM `bigquery-public-data.noaa_gsod.gsod2017` AS t | |
GROUP BY stn, d | |
) | |
) | |
GROUP BY line | |
) | |
; | |
); | |
RUN PIPELINE <YOUR_USERNAME>.migrating_to_partitioned_step_001_create_pivot ( | |
SELECT | |
'<YOUR_DATASET_ID>' as dataset_id, | |
'tmp_mtp_001' as table_prefix, | |
CONCAT( | |
'[', | |
STRING_AGG( | |
CONCAT('{"yyyymmdd":"',FORMAT_DATE('%Y%m%d',partition_date),'"}') | |
), | |
']' | |
) as date_list | |
FROM ( | |
SELECT | |
DATE_ADD(DATE(CURRENT_DATETIME()), INTERVAL -n DAY) as partition_date | |
FROM ( | |
SELECT [1,2,3] as n | |
), | |
UNNEST(n) AS n | |
) | |
); | |
CREATE PIPELINE <YOUR_USERNAME>.migrating_to_partitioned_step_002_unnest AS ( | |
CREATE TABLE {{{dataset_id}}}.{{{table_prefix}}}_{{{day_partition_date}}} AS ( | |
SELECT r.* | |
FROM {{{dataset_id}}}.{{{table_prefix}}}_partitions, UNNEST({{{day_partition_date}}}) as r | |
); | |
); | |
RUN PIPELINE <YOUR_USERNAME>.migrating_to_partitioned_step_002_unnest ( | |
SELECT | |
'<YOUR_USERNAME>' as dataset_id, | |
'tmp_mtp_001' as table_prefix, | |
CONCAT('day_',FORMAT_DATE('%Y%m%d',partition_date)) as day_partition_date | |
FROM ( | |
SELECT | |
DATE_ADD(DATE(CURRENT_DATETIME()), INTERVAL -n DAY) as partition_date | |
FROM ( | |
SELECT [1,2,3] as n | |
), | |
UNNEST(n) AS n | |
) | |
); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment