Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- object Example {
- import fs2._
- import fs2.async.mutable._
- import cats._
- import cats.data._
- import cats.implicits._
- /**
- * This exists only because cats-effect keeps the trait expressing the
- * Concurrent instance of Kleisli package private, otherwise we could just
- * directly extend that trait instead of needing to reimplement each of
- * the methods of our super classes
- */
- def readerTEffectInstance[G[_]: Effect, A](
- state: A
- ): Effect[({ type L[X] = Kleisli[G, A, X] })#L] =
- new Effect[({ type L[X] = Kleisli[G, A, X] })#L] {
- override def runAsync[A](fa: Kleisli[G, A, A])(
- cb: Either[Throwable, A] => IO[Unit]): IO[Unit] =
- Effect[G].runAsync(fa.run(state))(cb)
- override def async[A](k: (Either[Throwable, A] => Unit) => Unit): Kleisli[G, A, A] =
- Concurrent.catsKleisliConcurrent[G, A].async(k)
- override def suspend[A](thunk: => Kleisli[G, A, A]): Kleisli[G, A, A] =
- Concurrent.catsKleisliConcurrent[G, A].suspend(thunk)
- override def flatMap[A, B](fa: Kleisli[G, A, A])(f: A => Kleisli[G, A, B]): Kleisli[G, A, B] =
- Concurrent.catsKleisliConcurrent[G, A].flatMap(fa)(f)
- override def tailRecM[A, B](a: A)(f: A => Kleisli[G, A, Either[A, B]]): Kleisli[G, A, B] =
- Concurrent.catsKleisliConcurrent[G, A].tailRecM(a)(f)
- override def raiseError[A](e: Throwable): Kleisli[G, A, A] =
- Concurrent.catsKleisliConcurrent[G, A].raiseError(e)
- override def handleErrorWith[A](fa: Kleisli[G, A, A])(
- f: Throwable => Kleisli[G, A, A]): Kleisli[G, A, A] =
- Concurrent.catsKleisliConcurrent[G, A].handleErrorWith(fa)(f)
- override def pure[A](x: A): Kleisli[G, A, A] =
- Concurrent.catsKleisliConcurrent[G, A].pure(x)
- }
- type RefIO[State, A] = ReaderT[IO, Ref[IO, State], A]
- /**
- * This is unsafe because it only works if you're sure nothing else is
- * concurrently running against the same Ref. Otherwise between when we
- * retrieve apply the state of our ref to our ReaderT computation, and
- * then examine the state again to put it into our StateT, the state of
- * the ref might have been mutated by something else.
- *
- * Note as well that unsafeToStateT is NOT the inverse of fromStateT. The
- * latter is completely safe even in the face of concurrency. This creates
- * a new Ref which will override the ref in the reader generated by fromStateT.
- */
- def unsafeToStateT[State, A](value: RefIO[State, A]): StateT[IO, State, A] =
- StateT[IO, State, A] { state =>
- for {
- ref <- Ref[IO, State](state)
- result <- value.run(ref)
- currentState <- ref.get
- } yield (currentState, result)
- }
- def fromStateT[State: Monoid, A](stateT: StateT[IO, State, A]): RefIO[State, A] =
- Kleisli[IO, Ref[IO, State], A] { ref =>
- for {
- currentState <- ref.get
- stateAndValue <- stateT.run(currentState)
- (newState, value) = stateAndValue
- _ <- ref.setSync(newState)
- } yield value
- }
- def stream(label: String): Stream[StateIO, Unit] =
- Stream.eval(Effect[StateIO].pure(())).evalMap[Unit] { _ =>
- for {
- _ <- StateT.modify[IO, List[String]]("b" :: _)
- _ <- StateT
- .inspect[IO, List[String], String](_.toString)
- .map(list => println(s"$label: $list"))
- } yield ()
- }
- type StateIO[A] = StateT[IO, List[String], A]
- type RefIOStr[A] = ReaderT[IO, Ref[IO, List[String]], A]
- val stateIOToRefIOStr: StateIO ~> RefIOStr = new (StateIO ~> RefIOStr) {
- override def apply[A](fa: StateIO[A]): RefIOStr[A] = fromStateT(fa)
- }
- val exampleIO = {
- val initialStateAction = Ref[IO, List[String]](List.empty)
- initialStateAction.flatMap { initialState =>
- implicit val effectInstance = readerTEffectInstance[IO, Ref[IO, List[String]]](initialState)
- val io = for {
- currentState <- Kleisli.ask[IO, Ref[IO, List[String]]]
- _ <- Kleisli.liftF(currentState.modify("a" :: _))
- interrupt <- Signal[RefIOStr, Boolean](false)
- _ <- stream("with interrupt")
- .translate(stateIOToRefIOStr)
- .interruptWhen(interrupt)
- .compile
- .drain
- // Note that this stream will continue adding to the same state as the
- // with interrupt state stream
- _ <- stream("without interrupt")
- .translate(stateIOToRefIOStr)
- .compile
- .drain
- } yield ()
- io.run(initialState)
- }
- }
- }
Add Comment
Please, Sign In to add comment