Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import cats.implicits._
- import cats.effect.ExitCase.{Completed, Error}
- import cats.effect.{ExitCode, IO, IOApp}
- import cats.effect.concurrent.Ref
- import eu.timepit.refined.api.{Refined, RefinedTypeOps}
- import eu.timepit.refined.numeric.Interval
- import eu.timepit.refined.W
- import eu.timepit.refined.types.numeric.PosInt
- import fs2.concurrent.Queue
- trait Tap {
- def apply[A](effect: IO[A]): IO[A]
- def stop: IO[Unit]
- }
- object Tap {
- implicit val contextShift = IO.contextShift(scala.concurrent.ExecutionContext.global)
- type Percentage = Int Refined Interval.Closed[W.`0`.T, W.`100`.T]
- object Percentage extends RefinedTypeOps[Percentage, Int]
- def make(errBound: Percentage,
- maxSamples: PosInt = PosInt(100),
- qualified: Throwable => Boolean,
- rejected: => Throwable): IO[Tap] =
- for {
- queue <- Queue.unbounded[IO, Boolean]
- state <- Ref.of[IO, Double](0.0)
- fiber <- (fs2.Stream.emits(List.fill(maxSamples.value)(true)) ++ queue.dequeue)
- .sliding(maxSamples.value)
- .evalMap { window =>
- val (failures, count) = window.foldLeft[(Int, Int)]((0, 0)) { case ((s, c), r) => (s + (if (r) 0 else 1), c + 1) }
- val rate = if (count == 0) 0.0 else 1.0*failures/count
- state.set(rate)
- }.compile.drain.start
- } yield new Tap {
- override def apply[A](effect: IO[A]): IO[A] =
- for {
- errorRate <- state.get
- result <-
- if (errorRate * 100 <= errBound.value) {
- // below error rate, proceed and mark success/failure
- effect.guaranteeCase {
- case Completed => queue.enqueue1(true)
- case Error(e) => queue.enqueue1(!qualified(e))
- case _ => IO.unit
- }
- } else {
- // above error rate, mark success but fail
- queue.enqueue1(true) >> IO.raiseError[A](rejected)
- }
- } yield result
- override def stop: IO[Unit] = fiber.cancel
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement