Guest User

Untitled

a guest
Jan 21st, 2019
76
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.77 KB | None | 0 0
  1. # coding: utf-8
  2. # Python 2.7
  3.  
  4. import logging
  5. import datetime
  6.  
  7. import apache_beam as beam
  8. from apache_beam.options.pipeline_options import PipelineOptions
  9. from apache_beam.io import ReadFromText
  10.  
  11. # ###############
  12. def run(argv=None):
  13.  
  14. p = beam.Pipeline(options=PipelineOptions())
  15.  
  16.  
  17. class Printer(beam.DoFn):
  18. def process(self, element):
  19. print element
  20.  
  21. class Transaction(beam.DoFn):
  22. def process(self, element):
  23. date, time, id, item = element.split(',')
  24. if date!='date': # just to avoid the problems caused by the csv table header
  25. return [{"date": date + ' ' + time, "id": id, "item": item}]
  26.  
  27.  
  28. data_from_source = (p
  29. | 'Read the source file' >> ReadFromText('./input/BreadBasket_DMS.csv')
  30. | 'Clean the items' >> beam.ParDo(Transaction())
  31. )
  32.  
  33. project_id = "project-id" # replace with your project ID
  34. dataset_id = 'dataset_001' # replace with your dataset ID
  35. table_id = 'bread_data' # replace with your table ID
  36. table_schema = ('date:TIMESTAMP, id:STRING, item:STRING')
  37.  
  38. # Persist to BigQuery
  39. # WriteToBigQuery accepts the data as list of JSON objects
  40. data_from_source | 'Write' >> beam.io.WriteToBigQuery(
  41. table=table_id,
  42. dataset=dataset_id,
  43. project=project_id,
  44. schema=table_schema,
  45. create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
  46. write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
  47. batch_size=int(100)
  48. )
  49.  
  50. result = p.run()
  51. result.wait_until_finish()
  52.  
  53. if __name__ == '__main__':
  54. logger = logging.getLogger().setLevel(logging.INFO)
  55. run()
Add Comment
Please, Sign In to add comment