Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import akka.stream._
- import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
- import scala.collection.immutable
- final case class GroupedWithList[T](n: Int) extends GraphStage[FlowShape[T, immutable.Seq[T]]] {
- require(n > 0, "n must be greater than 0")
- val in = Inlet[T]("Grouped.in")
- val out = Outlet[immutable.Seq[T]]("Grouped.out")
- override val shape: FlowShape[T, immutable.Seq[T]] = FlowShape(in, out)
- override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
- private val buf = {
- val b = List.newBuilder[T]
- b.sizeHint(n)
- b
- }
- var left = n
- override def onPush(): Unit = {
- buf += grab(in)
- left -= 1
- if (left == 0) {
- val elements = buf.result()
- buf.clear()
- left = n
- push(out, elements)
- } else {
- pull(in)
- }
- }
- override def onPull(): Unit = {
- pull(in)
- }
- override def onUpstreamFinish(): Unit = {
- // This means the buf is filled with some elements but not enough (left < n) to group together.
- // Since the upstream has finished we have to push them to downstream though.
- if (left < n) {
- val elements = buf.result()
- buf.clear()
- left = n
- push(out, elements)
- }
- completeStage()
- }
- setHandlers(in, out, this)
- }
- }
Add Comment
Please, Sign In to add comment