Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import akka.NotUsed
- import akka.stream._
- import akka.stream.scaladsl._
- import cats.data.Ior
- object IorFanOutShape {
- def apply[L, R](): Graph[IorFanOutShape[Ior[L, R], L, R], NotUsed] =
- GraphDSL.create() { implicit b =>
- import GraphDSL.Implicits._
- val input = b.add(Flow[Ior[L, R]])
- val bcast = b.add(Broadcast[Ior[L, R]](outputPorts = 2))
- val leftOut = b.add(Flow[Ior[L, R]].collect { case Ior.Left(l) => l })
- val rightOut = b.add(Flow[Ior[L, R]].collect { case Ior.Right(r) => r })
- // format: off
- input ~> bcast ~> leftOut
- bcast ~> rightOut
- // format: on
- new IorFanOutShape(input.in, leftOut.out, rightOut.out)
- }
- }
- class IorFanOutShape[In, L, R](_init: FanOutShape.Init[In]) extends FanOutShape[In](_init) {
- def this(in: Inlet[In], left: Outlet[L], right: Outlet[R]) = this(FanOutShape.Ports(in, left :: right :: Nil))
- override protected def construct(init: FanOutShape.Init[In]): FanOutShape[In] = new IorFanOutShape(init)
- override def deepCopy(): IorFanOutShape[In, L, R] = super.deepCopy().asInstanceOf[IorFanOutShape[In, L, R]]
- val left: Outlet[L] = newOutlet[L]("left")
- val right: Outlet[R] = newOutlet[R]("right")
- }
- // Spec:
- import akka.actor.ActorSystem
- import akka.stream.scaladsl.{Flow, GraphDSL, RunnableGraph, Sink, Source}
- import akka.stream.{ActorMaterializer, ClosedShape, FlowShape, Graph}
- import cats.data.Ior
- import org.scalatest.concurrent.ScalaFutures
- import org.scalatest.{FlatSpec, Matchers}
- class IorFanOutShapeSpec extends FlatSpec with ScalaFutures with Matchers {
- implicit val sys: ActorSystem = ActorSystem()
- implicit val mat: ActorMaterializer = ActorMaterializer()
- "IorFanOutShape" should "split elements into 2 outlets" in new Fixture {
- val source = Source(List(1, 2, 3, 4, 5, 6, 7, 8, 9))
- val graphL = leftGraph(source, Sink.seq[Int], Sink.seq[Int]) (_ % 2 == 0)
- val graphR = rightGraph(source, Sink.seq[Int], Sink.seq[Int]) (_ % 2 == 0)
- val odds: List[Int] = RunnableGraph.fromGraph(graphL).run().futureValue.toList
- val evens: List[Int] = RunnableGraph.fromGraph(graphR).run().futureValue.toList
- odds shouldBe List(1, 3, 5, 7, 9)
- evens shouldBe List(2, 4, 6, 8)
- }
- trait Fixture {
- def leftGraph[T, Mat](source: Source[T, _], sink1: Sink[T, Mat], sink2: Sink[T, _])(f: T => Boolean): Graph[ClosedShape, Mat] = {
- GraphDSL.create(sink1) {
- implicit builder => sink =>
- import GraphDSL.Implicits._
- val flow: FlowShape[T, Ior[T, T]] = builder.add(Flow[T].map(i => if (f(i)) Ior.right(i) else Ior.left(i)))
- val bcastT: IorFanOutShape[Ior[T, T], T, T] = builder.add(IorFanOutShape[T, T]())
- // format: off
- bcastT.left ~> sink
- source ~> flow ~> bcastT.in
- bcastT.right ~> sink2
- // format: on
- ClosedShape
- }
- }
- def rightGraph[T, Mat](source: Source[T, _], sink1: Sink[T, _], sink2: Sink[T, Mat])(f: T => Boolean): Graph[ClosedShape, Mat] = {
- GraphDSL.create(sink2) {
- implicit builder => sink =>
- import GraphDSL.Implicits._
- val flow: FlowShape[T, Ior[T, T]] = builder.add(Flow[T].map(i => if (f(i)) Ior.right(i) else Ior.left(i)))
- val bcastT: IorFanOutShape[Ior[T, T], T, T] = builder.add(IorFanOutShape[T, T]())
- // format: off
- bcastT.left ~> sink1
- source ~> flow ~> bcastT.in
- bcastT.right ~> sink
- // format: on
- ClosedShape
- }
- }
- }
- }
Add Comment
Please, Sign In to add comment