Advertisement
Guest User

Untitled

a guest
Mar 25th, 2019
67
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.99 KB | None | 0 0
  1. import cats.implicits._
  2. import cats.effect.ExitCase.{Completed, Error}
  3. import cats.effect.{ExitCode, IO, IOApp}
  4. import cats.effect.concurrent.Ref
  5. import eu.timepit.refined.api.{Refined, RefinedTypeOps}
  6. import eu.timepit.refined.numeric.Interval
  7. import eu.timepit.refined.W
  8. import eu.timepit.refined.types.numeric.PosInt
  9. import fs2.concurrent.Queue
  10.  
  11. trait Tap {
  12. def apply[A](effect: IO[A]): IO[A]
  13. def stop: IO[Unit]
  14. }
  15.  
  16. object Tap {
  17.  
  18. implicit val contextShift = IO.contextShift(scala.concurrent.ExecutionContext.global)
  19.  
  20. type Percentage = Int Refined Interval.Closed[W.`0`.T, W.`100`.T]
  21. object Percentage extends RefinedTypeOps[Percentage, Int]
  22.  
  23. def make(errBound: Percentage,
  24. maxSamples: PosInt = PosInt(100),
  25. qualified: Throwable => Boolean,
  26. rejected: => Throwable): IO[Tap] =
  27. for {
  28. queue <- Queue.unbounded[IO, Boolean]
  29. state <- Ref.of[IO, Double](0.0)
  30. fiber <- (fs2.Stream.emits(List.fill(maxSamples.value)(true)) ++ queue.dequeue)
  31. .sliding(maxSamples.value)
  32. .evalMap { window =>
  33. val (failures, count) = window.foldLeft[(Int, Int)]((0, 0)) { case ((s, c), r) => (s + (if (r) 0 else 1), c + 1) }
  34. val rate = if (count == 0) 0.0 else 1.0*failures/count
  35. state.set(rate)
  36. }.compile.drain.start
  37. } yield new Tap {
  38.  
  39. override def apply[A](effect: IO[A]): IO[A] =
  40. for {
  41. errorRate <- state.get
  42. result <-
  43. if (errorRate * 100 <= errBound.value) {
  44. // below error rate, proceed and mark success/failure
  45. effect.guaranteeCase {
  46. case Completed => queue.enqueue1(true)
  47. case Error(e) => queue.enqueue1(!qualified(e))
  48. case _ => IO.unit
  49. }
  50. } else {
  51. // above error rate, mark success but fail
  52. queue.enqueue1(true) >> IO.raiseError[A](rejected)
  53. }
  54. } yield result
  55.  
  56. override def stop: IO[Unit] = fiber.cancel
  57. }
  58.  
  59. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement