Advertisement
Guest User

Untitled

a guest
Mar 19th, 2019
157
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.94 KB | None | 0 0
  1. import base64
  2. import json
  3. import logging
  4. from google.cloud import bigquery
  5.  
  6. # Insert name of Google Cloud Storage bucket
  7. BUCKET_NAME = '<your_bucket_name>'
  8. # Insert Google Cloud project ID
  9. PROJECT = '<your_project_id>'
  10. # Insert required file format, either 'NEWLINE_DELIMITED_JSON', 'CSV' or 'AVRO'
  11. FILE_FORMAT = 'NEWLINE_DELIMITED_JSON'
  12. # Insert True or False depending on if you want it to be zipped
  13. ZIPPED = True
  14. # For JSON use GZIP, for AVRO use SNAPPY or DEFLATE. If none use None
  15. COMPRESSION_FORMAT = 'GZIP'
  16.  
  17. def extract_bigquery_table(event, context):
  18. pubsub_message = base64.b64decode(event['data']).decode('utf-8')
  19. obj = json.loads(pubsub_message)
  20.  
  21. bq_client = bigquery.Client()
  22. dataset_id = obj['protoPayload']['serviceData']['jobCompletedEvent']['job']['jobConfiguration']['load']['destinationTable']['datasetId']
  23. table_id = obj['protoPayload']['serviceData']['jobCompletedEvent']['job']['jobConfiguration']['load']['destinationTable']['tableId']
  24. file_name = dataset_id + '/' + dataset_id + '_' + table_id
  25.  
  26. if FILE_FORMAT == "NEWLINE_DELIMITED_JSON":
  27. file_name = file_name + '.json'
  28. elif FILE_FORMAT == "CSV":
  29. file_name = file_name + '.csv'
  30. elif FILE_FORMAT == "AVRO":
  31. file_name = file_name + '.avro'
  32. else:
  33. return False
  34.  
  35. if ZIPPED and FILE_FORMAT in ["NEWLINE_DELIMITED_JSON","CSV"]:
  36. file_name = file_name + '.gz'
  37.  
  38. destination_uri = 'gs://{}/{}'.format(BUCKET_NAME, file_name)
  39. dataset_ref = bq_client.dataset(dataset_id, project=PROJECT)
  40. table_ref = dataset_ref.table(table_id)
  41.  
  42. job_config = bigquery.ExtractJobConfig()
  43. job_config.compression = COMPRESSION_FORMAT
  44. job_config.destination_format = FILE_FORMAT
  45.  
  46. extract_job = bq_client.extract_table(
  47. table_ref,
  48. destination_uri,
  49. job_config=job_config)
  50. extract_job.result()
  51.  
  52. print('Exported {}:{}.{} to {}'.format(
  53. PROJECT, dataset_id, table_id, destination_uri))
  54.  
  55. return True
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement