Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- private def execute(input: Seq[ProducerInput], initialState: ProducerState): DeserializationHandlerResponse = {
- val finalState = input.foldLeft(initialState) {
- case (state @ ProducerState(producer, succeeded), item) =>
- if (succeeded)
- state
- else
- item match {
- case RebuildProducer =>
- val newProducer = buildProducer()
- val newState = ProducerState(newProducer, succeeded)
- newState
- case Send(record, context) =>
- val newState = ProducerState(producer, tryToSend(producer, record, context).isSuccess)
- newState
- }
- }
- if (finalState.succeeded)
- DeserializationHandlerResponse.CONTINUE
- else
- DeserializationHandlerResponse.FAIL
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement