Guest User

Untitled

a guest
Dec 16th, 2017
101
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.28 KB | None | 0 0
  1. @Bean
  2. KTable reportStream(StreamsBuilder builder, Engine engine) {
  3.  
  4. def stream = builder.stream(topic)
  5. .groupBy({ key, word -> word })
  6. .windowedBy(SessionWindows.with(TimeUnit.SECONDS.toMillis(1)))
  7. .aggregate(
  8. new Initializer<Long>() {
  9. @Override
  10. Long apply() {
  11. 0
  12. }
  13. },
  14. new Aggregator<String, String, Long>() {
  15. @Override
  16. Long apply(String key, String value, Long aggregate) {
  17. def l = 1 + aggregate
  18. return l
  19. }
  20. },
  21. new Merger() {
  22. @Override
  23. Long apply(Object aggKey, Object aggOne, Object aggTwo) {
  24. return aggOne + aggTwo
  25. }
  26. },
  27. Materialized.with(Serdes.String(), Serdes.Long()))
  28.  
  29. stream.toStream().to("classificationResult")
  30.  
  31. stream
  32. }
  33.  
  34. @Bean
  35. KStream classificationStream(StreamsBuilder builder, Engine engine) {
  36.  
  37. builder.stream("classificationResult").mapValues({
  38. println "classResult"
  39. println it
  40. })
  41.  
  42. }
Add Comment
Please, Sign In to add comment