Advertisement
Guest User

Untitled

a guest
Jun 19th, 2019
67
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.05 KB | None | 0 0
  1. import argparse, logging
  2.  
  3. import apache_beam as beam
  4. from apache_beam.options.pipeline_options import PipelineOptions
  5. from apache_beam.options.pipeline_options import SetupOptions
  6.  
  7.  
  8. def compute_interest_map(data_item):
  9. return data_item + 1
  10.  
  11. class compute_interest_pardo(beam.DoFn):
  12. def process(self, element):
  13. yield element + 2
  14.  
  15. class log_results(beam.DoFn):
  16. def process(self, element):
  17. logging.info(">> Interest: %s", element)
  18.  
  19.  
  20. def run(argv=None):
  21. parser = argparse.ArgumentParser()
  22. known_args, pipeline_args = parser.parse_known_args(argv)
  23.  
  24. pipeline_options = PipelineOptions(pipeline_args)
  25. pipeline_options.view_as(SetupOptions).save_main_session = True
  26. p = beam.Pipeline(options=pipeline_options)
  27.  
  28. events = (p
  29. | 'Create' >> beam.Create([1, 2, 3]) \
  30. | 'Add 1' >> beam.Map(lambda x: compute_interest_map(x)) \
  31. | 'Add 2' >> beam.ParDo(compute_interest_pardo()) \
  32. | 'Print' >> beam.ParDo(log_results()))
  33.  
  34. result = p.run()
  35. result.wait_until_finish()
  36.  
  37. if __name__ == '__main__':
  38. logging.getLogger().setLevel(logging.INFO)
  39. run()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement