Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- with beam.Pipeline(options=pipeline_options) as p:
- def count_votes(counterId_count):
- (counterId, count) = counterId_count
- return (counterId, sum(count))
- def format_result(counterId_count):
- (counterId, count) = counterId_count
- return '{ "counterId": %s, "count": %d }' % (counterId, count)
- transformed = (p
- | 'Receive PubSub' >> beam.io.ReadFromPubSub(topic=known_args.input_topic)
- .with_output_types(bytes)
- | 'Decode' >> beam.Map(lambda x: x.decode('utf-8'))
- | 'Pair with one' >> beam.Map(lambda x: (x, 1))
- | 'Apply window of time' >> beam.WindowInto(window.FixedWindows(30, 0))
- | 'Group by counterId' >> beam.GroupByKey()
- | 'Count triggers' >> beam.Map(counterId_count)
- | 'Format' >> beam.Map(format_result)
- | 'Transform to PubSub base64 string' >> beam.Map(lambda x: x.encode('utf-8'))
- .with_output_types(bytes))
- transformed | beam.io.WriteToPubSub(known_args.output_topic)
- result = p.run()
- result.wait_until_finish()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement