daily pastebin goal
48%
SHARE
TWEET

Untitled

a guest Sep 14th, 2018 50 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. import akka.stream._
  2. import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
  3.  
  4. import scala.collection.immutable
  5.  
  6. final case class GroupedWithList[T](n: Int) extends GraphStage[FlowShape[T, immutable.Seq[T]]] {
  7.   require(n > 0, "n must be greater than 0")
  8.  
  9.   val in = Inlet[T]("Grouped.in")
  10.   val out = Outlet[immutable.Seq[T]]("Grouped.out")
  11.   override val shape: FlowShape[T, immutable.Seq[T]] = FlowShape(in, out)
  12.  
  13.   override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
  14.     private val buf = {
  15.       val b = List.newBuilder[T]
  16.       b.sizeHint(n)
  17.       b
  18.     }
  19.     var left = n
  20.  
  21.     override def onPush(): Unit = {
  22.       buf += grab(in)
  23.       left -= 1
  24.       if (left == 0) {
  25.         val elements = buf.result()
  26.         buf.clear()
  27.         left = n
  28.         push(out, elements)
  29.       } else {
  30.         pull(in)
  31.       }
  32.     }
  33.  
  34.     override def onPull(): Unit = {
  35.       pull(in)
  36.     }
  37.  
  38.     override def onUpstreamFinish(): Unit = {
  39.       // This means the buf is filled with some elements but not enough (left < n) to group together.
  40.       // Since the upstream has finished we have to push them to downstream though.
  41.       if (left < n) {
  42.         val elements = buf.result()
  43.         buf.clear()
  44.         left = n
  45.         push(out, elements)
  46.       }
  47.       completeStage()
  48.     }
  49.  
  50.     setHandlers(in, out, this)
  51.   }
  52. }
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top