Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package io.treev.eventsourcing
- import cats.data.Xor
- import enumeratum.{CirceEnum, Enum, EnumEntry}
- import io.circe.Encoder
- import io.circe.generic.auto._
- import io.treev.eventsourcing.model.Context
- import io.treev.model._
- import monix.eval.Task
- object eventsourcing {
- /** Command execution result. */
- type CommandResult[F, S] = Task[(F Xor S, List[EmittedEvent[_]])]
- object CommandResult {
- def successful[F, S](s: S, events: EmittedEvent[_]*): CommandResult[F, S] =
- Task.now((Xor.right(s), events.toList))
- def failed[F, S](f: F, events: EmittedEvent[_]*): CommandResult[F, S] =
- Task.now((Xor.left(f), events.toList))
- }
- /** Encodes evidence that a command `C` can emit event `E`.
- * @tparam C command
- * @tparam E event */
- trait CommandEmitsEvent[C, E] {
- implicit val ev: CommandEmitsEvent[C, E] = null
- }
- object CommandEmitsEvent {
- def apply[C, E]: CommandEmitsEvent[C, E] = null
- }
- /** Encodes evidence that a listener for event `E` can emit event `E2`.
- * @tparam E listening event
- * @tparam E2 emitting event */
- trait EventListenerEmitsEvent[E, E2] {
- implicit val ev: EventListenerEmitsEvent[E, E2] = null
- }
- object EventListenerEmitsEvent {
- def apply[E, E2]: EventListenerEmitsEvent[E, E2] = null
- }
- /** Wrapper for event emitted from command handler or event listener.
- * Event can become "emitted" only by means of internal API.
- * @tparam E underlying event
- * @see [[io.treev.eventsourcing.model.CommandHandler#emit]]
- * @see [[io.treev.eventsourcing.model.EventListener#emit]] */
- class EmittedEvent[E] private[eventsourcing](_event: E,
- _handleCallback: Task[Unit] = Task.unit) {
- /** Execute callback after executing all handlers for this event.
- * Chaining this method results in multiple callbacks executed potentially concurrently.
- * @param cb callback
- * @return emitted event with updated handle callback */
- def whenHandled(cb: => Task[Unit]): EmittedEvent[E] =
- new EmittedEvent[E](_event, Task.zipList(List(handleCallback, cb)).map(_ => ()))
- private[eventsourcing] def event: E = _event
- private[eventsourcing] def handleCallback: Task[Unit] = _handleCallback
- }
- /** Command handler interface.
- * @tparam C command
- * @tparam F failure type
- * @tparam S success type */
- trait CommandHandler[C, F, S] {
- /** Handle command.
- * @param command command
- * @param ctx context
- * @return command result - success `S` or failure `F` with a list of emitted events */
- def handle(command: C)
- (implicit ctx: Context): CommandResult[F, S]
- /** Create emitted event.
- * @param event event to emit */
- def emit[E](event: E)(implicit ev: CommandEmitsEvent[C, E]): EmittedEvent[E] =
- new EmittedEvent(event)
- /** Create single emitted event.
- * @param event event to emit */
- def emitOne[E](event: E)(implicit ev: CommandEmitsEvent[C, E]): List[EmittedEvent[E]] =
- List(emit(event))
- /** Create zero emitted events. */
- def emitNone: Task[List[EmittedEvent[_]]] = CommandHandler.noop
- }
- private object CommandHandler {
- val noop: Task[List[EmittedEvent[_]]] = Task.now(Nil)
- }
- /** Event handler interface.
- * @tparam E event */
- trait EventHandler[E] {
- /** Handler event
- * @param event event
- * @return event handler description */
- def handle(event: E): Task[Unit]
- }
- /** Event listener interface.
- * @tparam E event */
- trait EventListener[E] {
- /** Event handled callback.
- * Is executed after an event is handled by an `EventHandler`.
- * @param event event
- * @return event handler description holding a list of additional events to emit */
- def onEvent(event: E): Task[List[EmittedEvent[_]]]
- /** Create emitted event.
- * @param event event to emit */
- def emit[E2](event: E2)(implicit ev: EventListenerEmitsEvent[E, E2]): EmittedEvent[E2] =
- new EmittedEvent(event)
- /** Create single emitted event.
- * @param event event to emit */
- def emitOne[E2](event: E2)(implicit ev: EventListenerEmitsEvent[E, E2]): List[EmittedEvent[E2]] =
- List(emit(event))
- /** Create zero emitted events. */
- def emitNone: Task[List[EmittedEvent[_]]] = EventListener.noop
- }
- private object EventListener {
- val noop: Task[List[EmittedEvent[_]]] = Task.now(Nil)
- }
- // commands
- case class CreateSession(username: Username, password: String)
- object CreateSession {
- implicit val emitsSessionCreated = CommandEmitsEvent[CreateSession, SessionCreated]
- implicit val emitsSessionNotCreated = CommandEmitsEvent[CreateSession, SessionNotCreated]
- }
- // events
- case class SessionCreated(userId: UserId, sessionId: SessionId)
- case class SessionNotCreated(username: Username,
- password: String,
- reason: AuthFailureReason,
- userId: Option[UserId])
- object SessionNotCreated {
- implicit val emitsAuthFailureNotificationSent =
- EventListenerEmitsEvent[SessionNotCreated, AuthFailureNotificationSent]
- }
- case class AuthFailureNotificationSent(userId: UserId)
- sealed abstract class AuthFailureReason extends EnumEntry {
- def code: String = entryName
- }
- object AuthFailureReason extends CirceEnum[AuthFailureReason] with Enum[AuthFailureReason] {
- override val values: Seq[AuthFailureReason] = findValues
- case object WrongUsername extends AuthFailureReason
- case object WrongPassword extends AuthFailureReason
- }
- // responses
- case class CreateSessionSuccess(userId: UserId, sessionId: SessionId)
- case class CreateSessionFailure(reason: AuthFailureReason)
- // command handlers
- class CreateSessionHandler extends CommandHandler[CreateSession, AuthFailureReason, (UserId, SessionId)] {
- override def handle(command: CreateSession)
- (implicit ctx: Context): CommandResult[AuthFailureReason, (UserId, SessionId)] = {
- import command._
- for {
- result <- AuthApi.authenticate(username, password)
- } yield (
- result.leftMap(_._1),
- List {
- result.fold(
- { case (reason, userId) =>
- emit(SessionNotCreated(username, password, reason, userId)).whenHandled {
- ConnectionApi.reply(CreateSessionFailure(reason))
- }
- },
- { case (userId, sessionId) =>
- emit(SessionCreated(userId, sessionId)).whenHandled {
- ConnectionApi.reply(CreateSessionSuccess(userId, sessionId))
- }
- }
- )
- }
- )
- }
- }
- // event handlers
- class SessionCreatedHandler extends EventHandler[SessionCreated] {
- override def handle(event: SessionCreated): Task[Unit] =
- AuthApi.saveSession(event.userId, event.sessionId)
- }
- class AuthFailureNotificationSentHandler extends EventHandler[AuthFailureNotificationSent] {
- override def handle(event: AuthFailureNotificationSent): Task[Unit] =
- AuthApi.updateLastFailureNotificationTimestamp(event.userId)
- }
- // event listeners
- class SessionNotCreatedListener extends EventListener[SessionNotCreated] {
- override def onEvent(event: SessionNotCreated): Task[List[EmittedEvent[_]]] = {
- event.userId
- .map { userId =>
- AuthApi.sendFailedAuthenticationEmail(userId)
- .map(_ => emitOne(AuthFailureNotificationSent(userId)))
- }
- .getOrElse(emitNone)
- }
- }
- // apis
- trait Marshaler[T]
- object Marshaler {
- implicit def jsonMarshaler[T: Encoder]: Marshaler[T] = ???
- }
- object ConnectionApi {
- def reply[R: Marshaler](response: R)(implicit ctx: Context): Task[Unit] = ???
- }
- object AuthApi {
- def authenticate(username: Username,
- password: String): Task[(AuthFailureReason, Option[UserId]) Xor (UserId, SessionId)] = ???
- def saveSession(userId: UserId, sessionId: SessionId): Task[Unit] = ???
- def sendFailedAuthenticationEmail(userId: UserId): Task[Unit] = ???
- def updateLastFailureNotificationTimestamp(userId: UserId): Task[Unit] = ???
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement