Created
December 8, 2017 10:51
-
-
Save ras44/a66ea784e6fb4d2f18f084fb3d99ce53 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- Note: All variables that look like <VARIABLE_NAME> need to be changed to user's preferences | |
-- Pipelines to migrate from non-partitioned to daily partitioned tables | |
-- This example illustrates partitioning only the past three days; try to modify it to partition dates between any date range! | |
DEFINE ({ | |
"connection": "<YOUR_CONNECTION>", | |
"mode": "bigquery-v2", | |
"pipelineRenderer": "MUSTACHE", | |
"bigquery-v2": { | |
"JobConfigurationQuery": { | |
"allowLargeResults": false, | |
"createDisposition": "CREATE_IF_NEEDED", | |
"flattenResults": true, | |
"maximumBillingTier": 1, | |
"useLegacySql": false, | |
"useQueryCache": true, | |
"writeDisposition": "WRITE_TRUNCATE" | |
}, | |
"QueryRequest": { | |
"maxResults": 1000, | |
"timeoutMs": 1000000, | |
"useLegacySql": false, | |
"useQueryCache": true | |
}, | |
"JobConfigurationExtract": { | |
"compression": "NONE", | |
"destinationFormat": "CSV", | |
"fieldDelimiter": ",", | |
"printHeader": false | |
} | |
} | |
}); | |
CREATE PIPELINE <YOUR_USERNAME>.migrating_to_partitioned_step_001_create_pivot AS ( | |
DROP TABLE {{{dataset_id}}}.{{{table_prefix}}}_partitions; | |
CREATE TABLE {{{dataset_id}}}.{{{table_prefix}}}_partitions AS ( | |
SELECT | |
{{#date_list}} | |
ARRAY_CONCAT_AGG(CASE WHEN d = 'day{{{yyyymmdd}}}' THEN r END) AS day_{{{yyyymmdd}}}, | |
{{/date_list}} | |
line | |
FROM ( | |
SELECT d, r, ROW_NUMBER() OVER(PARTITION BY d) AS line | |
FROM ( | |
SELECT | |
stn, CONCAT('day', year, mo, da) AS d, ARRAY_AGG(t) AS r | |
FROM `bigquery-public-data.noaa_gsod.gsod2017` AS t | |
GROUP BY stn, d | |
) | |
) | |
GROUP BY line | |
) | |
; | |
); | |
RUN PIPELINE <YOUR_USERNAME>.migrating_to_partitioned_step_001_create_pivot ( | |
SELECT | |
'<YOUR_DATASET_ID>' as dataset_id, | |
'tmp_mtp_001' as table_prefix, | |
CONCAT( | |
'[', | |
STRING_AGG( | |
CONCAT('{"yyyymmdd":"',FORMAT_DATE('%Y%m%d',partition_date),'"}') | |
), | |
']' | |
) as date_list | |
FROM ( | |
SELECT | |
DATE_ADD(DATE(CURRENT_DATETIME()), INTERVAL -n DAY) as partition_date | |
FROM ( | |
SELECT [1,2,3] as n | |
), | |
UNNEST(n) AS n | |
) | |
); | |
CREATE PIPELINE <YOUR_USERNAME>.migrating_to_partitioned_step_002_unnest AS ( | |
CREATE TABLE {{{dataset_id}}}.{{{table_prefix}}}_{{{day_partition_date}}} AS ( | |
SELECT r.* | |
FROM {{{dataset_id}}}.{{{table_prefix}}}_partitions, UNNEST({{{day_partition_date}}}) as r | |
); | |
); | |
RUN PIPELINE <YOUR_USERNAME>.migrating_to_partitioned_step_002_unnest ( | |
SELECT | |
'<YOUR_USERNAME>' as dataset_id, | |
'tmp_mtp_001' as table_prefix, | |
CONCAT('day_',FORMAT_DATE('%Y%m%d',partition_date)) as day_partition_date | |
FROM ( | |
SELECT | |
DATE_ADD(DATE(CURRENT_DATETIME()), INTERVAL -n DAY) as partition_date | |
FROM ( | |
SELECT [1,2,3] as n | |
), | |
UNNEST(n) AS n | |
) | |
); | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment