Guest User

Untitled

a guest
Aug 20th, 2018
87
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.27 KB | None | 0 0
  1. @StreamListener
  2. @SendTo("processingComplete")
  3. public KStream<String, String> onCompletion(@Input("stageCompletion")
  4. KStream<String, String> stageCompletionStream) {
  5. return stageCompletionStream
  6. .filter(this::checkValidity)
  7. .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
  8. .reduce(this::aggregateStageCompletion,
  9. Materialized.as("stage_completion_store"))
  10. .toStream()
  11. .filter((ignored, message) -> checkCompletion(message))
  12. .map(this::publishCompletion);
  13. }
  14.  
  15. @StreamListener
  16. @SendTo("processingComplete")
  17. public KStream<String, String> onCompletion(@Input("stageCompletion")
  18. KStream<String, String>
  19. stageCompletionStream,@Input("processingCompleteFeed") KStream<String,
  20. String> processingCompletionStream){
  21. return processingCompletionStream.merge(stageCompletionStream)
  22. .filter(this::checkValidity)
  23. .groupByKey(Serialized.with(Serdes.String(),Serdes.String()))
  24. .reduce(this::aggregateStageCompletion,
  25. Materialized.as("stage_completion_store"))
  26. .toStream()
  27. .filter((ignored,message)->checkCompletion(message))
  28. .map(this::publishCompletion);
  29. }
Add Comment
Please, Sign In to add comment