Guest User

Untitled

a guest
Mar 22nd, 2018
92
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 7.33 KB | None | 0 0
  1. import java.util.concurrent.LinkedBlockingQueue
  2.  
  3. import org.scalatest.{FlatSpec, Matchers}
  4.  
  5. import scala.annotation.tailrec
  6. import scala.concurrent.ExecutionContext
  7. import scala.util.control.NonFatal
  8.  
  9. object BufferingIterator {
  10.  
  11. private val END = new Object //marker for end of buffer
  12.  
  13. def bufferingIterator[T](iterator : Iterator[T], capacity : Int = 1000, preload : Int = 1)(implicit ec : ExecutionContext):Iterator[T] =
  14. new Iterator[T] {
  15. if (iterator==null) throw new NullPointerException("null iterator")
  16. if (capacity<1) throw new IllegalArgumentException("capacity must be greater than zero")
  17. if (preload<0) throw new IllegalArgumentException("negative preload")
  18. if (preload>capacity) throw new IllegalArgumentException("preload greater than capacity")
  19.  
  20. //Note: finished buffer will end with END (if successful) or Throwable (if error)
  21. private val buffer = new ArrayBlockingQueue[Any](capacity) //thread-safe, this keeps both items and errors (to preserve continuity)
  22.  
  23. try {
  24. //fail-fast on empty iterator (and also a case when iterator throws exception)
  25. var counter : Int = preload
  26. while ( counter > 0 && iterator.hasNext) {
  27. counter = counter - 1
  28. buffer.put(iterator.next)
  29. }
  30. if (!iterator.hasNext) {
  31. buffer.put(END)
  32. } else {
  33. ec.execute(() => {
  34. //start reading into buffer a in background thread
  35. try {
  36. while (iterator.hasNext) {
  37. buffer.put(iterator.next) //this will block when queue capacity is reached
  38. }
  39. buffer.put(END)
  40. } catch {
  41. case NonFatal(error) =>
  42. buffer.put(error) //exit now, error will be rethrown in the 'next' call
  43. }
  44. })
  45. }
  46. } catch {
  47. case NonFatal(error) =>
  48. buffer.put(error) //exit now, error will be rethrown in the 'next' call
  49. }
  50.  
  51. @tailrec
  52. override def hasNext : Boolean = {
  53. //this could be done as Option match, but it's faster with if
  54. val nextItem = buffer.peek
  55. if (nextItem==null) {
  56. Thread.sleep(0) //release control of this thread (faster than Locks)
  57. hasNext //keep reading till there is something in buffer (peek returns null)
  58. } else {
  59. (nextItem != END)
  60. }
  61. }
  62.  
  63. override def next : T = {
  64. //this will be block if buffer is empty
  65. buffer.take match {
  66. case END =>
  67. buffer.put(END) //add end marker
  68. throw new NoSuchElementException("no more items")
  69.  
  70. case error: Throwable =>
  71. buffer.put(END) //throw exception once and add end marker
  72. throw error
  73.  
  74. case item => item.asInstanceOf[T] //return item
  75. }
  76. }
  77. }
  78. }
  79.  
  80. class BufferingIteratorTest extends FlatSpec with Matchers {
  81.  
  82. import scala.concurrent.ExecutionContext.Implicits.global
  83.  
  84. case class TestException() extends Exception
  85.  
  86. "A BufferingIterator" should "work correctly" in {
  87. val iterator = BufferingIterator.bufferingIterator(
  88. Stream.from(1).takeWhile(_ <= 3).iterator
  89. )
  90.  
  91. iterator.hasNext should be(true)
  92. iterator.next should be(1)
  93.  
  94. iterator.hasNext should be(true)
  95. iterator.next should be(2)
  96.  
  97. iterator.hasNext should be(true)
  98. iterator.hasNext should be(true)
  99.  
  100. iterator.next should be(3)
  101. iterator.hasNext should be(false)
  102. iterator.hasNext should be(false)
  103.  
  104. a[NoSuchElementException] should be thrownBy {
  105. iterator.next
  106. }
  107.  
  108. iterator.hasNext should be(false)
  109. }
  110.  
  111. it should "work correctly with long iterator" in {
  112. val iterator = BufferingIterator.bufferingIterator(
  113. Stream.from(1).takeWhile(_ <= 10000).iterator,
  114. 1000
  115. )
  116.  
  117. var counter = 0
  118. while (iterator.hasNext) {
  119. counter = counter + 1
  120. iterator.next should be (counter)
  121. }
  122.  
  123. iterator.hasNext should be(false)
  124. }
  125.  
  126. it should "work correctly with delayed iterator" in {
  127. val iterator = BufferingIterator.bufferingIterator(
  128. new Iterator[Int] {
  129. var counter = 0
  130. override def hasNext(): Boolean = counter < 4
  131. override def next(): Int = {
  132. counter = counter + 1
  133. Thread.sleep(100)
  134. return counter
  135. }
  136. }
  137. )
  138.  
  139. //next is delayed
  140. while (iterator.hasNext) {
  141. iterator.next
  142. }
  143.  
  144. iterator.hasNext should be(false)
  145. }
  146.  
  147. it should "work correctly with an empty iterator" in {
  148. val iterator = BufferingIterator.bufferingIterator(
  149. Stream().iterator
  150. )
  151.  
  152. iterator.hasNext should be(false)
  153. a[NoSuchElementException] should be thrownBy {
  154. iterator.next
  155. }
  156.  
  157. iterator.hasNext should be(false)
  158. a[NoSuchElementException] should be thrownBy {
  159. iterator.next
  160. }
  161. }
  162.  
  163. it should "work correctly with an single item" in {
  164. val iterator = BufferingIterator.bufferingIterator(
  165. Stream(1).iterator
  166. )
  167.  
  168. iterator.hasNext should be(true)
  169. iterator.next should be(1)
  170.  
  171. iterator.hasNext should be(false)
  172. a[NoSuchElementException] should be thrownBy {
  173. iterator.next
  174. }
  175. }
  176.  
  177. it should "throw error if iterator throws it" in {
  178. val iterator = BufferingIterator.bufferingIterator(
  179. new Iterator[Int] {
  180. override def hasNext(): Boolean = throw new TestException
  181. override def next(): Int = 0
  182. }
  183. )
  184.  
  185. iterator.hasNext should be(true)
  186. a[TestException] should be thrownBy {
  187. iterator.next
  188. }
  189. }
  190.  
  191. it should "throw error in the right order (next exception)" in {
  192. var counter: Int = 0
  193. val iterator = BufferingIterator.bufferingIterator(
  194. new Iterator[Int] {
  195. override def hasNext(): Boolean = true
  196. override def next(): Int = {
  197. counter = counter + 1
  198. if (counter == 3) {
  199. throw new TestException
  200. } else {
  201. return counter
  202. }
  203. }
  204. }, 10, 10)
  205.  
  206. //give it time to preload
  207. while (counter < 3) Thread.sleep(100)
  208.  
  209. iterator.hasNext should be(true)
  210. iterator.next should be(1)
  211.  
  212. iterator.hasNext should be(true)
  213. iterator.next should be(2)
  214.  
  215. a[TestException] should be thrownBy {
  216. iterator.next
  217. }
  218. iterator.hasNext should be(false)
  219. iterator.hasNext should be(false)
  220.  
  221. a[NoSuchElementException] should be thrownBy {
  222. iterator.next
  223. }
  224.  
  225. a[NoSuchElementException] should be thrownBy {
  226. iterator.next
  227. }
  228. }
  229.  
  230. it should "throw error in the right order (hasNext exception)" in {
  231. var counter: Int = 0
  232. val iterator = BufferingIterator.bufferingIterator(
  233. new Iterator[Int] {
  234. override def hasNext(): Boolean = {
  235. if (counter == 3) throw new TestException
  236. true
  237. }
  238.  
  239. override def next(): Int = {
  240. counter = counter + 1
  241. counter
  242. }
  243. }
  244. )
  245. //give it time to preload
  246. while (counter < 3) Thread.sleep(100)
  247.  
  248. iterator.hasNext should be(true)
  249. iterator.next should be(1)
  250.  
  251. iterator.hasNext should be(true)
  252. iterator.next should be(2)
  253.  
  254. iterator.hasNext should be(true)
  255. iterator.next should be(3)
  256.  
  257. a[TestException] should be thrownBy {
  258. iterator.next
  259. }
  260. iterator.hasNext should be(false)
  261. iterator.hasNext should be(false)
  262.  
  263. a[NoSuchElementException] should be thrownBy {
  264. iterator.next
  265. }
  266.  
  267. a[NoSuchElementException] should be thrownBy {
  268. iterator.next
  269. }
  270. }
  271.  
  272. }
Add Comment
Please, Sign In to add comment