Advertisement
Guest User

Untitled

a guest
Jun 20th, 2019
80
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.96 KB | None | 0 0
  1. from __future__ import absolute_import
  2.  
  3. import argparse
  4. import logging
  5.  
  6. from past.builtins import unicode
  7.  
  8. import apache_beam as beam
  9. import apache_beam.transforms.window as window
  10.  
  11. from apache_beam.examples.wordcount import WordExtractingDoFn
  12. from apache_beam.options.pipeline_options import PipelineOptions
  13. from apache_beam.options.pipeline_options import SetupOptions
  14. from apache_beam.options.pipeline_options import StandardOptions
  15.  
  16.  
  17.  
  18. def print_row(row):
  19. print row
  20. print type(row)
  21.  
  22. def filter_out_nones(row):
  23. if row is not None:
  24. yield row
  25. else:
  26. print 'we found a none! get it out'
  27.  
  28.  
  29. def run(argv=None):
  30. pipeline_options = PipelineOptions()
  31. pipeline_options.view_as(SetupOptions).save_main_session = True
  32. pipeline_options.view_as(StandardOptions).streaming = True
  33.  
  34.  
  35. p = beam.Pipeline(options=pipeline_options)
  36.  
  37.  
  38.  
  39. data = ['test1 message','test2 message',None,'test3 please work']
  40.  
  41. ## this does seem to return only the values I would hope for based on the console log
  42.  
  43. testlogOnly = (p | "makeData" >> beam.Create(data)
  44. | "filter" >> beam.ParDo(filter_out_nones)
  45. | "printtesting" >> beam.Map(print_row))
  46. # | 'encoding' >> beam.Map(lambda x: x.encode('utf-8')).with_output_types(bytes)
  47. # | "writing" >> beam.io.WriteToPubSub("projects/??/topics/??"))
  48.  
  49.  
  50. ## testlogAndWrite = (p | "MakeWriteData" >> beam.Create(data)
  51. # | "filterHere" >> beam.ParDo(filter_out_nones)
  52. # | "printHere" >> beam.Map(print_row)
  53. ## below here does not work due to the following message
  54. ## AttributeError: 'NoneType' object has no attribute 'encode' [while running 'encodeHere']
  55. # | 'encodeHere' >> beam.Map(lambda x: x.encode('utf-8')).with_output_types(bytes)
  56. # | "writeTest" >> beam.io.WriteToPubSub("projects/??/topics/??"))
  57.  
  58. result = p.run()
  59. result.wait_until_finish()
  60.  
  61.  
  62. if __name__ == '__main__':
  63. logging.getLogger().setLevel(logging.INFO)
  64. run()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement