Advertisement
Guest User

Untitled

a guest
Feb 10th, 2016
58
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.36 KB | None | 0 0
  1. sealed trait Ack[+R]
  2.  
  3. object Ack {
  4. def accepted[R]: Ack[R] = Accepted
  5. def rejected[R](rejection: R): Ack[R] = Rejected(rejection)
  6. case object Accepted extends Ack[Nothing]
  7. case class Rejected[+R](rejection: R) extends Ack[R]
  8. }
  9.  
  10. sealed trait Message
  11. case class Command(id: String) extends Message
  12. case class Event(id: String) extends Message
  13.  
  14. sealed trait Rejection
  15. case object Rejection extends Rejection
  16.  
  17. trait State {
  18. def applyEvent(event: Event): State
  19. def processCommand(command: Command): Future[Rejection Xor Event]
  20. }
  21.  
  22. object State {
  23. def zero: State = ???
  24. }
  25.  
  26. def persistedEvents: Source[Event, Any] = ???
  27.  
  28. def commands: Source[(Command, Promise[Ack[Rejection]]), Any] = ???
  29.  
  30. def persistEvent(e: Event): Future[Event] = ???
  31.  
  32. persistedEvents.fold(State.zero) { (state, event) =>
  33. state.applyEvent(event)
  34. }.flatMapConcat { state =>
  35. commands.fold(Future.successful(state)) { case (futureOptionState, (command, ack)) =>
  36. futureOptionState.flatMap { state =>
  37. state.processCommand(command).flatMap {
  38. case Xor.Right(event) => persistEvent(event).map(state.applyEvent).map(_ -> Ack.accepted[Rejection])
  39. case Xor.Left(rejection) => Future.successful(state -> Ack.rejected(rejection))
  40. }.map { case (s, r) =>
  41. ack.success(r)
  42. s
  43. }
  44. }
  45. }.mapAsync(1)(identity)
  46. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement