Advertisement
Guest User

Untitled

a guest
Aug 12th, 2016
115
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 8.02 KB | None | 0 0
  1. package io.treev.eventsourcing
  2.  
  3. import cats.data.Xor
  4. import enumeratum.{CirceEnum, Enum, EnumEntry}
  5. import io.circe.Encoder
  6. import io.circe.generic.auto._
  7. import io.treev.eventsourcing.model.Context
  8. import io.treev.model._
  9. import monix.eval.Task
  10.  
  11. object eventsourcing {
  12.  
  13. /** Command execution result. */
  14. type CommandResult[F, S] = Task[(F Xor S, List[EmittedEvent[_]])]
  15.  
  16. object CommandResult {
  17.  
  18. def successful[F, S](s: S, events: EmittedEvent[_]*): CommandResult[F, S] =
  19. Task.now((Xor.right(s), events.toList))
  20.  
  21. def failed[F, S](f: F, events: EmittedEvent[_]*): CommandResult[F, S] =
  22. Task.now((Xor.left(f), events.toList))
  23.  
  24. }
  25.  
  26. /** Encodes evidence that a command `C` can emit event `E`.
  27. * @tparam C command
  28. * @tparam E event */
  29. trait CommandEmitsEvent[C, E] {
  30. implicit val ev: CommandEmitsEvent[C, E] = null
  31. }
  32. object CommandEmitsEvent {
  33. def apply[C, E]: CommandEmitsEvent[C, E] = null
  34. }
  35.  
  36. /** Encodes evidence that a listener for event `E` can emit event `E2`.
  37. * @tparam E listening event
  38. * @tparam E2 emitting event */
  39. trait EventListenerEmitsEvent[E, E2] {
  40. implicit val ev: EventListenerEmitsEvent[E, E2] = null
  41. }
  42. object EventListenerEmitsEvent {
  43. def apply[E, E2]: EventListenerEmitsEvent[E, E2] = null
  44. }
  45.  
  46. /** Wrapper for event emitted from command handler or event listener.
  47. * Event can become "emitted" only by means of internal API.
  48. * @tparam E underlying event
  49. * @see [[io.treev.eventsourcing.model.CommandHandler#emit]]
  50. * @see [[io.treev.eventsourcing.model.EventListener#emit]] */
  51. class EmittedEvent[E] private[eventsourcing](_event: E,
  52. _handleCallback: Task[Unit] = Task.unit) {
  53.  
  54. /** Execute callback after executing all handlers for this event.
  55. * Chaining this method results in multiple callbacks executed potentially concurrently.
  56. * @param cb callback
  57. * @return emitted event with updated handle callback */
  58. def whenHandled(cb: => Task[Unit]): EmittedEvent[E] =
  59. new EmittedEvent[E](_event, Task.zipList(List(handleCallback, cb)).map(_ => ()))
  60.  
  61. private[eventsourcing] def event: E = _event
  62. private[eventsourcing] def handleCallback: Task[Unit] = _handleCallback
  63.  
  64. }
  65.  
  66. /** Command handler interface.
  67. * @tparam C command
  68. * @tparam F failure type
  69. * @tparam S success type */
  70. trait CommandHandler[C, F, S] {
  71.  
  72. /** Handle command.
  73. * @param command command
  74. * @param ctx context
  75. * @return command result - success `S` or failure `F` with a list of emitted events */
  76. def handle(command: C)
  77. (implicit ctx: Context): CommandResult[F, S]
  78.  
  79. /** Create emitted event.
  80. * @param event event to emit */
  81. def emit[E](event: E)(implicit ev: CommandEmitsEvent[C, E]): EmittedEvent[E] =
  82. new EmittedEvent(event)
  83.  
  84. /** Create single emitted event.
  85. * @param event event to emit */
  86. def emitOne[E](event: E)(implicit ev: CommandEmitsEvent[C, E]): List[EmittedEvent[E]] =
  87. List(emit(event))
  88.  
  89. /** Create zero emitted events. */
  90. def emitNone: Task[List[EmittedEvent[_]]] = CommandHandler.noop
  91.  
  92. }
  93. private object CommandHandler {
  94. val noop: Task[List[EmittedEvent[_]]] = Task.now(Nil)
  95. }
  96.  
  97. /** Event handler interface.
  98. * @tparam E event */
  99. trait EventHandler[E] {
  100.  
  101. /** Handler event
  102. * @param event event
  103. * @return event handler description */
  104. def handle(event: E): Task[Unit]
  105.  
  106. }
  107.  
  108. /** Event listener interface.
  109. * @tparam E event */
  110. trait EventListener[E] {
  111.  
  112. /** Event handled callback.
  113. * Is executed after an event is handled by an `EventHandler`.
  114. * @param event event
  115. * @return event handler description holding a list of additional events to emit */
  116. def onEvent(event: E): Task[List[EmittedEvent[_]]]
  117.  
  118. /** Create emitted event.
  119. * @param event event to emit */
  120. def emit[E2](event: E2)(implicit ev: EventListenerEmitsEvent[E, E2]): EmittedEvent[E2] =
  121. new EmittedEvent(event)
  122.  
  123. /** Create single emitted event.
  124. * @param event event to emit */
  125. def emitOne[E2](event: E2)(implicit ev: EventListenerEmitsEvent[E, E2]): List[EmittedEvent[E2]] =
  126. List(emit(event))
  127.  
  128. /** Create zero emitted events. */
  129. def emitNone: Task[List[EmittedEvent[_]]] = EventListener.noop
  130. }
  131. private object EventListener {
  132. val noop: Task[List[EmittedEvent[_]]] = Task.now(Nil)
  133. }
  134.  
  135. // commands
  136.  
  137. case class CreateSession(username: Username, password: String)
  138. object CreateSession {
  139. implicit val emitsSessionCreated = CommandEmitsEvent[CreateSession, SessionCreated]
  140. implicit val emitsSessionNotCreated = CommandEmitsEvent[CreateSession, SessionNotCreated]
  141. }
  142.  
  143. // events
  144.  
  145. case class SessionCreated(userId: UserId, sessionId: SessionId)
  146.  
  147. case class SessionNotCreated(username: Username,
  148. password: String,
  149. reason: AuthFailureReason,
  150. userId: Option[UserId])
  151. object SessionNotCreated {
  152. implicit val emitsAuthFailureNotificationSent =
  153. EventListenerEmitsEvent[SessionNotCreated, AuthFailureNotificationSent]
  154. }
  155.  
  156. case class AuthFailureNotificationSent(userId: UserId)
  157.  
  158. sealed abstract class AuthFailureReason extends EnumEntry {
  159. def code: String = entryName
  160. }
  161.  
  162. object AuthFailureReason extends CirceEnum[AuthFailureReason] with Enum[AuthFailureReason] {
  163. override val values: Seq[AuthFailureReason] = findValues
  164.  
  165. case object WrongUsername extends AuthFailureReason
  166. case object WrongPassword extends AuthFailureReason
  167. }
  168.  
  169. // responses
  170.  
  171. case class CreateSessionSuccess(userId: UserId, sessionId: SessionId)
  172. case class CreateSessionFailure(reason: AuthFailureReason)
  173.  
  174. // command handlers
  175.  
  176. class CreateSessionHandler extends CommandHandler[CreateSession, AuthFailureReason, (UserId, SessionId)] {
  177. override def handle(command: CreateSession)
  178. (implicit ctx: Context): CommandResult[AuthFailureReason, (UserId, SessionId)] = {
  179. import command._
  180. for {
  181. result <- AuthApi.authenticate(username, password)
  182. } yield (
  183. result.leftMap(_._1),
  184. List {
  185. result.fold(
  186. { case (reason, userId) =>
  187. emit(SessionNotCreated(username, password, reason, userId)).whenHandled {
  188. ConnectionApi.reply(CreateSessionFailure(reason))
  189. }
  190. },
  191. { case (userId, sessionId) =>
  192. emit(SessionCreated(userId, sessionId)).whenHandled {
  193. ConnectionApi.reply(CreateSessionSuccess(userId, sessionId))
  194. }
  195. }
  196. )
  197. }
  198. )
  199. }
  200. }
  201.  
  202. // event handlers
  203.  
  204. class SessionCreatedHandler extends EventHandler[SessionCreated] {
  205. override def handle(event: SessionCreated): Task[Unit] =
  206. AuthApi.saveSession(event.userId, event.sessionId)
  207. }
  208.  
  209. class AuthFailureNotificationSentHandler extends EventHandler[AuthFailureNotificationSent] {
  210. override def handle(event: AuthFailureNotificationSent): Task[Unit] =
  211. AuthApi.updateLastFailureNotificationTimestamp(event.userId)
  212. }
  213.  
  214. // event listeners
  215.  
  216. class SessionNotCreatedListener extends EventListener[SessionNotCreated] {
  217. override def onEvent(event: SessionNotCreated): Task[List[EmittedEvent[_]]] = {
  218. event.userId
  219. .map { userId =>
  220. AuthApi.sendFailedAuthenticationEmail(userId)
  221. .map(_ => emitOne(AuthFailureNotificationSent(userId)))
  222. }
  223. .getOrElse(emitNone)
  224. }
  225. }
  226.  
  227. // apis
  228.  
  229. trait Marshaler[T]
  230. object Marshaler {
  231. implicit def jsonMarshaler[T: Encoder]: Marshaler[T] = ???
  232. }
  233.  
  234. object ConnectionApi {
  235. def reply[R: Marshaler](response: R)(implicit ctx: Context): Task[Unit] = ???
  236. }
  237.  
  238. object AuthApi {
  239. def authenticate(username: Username,
  240. password: String): Task[(AuthFailureReason, Option[UserId]) Xor (UserId, SessionId)] = ???
  241. def saveSession(userId: UserId, sessionId: SessionId): Task[Unit] = ???
  242. def sendFailedAuthenticationEmail(userId: UserId): Task[Unit] = ???
  243. def updateLastFailureNotificationTimestamp(userId: UserId): Task[Unit] = ???
  244. }
  245.  
  246. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement