Guest User

Untitled

a guest
Jun 24th, 2018
113
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.41 KB | None | 0 0
  1. object Example {
  2. import fs2._
  3. import fs2.async.mutable._
  4. import cats._
  5. import cats.data._
  6. import cats.implicits._
  7.  
  8. /**
  9. * This exists only because cats-effect keeps the trait expressing the
  10. * Concurrent instance of Kleisli package private, otherwise we could just
  11. * directly extend that trait instead of needing to reimplement each of
  12. * the methods of our super classes
  13. */
  14. def readerTEffectInstance[G[_]: Effect, A](
  15. state: A
  16. ): Effect[({ type L[X] = Kleisli[G, A, X] })#L] =
  17.  
  18. new Effect[({ type L[X] = Kleisli[G, A, X] })#L] {
  19. override def runAsync[A](fa: Kleisli[G, A, A])(
  20. cb: Either[Throwable, A] => IO[Unit]): IO[Unit] =
  21. Effect[G].runAsync(fa.run(state))(cb)
  22.  
  23. override def async[A](k: (Either[Throwable, A] => Unit) => Unit): Kleisli[G, A, A] =
  24. Concurrent.catsKleisliConcurrent[G, A].async(k)
  25.  
  26. override def suspend[A](thunk: => Kleisli[G, A, A]): Kleisli[G, A, A] =
  27. Concurrent.catsKleisliConcurrent[G, A].suspend(thunk)
  28.  
  29. override def flatMap[A, B](fa: Kleisli[G, A, A])(f: A => Kleisli[G, A, B]): Kleisli[G, A, B] =
  30. Concurrent.catsKleisliConcurrent[G, A].flatMap(fa)(f)
  31.  
  32. override def tailRecM[A, B](a: A)(f: A => Kleisli[G, A, Either[A, B]]): Kleisli[G, A, B] =
  33. Concurrent.catsKleisliConcurrent[G, A].tailRecM(a)(f)
  34.  
  35. override def raiseError[A](e: Throwable): Kleisli[G, A, A] =
  36. Concurrent.catsKleisliConcurrent[G, A].raiseError(e)
  37.  
  38. override def handleErrorWith[A](fa: Kleisli[G, A, A])(
  39. f: Throwable => Kleisli[G, A, A]): Kleisli[G, A, A] =
  40. Concurrent.catsKleisliConcurrent[G, A].handleErrorWith(fa)(f)
  41.  
  42. override def pure[A](x: A): Kleisli[G, A, A] =
  43. Concurrent.catsKleisliConcurrent[G, A].pure(x)
  44. }
  45.  
  46. type RefIO[State, A] = ReaderT[IO, Ref[IO, State], A]
  47.  
  48. /**
  49. * This is unsafe because it only works if you're sure nothing else is
  50. * concurrently running against the same Ref. Otherwise between when we
  51. * retrieve apply the state of our ref to our ReaderT computation, and
  52. * then examine the state again to put it into our StateT, the state of
  53. * the ref might have been mutated by something else.
  54. *
  55. * Note as well that unsafeToStateT is NOT the inverse of fromStateT. The
  56. * latter is completely safe even in the face of concurrency. This creates
  57. * a new Ref which will override the ref in the reader generated by fromStateT.
  58. */
  59. def unsafeToStateT[State, A](value: RefIO[State, A]): StateT[IO, State, A] =
  60. StateT[IO, State, A] { state =>
  61. for {
  62. ref <- Ref[IO, State](state)
  63. result <- value.run(ref)
  64. currentState <- ref.get
  65. } yield (currentState, result)
  66. }
  67.  
  68. def fromStateT[State: Monoid, A](stateT: StateT[IO, State, A]): RefIO[State, A] =
  69. Kleisli[IO, Ref[IO, State], A] { ref =>
  70. for {
  71. currentState <- ref.get
  72. stateAndValue <- stateT.run(currentState)
  73. (newState, value) = stateAndValue
  74. _ <- ref.setSync(newState)
  75. } yield value
  76. }
  77.  
  78. def stream(label: String): Stream[StateIO, Unit] =
  79. Stream.eval(Effect[StateIO].pure(())).evalMap[Unit] { _ =>
  80. for {
  81. _ <- StateT.modify[IO, List[String]]("b" :: _)
  82. _ <- StateT
  83. .inspect[IO, List[String], String](_.toString)
  84. .map(list => println(s"$label: $list"))
  85. } yield ()
  86. }
  87.  
  88. type StateIO[A] = StateT[IO, List[String], A]
  89.  
  90. type RefIOStr[A] = ReaderT[IO, Ref[IO, List[String]], A]
  91.  
  92. val stateIOToRefIOStr: StateIO ~> RefIOStr = new (StateIO ~> RefIOStr) {
  93. override def apply[A](fa: StateIO[A]): RefIOStr[A] = fromStateT(fa)
  94. }
  95.  
  96. val exampleIO = {
  97. val initialStateAction = Ref[IO, List[String]](List.empty)
  98. initialStateAction.flatMap { initialState =>
  99. implicit val effectInstance = readerTEffectInstance[IO, Ref[IO, List[String]]](initialState)
  100.  
  101. val io = for {
  102. currentState <- Kleisli.ask[IO, Ref[IO, List[String]]]
  103. _ <- Kleisli.liftF(currentState.modify("a" :: _))
  104. interrupt <- Signal[RefIOStr, Boolean](false)
  105. _ <- stream("with interrupt")
  106. .translate(stateIOToRefIOStr)
  107. .interruptWhen(interrupt)
  108. .compile
  109. .drain
  110. // Note that this stream will continue adding to the same state as the
  111. // with interrupt state stream
  112. _ <- stream("without interrupt")
  113. .translate(stateIOToRefIOStr)
  114. .compile
  115. .drain
  116. } yield ()
  117.  
  118. io.run(initialState)
  119. }
  120.  
  121. }
  122. }
Add Comment
Please, Sign In to add comment