Advertisement
Guest User

Untitled

a guest
Jan 28th, 2020
144
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.78 KB | None | 0 0
  1. import java.util.concurrent.atomic.AtomicInteger
  2. import java.util.concurrent.atomic.AtomicReference
  3.  
  4. import kotlin.coroutines.Continuation
  5. import kotlin.coroutines.resume
  6. import kotlin.coroutines.suspendCoroutine
  7.  
  8.  
  9.  
  10. class FAAQueue<T> {
  11.  
  12. private val head: AtomicReference<Node<T>>
  13. private val tail: AtomicReference<Node<T>>
  14.  
  15. private val BROKEN: Any = Any()
  16.  
  17. init {
  18. val firstNode = Node<T>()
  19. head = AtomicReference(firstNode)
  20. tail = AtomicReference(firstNode)
  21. }
  22.  
  23. fun enqueue(x: T) {
  24. val newTail = Node<T>(x)
  25. while (true) {
  26. val tail: Node<T> = tail.get()
  27. val enqIdx: Int = tail.enqIdx.getAndAdd(1)
  28.  
  29. if (enqIdx >= Node.NODE_SIZE) {
  30. if (tail.next.compareAndSet(null, newTail)) {
  31. this.tail.compareAndSet(tail, newTail)
  32. return
  33. } else {
  34. this.tail.compareAndSet(tail, tail.next.get())
  35. }
  36. } else {
  37. if (tail.data[enqIdx].compareAndSet(null, x)) {
  38. return
  39. }
  40. }
  41. }
  42. }
  43.  
  44. fun dequeue(): T? {
  45. while (true) {
  46. val head: Node<T> = head.get()
  47. if (head.isEmpty()) {
  48. val headNext: Node<T> = head.next.get() ?: return null
  49. this.head.compareAndSet(head, headNext)
  50. } else {
  51. val deq: Int = head.deqIdx.getAndAdd(1)
  52. if (deq >= Node.NODE_SIZE) {
  53. continue
  54. }
  55. val res: Any = head.data[deq].getAndSet(BROKEN) ?: continue
  56. return res as T
  57. }
  58. }
  59. }
  60.  
  61. internal class Node<T>() {
  62. val next: AtomicReference<Node<T>?> = AtomicReference(null)
  63. var enqIdx = AtomicInteger(0)
  64. val deqIdx = AtomicInteger(0)
  65. val data = Array(NODE_SIZE) {
  66. AtomicReference<Any?>(null)
  67. }
  68.  
  69. constructor(x: Any?) : this() {
  70. this.enqIdx = AtomicInteger(1);
  71. data[0] = AtomicReference(x)
  72. }
  73.  
  74. fun isEmpty(): Boolean {
  75. val deqIdx: Int = deqIdx.get()
  76. val enqIdx: Int = enqIdx.get()
  77. return deqIdx >= enqIdx || deqIdx >= NODE_SIZE
  78. }
  79.  
  80. companion object {
  81. const val NODE_SIZE = 10
  82. }
  83. }
  84. }
  85.  
  86. class BlockingStackImpl<E> : BlockingStack<E> {
  87.  
  88. // ==========================
  89. // Segment Queue Synchronizer
  90. // ==========================
  91.  
  92. private val myQueue = FAAQueue<Continuation<E>>()
  93.  
  94. private suspend fun suspend(): E {
  95. return suspendCoroutine {
  96. myQueue.enqueue(it)
  97. }
  98. }
  99.  
  100. private fun resume(element: E) {
  101. while (true) {
  102. val res = myQueue.dequeue()
  103. if (res != null) {
  104. res.resume(element)
  105. break
  106. }
  107. }
  108. }
  109.  
  110. // ==============
  111. // Blocking Stack
  112. // ==============
  113.  
  114.  
  115. private val head = AtomicReference<Node<E>?>()
  116. private val elements = AtomicInteger()
  117.  
  118. override fun push(element: E) {
  119. val elements = this.elements.getAndIncrement()
  120. if (elements >= 0) {
  121. while (true) {
  122. val currentHead = head.get()
  123. if (currentHead != null && currentHead.element == SUSPENDED) {
  124. val headNext = currentHead.next
  125. if (head.compareAndSet(currentHead, headNext)) {
  126. return resume(element)
  127. }
  128. } else {
  129. val newHead = Node(element, currentHead)
  130. if (head.compareAndSet(currentHead, newHead)) {
  131. return
  132. }
  133. }
  134. }
  135. } else {
  136. resume(element)
  137. }
  138. }
  139.  
  140. override suspend fun pop(): E {
  141. val elements = this.elements.getAndDecrement()
  142. if (elements > 0) {
  143. while (true) {
  144. val currentHead = head.get()
  145. if (currentHead == null || currentHead.element == SUSPENDED) {
  146. val newHead = Node(SUSPENDED, currentHead)
  147. if (head.compareAndSet(currentHead, newHead)) {
  148. return suspend()
  149. }
  150. } else {
  151. val headNext = currentHead.next
  152. if (head.compareAndSet(currentHead, headNext)) {
  153. return currentHead.element as E
  154. }
  155. }
  156. }
  157. } else {
  158. return suspend()
  159. }
  160. }
  161. }
  162.  
  163. private class Node<E>(val element: Any?, val next: Node<E>?)
  164.  
  165. private val SUSPENDED = Any()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement