Advertisement
Guest User

Untitled

a guest
Jan 18th, 2019
81
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 6.50 KB | None | 0 0
  1. package fpinscala.parallelism
  2.  
  3. import java.util.concurrent._
  4. import language.implicitConversions
  5.  
  6.  
  7. object Par {
  8.   type Par[A] = ExecutorService => Future[A]
  9.  
  10.   def run[A](s: ExecutorService)(a: Par[A]): Future[A] = a(s)
  11.  
  12.   def unit[A](a: A): Par[A] = (es: ExecutorService) => UnitFuture(a) // `unit` is represented as a function that returns a `UnitFuture`, which is a simple implementation of `Future` that just wraps a constant value. It doesn't use the `ExecutorService` at all. It's always done and can't be cancelled. Its `get` method simply returns the value that we gave it.
  13.  
  14.   private case class UnitFuture[A](get: A) extends Future[A] {
  15.     def isDone = true
  16.     def get(timeout: Long, units: TimeUnit) = get
  17.     def isCancelled = false
  18.     def cancel(evenIfRunning: Boolean): Boolean = false
  19.   }
  20.  
  21.   def map2[A,B,C](a: Par[A], b: Par[B])(f: (A,B) => C): Par[C] = // `map2` doesn't evaluate the call to `f` in a separate logical thread, in accord with our design choice of having `fork` be the sole function in the API for controlling parallelism. We can always do `fork(map2(a,b)(f))` if we want the evaluation of `f` to occur in a separate thread.
  22.     (es: ExecutorService) => {
  23.       val af = a(es)
  24.       val bf = b(es)
  25.       UnitFuture(f(af.get, bf.get)) // This implementation of `map2` does _not_ respect timeouts. It simply passes the `ExecutorService` on to both `Par` values, waits for the results of the Futures `af` and `bf`, applies `f` to them, and wraps them in a `UnitFuture`. In order to respect timeouts, we'd need a new `Future` implementation that records the amount of time spent evaluating `af`, then subtracts that time from the available time allocated for evaluating `bf`.
  26.     }
  27.  
  28.   def fork[A](a: => Par[A]): Par[A] = // This is the simplest and most natural implementation of `fork`, but there are some problems with it--for one, the outer `Callable` will block waiting for the "inner" task to complete. Since this blocking occupies a thread in our thread pool, or whatever resource backs the `ExecutorService`, this implies that we're losing out on some potential parallelism. Essentially, we're using two threads when one should suffice. This is a symptom of a more serious problem with the implementation, and we will discuss this later in the chapter.
  29.     es => es.submit(new Callable[A] {
  30.       def call = a(es).get
  31.     })
  32.  
  33.   def lazyUnit[A](a: => A): Par[A] = fork(unit(a))
  34.  
  35.   def asyncF[A,B](f: A => B): A => Par[B] =
  36.     a => lazyUnit(f(a))
  37.  
  38.   def map[A,B](pa: Par[A])(f: A => B): Par[B] =
  39.     map2(pa, unit(()))((a,_) => f(a))
  40.  
  41.   def sortPar(parList: Par[List[Int]]) = map(parList)(_.sorted)
  42.  
  43.   def sequence_simple[A](l: List[Par[A]]): Par[List[A]] =
  44.     l.foldRight[Par[List[A]]](unit(List()))((h,t) => map2(h,t)(_ :: _))
  45.  
  46.   // This implementation forks the recursive step off to a new logical thread,
  47.   // making it effectively tail-recursive. However, we are constructing
  48.   // a right-nested parallel program, and we can get better performance by
  49.   // dividing the list in half, and running both halves in parallel.
  50.   // See `sequenceBalanced` below.
  51.   def sequenceRight[A](as: List[Par[A]]): Par[List[A]] =
  52.     as match {
  53.       case Nil => unit(Nil)
  54.       case h :: t => map2(h, fork(sequenceRight(t)))(_ :: _)
  55.     }
  56.  
  57.   // We define `sequenceBalanced` using `IndexedSeq`, which provides an
  58.   // efficient function for splitting the sequence in half.
  59.   def sequenceBalanced[A](as: IndexedSeq[Par[A]]): Par[IndexedSeq[A]] = fork {
  60.     if (as.isEmpty) unit(Vector())
  61.     else if (as.length == 1) map(as.head)(a => Vector(a))
  62.     else {
  63.       val (l,r) = as.splitAt(as.length/2)
  64.       map2(sequenceBalanced(l), sequenceBalanced(r))(_ ++ _)
  65.     }
  66.   }
  67.  
  68.   def sequence[A](as: List[Par[A]]): Par[List[A]] =
  69.     map(sequenceBalanced(as.toIndexedSeq))(_.toList)
  70.  
  71.   def parFilter[A](l: List[A])(f: A => Boolean): Par[List[A]] = {
  72.     val pars: List[Par[List[A]]] =
  73.       l map (asyncF((a: A) => if (f(a)) List(a) else List()))
  74.     map(sequence(pars))(_.flatten) // convenience method on `List` for concatenating a list of lists
  75.   }
  76.  
  77.   def equal[A](e: ExecutorService)(p: Par[A], p2: Par[A]): Boolean =
  78.     p(e).get == p2(e).get
  79.  
  80.   def delay[A](fa: => Par[A]): Par[A] =
  81.     es => fa(es)
  82.  
  83.   def choice[A](cond: Par[Boolean])(t: Par[A], f: Par[A]): Par[A] =
  84.     es =>
  85.       if (run(es)(cond).get) t(es) // Notice we are blocking on the result of `cond`.
  86.       else f(es)
  87.  
  88.   def choiceN[A](n: Par[Int])(choices: List[Par[A]]): Par[A] =
  89.     es => {
  90.       val ind = run(es)(n).get // Full source files
  91.       run(es)(choices(ind))
  92.     }
  93.  
  94.   def choiceViaChoiceN[A](a: Par[Boolean])(ifTrue: Par[A], ifFalse: Par[A]): Par[A] =
  95.     choiceN(map(a)(b => if (b) 0 else 1))(List(ifTrue, ifFalse))
  96.  
  97.   def choiceMap[K,V](key: Par[K])(choices: Map[K,Par[V]]): Par[V] =
  98.     es => {
  99.       val k = run(es)(key).get
  100.       run(es)(choices(k))
  101.     }
  102.  
  103.   def chooser[A,B](p: Par[A])(choices: A => Par[B]): Par[B] =
  104.     es => {
  105.       val k = run(es)(p).get
  106.       run(es)(choices(k))
  107.     }
  108.  
  109.   /* `chooser` is usually called `flatMap` or `bind`. */
  110.   def flatMap[A,B](p: Par[A])(choices: A => Par[B]): Par[B] =
  111.     es => {
  112.       val k = run(es)(p).get
  113.       run(es)(choices(k))
  114.     }
  115.  
  116.   def choiceViaFlatMap[A](p: Par[Boolean])(f: Par[A], t: Par[A]): Par[A] =
  117.     flatMap(p)(b => if (b) t else f)
  118.  
  119.   def choiceNViaFlatMap[A](p: Par[Int])(choices: List[Par[A]]): Par[A] =
  120.     flatMap(p)(i => choices(i))
  121.  
  122.   // see nonblocking implementation in `Nonblocking.scala`
  123.   def join[A](a: Par[Par[A]]): Par[A] =
  124.     es => run(es)(run(es)(a).get())
  125.  
  126.   def joinViaFlatMap[A](a: Par[Par[A]]): Par[A] =
  127.     flatMap(a)(x => x)
  128.  
  129.   def flatMapViaJoin[A,B](p: Par[A])(f: A => Par[B]): Par[B] =
  130.     join(map(p)(f))
  131.   /* Gives us infix syntax for `Par`. */
  132.   implicit def toParOps[A](p: Par[A]): ParOps[A] = new ParOps(p)
  133.  
  134.   class ParOps[A](p: Par[A]) {
  135.  
  136.   }
  137. }
  138.  
  139. object Examples {
  140.   import Par._
  141.   def sum(ints: IndexedSeq[Int]): Int = // `IndexedSeq` is a superclass of random-access sequences like `Vector` in the standard library. Unlike lists, these sequences provide an efficient `splitAt` method for dividing them into two parts at a particular index.
  142.     if (ints.size <= 1)
  143.       ints.headOption getOrElse 0 // `headOption` is a method defined on all collections in Scala. We saw this function in chapter 3.
  144.     else {
  145.       val (l,r) = ints.splitAt(ints.length/2) // Divide the sequence in half using the `splitAt` function.
  146.       sum(l) + sum(r) // Recursively sum both halves and add the results together.
  147.     }
  148. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement