Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.theseventhsense.utils.akka
- import akka.stream.stage._
- import com.codahale.metrics.Histogram
- object ObservableBuffer {
- object FixedSizeBuffer {
- /**
- * Verbatim copy from akka.streams.impl
- */
- def apply[T](size: Int): FixedSizeBuffer[T] =
- if (size < 1) throw new IllegalArgumentException("size must be positive")
- else if (((size - 1) & size) == 0) new PowerOfTwoFixedSizeBuffer(size)
- else new ModuloFixedSizeBuffer(size)
- sealed abstract class FixedSizeBuffer[T](val size: Int) {
- override def toString = s"Buffer($size, $readIdx, $writeIdx)(${(readIdx until writeIdx).map(get).mkString(", ")})"
- private val buffer = new Array[AnyRef](size)
- protected var readIdx = 0
- protected var writeIdx = 0
- def used: Int = writeIdx - readIdx
- def isFull: Boolean = used == size
- def isEmpty: Boolean = used == 0
- def nonEmpty: Boolean = used != 0
- def enqueue(elem: T): Unit = {
- put(writeIdx, elem)
- writeIdx += 1
- }
- protected def toOffset(idx: Int): Int
- def put(idx: Int, elem: T): Unit = buffer(toOffset(idx)) = {
- val result = elem.asInstanceOf[AnyRef]
- result
- }
- def get(idx: Int): T = buffer(toOffset(idx)).asInstanceOf[T]
- def peek(): T = get(readIdx)
- def dequeue(): T = {
- val result = get(readIdx)
- dropHead()
- result
- }
- def clear(): Unit = {
- java.util.Arrays.fill(buffer, null)
- readIdx = 0
- writeIdx = 0
- }
- def dropHead(): Unit = {
- put(readIdx, null.asInstanceOf[T])
- readIdx += 1
- }
- def dropTail(): Unit = {
- writeIdx -= 1
- put(writeIdx, null.asInstanceOf[T])
- }
- }
- private final class ModuloFixedSizeBuffer[T](_size: Int)
- extends FixedSizeBuffer[T](_size) {
- override protected def toOffset(idx: Int): Int = idx % size
- }
- private final class PowerOfTwoFixedSizeBuffer[T](_size: Int)
- extends FixedSizeBuffer[T](_size) {
- private val Mask = size - 1
- override protected def toOffset(idx: Int): Int = idx & Mask
- }
- }
- }
- /**
- * An observable, backpressuring buffer. Stores the number of used elements in a
- * dropwizard metrics Histogram. Based directly off the default akka buffer
- * implementation.
- *
- * Use it like this:
- *
- * val histogram = metricRegistry.histogram("my-buffer")
- * val stream = (() => Source.fromPublisher(...))
- * .trasform(() => ObservableBuffer(BufferSize, histogram))
- * .runFold(...)
- *
- * @param size
- * @param histogram
- * @tparam T
- */
- case class ObservableBuffer[T](size: Int, histogram: Histogram) extends DetachedStage[T, T] {
- import ObservableBuffer._
- private val buffer = FixedSizeBuffer[T](size)
- override def onPush(elem: T, ctx: DetachedContext[T]): UpstreamDirective = {
- histogram.update(buffer.used)
- if (ctx.isHoldingDownstream) ctx.pushAndPull(elem)
- else enqueueAction(ctx, elem)
- }
- override def onPull(ctx: DetachedContext[T]): DownstreamDirective = {
- histogram.update(buffer.used)
- if (ctx.isFinishing) {
- val elem = buffer.dequeue()
- if (buffer.isEmpty) ctx.pushAndFinish(elem)
- else ctx.push(elem)
- } else if (ctx.isHoldingUpstream) ctx.pushAndPull(buffer.dequeue())
- else if (buffer.isEmpty) ctx.holdDownstream()
- else ctx.push(buffer.dequeue())
- }
- override def onUpstreamFinish(ctx: DetachedContext[T]): TerminationDirective = {
- histogram.update(buffer.used)
- if (buffer.isEmpty) ctx.finish()
- else ctx.absorbTermination()
- }
- private val enqueueAction: (DetachedContext[T], T) ⇒ UpstreamDirective = { (ctx, elem) ⇒
- buffer.enqueue(elem)
- if (buffer.isFull) ctx.holdUpstream()
- else ctx.pull()
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement