SHARE
TWEET

Untitled

a guest Aug 12th, 2016 78 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. package io.treev.eventsourcing
  2.  
  3. import cats.data.Xor
  4. import enumeratum.{CirceEnum, Enum, EnumEntry}
  5. import io.circe.Encoder
  6. import io.circe.generic.auto._
  7. import io.treev.eventsourcing.model.Context
  8. import io.treev.model._
  9. import monix.eval.Task
  10.  
  11. object eventsourcing {
  12.  
  13.   /** Command execution result. */
  14.   type CommandResult[F, S] = Task[(F Xor S, List[EmittedEvent[_]])]
  15.  
  16.   object CommandResult {
  17.  
  18.     def successful[F, S](s: S, events: EmittedEvent[_]*): CommandResult[F, S] =
  19.       Task.now((Xor.right(s), events.toList))
  20.  
  21.     def failed[F, S](f: F, events: EmittedEvent[_]*): CommandResult[F, S] =
  22.       Task.now((Xor.left(f), events.toList))
  23.  
  24.   }
  25.  
  26.   /** Encodes evidence that a command `C` can emit event `E`.
  27.     * @tparam C command
  28.     * @tparam E event */
  29.   trait CommandEmitsEvent[C, E] {
  30.     implicit val ev: CommandEmitsEvent[C, E] = null
  31.   }
  32.   object CommandEmitsEvent {
  33.     def apply[C, E]: CommandEmitsEvent[C, E] = null
  34.   }
  35.  
  36.   /** Encodes evidence that a listener for event `E` can emit event `E2`.
  37.     * @tparam E listening event
  38.     * @tparam E2 emitting event */
  39.   trait EventListenerEmitsEvent[E, E2] {
  40.     implicit val ev: EventListenerEmitsEvent[E, E2] = null
  41.   }
  42.   object EventListenerEmitsEvent {
  43.     def apply[E, E2]: EventListenerEmitsEvent[E, E2] = null
  44.   }
  45.  
  46.   /** Wrapper for event emitted from command handler or event listener.
  47.     * Event can become "emitted" only by means of internal API.
  48.     * @tparam E underlying event
  49.     * @see [[io.treev.eventsourcing.model.CommandHandler#emit]]
  50.     * @see [[io.treev.eventsourcing.model.EventListener#emit]] */
  51.   class EmittedEvent[E] private[eventsourcing](_event: E,
  52.                                               _handleCallback: Task[Unit] = Task.unit) {
  53.  
  54.     /** Execute callback after executing all handlers for this event.
  55.       * Chaining this method results in multiple callbacks executed potentially concurrently.
  56.       * @param cb callback
  57.       * @return emitted event with updated handle callback */
  58.     def whenHandled(cb: => Task[Unit]): EmittedEvent[E] =
  59.       new EmittedEvent[E](_event, Task.zipList(List(handleCallback, cb)).map(_ => ()))
  60.  
  61.     private[eventsourcing] def event: E = _event
  62.     private[eventsourcing] def handleCallback: Task[Unit] = _handleCallback
  63.  
  64.   }
  65.  
  66.   /** Command handler interface.
  67.     * @tparam C command
  68.     * @tparam F failure type
  69.     * @tparam S success type */
  70.   trait CommandHandler[C, F, S] {
  71.  
  72.     /** Handle command.
  73.       * @param command command
  74.       * @param ctx context
  75.       * @return command result - success `S` or failure `F` with a list of emitted events */
  76.     def handle(command: C)
  77.               (implicit ctx: Context): CommandResult[F, S]
  78.  
  79.     /** Create emitted event.
  80.       * @param event event to emit */
  81.     def emit[E](event: E)(implicit ev: CommandEmitsEvent[C, E]): EmittedEvent[E] =
  82.       new EmittedEvent(event)
  83.  
  84.     /** Create single emitted event.
  85.       * @param event event to emit */
  86.     def emitOne[E](event: E)(implicit ev: CommandEmitsEvent[C, E]): List[EmittedEvent[E]] =
  87.       List(emit(event))
  88.  
  89.     /** Create zero emitted events. */
  90.     def emitNone: Task[List[EmittedEvent[_]]] = CommandHandler.noop
  91.  
  92.   }
  93.   private object CommandHandler {
  94.     val noop: Task[List[EmittedEvent[_]]] = Task.now(Nil)
  95.   }
  96.  
  97.   /** Event handler interface.
  98.     * @tparam E event */
  99.   trait EventHandler[E] {
  100.  
  101.     /** Handler event
  102.       * @param event event
  103.       * @return event handler description */
  104.     def handle(event: E): Task[Unit]
  105.  
  106.   }
  107.  
  108.   /** Event listener interface.
  109.     * @tparam E event */
  110.   trait EventListener[E] {
  111.  
  112.     /** Event handled callback.
  113.       * Is executed after an event is handled by an `EventHandler`.
  114.       * @param event event
  115.       * @return event handler description holding a list of additional events to emit */
  116.     def onEvent(event: E): Task[List[EmittedEvent[_]]]
  117.  
  118.     /** Create emitted event.
  119.       * @param event event to emit */
  120.     def emit[E2](event: E2)(implicit ev: EventListenerEmitsEvent[E, E2]): EmittedEvent[E2] =
  121.       new EmittedEvent(event)
  122.  
  123.     /** Create single emitted event.
  124.       * @param event event to emit */
  125.     def emitOne[E2](event: E2)(implicit ev: EventListenerEmitsEvent[E, E2]): List[EmittedEvent[E2]] =
  126.       List(emit(event))
  127.  
  128.     /** Create zero emitted events. */
  129.     def emitNone: Task[List[EmittedEvent[_]]] = EventListener.noop
  130.   }
  131.   private object EventListener {
  132.     val noop: Task[List[EmittedEvent[_]]] = Task.now(Nil)
  133.   }
  134.  
  135.   // commands
  136.  
  137.   case class CreateSession(username: Username, password: String)
  138.   object CreateSession {
  139.     implicit val emitsSessionCreated = CommandEmitsEvent[CreateSession, SessionCreated]
  140.     implicit val emitsSessionNotCreated = CommandEmitsEvent[CreateSession, SessionNotCreated]
  141.   }
  142.  
  143.   // events
  144.  
  145.   case class SessionCreated(userId: UserId, sessionId: SessionId)
  146.  
  147.   case class SessionNotCreated(username: Username,
  148.                                password: String,
  149.                                reason: AuthFailureReason,
  150.                                userId: Option[UserId])
  151.   object SessionNotCreated {
  152.     implicit val emitsAuthFailureNotificationSent =
  153.       EventListenerEmitsEvent[SessionNotCreated, AuthFailureNotificationSent]
  154.   }
  155.  
  156.   case class AuthFailureNotificationSent(userId: UserId)
  157.  
  158.   sealed abstract class AuthFailureReason extends EnumEntry {
  159.     def code: String = entryName
  160.   }
  161.  
  162.   object AuthFailureReason extends CirceEnum[AuthFailureReason] with Enum[AuthFailureReason] {
  163.     override val values: Seq[AuthFailureReason] = findValues
  164.  
  165.     case object WrongUsername extends AuthFailureReason
  166.     case object WrongPassword extends AuthFailureReason
  167.   }
  168.  
  169.   // responses
  170.  
  171.   case class CreateSessionSuccess(userId: UserId, sessionId: SessionId)
  172.   case class CreateSessionFailure(reason: AuthFailureReason)
  173.  
  174.   // command handlers
  175.  
  176.   class CreateSessionHandler extends CommandHandler[CreateSession, AuthFailureReason, (UserId, SessionId)] {
  177.     override def handle(command: CreateSession)
  178.                        (implicit ctx: Context): CommandResult[AuthFailureReason, (UserId, SessionId)] = {
  179.       import command._
  180.       for {
  181.         result <- AuthApi.authenticate(username, password)
  182.       } yield (
  183.         result.leftMap(_._1),
  184.         List {
  185.           result.fold(
  186.             { case (reason, userId) =>
  187.               emit(SessionNotCreated(username, password, reason, userId)).whenHandled {
  188.                 ConnectionApi.reply(CreateSessionFailure(reason))
  189.               }
  190.             },
  191.             { case (userId, sessionId) =>
  192.               emit(SessionCreated(userId, sessionId)).whenHandled {
  193.                 ConnectionApi.reply(CreateSessionSuccess(userId, sessionId))
  194.               }
  195.             }
  196.           )
  197.         }
  198.       )
  199.     }
  200.   }
  201.  
  202.   // event handlers
  203.  
  204.   class SessionCreatedHandler extends EventHandler[SessionCreated] {
  205.     override def handle(event: SessionCreated): Task[Unit] =
  206.       AuthApi.saveSession(event.userId, event.sessionId)
  207.   }
  208.  
  209.   class AuthFailureNotificationSentHandler extends EventHandler[AuthFailureNotificationSent] {
  210.     override def handle(event: AuthFailureNotificationSent): Task[Unit] =
  211.       AuthApi.updateLastFailureNotificationTimestamp(event.userId)
  212.   }
  213.  
  214.   // event listeners
  215.  
  216.   class SessionNotCreatedListener extends EventListener[SessionNotCreated] {
  217.     override def onEvent(event: SessionNotCreated): Task[List[EmittedEvent[_]]] = {
  218.       event.userId
  219.         .map { userId =>
  220.           AuthApi.sendFailedAuthenticationEmail(userId)
  221.             .map(_ => emitOne(AuthFailureNotificationSent(userId)))
  222.         }
  223.         .getOrElse(emitNone)
  224.     }
  225.   }
  226.  
  227.   // apis
  228.  
  229.   trait Marshaler[T]
  230.   object Marshaler {
  231.     implicit def jsonMarshaler[T: Encoder]: Marshaler[T] = ???
  232.   }
  233.  
  234.   object ConnectionApi {
  235.     def reply[R: Marshaler](response: R)(implicit ctx: Context): Task[Unit] = ???
  236.   }
  237.  
  238.   object AuthApi {
  239.     def authenticate(username: Username,
  240.                      password: String): Task[(AuthFailureReason, Option[UserId]) Xor (UserId, SessionId)] = ???
  241.     def saveSession(userId: UserId, sessionId: SessionId): Task[Unit] = ???
  242.     def sendFailedAuthenticationEmail(userId: UserId): Task[Unit] = ???
  243.     def updateLastFailureNotificationTimestamp(userId: UserId): Task[Unit] = ???
  244.   }
  245.  
  246. }
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top