Advertisement
Guest User

Untitled

a guest
Jan 12th, 2017
81
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.71 KB | None | 0 0
  1. import akka.stream._
  2. import akka.stream.stage._
  3.  
  4. /**
  5. * Fan-out the stream of `scala.util.Either[L, R]` elements to `L` and `R` streams.
  6. *
  7. * '''Emits when''' emits when an element is available from the input and the chosen output has demand
  8. *
  9. * '''Backpressures when''' the currently chosen output back-pressures
  10. *
  11. * '''Completes when''' upstream completes and no output is pending
  12. *
  13. * '''Cancels when''' when both downstreams cancel
  14. */
  15. class Splitter[L, R] extends GraphStage[FanOutShape2[Either[L, R], L, R]] {
  16.  
  17. val in: Inlet[Either[L, R]] = Inlet("in")
  18.  
  19. val left: Outlet[L] = Outlet("left")
  20.  
  21. val right: Outlet[R] = Outlet("right")
  22.  
  23. override val shape = new FanOutShape2(in, left, right)
  24.  
  25. def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
  26.  
  27. var pendingElem: Any = null
  28.  
  29. var pendingOutlet: Outlet[_] = null
  30.  
  31. var downstreamRunning = 2
  32.  
  33. class SplitterInHandler extends InHandler {
  34.  
  35. def maybePull(): Unit = {
  36. if (isAvailable(left) || isAvailable(right)) {
  37. pull(in)
  38. }
  39. }
  40.  
  41. def handlePush[T](elem: T, outlet: Outlet[T]): Unit = {
  42. if (!isClosed(outlet)) {
  43. if (isAvailable(outlet)) {
  44. push(outlet, elem)
  45. maybePull()
  46. } else {
  47. pendingElem = elem
  48. pendingOutlet = outlet
  49. }
  50. } else {
  51. maybePull()
  52. }
  53. }
  54.  
  55. override def onPush: Unit = {
  56. val elem = grab(in)
  57. elem match {
  58. case Left(lElem) => handlePush(lElem, left)
  59. case Right(rElem) => handlePush(rElem, right)
  60. }
  61. }
  62.  
  63. override def onUpstreamFinish(): Unit = {
  64. if (pendingElem == null) {
  65. completeStage()
  66. }
  67. }
  68. }
  69.  
  70. class SplitterOutHandler[T](outlet: Outlet[T]) extends OutHandler {
  71.  
  72. override def onPull: Unit = {
  73. if (pendingElem == null && pendingOutlet == outlet) {
  74. push(outlet, pendingElem.asInstanceOf[T])
  75. pendingElem = null
  76. if (!isClosed(in)) {
  77. if (!hasBeenPulled(in)) {
  78. pull(in)
  79. }
  80. } else {
  81. completeStage()
  82. }
  83. } else {
  84. if (!hasBeenPulled(in)) {
  85. pull(in)
  86. }
  87. }
  88. }
  89.  
  90. override def onDownstreamFinish(): Unit = {
  91. downstreamRunning -= 1
  92. if (downstreamRunning == 0) {
  93. completeStage()
  94. } else if (pendingElem != null && pendingOutlet == outlet) {
  95. pendingElem = null
  96. if (!hasBeenPulled(in)) {
  97. pull(in)
  98. }
  99. }
  100. }
  101. }
  102.  
  103. setHandler(in, new SplitterInHandler)
  104. setHandler(left, new SplitterOutHandler(left))
  105. setHandler(right, new SplitterOutHandler(right))
  106. }
  107. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement