Advertisement
mitrakov

Akka Streams: Either Fanout

Dec 30th, 2018 (edited)
306
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 1.66 KB | None | 0 0
  1. /**
  2.   Copy left Alexander Hasselbach
  3.  
  4.   Usage:
  5.  
  6.   val E = b.add(splitEither[Throwable,Int])
  7.   val parsed = b.add(Flow[Either[Throwable,Int]])
  8.   val valids = b.add(Flow[Int])
  9.   val invalids = b.add(Flow[Throwable])
  10.  
  11.   parsed ~> E.in
  12.             E.left  ~> invalids
  13.             E.right ~> valids
  14.  
  15. */
  16.  
  17. import akka.NotUsed
  18. import akka.stream.{Graph, FanOutShape, Inlet, Outlet}
  19. import akka.stream.scaladsl.{Flow, GraphDSL, Broadcast}
  20.  
  21. object EitherFanOutShape {
  22.   def apply[L, R](): Graph[EitherFanOutShape[Either[L, R], L, R], NotUsed] =
  23.     GraphDSL.create() { implicit b =>
  24.       import GraphDSL.Implicits._
  25.  
  26.       val input = b.add(Flow[Either[L, R]])
  27.       val bcast = b.add(Broadcast[Either[L, R]](outputPorts = 2))
  28.       val leftOut = b.add(Flow[Either[L, R]].collect { case Left(l) => l })
  29.       val rightOut = b.add(Flow[Either[L, R]].collect { case Right(r) => r })
  30.  
  31.       // format: off
  32.       input ~> bcast ~> leftOut
  33.                bcast ~> rightOut
  34.       // format: on
  35.  
  36.       new EitherFanOutShape(input.in, leftOut.out, rightOut.out)
  37.     }
  38. }
  39.  
  40. class EitherFanOutShape[In, L, R](_init: FanOutShape.Init[In]) extends FanOutShape[In](_init) {
  41.   def this(name: String) = this(FanOutShape.Name[In](name))
  42.   def this(in: Inlet[In], left: Outlet[L], right: Outlet[R]) = this(FanOutShape.Ports(in, left :: right :: Nil))
  43.  
  44.   override protected def construct(init: FanOutShape.Init[In]): FanOutShape[In] = new EitherFanOutShape(init)
  45.   override def deepCopy(): EitherFanOutShape[In, L, R] = super.deepCopy().asInstanceOf[EitherFanOutShape[In, L, R]]
  46.  
  47.   val left: Outlet[L] = newOutlet[L]("left")
  48.   val right: Outlet[R] = newOutlet[R]("right")
  49. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement