Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import java.util.concurrent.atomic.AtomicInteger
- import java.util.concurrent.atomic.AtomicReference
- import kotlin.coroutines.Continuation
- import kotlin.coroutines.resume
- import kotlin.coroutines.suspendCoroutine
- class FAAQueue<T> {
- private val head: AtomicReference<Node<T>>
- private val tail: AtomicReference<Node<T>>
- private val BROKEN: Any = Any()
- init {
- val firstNode = Node<T>()
- head = AtomicReference(firstNode)
- tail = AtomicReference(firstNode)
- }
- fun enqueue(x: T) {
- val newTail = Node<T>(x)
- while (true) {
- val tail: Node<T> = tail.get()
- val enqIdx: Int = tail.enqIdx.getAndAdd(1)
- if (enqIdx >= Node.NODE_SIZE) {
- if (tail.next.compareAndSet(null, newTail)) {
- this.tail.compareAndSet(tail, newTail)
- return
- } else {
- this.tail.compareAndSet(tail, tail.next.get())
- }
- } else {
- if (tail.data[enqIdx].compareAndSet(null, x)) {
- return
- }
- }
- }
- }
- fun dequeue(): T? {
- while (true) {
- val head: Node<T> = head.get()
- if (head.isEmpty()) {
- val headNext: Node<T> = head.next.get() ?: return null
- this.head.compareAndSet(head, headNext)
- } else {
- val deq: Int = head.deqIdx.getAndAdd(1)
- if (deq >= Node.NODE_SIZE) {
- continue
- }
- val res: Any = head.data[deq].getAndSet(BROKEN) ?: continue
- return res as T
- }
- }
- }
- internal class Node<T>() {
- val next: AtomicReference<Node<T>?> = AtomicReference(null)
- var enqIdx = AtomicInteger(0)
- val deqIdx = AtomicInteger(0)
- val data = Array(NODE_SIZE) {
- AtomicReference<Any?>(null)
- }
- constructor(x: Any?) : this() {
- this.enqIdx = AtomicInteger(1);
- data[0] = AtomicReference(x)
- }
- fun isEmpty(): Boolean {
- val deqIdx: Int = deqIdx.get()
- val enqIdx: Int = enqIdx.get()
- return deqIdx >= enqIdx || deqIdx >= NODE_SIZE
- }
- companion object {
- const val NODE_SIZE = 10
- }
- }
- }
- class BlockingStackImpl<E> : BlockingStack<E> {
- // ==========================
- // Segment Queue Synchronizer
- // ==========================
- private val myQueue = FAAQueue<Continuation<E>>()
- private suspend fun suspend(): E {
- return suspendCoroutine {
- myQueue.enqueue(it)
- }
- }
- private fun resume(element: E) {
- while (true) {
- val res = myQueue.dequeue()
- if (res != null) {
- res.resume(element)
- break
- }
- }
- }
- // ==============
- // Blocking Stack
- // ==============
- private val head = AtomicReference<Node<E>?>()
- private val elements = AtomicInteger()
- override fun push(element: E) {
- val elements = this.elements.getAndIncrement()
- if (elements >= 0) {
- while (true) {
- val currentHead = head.get()
- if (currentHead != null && currentHead.element == SUSPENDED) {
- val headNext = currentHead.next
- if (head.compareAndSet(currentHead, headNext)) {
- return resume(element)
- }
- } else {
- val newHead = Node(element, currentHead)
- if (head.compareAndSet(currentHead, newHead)) {
- return
- }
- }
- }
- } else {
- resume(element)
- }
- }
- override suspend fun pop(): E {
- val elements = this.elements.getAndDecrement()
- if (elements > 0) {
- while (true) {
- val currentHead = head.get()
- if (currentHead == null || currentHead.element == SUSPENDED) {
- val newHead = Node(SUSPENDED, currentHead)
- if (head.compareAndSet(currentHead, newHead)) {
- return suspend()
- }
- } else {
- val headNext = currentHead.next
- if (head.compareAndSet(currentHead, headNext)) {
- return currentHead.element as E
- }
- }
- }
- } else {
- return suspend()
- }
- }
- }
- private class Node<E>(val element: Any?, val next: Node<E>?)
- private val SUSPENDED = Any()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement