Guest User

Untitled

a guest
Dec 10th, 2017
83
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.82 KB | None | 0 0
  1. -- Note: All variables that look like <VARIABLE_NAME> need to be changed to user's preferences
  2.  
  3. -- Pipelines to migrate from non-partitioned to daily partitioned tables
  4. -- This example illustrates partitioning only the past three days; try to modify it to partition dates between any date range!
  5.  
  6. DEFINE ({
  7. "connection": "<YOUR_CONNECTION>",
  8. "mode": "bigquery-v2",
  9. "pipelineRenderer": "MUSTACHE",
  10. "bigquery-v2": {
  11. "JobConfigurationQuery": {
  12. "allowLargeResults": false,
  13. "createDisposition": "CREATE_IF_NEEDED",
  14. "flattenResults": true,
  15. "maximumBillingTier": 1,
  16. "useLegacySql": false,
  17. "useQueryCache": true,
  18. "writeDisposition": "WRITE_TRUNCATE"
  19. },
  20. "QueryRequest": {
  21. "maxResults": 1000,
  22. "timeoutMs": 1000000,
  23. "useLegacySql": false,
  24. "useQueryCache": true
  25. },
  26. "JobConfigurationExtract": {
  27. "compression": "NONE",
  28. "destinationFormat": "CSV",
  29. "fieldDelimiter": ",",
  30. "printHeader": false
  31. }
  32. }
  33. });
  34.  
  35.  
  36. CREATE PIPELINE <YOUR_USERNAME>.migrating_to_partitioned_step_001_create_pivot AS (
  37.  
  38. DROP TABLE {{{dataset_id}}}.{{{table_prefix}}}_partitions;
  39.  
  40. CREATE TABLE {{{dataset_id}}}.{{{table_prefix}}}_partitions AS (
  41. SELECT
  42. {{#date_list}}
  43. ARRAY_CONCAT_AGG(CASE WHEN d = 'day{{{yyyymmdd}}}' THEN r END) AS day_{{{yyyymmdd}}},
  44. {{/date_list}}
  45. line
  46. FROM (
  47. SELECT d, r, ROW_NUMBER() OVER(PARTITION BY d) AS line
  48. FROM (
  49. SELECT
  50. stn, CONCAT('day', year, mo, da) AS d, ARRAY_AGG(t) AS r
  51. FROM `bigquery-public-data.noaa_gsod.gsod2017` AS t
  52. GROUP BY stn, d
  53. )
  54. )
  55. GROUP BY line
  56. )
  57. ;
  58.  
  59. );
  60.  
  61.  
  62. RUN PIPELINE <YOUR_USERNAME>.migrating_to_partitioned_step_001_create_pivot (
  63.  
  64. SELECT
  65. '<YOUR_DATASET_ID>' as dataset_id,
  66. 'tmp_mtp_001' as table_prefix,
  67. CONCAT(
  68. '[',
  69. STRING_AGG(
  70. CONCAT('{"yyyymmdd":"',FORMAT_DATE('%Y%m%d',partition_date),'"}')
  71. ),
  72. ']'
  73. ) as date_list
  74. FROM (
  75. SELECT
  76. DATE_ADD(DATE(CURRENT_DATETIME()), INTERVAL -n DAY) as partition_date
  77. FROM (
  78. SELECT [1,2,3] as n
  79. ),
  80. UNNEST(n) AS n
  81. )
  82.  
  83. );
  84.  
  85.  
  86. CREATE PIPELINE <YOUR_USERNAME>.migrating_to_partitioned_step_002_unnest AS (
  87.  
  88. CREATE TABLE {{{dataset_id}}}.{{{table_prefix}}}_{{{day_partition_date}}} AS (
  89. SELECT r.*
  90. FROM {{{dataset_id}}}.{{{table_prefix}}}_partitions, UNNEST({{{day_partition_date}}}) as r
  91. );
  92.  
  93. );
  94.  
  95.  
  96. RUN PIPELINE <YOUR_USERNAME>.migrating_to_partitioned_step_002_unnest (
  97.  
  98. SELECT
  99. '<YOUR_USERNAME>' as dataset_id,
  100. 'tmp_mtp_001' as table_prefix,
  101. CONCAT('day_',FORMAT_DATE('%Y%m%d',partition_date)) as day_partition_date
  102. FROM (
  103. SELECT
  104. DATE_ADD(DATE(CURRENT_DATETIME()), INTERVAL -n DAY) as partition_date
  105. FROM (
  106. SELECT [1,2,3] as n
  107. ),
  108. UNNEST(n) AS n
  109. )
  110.  
  111. );
Add Comment
Please, Sign In to add comment