Advertisement
Guest User

Untitled

a guest
Dec 11th, 2017
346
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Kotlin 8.97 KB | None | 0 0
  1. @PublishedApi
  2. internal class SelectBuilderImpl<in R>(
  3.     private val delegate: Continuation<R>
  4. ) : LockFreeLinkedListHead(), SelectBuilder<R>, SelectInstance<R>, Continuation<R> {
  5.     // selection state is "this" (list of nodes) initially and is replaced by idempotent marker (or null) when selected
  6.     private val _state = atomic<Any?>(this)
  7.  
  8.     // this is basically our own SafeContinuation
  9.     private val _result = atomic<Any?>(UNDECIDED)
  10.  
  11.     // cancellability support
  12.     @Volatile
  13.     private var parentHandle: DisposableHandle? = null
  14.  
  15.     /* Result state machine
  16.  
  17.         +-----------+   getResult   +---------------------+   resume   +---------+
  18.         | UNDECIDED | ------------> | COROUTINE_SUSPENDED | ---------> | RESUMED |
  19.         +-----------+               +---------------------+            +---------+
  20.               |
  21.               | resume
  22.               V
  23.         +------------+  getResult
  24.         | value/Fail | -----------+
  25.         +------------+            |
  26.               ^                   |
  27.               |                   |
  28.               +-------------------+
  29.      */
  30.  
  31.     override val context: CoroutineContext get() = delegate.context
  32.  
  33.     override val completion: Continuation<R> get() = this
  34.  
  35.     private inline fun doResume(value: () -> Any?, block: () -> Unit) {
  36.         check(isSelected) { "Must be selected first" }
  37.         _result.loop { result ->
  38.             when {
  39.                 result === UNDECIDED -> if (_result.compareAndSet(UNDECIDED, value())) return
  40.                 result === COROUTINE_SUSPENDED -> if (_result.compareAndSet(COROUTINE_SUSPENDED, RESUMED)) {
  41.                     block()
  42.                     return
  43.                 }
  44.                 else -> throw IllegalStateException("Already resumed")
  45.             }
  46.         }
  47.     }
  48.  
  49.     // Resumes in MODE_DIRECT
  50.     override fun resume(value: R) {
  51.         doResume({ value }) {
  52.             delegate.resumeDirect(value)
  53.         }
  54.     }
  55.  
  56.     // Resumes in MODE_DIRECT
  57.     override fun resumeWithException(exception: Throwable) {
  58.         doResume({ Fail(exception) }) {
  59.             delegate.resumeDirectWithException(exception)
  60.         }
  61.     }
  62.  
  63.     // Resumes in MODE_CANCELLABLE
  64.     override fun resumeSelectCancellableWithException(exception: Throwable) {
  65.         doResume({ Fail(exception) }) {
  66.             delegate.resumeCancellableWithException(exception)
  67.         }
  68.     }
  69.  
  70.     @PublishedApi
  71.     internal fun getResult(): Any? {
  72.         if (!isSelected) initCancellability()
  73.         var result = _result.value // atomic read
  74.         if (result === UNDECIDED) {
  75.             if (_result.compareAndSet(UNDECIDED, COROUTINE_SUSPENDED)) return COROUTINE_SUSPENDED
  76.             result = _result.value // reread volatile var
  77.         }
  78.         when {
  79.             result === RESUMED -> throw IllegalStateException("Already resumed")
  80.             result is Fail -> throw result.exception
  81.             else -> return result // either COROUTINE_SUSPENDED or data
  82.         }
  83.     }
  84.  
  85.     private fun initCancellability() {
  86.         val parent = context[Job] ?: return
  87.         val newRegistration = parent.invokeOnCompletion(onCancelling = true, handler = SelectOnCancellation(parent))
  88.         parentHandle = newRegistration
  89.         // now check our state _after_ registering
  90.         if (isSelected) newRegistration.dispose()
  91.     }
  92.  
  93.     private inner class SelectOnCancellation(job: Job) : JobCancellationNode<Job>(job) {
  94.         // Note: may be invoked multiple times, but only the first trySelect succeeds anyway
  95.         override fun invoke(reason: Throwable?) {
  96.             if (trySelect(null))
  97.                 resumeSelectCancellableWithException(job.getCancellationException())
  98.         }
  99.         override fun toString(): String = "SelectOnCancellation[${this@SelectBuilderImpl}]"
  100.     }
  101.  
  102.     private val state: Any? get() {
  103.         _state.loop { state ->
  104.             if (state !is OpDescriptor) return state
  105.             state.perform(this)
  106.         }
  107.     }
  108.  
  109.     @PublishedApi
  110.     internal fun handleBuilderException(e: Throwable) {
  111.         if (trySelect(null))
  112.             resumeWithException(e)
  113.         else
  114.             handleCoroutineException(context, e)
  115.     }
  116.  
  117.     override val isSelected: Boolean get() = state !== this
  118.  
  119.     override fun disposeOnSelect(handle: DisposableHandle) {
  120.         val node = DisposeNode(handle)
  121.         while (true) { // lock-free loop on state
  122.             val state = this.state
  123.             if (state === this) {
  124.                 if (addLastIf(node, { this.state === this }))
  125.                     return
  126.             } else { // already selected
  127.                 handle.dispose()
  128.                 return
  129.             }
  130.         }
  131.     }
  132.  
  133.     private fun doAfterSelect() {
  134.         parentHandle?.dispose()
  135.         forEach<DisposeNode> {
  136.             it.handle.dispose()
  137.         }
  138.     }
  139.  
  140.     // it is just like start(), but support idempotent start
  141.     override fun trySelect(idempotent: Any?): Boolean {
  142.         check(idempotent !is OpDescriptor) { "cannot use OpDescriptor as idempotent marker"}
  143.         while (true) { // lock-free loop on state
  144.             val state = this.state
  145.             when {
  146.                 state === this -> {
  147.                     if (_state.compareAndSet(this, idempotent)) {
  148.                         doAfterSelect()
  149.                         return true
  150.                     }
  151.                 }
  152.                 // otherwise -- already selected
  153.                 idempotent == null -> return false // already selected
  154.                 state === idempotent -> return true // was selected with this marker
  155.                 else -> return false
  156.             }
  157.         }
  158.     }
  159.  
  160.     override fun performAtomicTrySelect(desc: AtomicDesc): Any? = AtomicSelectOp(desc, true).perform(null)
  161.     override fun performAtomicIfNotSelected(desc: AtomicDesc): Any? = AtomicSelectOp(desc, false).perform(null)
  162.  
  163.     private inner class AtomicSelectOp(
  164.         @JvmField val desc: AtomicDesc,
  165.         @JvmField val select: Boolean
  166.     ) : AtomicOp<Any?>() {
  167.         override fun prepare(affected: Any?): Any? {
  168.             // only originator of operation makes preparation move of installing descriptor into this selector's state
  169.             // helpers should never do it, or risk ruining progress when they come late
  170.             if (affected == null) {
  171.                 // we are originator (affected reference is not null if helping)
  172.                 prepareIfNotSelected()?.let { return it }
  173.             }
  174.             return desc.prepare(this)
  175.         }
  176.  
  177.         override fun complete(affected: Any?, failure: Any?) {
  178.             completeSelect(failure)
  179.             desc.complete(this, failure)
  180.         }
  181.  
  182.         fun prepareIfNotSelected(): Any? {
  183.             _state.loop { state ->
  184.                 when {
  185.                     state === this@AtomicSelectOp -> return null // already in progress
  186.                     state is OpDescriptor -> state.perform(this@SelectBuilderImpl) // help
  187.                     state === this@SelectBuilderImpl -> {
  188.                         if (_state.compareAndSet(this@SelectBuilderImpl, this@AtomicSelectOp))
  189.                             return null // success
  190.                     }
  191.                     else -> return ALREADY_SELECTED
  192.                 }
  193.             }
  194.         }
  195.  
  196.         private fun completeSelect(failure: Any?) {
  197.             val selectSuccess = select && failure == null
  198.             val update = if (selectSuccess) null else this@SelectBuilderImpl
  199.             if (_state.compareAndSet(this@AtomicSelectOp, update)) {
  200.                 if (selectSuccess)
  201.                     doAfterSelect()
  202.             }
  203.         }
  204.     }
  205.  
  206.     override fun SelectClause0.invoke(block: suspend () -> R) {
  207.         registerSelectClause0(this@SelectBuilderImpl, block)
  208.     }
  209.  
  210.     override fun <Q> SelectClause1<Q>.invoke(block: suspend (Q) -> R) {
  211.         registerSelectClause1(this@SelectBuilderImpl, block)
  212.     }
  213.  
  214.     override fun <P, Q> SelectClause2<P, Q>.invoke(param: P, block: suspend (Q) -> R) {
  215.         registerSelectClause2(this@SelectBuilderImpl, param, block)
  216.     }
  217.  
  218.     override fun onTimeout(time: Long, unit: TimeUnit, block: suspend () -> R) {
  219.         require(time >= 0) { "Timeout time $time cannot be negative" }
  220.         if (time == 0L) {
  221.             if (trySelect(null))
  222.                 block.startCoroutineUndispatched(completion)
  223.             return
  224.         }
  225.         val action = Runnable {
  226.             // todo: we could have replaced startCoroutine with startCoroutineUndispatched
  227.             // But we need a way to know that Delay.invokeOnTimeout had used the right thread
  228.             if (trySelect(null))
  229.                 block.startCoroutineCancellable(completion) // shall be cancellable while waits for dispatch
  230.         }
  231.         disposeOnSelect(context.delay.invokeOnTimeout(time, unit, action))
  232.     }
  233.  
  234.     private class DisposeNode(
  235.         @JvmField val handle: DisposableHandle
  236.     ) : LockFreeLinkedListNode()
  237.  
  238.     private class Fail(
  239.         @JvmField val exception: Throwable
  240.     )
  241. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement