Advertisement
Guest User

Untitled

a guest
Jan 28th, 2020
111
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Kotlin 3.29 KB | None | 0 0
  1. import java.util.concurrent.atomic.AtomicReference
  2. import kotlin.coroutines.Continuation
  3. import kotlin.coroutines.resume
  4. import kotlin.coroutines.suspendCoroutine
  5.  
  6. class SynchronousQueueMS<E> : SynchronousQueue<E> {
  7.  
  8.     private enum class NodeType { SENDER, GETTER }
  9.  
  10.     private class Node<T>(data: T?, val type: NodeType) {
  11.         var cont: Continuation<Boolean>? = null
  12.         var data = AtomicReference(data)
  13.         var next: AtomicReference<Node<T>?> = AtomicReference(null)
  14.     }
  15.  
  16.     private val head: AtomicReference<Node<E>>
  17.  
  18.     private val tail: AtomicReference<Node<E>>
  19.  
  20.     init {
  21.         val dummy: Node<E> = Node(null, NodeType.SENDER)
  22.         head = AtomicReference(dummy)
  23.         tail = AtomicReference(dummy)
  24.     }
  25.  
  26.     override suspend fun send(element: E) {
  27.         val node = Node(element, NodeType.SENDER)
  28.         while (true) {
  29.             val curTail = tail.get()
  30.             val curHead = head.get()
  31.             if (curHead == curTail || curTail.type == NodeType.SENDER) {
  32.                 val res = suspendCoroutine<Boolean> sc@{ cont ->
  33.                     node.cont = cont
  34.                     if (curTail.next.compareAndSet(null, node)) {
  35.                         tail.compareAndSet(curTail, node)
  36.                     } else {
  37.                         cont.resume(false)
  38.                         return@sc
  39.                     }
  40.                 }
  41.                 if (res) {
  42.                     return
  43.                 }
  44.             } else {
  45.                 val next = curHead.next.get()
  46.                 if (curTail != tail.get() || curHead != head.get() || curHead == tail.get() || next == null) {
  47.                     continue
  48.                 }
  49.                 if (next.cont !== null && next.type == NodeType.GETTER && head.compareAndSet(curHead, next)) {
  50.                     next.data.compareAndSet(null, element)
  51.                     next.cont!!.resume(true)
  52.                     return
  53.                 }
  54.             }
  55.         }
  56.     }
  57.  
  58.     override suspend fun receive(): E {
  59.         val node: Node<E> = Node(null, NodeType.GETTER)
  60.  
  61.         while (true) {
  62.             val curTail = tail.get()
  63.             val curHead = head.get()
  64.             if (curHead == curTail || curTail.type == NodeType.GETTER) {
  65.                 val res = suspendCoroutine<Boolean> sc@{ cont ->
  66.                     node.cont = cont
  67.                     if (curTail.next.compareAndSet(null, node)) {
  68.                         tail.compareAndSet(curTail, node)
  69.  
  70.                     } else {
  71.                         cont.resume(false)
  72.                         return@sc
  73.                     }
  74.                 }
  75.                 if (res) {
  76.                     return node.data.get()!!
  77.                 }
  78.             } else {
  79.                 val next = curHead.next.get()
  80.                 if (curHead == tail.get() || curTail != tail.get() || curHead != head.get() || next == null) {
  81.                     continue
  82.                 }
  83.                 val element = next.data.get() ?: continue
  84.                 if (next.cont !== null && next.type == NodeType.SENDER && head.compareAndSet(curHead, next)) {
  85.                     next.data.compareAndSet(element, null)
  86.                     next.cont!!.resume(true)
  87.                     return element
  88.                 }
  89.             }
  90.         }
  91.     }
  92. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement