Guest User

scala-cli-simple-io

a guest
Dec 5th, 2024
52
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 6.17 KB | None | 0 0
  1. //> using scala 3.5.2
  2. //> using options -Wall
  3.  
  4. import java.util.concurrent.atomic.AtomicInteger
  5. import java.util.concurrent.Executors
  6. import java.util.{Timer, TimerTask}
  7. import scala.concurrent.duration._
  8. import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor, Promise}
  9. import scala.util.chaining._
  10. import scala.util.{Failure, Success, Try}
  11.  
  12. class Context(private val timer: Timer, private val threadPool: ExecutionContext):
  13.   def schedule(task: => Unit, fd: FiniteDuration): Unit =
  14.     val timerTask = new TimerTask:
  15.       def run(): Unit = sendToPool(task)
  16.     timer.schedule(timerTask, fd.toMillis)
  17.  
  18.   def sendToPool[A](task: => A): Unit = threadPool.execute { () =>
  19.     val _ = task
  20.   }
  21.  
  22. sealed trait IO[A] extends Product with Serializable:
  23.  
  24.   /** Implemented in terms of [[runAsync]] */
  25.   def runSync(
  26.   )(
  27.     implicit context: Context
  28.   ): Try[A] =
  29.     val p = Promise[A]()
  30.     this.runAsync(p.complete)
  31.     Await.ready(p.future, Duration.Inf).value.get
  32.  
  33.   /** @param callback to handle a result of a computation, guaranteed to be invoked only once. */
  34.   def runAsync(
  35.     callback: Try[A] => Unit
  36.   )(
  37.     implicit context: Context
  38.   ): Unit =
  39.     IO.runLoop(this)(callback.asInstanceOf[Try[?] => Unit])
  40.  
  41.   def map[B](f: A => B): IO[B] = IO.Map(this, f)
  42.   def as[B](b: B): IO[B] = IO.Map(this, (_: A) => b)
  43.   def flatMap[B](f: A => IO[B]): IO[B] = IO.FlatMap(this, f)
  44.   def >>[B](f: IO[B]): IO[B] = IO.FlatMap(this, (_: A) => f)
  45.   def recover(f: PartialFunction[Throwable, A]): IO[A] = IO.Recover(this, f)
  46.   def recoverWith(f: PartialFunction[Throwable, IO[A]]): IO[A] = IO.RecoverWith(this, f)
  47.   def fork(): IO[IO.Fiber[A]] = IO.Fork(this)
  48.  
  49. object IO:
  50.   def apply[A](thunk: => A): IO[A] = IO.Delay(() => thunk)
  51.   def pure[A](a: A): IO[A] = IO.Pure(a)
  52.   def async[A](cb: (Try[A] => Unit) => Unit): IO[A] = IO.Async(cb)
  53.   def sleep(duration: FiniteDuration): IO[Unit] = IO.Sleep(duration)
  54.   def raiseError(e: Throwable): IO[Unit] = IO.Error(e)
  55.   def putStrLn(s: String): IO[Unit] = IO(println(s"${Thread.currentThread().getName}: $s"))
  56.   def shift: IO[Unit] = IO.Shift
  57.  
  58.   final private case class Pure[A](value: A) extends IO[A]
  59.   final private case class Delay[A](thunk: () => A) extends IO[A]
  60.   final private case class Async[A](callback: (Try[A] => Unit) => Unit) extends IO[A]
  61.   final private case class FlatMap[A, B](prev: IO[A], f: A => IO[B]) extends IO[B]
  62.   final private case class Map[A, B](prev: IO[A], f: A => B) extends IO[B]
  63.   final private case class Recover[A](prev: IO[A], f: PartialFunction[Throwable, A]) extends IO[A]
  64.   final private case class RecoverWith[A](prev: IO[A], f: PartialFunction[Throwable, IO[A]]) extends IO[A]
  65.   final private case class Error(e: Throwable) extends IO[Unit]
  66.   final private case class Sleep[A](duration: FiniteDuration) extends IO[A]
  67.   final private case class Fork[A](io: IO[A]) extends IO[Fiber[A]]
  68.   final private case class Join[A](fiber: Fiber[A]) extends IO[A]
  69.   private case object Shift extends IO[Unit]
  70.  
  71.   class Fiber[A]:
  72.     private var callbacks = Set.empty[Try[A] => Unit]
  73.     private var result = Option.empty[Try[A]]
  74.  
  75.     def join(): IO[A] = IO.Join(this)
  76.  
  77.     private[IO] def register(cb: Try[A] => Unit): Unit =
  78.       synchronized:
  79.         result match
  80.           // To ensure the callback invoked only once.
  81.           case Some(value) => cb(value)
  82.           case None => callbacks = callbacks + cb
  83.  
  84.     private[IO] def finish(res: Try[A]): Unit =
  85.       synchronized:
  86.         // To ensure the callback invoked only once.
  87.         result = Some(res)
  88.         callbacks.foreach(_(res))
  89.         callbacks = Set.empty
  90.  
  91.   /** Optimised for maximum throughput, fairness must be ensured by the end developer by using [[IO.shift]]. */
  92.   private def runLoop(
  93.     io: IO[?]
  94.   )(
  95.     done: Try[?] => Unit
  96.   )(
  97.     implicit context: Context
  98.   ): Unit =
  99.     // Evaluation should run in an intended thread pool since the beginning.
  100.     // Otherwise, the first computations would run in a default 'main' JVM thread.
  101.     context.sendToPool(eval(io)(done))
  102.  
  103.   private def eval(
  104.     io: IO[?]
  105.   )(
  106.     done: Try[?] => Unit
  107.   )(
  108.     implicit context: Context
  109.   ): Unit =
  110.     io match
  111.       case IO.Pure(value) => done(Success(value))
  112.       case IO.Delay(thunk) => done(Success(thunk()))
  113.       case IO.Async(asyncTaskDefinition) => asyncTaskDefinition(done)
  114.       case IO.FlatMap(prev, f) =>
  115.         eval(prev):
  116.           case Success(value) => eval(f.asInstanceOf[Any => IO[?]](value))(done)
  117.           case x => done(x)
  118.       case IO.Map(prev, f) => eval(prev)(res => done(res.map(f.asInstanceOf[Any => Any])))
  119.       case IO.Recover(prev, f) => eval(prev)(res => done(res.recover(f)))
  120.       case IO.RecoverWith(prev, f) =>
  121.         eval(prev):
  122.           case Failure(e) if f.isDefinedAt(e) => eval(f(e))(done)
  123.           case x => done(x)
  124.       case IO.Error(e) => done(Failure(e))
  125.       case IO.Sleep(duration) => context.schedule(done(Success(())), duration)
  126.       case _: IO.Fork[_] =>
  127.         val fiber = new Fiber[Any] {}
  128.         context.sendToPool(eval(io.asInstanceOf[IO.Fork[?]].io)(fiber.finish))
  129.         done(Success(fiber))
  130.       case IO.Join(fiber) => fiber.register(done)
  131.       case IO.Shift => context.sendToPool(eval(io)(done))
  132.  
  133. object Main:
  134.   def main(args: Array[String]): Unit =
  135.     val program: IO[Unit] = (for {
  136.       fiber <- (IO.sleep(1.second) >> IO.putStrLn("1") >> IO.sleep(3.second) >> IO.putStrLn("2") >> IO.pure(
  137.         42
  138.       )).fork()
  139.       _ <- IO.putStrLn("3")
  140.       value <- fiber.join()
  141.       value2 <- fiber.join()
  142.       _ <- IO.raiseError(new RuntimeException(s"Boom! $value $value2"))
  143.     } yield ()).recoverWith { case e => IO.putStrLn(e.getMessage) }
  144.  
  145.     implicit val context: Context =
  146.       val timer: Timer = new Timer("Pet IO Timer", true)
  147.       val counter = new AtomicInteger(0)
  148.       val nThreads = 1
  149.       val pool: ExecutionContextExecutor = ExecutionContext.fromExecutor(
  150.         Executors.newFixedThreadPool(
  151.           nThreads,
  152.           (r: Runnable) =>
  153.             new Thread(r).tap(_.setDaemon(true)).tap(_.setName(s"Pet IO ${counter.getAndIncrement()}"))
  154.         )
  155.       )
  156.       new Context(timer, pool)
  157.     println(program.runSync())
Advertisement
Add Comment
Please, Sign In to add comment