Advertisement
Guest User

Untitled

a guest
Apr 24th, 2019
107
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.21 KB | None | 0 0
  1. with beam.Pipeline(options=pipeline_options) as p:
  2.  
  3. def count_votes(counterId_count):
  4. (counterId, count) = counterId_count
  5. return (counterId, sum(count))
  6.  
  7. def format_result(counterId_count):
  8. (counterId, count) = counterId_count
  9. return '{ "counterId": %s, "count": %d }' % (counterId, count)
  10.  
  11. transformed = (p
  12. | 'Receive PubSub' >> beam.io.ReadFromPubSub(topic=known_args.input_topic)
  13. .with_output_types(bytes)
  14. | 'Decode' >> beam.Map(lambda x: x.decode('utf-8'))
  15. | 'Pair with one' >> beam.Map(lambda x: (x, 1))
  16. | 'Apply window of time' >> beam.WindowInto(window.FixedWindows(30, 0))
  17. | 'Group by counterId' >> beam.GroupByKey()
  18. | 'Count triggers' >> beam.Map(counterId_count)
  19. | 'Format' >> beam.Map(format_result)
  20. | 'Transform to PubSub base64 string' >> beam.Map(lambda x: x.encode('utf-8'))
  21. .with_output_types(bytes))
  22.  
  23. transformed | beam.io.WriteToPubSub(known_args.output_topic)
  24.  
  25. result = p.run()
  26. result.wait_until_finish()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement