Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from __future__ import absolute_import
- from __future__ import print_function
- import logging
- import apache_beam as beam
- from apache_beam.options.pipeline_options import PipelineOptions
- """
- 0.
- Add line 122 to java/org/apache/beam/sdk/expansion/service/ExpansionService.java
- builder.put("beam:transforms:xlang:count", spec -> Count.perElement());
- 1.
- before install Flink 1.10 with homebrew
- in /usr/local/Cellar/apache-flink/1.10.0/libexec/conf/flink-conf.yaml set jobservice memory to 4096
- run it on mac /usr/local/Cellar/apache-flink/1.10.0/libexec/bin/start-cluster.sh
- 2.
- run ./gradlew -p runners/flink/1.10/job-server run
- 3.
- run this script
- """
- OUTPUT = '/Users/piotr/out/out.txt'
- EXPANSION_SERVICE = 'localhost:8097'
- def main():
- logging.getLogger().setLevel(logging.INFO)
- options = PipelineOptions([
- "--runner=FlinkRunner",
- "--flink_version=1.10",
- "--flink_master=localhost:8081",
- "--environment_type=LOOPBACK"
- ])
- def print_fn(msg):
- print(msg)
- return msg
- with beam.Pipeline(options=options) as pipeline:
- (
- pipeline
- | 'Create input' >> beam.Create(['abc', 'cde', 'abc', 'xlang', 'xlang', 'xlang2'])
- | 'Count' >> beam.ExternalTransform('beam:transforms:xlang:count', None, EXPANSION_SERVICE)
- | 'Map to string' >> beam.Map(lambda x: str(x[0]).encode('utf-8') + str(x[1]).encode('utf-8'))
- | 'Print' >> beam.Map(print_fn)
- | 'Write to output' >> beam.io.WriteToText(OUTPUT)
- )
- if __name__ == '__main__':
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement