Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # coding: utf-8
- # Python 2.7
- import logging
- import datetime
- import apache_beam as beam
- from apache_beam.options.pipeline_options import PipelineOptions
- from apache_beam.io import ReadFromText
- # ###############
- def run(argv=None):
- p = beam.Pipeline(options=PipelineOptions())
- class Printer(beam.DoFn):
- def process(self, element):
- print element
- class Transaction(beam.DoFn):
- def process(self, element):
- date, time, id, item = element.split(',')
- if date!='date': # just to avoid the problems caused by the csv table header
- return [{"date": date + ' ' + time, "id": id, "item": item}]
- data_from_source = (p
- | 'Read the source file' >> ReadFromText('./input/BreadBasket_DMS.csv')
- | 'Clean the items' >> beam.ParDo(Transaction())
- )
- project_id = "project-id" # replace with your project ID
- dataset_id = 'dataset_001' # replace with your dataset ID
- table_id = 'bread_data' # replace with your table ID
- table_schema = ('date:TIMESTAMP, id:STRING, item:STRING')
- # Persist to BigQuery
- # WriteToBigQuery accepts the data as list of JSON objects
- data_from_source | 'Write' >> beam.io.WriteToBigQuery(
- table=table_id,
- dataset=dataset_id,
- project=project_id,
- schema=table_schema,
- create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
- write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
- batch_size=int(100)
- )
- result = p.run()
- result.wait_until_finish()
- if __name__ == '__main__':
- logger = logging.getLogger().setLevel(logging.INFO)
- run()
Add Comment
Please, Sign In to add comment