SHARE
TWEET

Untitled

a guest Jun 20th, 2019 57 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. from __future__ import absolute_import
  2.  
  3. import argparse
  4. import logging
  5.  
  6. from past.builtins import unicode
  7.  
  8. import apache_beam as beam
  9. import apache_beam.transforms.window as window
  10.  
  11. from apache_beam.examples.wordcount import WordExtractingDoFn
  12. from apache_beam.options.pipeline_options import PipelineOptions
  13. from apache_beam.options.pipeline_options import SetupOptions
  14. from apache_beam.options.pipeline_options import StandardOptions
  15.  
  16.  
  17.  
  18. def print_row(row):
  19.     print row
  20.     print type(row)
  21.  
  22. def filter_out_nones(row):
  23.   if row is not None:
  24.     yield row
  25.   else:
  26.     print 'we found a none! get it out'
  27.  
  28.  
  29. def run(argv=None):
  30.     pipeline_options = PipelineOptions()
  31.     pipeline_options.view_as(SetupOptions).save_main_session = True
  32.     pipeline_options.view_as(StandardOptions).streaming = True
  33.  
  34.  
  35.     p = beam.Pipeline(options=pipeline_options)
  36.  
  37.  
  38.  
  39.     data = ['test1 message','test2 message',None,'test3 please work']
  40.  
  41. ## this does seem to return only the values I would hope for based on the console log
  42.  
  43.     testlogOnly = (p | "makeData" >> beam.Create(data)
  44.                | "filter" >> beam.ParDo(filter_out_nones)
  45.                | "printtesting" >> beam.Map(print_row))
  46.             #  | 'encoding' >> beam.Map(lambda x: x.encode('utf-8')).with_output_types(bytes)
  47.             #  | "writing" >> beam.io.WriteToPubSub("projects/??/topics/??"))
  48.  
  49.  
  50. ##    testlogAndWrite = (p | "MakeWriteData" >> beam.Create(data)
  51.             #  | "filterHere" >> beam.ParDo(filter_out_nones)
  52.             #   | "printHere" >> beam.Map(print_row)
  53. ## below here does not work due to the following message
  54. ## AttributeError: 'NoneType' object has no attribute 'encode' [while running 'encodeHere']
  55.             #   | 'encodeHere' >> beam.Map(lambda x: x.encode('utf-8')).with_output_types(bytes)
  56.             # | "writeTest" >> beam.io.WriteToPubSub("projects/??/topics/??"))
  57.  
  58.     result = p.run()
  59.     result.wait_until_finish()
  60.  
  61.  
  62. if __name__ == '__main__':
  63.   logging.getLogger().setLevel(logging.INFO)
  64.   run()
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
Not a member of Pastebin yet?
Sign Up, it unlocks many cool features!
 
Top