Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- @StreamListener
- @SendTo("processingComplete")
- public KStream<String, String> onCompletion(@Input("stageCompletion")
- KStream<String, String> stageCompletionStream) {
- return stageCompletionStream
- .filter(this::checkValidity)
- .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
- .reduce(this::aggregateStageCompletion,
- Materialized.as("stage_completion_store"))
- .toStream()
- .filter((ignored, message) -> checkCompletion(message))
- .map(this::publishCompletion);
- }
- @StreamListener
- @SendTo("processingComplete")
- public KStream<String, String> onCompletion(@Input("stageCompletion")
- KStream<String, String>
- stageCompletionStream,@Input("processingCompleteFeed") KStream<String,
- String> processingCompletionStream){
- return processingCompletionStream.merge(stageCompletionStream)
- .filter(this::checkValidity)
- .groupByKey(Serialized.with(Serdes.String(),Serdes.String()))
- .reduce(this::aggregateStageCompletion,
- Materialized.as("stage_completion_store"))
- .toStream()
- .filter((ignored,message)->checkCompletion(message))
- .map(this::publishCompletion);
- }
Add Comment
Please, Sign In to add comment