Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import java.util.concurrent.atomic.AtomicReference
- import kotlin.coroutines.Continuation
- import kotlin.coroutines.resume
- import kotlin.coroutines.suspendCoroutine
- @Suppress("UNCHECKED_CAST")
- class SynchronousQueueMS<E> : SynchronousQueue<E> {
- private val dummy = Node<E>()
- private val head: AtomicReference<Node<E>> = AtomicReference(dummy)
- private val tail: AtomicReference<Node<E>> = AtomicReference(dummy)
- //_________________________________________________________________________
- private open class Node<E> {
- var next = AtomicReference<Node<E>>(null)
- }
- private class Getter<E>(
- var cont: Continuation<E>? = null
- ) : Node<E>()
- private class Sender<E>(
- elem: E?,
- var cont: Continuation<Unit>? = null
- ) : Node<E>() {
- var element = AtomicReference(elem)
- }
- //_________________________________________________________________________
- override suspend fun send(element: E) {
- val newTail: Node<E> = Sender(element)
- while (true) {
- val curTail = tail.get()
- if (curTail == head.get() || curTail is Sender<*>) {
- val retSuspend = suspendCoroutine<Any> sc@{ continuation ->
- (newTail as Sender).cont = continuation
- val oldTail = tail.get()
- if ((oldTail == head.get() || oldTail is Sender<*>) &&
- oldTail.next.compareAndSet(null, newTail)
- ) {
- tail.compareAndSet(oldTail, newTail)
- } else {
- continuation.resume(RELOAD)
- return@sc
- }
- }
- if (retSuspend == RELOAD) continue
- return
- } else {
- val oldHead = head.get()
- if (oldHead == tail.get() || oldHead.next.get() == null) continue
- val newHead = oldHead.next.get()
- if (newHead is Getter<*> && head.compareAndSet(oldHead, newHead)) {
- (newHead.cont as Continuation<E>).resume(element)
- return
- }
- }
- }
- }
- //__________________________________________________________________________________________________________________
- override suspend fun receive(): E {
- val newTail: Node<E> = Getter()
- while (true) {
- val curTail = tail.get()
- if (curTail == head.get() || curTail is Getter<*>) {
- val retSuspend = suspendCoroutine<E?> sc@{ continuation ->
- (newTail as Getter).cont = continuation
- val oldTail = tail.get()
- if (((oldTail == head.get()) || oldTail is Getter<*>) && oldTail.next.compareAndSet(
- null,
- newTail
- )
- ) {
- tail.compareAndSet(oldTail, newTail)
- } else {
- continuation.resume(null)
- return@sc
- }
- }
- if (retSuspend == null) {
- continue
- } else {
- return retSuspend
- }
- } else {
- val oldHead = head.get()
- if (oldHead == tail.get() || oldHead.next.get() == null) {
- continue
- }
- val newHead = oldHead.next.get()
- if (oldHead != tail.get() && newHead is Sender<*> && head.compareAndSet(oldHead, newHead)) {
- newHead.cont!!.resume(Unit)
- return (newHead.element.get() as E)
- } else {
- continue
- }
- }
- }
- }
- private val RELOAD = "RELOAD"
- //__________________________________________________________________________________________________________________
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement