Advertisement
Guest User

Untitled

a guest
Jul 21st, 2019
112
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 0.77 KB | None | 0 0
  1. private def execute(input: Seq[ProducerInput], initialState: ProducerState): DeserializationHandlerResponse = {
  2. val finalState = input.foldLeft(initialState) {
  3. case (state @ ProducerState(producer, succeeded), item) =>
  4. if (succeeded)
  5. state
  6. else
  7. item match {
  8. case RebuildProducer =>
  9. val newProducer = buildProducer()
  10. val newState = ProducerState(newProducer, succeeded)
  11. newState
  12. case Send(record, context) =>
  13. val newState = ProducerState(producer, tryToSend(producer, record, context).isSuccess)
  14. newState
  15. }
  16. }
  17. if (finalState.succeeded)
  18. DeserializationHandlerResponse.CONTINUE
  19. else
  20. DeserializationHandlerResponse.FAIL
  21. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement