Skip to content

Instantly share code, notes, and snippets.

@ras44
Created December 8, 2017 10:55
Show Gist options
  • Save ras44/d42be2b6e1fd78d217d0190da3a53588 to your computer and use it in GitHub Desktop.
Save ras44/d42be2b6e1fd78d217d0190da3a53588 to your computer and use it in GitHub Desktop.
-- Note: All variables that look like <VARIABLE_NAME> need to be changed to user's preferences
-- Pipelines to migrate from non-partitioned to daily partitioned tables
-- This example illustrates partitioning only the past three days
-- Modify the invocation queries to partition dates between any date range
DEFINE ({
"connection": "<YOUR_CONNECTION>",
"mode": "bigquery-v2",
"pipelineRenderer": "MUSTACHE",
"bigquery-v2": {
"JobConfigurationQuery": {
"allowLargeResults": false,
"createDisposition": "CREATE_IF_NEEDED",
"flattenResults": true,
"maximumBillingTier": 1,
"useLegacySql": false,
"useQueryCache": true,
"writeDisposition": "WRITE_TRUNCATE"
},
"QueryRequest": {
"maxResults": 1000,
"timeoutMs": 1000000,
"useLegacySql": false,
"useQueryCache": true
},
"JobConfigurationExtract": {
"compression": "NONE",
"destinationFormat": "CSV",
"fieldDelimiter": ",",
"printHeader": false
}
}
});
CREATE PIPELINE <YOUR_USERNAME>.migrating_to_partitioned_step_001_create_pivot AS (
DROP TABLE {{{dataset_id}}}.{{{table_prefix}}}_partitions;
CREATE TABLE {{{dataset_id}}}.{{{table_prefix}}}_partitions AS (
SELECT
{{#date_list}}
ARRAY_CONCAT_AGG(CASE WHEN d = 'day{{{yyyymmdd}}}' THEN r END) AS day_{{{yyyymmdd}}},
{{/date_list}}
line
FROM (
SELECT d, r, ROW_NUMBER() OVER(PARTITION BY d) AS line
FROM (
SELECT
stn, CONCAT('day', year, mo, da) AS d, ARRAY_AGG(t) AS r
FROM `bigquery-public-data.noaa_gsod.gsod2017` AS t
GROUP BY stn, d
)
)
GROUP BY line
)
;
);
RUN PIPELINE <YOUR_USERNAME>.migrating_to_partitioned_step_001_create_pivot (
SELECT
'<YOUR_DATASET_ID>' as dataset_id,
'tmp_mtp_001' as table_prefix,
CONCAT(
'[',
STRING_AGG(
CONCAT('{"yyyymmdd":"',FORMAT_DATE('%Y%m%d',partition_date),'"}')
),
']'
) as date_list
FROM (
SELECT
DATE_ADD(DATE(CURRENT_DATETIME()), INTERVAL -n DAY) as partition_date
FROM (
SELECT [1,2,3] as n
),
UNNEST(n) AS n
)
);
CREATE PIPELINE <YOUR_USERNAME>.migrating_to_partitioned_step_002_unnest AS (
CREATE TABLE {{{dataset_id}}}.{{{table_prefix}}}_{{{day_partition_date}}} AS (
SELECT r.*
FROM {{{dataset_id}}}.{{{table_prefix}}}_partitions, UNNEST({{{day_partition_date}}}) as r
);
);
RUN PIPELINE <YOUR_USERNAME>.migrating_to_partitioned_step_002_unnest (
SELECT
'<YOUR_USERNAME>' as dataset_id,
'tmp_mtp_001' as table_prefix,
CONCAT('day_',FORMAT_DATE('%Y%m%d',partition_date)) as day_partition_date
FROM (
SELECT
DATE_ADD(DATE(CURRENT_DATETIME()), INTERVAL -n DAY) as partition_date
FROM (
SELECT [1,2,3] as n
),
UNNEST(n) AS n
)
);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment