Advertisement
Guest User

Untitled

a guest
Feb 12th, 2016
94
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.72 KB | None | 0 0
  1. package com.theseventhsense.utils.akka
  2.  
  3. import akka.stream.stage._
  4. import com.codahale.metrics.Histogram
  5.  
  6. object ObservableBuffer {
  7.  
  8. object FixedSizeBuffer {
  9.  
  10. /**
  11. * Verbatim copy from akka.streams.impl
  12. */
  13. def apply[T](size: Int): FixedSizeBuffer[T] =
  14. if (size < 1) throw new IllegalArgumentException("size must be positive")
  15. else if (((size - 1) & size) == 0) new PowerOfTwoFixedSizeBuffer(size)
  16. else new ModuloFixedSizeBuffer(size)
  17.  
  18. sealed abstract class FixedSizeBuffer[T](val size: Int) {
  19. override def toString = s"Buffer($size, $readIdx, $writeIdx)(${(readIdx until writeIdx).map(get).mkString(", ")})"
  20.  
  21. private val buffer = new Array[AnyRef](size)
  22.  
  23. protected var readIdx = 0
  24. protected var writeIdx = 0
  25.  
  26. def used: Int = writeIdx - readIdx
  27.  
  28. def isFull: Boolean = used == size
  29.  
  30. def isEmpty: Boolean = used == 0
  31.  
  32. def nonEmpty: Boolean = used != 0
  33.  
  34. def enqueue(elem: T): Unit = {
  35. put(writeIdx, elem)
  36. writeIdx += 1
  37. }
  38.  
  39. protected def toOffset(idx: Int): Int
  40.  
  41. def put(idx: Int, elem: T): Unit = buffer(toOffset(idx)) = {
  42. val result = elem.asInstanceOf[AnyRef]
  43. result
  44. }
  45.  
  46. def get(idx: Int): T = buffer(toOffset(idx)).asInstanceOf[T]
  47.  
  48. def peek(): T = get(readIdx)
  49.  
  50. def dequeue(): T = {
  51. val result = get(readIdx)
  52. dropHead()
  53.  
  54. result
  55. }
  56.  
  57. def clear(): Unit = {
  58. java.util.Arrays.fill(buffer, null)
  59. readIdx = 0
  60. writeIdx = 0
  61. }
  62.  
  63. def dropHead(): Unit = {
  64. put(readIdx, null.asInstanceOf[T])
  65. readIdx += 1
  66. }
  67.  
  68. def dropTail(): Unit = {
  69. writeIdx -= 1
  70. put(writeIdx, null.asInstanceOf[T])
  71. }
  72. }
  73.  
  74. private final class ModuloFixedSizeBuffer[T](_size: Int)
  75. extends FixedSizeBuffer[T](_size) {
  76. override protected def toOffset(idx: Int): Int = idx % size
  77. }
  78.  
  79. private final class PowerOfTwoFixedSizeBuffer[T](_size: Int)
  80. extends FixedSizeBuffer[T](_size) {
  81. private val Mask = size - 1
  82.  
  83. override protected def toOffset(idx: Int): Int = idx & Mask
  84. }
  85. }
  86. }
  87.  
  88. /**
  89. * An observable, backpressuring buffer. Stores the number of used elements in a
  90. * dropwizard metrics Histogram. Based directly off the default akka buffer
  91. * implementation.
  92. *
  93. * Use it like this:
  94. *
  95. * val histogram = metricRegistry.histogram("my-buffer")
  96. * val stream = (() => Source.fromPublisher(...))
  97. * .trasform(() => ObservableBuffer(BufferSize, histogram))
  98. * .runFold(...)
  99. *
  100. * @param size
  101. * @param histogram
  102. * @tparam T
  103. */
  104.  
  105. case class ObservableBuffer[T](size: Int, histogram: Histogram) extends DetachedStage[T, T] {
  106.  
  107. import ObservableBuffer._
  108.  
  109. private val buffer = FixedSizeBuffer[T](size)
  110.  
  111. override def onPush(elem: T, ctx: DetachedContext[T]): UpstreamDirective = {
  112. histogram.update(buffer.used)
  113. if (ctx.isHoldingDownstream) ctx.pushAndPull(elem)
  114. else enqueueAction(ctx, elem)
  115. }
  116.  
  117. override def onPull(ctx: DetachedContext[T]): DownstreamDirective = {
  118. histogram.update(buffer.used)
  119. if (ctx.isFinishing) {
  120. val elem = buffer.dequeue()
  121. if (buffer.isEmpty) ctx.pushAndFinish(elem)
  122. else ctx.push(elem)
  123. } else if (ctx.isHoldingUpstream) ctx.pushAndPull(buffer.dequeue())
  124. else if (buffer.isEmpty) ctx.holdDownstream()
  125. else ctx.push(buffer.dequeue())
  126. }
  127.  
  128. override def onUpstreamFinish(ctx: DetachedContext[T]): TerminationDirective = {
  129. histogram.update(buffer.used)
  130. if (buffer.isEmpty) ctx.finish()
  131. else ctx.absorbTermination()
  132. }
  133.  
  134. private val enqueueAction: (DetachedContext[T], T) ⇒ UpstreamDirective = { (ctx, elem) ⇒
  135. buffer.enqueue(elem)
  136. if (buffer.isFull) ctx.holdUpstream()
  137. else ctx.pull()
  138. }
  139. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement