Advertisement
pjotrekk

xlang_test

Apr 10th, 2020
257
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 1.51 KB | None | 0 0
  1. from __future__ import absolute_import
  2. from __future__ import print_function
  3.  
  4. import logging
  5.  
  6. import apache_beam as beam
  7. from apache_beam.options.pipeline_options import PipelineOptions
  8.  
  9. """
  10. 0.
  11. Add line 122 to java/org/apache/beam/sdk/expansion/service/ExpansionService.java
  12. builder.put("beam:transforms:xlang:count", spec -> Count.perElement());
  13.  
  14. 1.
  15. before install Flink 1.10 with homebrew
  16. in /usr/local/Cellar/apache-flink/1.10.0/libexec/conf/flink-conf.yaml set jobservice memory to 4096
  17. run it on mac /usr/local/Cellar/apache-flink/1.10.0/libexec/bin/start-cluster.sh
  18.  
  19. 2.
  20. run ./gradlew -p runners/flink/1.10/job-server run
  21.  
  22. 3.
  23. run this script
  24. """
  25.  
  26. OUTPUT = '/Users/piotr/out/out.txt'
  27. EXPANSION_SERVICE = 'localhost:8097'
  28.  
  29. def main():
  30.   logging.getLogger().setLevel(logging.INFO)
  31.  
  32.   options = PipelineOptions([
  33.     "--runner=FlinkRunner",
  34.     "--flink_version=1.10",
  35.     "--flink_master=localhost:8081",
  36.     "--environment_type=LOOPBACK"
  37.   ])
  38.  
  39.   def print_fn(msg):
  40.     print(msg)
  41.     return msg
  42.  
  43.   with beam.Pipeline(options=options) as pipeline:
  44.     (
  45.       pipeline
  46.       | 'Create input' >> beam.Create(['abc', 'cde', 'abc', 'xlang', 'xlang', 'xlang2'])
  47.       | 'Count' >> beam.ExternalTransform('beam:transforms:xlang:count', None, EXPANSION_SERVICE)
  48.       | 'Map to string' >> beam.Map(lambda x: str(x[0]).encode('utf-8') + str(x[1]).encode('utf-8'))
  49.       | 'Print' >> beam.Map(print_fn)
  50.       | 'Write to output' >> beam.io.WriteToText(OUTPUT)
  51.     )
  52.  
  53. if __name__ == '__main__':
  54.   main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement