Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import java.util.concurrent.LinkedBlockingQueue
- import org.scalatest.{FlatSpec, Matchers}
- import scala.annotation.tailrec
- import scala.concurrent.ExecutionContext
- import scala.util.control.NonFatal
- object BufferingIterator {
- private val END = new Object //marker for end of buffer
- def bufferingIterator[T](iterator : Iterator[T], capacity : Int = 1000, preload : Int = 1)(implicit ec : ExecutionContext):Iterator[T] =
- new Iterator[T] {
- if (iterator==null) throw new NullPointerException("null iterator")
- if (capacity<1) throw new IllegalArgumentException("capacity must be greater than zero")
- if (preload<0) throw new IllegalArgumentException("negative preload")
- if (preload>capacity) throw new IllegalArgumentException("preload greater than capacity")
- //Note: finished buffer will end with END (if successful) or Throwable (if error)
- private val buffer = new ArrayBlockingQueue[Any](capacity) //thread-safe, this keeps both items and errors (to preserve continuity)
- try {
- //fail-fast on empty iterator (and also a case when iterator throws exception)
- var counter : Int = preload
- while ( counter > 0 && iterator.hasNext) {
- counter = counter - 1
- buffer.put(iterator.next)
- }
- if (!iterator.hasNext) {
- buffer.put(END)
- } else {
- ec.execute(() => {
- //start reading into buffer a in background thread
- try {
- while (iterator.hasNext) {
- buffer.put(iterator.next) //this will block when queue capacity is reached
- }
- buffer.put(END)
- } catch {
- case NonFatal(error) =>
- buffer.put(error) //exit now, error will be rethrown in the 'next' call
- }
- })
- }
- } catch {
- case NonFatal(error) =>
- buffer.put(error) //exit now, error will be rethrown in the 'next' call
- }
- @tailrec
- override def hasNext : Boolean = {
- //this could be done as Option match, but it's faster with if
- val nextItem = buffer.peek
- if (nextItem==null) {
- Thread.sleep(0) //release control of this thread (faster than Locks)
- hasNext //keep reading till there is something in buffer (peek returns null)
- } else {
- (nextItem != END)
- }
- }
- override def next : T = {
- //this will be block if buffer is empty
- buffer.take match {
- case END =>
- buffer.put(END) //add end marker
- throw new NoSuchElementException("no more items")
- case error: Throwable =>
- buffer.put(END) //throw exception once and add end marker
- throw error
- case item => item.asInstanceOf[T] //return item
- }
- }
- }
- }
- class BufferingIteratorTest extends FlatSpec with Matchers {
- import scala.concurrent.ExecutionContext.Implicits.global
- case class TestException() extends Exception
- "A BufferingIterator" should "work correctly" in {
- val iterator = BufferingIterator.bufferingIterator(
- Stream.from(1).takeWhile(_ <= 3).iterator
- )
- iterator.hasNext should be(true)
- iterator.next should be(1)
- iterator.hasNext should be(true)
- iterator.next should be(2)
- iterator.hasNext should be(true)
- iterator.hasNext should be(true)
- iterator.next should be(3)
- iterator.hasNext should be(false)
- iterator.hasNext should be(false)
- a[NoSuchElementException] should be thrownBy {
- iterator.next
- }
- iterator.hasNext should be(false)
- }
- it should "work correctly with long iterator" in {
- val iterator = BufferingIterator.bufferingIterator(
- Stream.from(1).takeWhile(_ <= 10000).iterator,
- 1000
- )
- var counter = 0
- while (iterator.hasNext) {
- counter = counter + 1
- iterator.next should be (counter)
- }
- iterator.hasNext should be(false)
- }
- it should "work correctly with delayed iterator" in {
- val iterator = BufferingIterator.bufferingIterator(
- new Iterator[Int] {
- var counter = 0
- override def hasNext(): Boolean = counter < 4
- override def next(): Int = {
- counter = counter + 1
- Thread.sleep(100)
- return counter
- }
- }
- )
- //next is delayed
- while (iterator.hasNext) {
- iterator.next
- }
- iterator.hasNext should be(false)
- }
- it should "work correctly with an empty iterator" in {
- val iterator = BufferingIterator.bufferingIterator(
- Stream().iterator
- )
- iterator.hasNext should be(false)
- a[NoSuchElementException] should be thrownBy {
- iterator.next
- }
- iterator.hasNext should be(false)
- a[NoSuchElementException] should be thrownBy {
- iterator.next
- }
- }
- it should "work correctly with an single item" in {
- val iterator = BufferingIterator.bufferingIterator(
- Stream(1).iterator
- )
- iterator.hasNext should be(true)
- iterator.next should be(1)
- iterator.hasNext should be(false)
- a[NoSuchElementException] should be thrownBy {
- iterator.next
- }
- }
- it should "throw error if iterator throws it" in {
- val iterator = BufferingIterator.bufferingIterator(
- new Iterator[Int] {
- override def hasNext(): Boolean = throw new TestException
- override def next(): Int = 0
- }
- )
- iterator.hasNext should be(true)
- a[TestException] should be thrownBy {
- iterator.next
- }
- }
- it should "throw error in the right order (next exception)" in {
- var counter: Int = 0
- val iterator = BufferingIterator.bufferingIterator(
- new Iterator[Int] {
- override def hasNext(): Boolean = true
- override def next(): Int = {
- counter = counter + 1
- if (counter == 3) {
- throw new TestException
- } else {
- return counter
- }
- }
- }, 10, 10)
- //give it time to preload
- while (counter < 3) Thread.sleep(100)
- iterator.hasNext should be(true)
- iterator.next should be(1)
- iterator.hasNext should be(true)
- iterator.next should be(2)
- a[TestException] should be thrownBy {
- iterator.next
- }
- iterator.hasNext should be(false)
- iterator.hasNext should be(false)
- a[NoSuchElementException] should be thrownBy {
- iterator.next
- }
- a[NoSuchElementException] should be thrownBy {
- iterator.next
- }
- }
- it should "throw error in the right order (hasNext exception)" in {
- var counter: Int = 0
- val iterator = BufferingIterator.bufferingIterator(
- new Iterator[Int] {
- override def hasNext(): Boolean = {
- if (counter == 3) throw new TestException
- true
- }
- override def next(): Int = {
- counter = counter + 1
- counter
- }
- }
- )
- //give it time to preload
- while (counter < 3) Thread.sleep(100)
- iterator.hasNext should be(true)
- iterator.next should be(1)
- iterator.hasNext should be(true)
- iterator.next should be(2)
- iterator.hasNext should be(true)
- iterator.next should be(3)
- a[TestException] should be thrownBy {
- iterator.next
- }
- iterator.hasNext should be(false)
- iterator.hasNext should be(false)
- a[NoSuchElementException] should be thrownBy {
- iterator.next
- }
- a[NoSuchElementException] should be thrownBy {
- iterator.next
- }
- }
- }
Add Comment
Please, Sign In to add comment