Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import akka.stream._
- import akka.stream.stage._
- /**
- * Fan-out the stream of `scala.util.Either[L, R]` elements to `L` and `R` streams.
- *
- * '''Emits when''' emits when an element is available from the input and the chosen output has demand
- *
- * '''Backpressures when''' the currently chosen output back-pressures
- *
- * '''Completes when''' upstream completes and no output is pending
- *
- * '''Cancels when''' when both downstreams cancel
- */
- class Splitter[L, R] extends GraphStage[FanOutShape2[Either[L, R], L, R]] {
- val in: Inlet[Either[L, R]] = Inlet("in")
- val left: Outlet[L] = Outlet("left")
- val right: Outlet[R] = Outlet("right")
- override val shape = new FanOutShape2(in, left, right)
- def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
- var pendingElem: Any = null
- var pendingOutlet: Outlet[_] = null
- var downstreamRunning = 2
- class SplitterInHandler extends InHandler {
- def maybePull(): Unit = {
- if (isAvailable(left) || isAvailable(right)) {
- pull(in)
- }
- }
- def handlePush[T](elem: T, outlet: Outlet[T]): Unit = {
- if (!isClosed(outlet)) {
- if (isAvailable(outlet)) {
- push(outlet, elem)
- maybePull()
- } else {
- pendingElem = elem
- pendingOutlet = outlet
- }
- } else {
- maybePull()
- }
- }
- override def onPush: Unit = {
- val elem = grab(in)
- elem match {
- case Left(lElem) => handlePush(lElem, left)
- case Right(rElem) => handlePush(rElem, right)
- }
- }
- override def onUpstreamFinish(): Unit = {
- if (pendingElem == null) {
- completeStage()
- }
- }
- }
- class SplitterOutHandler[T](outlet: Outlet[T]) extends OutHandler {
- override def onPull: Unit = {
- if (pendingElem == null && pendingOutlet == outlet) {
- push(outlet, pendingElem.asInstanceOf[T])
- pendingElem = null
- if (!isClosed(in)) {
- if (!hasBeenPulled(in)) {
- pull(in)
- }
- } else {
- completeStage()
- }
- } else {
- if (!hasBeenPulled(in)) {
- pull(in)
- }
- }
- }
- override def onDownstreamFinish(): Unit = {
- downstreamRunning -= 1
- if (downstreamRunning == 0) {
- completeStage()
- } else if (pendingElem != null && pendingOutlet == outlet) {
- pendingElem = null
- if (!hasBeenPulled(in)) {
- pull(in)
- }
- }
- }
- }
- setHandler(in, new SplitterInHandler)
- setHandler(left, new SplitterOutHandler(left))
- setHandler(right, new SplitterOutHandler(right))
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement