Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /**
- Copy left Alexander Hasselbach
- Usage:
- val E = b.add(splitEither[Throwable,Int])
- val parsed = b.add(Flow[Either[Throwable,Int]])
- val valids = b.add(Flow[Int])
- val invalids = b.add(Flow[Throwable])
- parsed ~> E.in
- E.left ~> invalids
- E.right ~> valids
- */
- import akka.NotUsed
- import akka.stream.{Graph, FanOutShape, Inlet, Outlet}
- import akka.stream.scaladsl.{Flow, GraphDSL, Broadcast}
- object EitherFanOutShape {
- def apply[L, R](): Graph[EitherFanOutShape[Either[L, R], L, R], NotUsed] =
- GraphDSL.create() { implicit b =>
- import GraphDSL.Implicits._
- val input = b.add(Flow[Either[L, R]])
- val bcast = b.add(Broadcast[Either[L, R]](outputPorts = 2))
- val leftOut = b.add(Flow[Either[L, R]].collect { case Left(l) => l })
- val rightOut = b.add(Flow[Either[L, R]].collect { case Right(r) => r })
- // format: off
- input ~> bcast ~> leftOut
- bcast ~> rightOut
- // format: on
- new EitherFanOutShape(input.in, leftOut.out, rightOut.out)
- }
- }
- class EitherFanOutShape[In, L, R](_init: FanOutShape.Init[In]) extends FanOutShape[In](_init) {
- def this(name: String) = this(FanOutShape.Name[In](name))
- def this(in: Inlet[In], left: Outlet[L], right: Outlet[R]) = this(FanOutShape.Ports(in, left :: right :: Nil))
- override protected def construct(init: FanOutShape.Init[In]): FanOutShape[In] = new EitherFanOutShape(init)
- override def deepCopy(): EitherFanOutShape[In, L, R] = super.deepCopy().asInstanceOf[EitherFanOutShape[In, L, R]]
- val left: Outlet[L] = newOutlet[L]("left")
- val right: Outlet[R] = newOutlet[R]("right")
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement