Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- sealed trait Ack[+R]
- object Ack {
- def accepted[R]: Ack[R] = Accepted
- def rejected[R](rejection: R): Ack[R] = Rejected(rejection)
- case object Accepted extends Ack[Nothing]
- case class Rejected[+R](rejection: R) extends Ack[R]
- }
- sealed trait Message
- case class Command(id: String) extends Message
- case class Event(id: String) extends Message
- sealed trait Rejection
- case object Rejection extends Rejection
- trait State {
- def applyEvent(event: Event): State
- def processCommand(command: Command): Future[Rejection Xor Event]
- }
- object State {
- def zero: State = ???
- }
- def persistedEvents: Source[Event, Any] = ???
- def commands: Source[(Command, Promise[Ack[Rejection]]), Any] = ???
- def persistEvent(e: Event): Future[Event] = ???
- persistedEvents.fold(State.zero) { (state, event) =>
- state.applyEvent(event)
- }.flatMapConcat { state =>
- commands.fold(Future.successful(state)) { case (futureOptionState, (command, ack)) =>
- futureOptionState.flatMap { state =>
- state.processCommand(command).flatMap {
- case Xor.Right(event) => persistEvent(event).map(state.applyEvent).map(_ -> Ack.accepted[Rejection])
- case Xor.Left(rejection) => Future.successful(state -> Ack.rejected(rejection))
- }.map { case (s, r) =>
- ack.success(r)
- s
- }
- }
- }.mapAsync(1)(identity)
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement