Guest User

Untitled

a guest
Sep 14th, 2018
87
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.38 KB | None | 0 0
  1. import akka.stream._
  2. import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
  3.  
  4. import scala.collection.immutable
  5.  
  6. final case class GroupedWithList[T](n: Int) extends GraphStage[FlowShape[T, immutable.Seq[T]]] {
  7. require(n > 0, "n must be greater than 0")
  8.  
  9. val in = Inlet[T]("Grouped.in")
  10. val out = Outlet[immutable.Seq[T]]("Grouped.out")
  11. override val shape: FlowShape[T, immutable.Seq[T]] = FlowShape(in, out)
  12.  
  13. override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
  14. private val buf = {
  15. val b = List.newBuilder[T]
  16. b.sizeHint(n)
  17. b
  18. }
  19. var left = n
  20.  
  21. override def onPush(): Unit = {
  22. buf += grab(in)
  23. left -= 1
  24. if (left == 0) {
  25. val elements = buf.result()
  26. buf.clear()
  27. left = n
  28. push(out, elements)
  29. } else {
  30. pull(in)
  31. }
  32. }
  33.  
  34. override def onPull(): Unit = {
  35. pull(in)
  36. }
  37.  
  38. override def onUpstreamFinish(): Unit = {
  39. // This means the buf is filled with some elements but not enough (left < n) to group together.
  40. // Since the upstream has finished we have to push them to downstream though.
  41. if (left < n) {
  42. val elements = buf.result()
  43. buf.clear()
  44. left = n
  45. push(out, elements)
  46. }
  47. completeStage()
  48. }
  49.  
  50. setHandlers(in, out, this)
  51. }
  52. }
Add Comment
Please, Sign In to add comment