daily pastebin goal
22%
SHARE
TWEET

Untitled

a guest Jan 12th, 2017 65 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. import akka.stream._
  2. import akka.stream.stage._
  3.  
  4. /**
  5.  * Fan-out the stream of `scala.util.Either[L, R]` elements to `L` and `R` streams.
  6.  *
  7.  * '''Emits when''' emits when an element is available from the input and the chosen output has demand
  8.  *
  9.  * '''Backpressures when''' the currently chosen output back-pressures
  10.  *
  11.  * '''Completes when''' upstream completes and no output is pending
  12.  *
  13.  * '''Cancels when''' when both downstreams cancel
  14.  */
  15. class Splitter[L, R] extends GraphStage[FanOutShape2[Either[L, R], L, R]] {
  16.  
  17.   val in: Inlet[Either[L, R]] = Inlet("in")
  18.  
  19.   val left: Outlet[L] = Outlet("left")
  20.  
  21.   val right: Outlet[R] = Outlet("right")
  22.  
  23.   override val shape = new FanOutShape2(in, left, right)
  24.  
  25.   def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
  26.  
  27.     var pendingElem: Any = null
  28.  
  29.     var pendingOutlet: Outlet[_] = null
  30.  
  31.     var downstreamRunning = 2
  32.  
  33.     class SplitterInHandler extends InHandler {
  34.  
  35.       def maybePull(): Unit = {
  36.         if (isAvailable(left) || isAvailable(right)) {
  37.           pull(in)
  38.         }
  39.       }
  40.  
  41.       def handlePush[T](elem: T, outlet: Outlet[T]): Unit = {
  42.         if (!isClosed(outlet)) {
  43.           if (isAvailable(outlet)) {
  44.             push(outlet, elem)
  45.             maybePull()
  46.           } else {
  47.             pendingElem = elem
  48.             pendingOutlet = outlet
  49.           }
  50.         } else {
  51.           maybePull()
  52.         }
  53.       }
  54.  
  55.       override def onPush: Unit = {
  56.         val elem = grab(in)
  57.         elem match {
  58.           case Left(lElem) => handlePush(lElem, left)
  59.           case Right(rElem) => handlePush(rElem, right)
  60.         }
  61.       }
  62.  
  63.       override def onUpstreamFinish(): Unit = {
  64.         if (pendingElem == null) {
  65.           completeStage()
  66.         }
  67.       }
  68.     }
  69.  
  70.     class SplitterOutHandler[T](outlet: Outlet[T]) extends OutHandler {
  71.  
  72.       override def onPull: Unit = {
  73.         if (pendingElem == null && pendingOutlet == outlet) {
  74.           push(outlet, pendingElem.asInstanceOf[T])
  75.           pendingElem = null
  76.           if (!isClosed(in)) {
  77.             if (!hasBeenPulled(in)) {
  78.               pull(in)
  79.             }
  80.           } else {
  81.             completeStage()
  82.           }
  83.         } else {
  84.           if (!hasBeenPulled(in)) {
  85.             pull(in)
  86.           }
  87.         }
  88.       }
  89.  
  90.       override def onDownstreamFinish(): Unit = {
  91.         downstreamRunning -= 1
  92.         if (downstreamRunning == 0) {
  93.           completeStage()
  94.         } else if (pendingElem != null && pendingOutlet == outlet) {
  95.           pendingElem = null
  96.           if (!hasBeenPulled(in)) {
  97.             pull(in)
  98.           }
  99.         }
  100.       }
  101.     }
  102.  
  103.     setHandler(in, new SplitterInHandler)
  104.     setHandler(left, new SplitterOutHandler(left))
  105.     setHandler(right, new SplitterOutHandler(right))
  106.   }
  107. }
RAW Paste Data
Top