mitrakov

Streams: IorFanOutShape

Feb 20th, 2020
305
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 3.61 KB | None | 0 0
  1. import akka.NotUsed
  2. import akka.stream._
  3. import akka.stream.scaladsl._
  4. import cats.data.Ior
  5.  
  6. object IorFanOutShape {
  7.   def apply[L, R](): Graph[IorFanOutShape[Ior[L, R], L, R], NotUsed] =
  8.     GraphDSL.create() { implicit b =>
  9.       import GraphDSL.Implicits._
  10.  
  11.       val input = b.add(Flow[Ior[L, R]])
  12.       val bcast = b.add(Broadcast[Ior[L, R]](outputPorts = 2))
  13.       val leftOut = b.add(Flow[Ior[L, R]].collect { case Ior.Left(l) => l })
  14.       val rightOut = b.add(Flow[Ior[L, R]].collect { case Ior.Right(r) => r })
  15.  
  16.       // format: off
  17.       input ~> bcast ~> leftOut
  18.                bcast ~> rightOut
  19.       // format: on
  20.  
  21.       new IorFanOutShape(input.in, leftOut.out, rightOut.out)
  22.     }
  23. }
  24.  
  25. class IorFanOutShape[In, L, R](_init: FanOutShape.Init[In]) extends FanOutShape[In](_init) {
  26.   def this(in: Inlet[In], left: Outlet[L], right: Outlet[R]) = this(FanOutShape.Ports(in, left :: right :: Nil))
  27.  
  28.   override protected def construct(init: FanOutShape.Init[In]): FanOutShape[In] = new IorFanOutShape(init)
  29.   override def deepCopy(): IorFanOutShape[In, L, R] = super.deepCopy().asInstanceOf[IorFanOutShape[In, L, R]]
  30.  
  31.   val left: Outlet[L] = newOutlet[L]("left")
  32.   val right: Outlet[R] = newOutlet[R]("right")
  33. }
  34.  
  35.  
  36.  
  37. // Spec:
  38. import akka.actor.ActorSystem
  39. import akka.stream.scaladsl.{Flow, GraphDSL, RunnableGraph, Sink, Source}
  40. import akka.stream.{ActorMaterializer, ClosedShape, FlowShape, Graph}
  41. import cats.data.Ior
  42. import org.scalatest.concurrent.ScalaFutures
  43. import org.scalatest.{FlatSpec, Matchers}
  44.  
  45. class IorFanOutShapeSpec extends FlatSpec with ScalaFutures with Matchers {
  46.   implicit val sys: ActorSystem = ActorSystem()
  47.   implicit val mat: ActorMaterializer = ActorMaterializer()
  48.  
  49.   "IorFanOutShape" should "split elements into 2 outlets" in new Fixture {
  50.     val source = Source(List(1, 2, 3, 4, 5, 6, 7, 8, 9))
  51.     val graphL = leftGraph(source, Sink.seq[Int], Sink.seq[Int]) (_ % 2 == 0)
  52.     val graphR = rightGraph(source, Sink.seq[Int], Sink.seq[Int]) (_ % 2 == 0)
  53.  
  54.     val odds: List[Int] = RunnableGraph.fromGraph(graphL).run().futureValue.toList
  55.     val evens: List[Int] = RunnableGraph.fromGraph(graphR).run().futureValue.toList
  56.  
  57.     odds shouldBe List(1, 3, 5, 7, 9)
  58.     evens shouldBe List(2, 4, 6, 8)
  59.   }
  60.  
  61.   trait Fixture {
  62.     def leftGraph[T, Mat](source: Source[T, _], sink1: Sink[T, Mat], sink2: Sink[T, _])(f: T => Boolean): Graph[ClosedShape, Mat] = {
  63.       GraphDSL.create(sink1) {
  64.         implicit builder => sink =>
  65.           import GraphDSL.Implicits._
  66.           val flow: FlowShape[T, Ior[T, T]] = builder.add(Flow[T].map(i => if (f(i)) Ior.right(i) else Ior.left(i)))
  67.           val bcastT: IorFanOutShape[Ior[T, T], T, T] = builder.add(IorFanOutShape[T, T]())
  68.  
  69.           // format: off
  70.                             bcastT.left ~> sink
  71.           source ~> flow ~> bcastT.in
  72.                             bcastT.right ~> sink2
  73.           // format: on
  74.           ClosedShape
  75.       }
  76.     }
  77.  
  78.     def rightGraph[T, Mat](source: Source[T, _], sink1: Sink[T, _], sink2: Sink[T, Mat])(f: T => Boolean): Graph[ClosedShape, Mat] = {
  79.       GraphDSL.create(sink2) {
  80.         implicit builder => sink =>
  81.           import GraphDSL.Implicits._
  82.           val flow: FlowShape[T, Ior[T, T]] = builder.add(Flow[T].map(i => if (f(i)) Ior.right(i) else Ior.left(i)))
  83.           val bcastT: IorFanOutShape[Ior[T, T], T, T] = builder.add(IorFanOutShape[T, T]())
  84.  
  85.           // format: off
  86.                             bcastT.left ~> sink1
  87.           source ~> flow ~> bcastT.in
  88.                             bcastT.right ~> sink
  89.           // format: on
  90.           ClosedShape
  91.       }
  92.     }
  93.   }
  94. }
Add Comment
Please, Sign In to add comment