Advertisement
Guest User

Untitled

a guest
Jan 28th, 2020
139
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Kotlin 4.01 KB | None | 0 0
  1. import java.util.concurrent.atomic.AtomicReference
  2. import kotlin.coroutines.Continuation
  3. import kotlin.coroutines.resume
  4. import kotlin.coroutines.suspendCoroutine
  5.  
  6. @Suppress("UNCHECKED_CAST")
  7. class SynchronousQueueMS<E> : SynchronousQueue<E> {
  8.     private val dummy = Node<E>()
  9.     private val head: AtomicReference<Node<E>> = AtomicReference(dummy)
  10.     private val tail: AtomicReference<Node<E>> = AtomicReference(dummy)
  11.  
  12.     //_________________________________________________________________________
  13.     private open class Node<E> {
  14.         var next = AtomicReference<Node<E>>(null)
  15.     }
  16.  
  17.     private class Getter<E>(
  18.         var cont: Continuation<E>? = null
  19.     ) : Node<E>()
  20.  
  21.     private class Sender<E>(
  22.         elem: E?,
  23.         var cont: Continuation<Unit>? = null
  24.     ) : Node<E>() {
  25.         var element = AtomicReference(elem)
  26.     }
  27.  
  28.     //_________________________________________________________________________
  29.     override suspend fun send(element: E) {
  30.         val newTail: Node<E> = Sender(element)
  31.         while (true) {
  32.             val curTail = tail.get()
  33.             if (curTail == head.get() || curTail is Sender<*>) {
  34.                 val retSuspend = suspendCoroutine<Any> sc@{ continuation ->
  35.                     (newTail as Sender).cont = continuation
  36.                     val oldTail = tail.get()
  37.                     if ((oldTail == head.get() || oldTail is Sender<*>) &&
  38.                         oldTail.next.compareAndSet(null, newTail)
  39.                     ) {
  40.                         tail.compareAndSet(oldTail, newTail)
  41.                     } else {
  42.                         continuation.resume(RELOAD)
  43.                         return@sc
  44.                     }
  45.                 }
  46.                 if (retSuspend == RELOAD) continue
  47.                 return
  48.             } else {
  49.                 val oldHead = head.get()
  50.                 if (oldHead == tail.get() || oldHead.next.get() == null) continue
  51.                 val newHead = oldHead.next.get()
  52.                 if (newHead is Getter<*> && head.compareAndSet(oldHead, newHead)) {
  53.                     (newHead.cont as Continuation<E>).resume(element)
  54.                     return
  55.                 }
  56.             }
  57.         }
  58.     }
  59.  
  60.     //__________________________________________________________________________________________________________________
  61.     override suspend fun receive(): E {
  62.         val newTail: Node<E> = Getter()
  63.         while (true) {
  64.             val curTail = tail.get()
  65.             if (curTail == head.get() || curTail is Getter<*>) {
  66.                 val retSuspend = suspendCoroutine<E?> sc@{ continuation ->
  67.                     (newTail as Getter).cont = continuation
  68.                     val oldTail = tail.get()
  69.                     if (((oldTail == head.get()) || oldTail is Getter<*>) && oldTail.next.compareAndSet(
  70.                             null,
  71.                             newTail
  72.                         )
  73.                     ) {
  74.                         tail.compareAndSet(oldTail, newTail)
  75.                     } else {
  76.                         continuation.resume(null)
  77.                         return@sc
  78.                     }
  79.                 }
  80.                 if (retSuspend == null) {
  81.                     continue
  82.                 } else {
  83.                     return retSuspend
  84.                 }
  85.             } else {
  86.                 val oldHead = head.get()
  87.                 if (oldHead == tail.get() || oldHead.next.get() == null) {
  88.                     continue
  89.                 }
  90.                 val newHead = oldHead.next.get()
  91.                 if (oldHead != tail.get() && newHead is Sender<*> && head.compareAndSet(oldHead, newHead)) {
  92.                     newHead.cont!!.resume(Unit)
  93.                     return (newHead.element.get() as E)
  94.                 } else {
  95.                     continue
  96.                 }
  97.             }
  98.         }
  99.     }
  100.  
  101.     private val RELOAD = "RELOAD"
  102.     //__________________________________________________________________________________________________________________
  103. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement