Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- -- Note: All variables that look like <VARIABLE_NAME> need to be changed to user's preferences
- -- Pipelines to migrate from non-partitioned to daily partitioned tables
- -- This example illustrates partitioning only the past three days; try to modify it to partition dates between any date range!
- DEFINE ({
- "connection": "<YOUR_CONNECTION>",
- "mode": "bigquery-v2",
- "pipelineRenderer": "MUSTACHE",
- "bigquery-v2": {
- "JobConfigurationQuery": {
- "allowLargeResults": false,
- "createDisposition": "CREATE_IF_NEEDED",
- "flattenResults": true,
- "maximumBillingTier": 1,
- "useLegacySql": false,
- "useQueryCache": true,
- "writeDisposition": "WRITE_TRUNCATE"
- },
- "QueryRequest": {
- "maxResults": 1000,
- "timeoutMs": 1000000,
- "useLegacySql": false,
- "useQueryCache": true
- },
- "JobConfigurationExtract": {
- "compression": "NONE",
- "destinationFormat": "CSV",
- "fieldDelimiter": ",",
- "printHeader": false
- }
- }
- });
- CREATE PIPELINE <YOUR_USERNAME>.migrating_to_partitioned_step_001_create_pivot AS (
- DROP TABLE {{{dataset_id}}}.{{{table_prefix}}}_partitions;
- CREATE TABLE {{{dataset_id}}}.{{{table_prefix}}}_partitions AS (
- SELECT
- {{#date_list}}
- ARRAY_CONCAT_AGG(CASE WHEN d = 'day{{{yyyymmdd}}}' THEN r END) AS day_{{{yyyymmdd}}},
- {{/date_list}}
- line
- FROM (
- SELECT d, r, ROW_NUMBER() OVER(PARTITION BY d) AS line
- FROM (
- SELECT
- stn, CONCAT('day', year, mo, da) AS d, ARRAY_AGG(t) AS r
- FROM `bigquery-public-data.noaa_gsod.gsod2017` AS t
- GROUP BY stn, d
- )
- )
- GROUP BY line
- )
- ;
- );
- RUN PIPELINE <YOUR_USERNAME>.migrating_to_partitioned_step_001_create_pivot (
- SELECT
- '<YOUR_DATASET_ID>' as dataset_id,
- 'tmp_mtp_001' as table_prefix,
- CONCAT(
- '[',
- STRING_AGG(
- CONCAT('{"yyyymmdd":"',FORMAT_DATE('%Y%m%d',partition_date),'"}')
- ),
- ']'
- ) as date_list
- FROM (
- SELECT
- DATE_ADD(DATE(CURRENT_DATETIME()), INTERVAL -n DAY) as partition_date
- FROM (
- SELECT [1,2,3] as n
- ),
- UNNEST(n) AS n
- )
- );
- CREATE PIPELINE <YOUR_USERNAME>.migrating_to_partitioned_step_002_unnest AS (
- CREATE TABLE {{{dataset_id}}}.{{{table_prefix}}}_{{{day_partition_date}}} AS (
- SELECT r.*
- FROM {{{dataset_id}}}.{{{table_prefix}}}_partitions, UNNEST({{{day_partition_date}}}) as r
- );
- );
- RUN PIPELINE <YOUR_USERNAME>.migrating_to_partitioned_step_002_unnest (
- SELECT
- '<YOUR_USERNAME>' as dataset_id,
- 'tmp_mtp_001' as table_prefix,
- CONCAT('day_',FORMAT_DATE('%Y%m%d',partition_date)) as day_partition_date
- FROM (
- SELECT
- DATE_ADD(DATE(CURRENT_DATETIME()), INTERVAL -n DAY) as partition_date
- FROM (
- SELECT [1,2,3] as n
- ),
- UNNEST(n) AS n
- )
- );
Add Comment
Please, Sign In to add comment