Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- @PublishedApi
- internal class SelectBuilderImpl<in R>(
- private val delegate: Continuation<R>
- ) : LockFreeLinkedListHead(), SelectBuilder<R>, SelectInstance<R>, Continuation<R> {
- // selection state is "this" (list of nodes) initially and is replaced by idempotent marker (or null) when selected
- private val _state = atomic<Any?>(this)
- // this is basically our own SafeContinuation
- private val _result = atomic<Any?>(UNDECIDED)
- // cancellability support
- @Volatile
- private var parentHandle: DisposableHandle? = null
- /* Result state machine
- +-----------+ getResult +---------------------+ resume +---------+
- | UNDECIDED | ------------> | COROUTINE_SUSPENDED | ---------> | RESUMED |
- +-----------+ +---------------------+ +---------+
- |
- | resume
- V
- +------------+ getResult
- | value/Fail | -----------+
- +------------+ |
- ^ |
- | |
- +-------------------+
- */
- override val context: CoroutineContext get() = delegate.context
- override val completion: Continuation<R> get() = this
- private inline fun doResume(value: () -> Any?, block: () -> Unit) {
- check(isSelected) { "Must be selected first" }
- _result.loop { result ->
- when {
- result === UNDECIDED -> if (_result.compareAndSet(UNDECIDED, value())) return
- result === COROUTINE_SUSPENDED -> if (_result.compareAndSet(COROUTINE_SUSPENDED, RESUMED)) {
- block()
- return
- }
- else -> throw IllegalStateException("Already resumed")
- }
- }
- }
- // Resumes in MODE_DIRECT
- override fun resume(value: R) {
- doResume({ value }) {
- delegate.resumeDirect(value)
- }
- }
- // Resumes in MODE_DIRECT
- override fun resumeWithException(exception: Throwable) {
- doResume({ Fail(exception) }) {
- delegate.resumeDirectWithException(exception)
- }
- }
- // Resumes in MODE_CANCELLABLE
- override fun resumeSelectCancellableWithException(exception: Throwable) {
- doResume({ Fail(exception) }) {
- delegate.resumeCancellableWithException(exception)
- }
- }
- @PublishedApi
- internal fun getResult(): Any? {
- if (!isSelected) initCancellability()
- var result = _result.value // atomic read
- if (result === UNDECIDED) {
- if (_result.compareAndSet(UNDECIDED, COROUTINE_SUSPENDED)) return COROUTINE_SUSPENDED
- result = _result.value // reread volatile var
- }
- when {
- result === RESUMED -> throw IllegalStateException("Already resumed")
- result is Fail -> throw result.exception
- else -> return result // either COROUTINE_SUSPENDED or data
- }
- }
- private fun initCancellability() {
- val parent = context[Job] ?: return
- val newRegistration = parent.invokeOnCompletion(onCancelling = true, handler = SelectOnCancellation(parent))
- parentHandle = newRegistration
- // now check our state _after_ registering
- if (isSelected) newRegistration.dispose()
- }
- private inner class SelectOnCancellation(job: Job) : JobCancellationNode<Job>(job) {
- // Note: may be invoked multiple times, but only the first trySelect succeeds anyway
- override fun invoke(reason: Throwable?) {
- if (trySelect(null))
- resumeSelectCancellableWithException(job.getCancellationException())
- }
- override fun toString(): String = "SelectOnCancellation[${this@SelectBuilderImpl}]"
- }
- private val state: Any? get() {
- _state.loop { state ->
- if (state !is OpDescriptor) return state
- state.perform(this)
- }
- }
- @PublishedApi
- internal fun handleBuilderException(e: Throwable) {
- if (trySelect(null))
- resumeWithException(e)
- else
- handleCoroutineException(context, e)
- }
- override val isSelected: Boolean get() = state !== this
- override fun disposeOnSelect(handle: DisposableHandle) {
- val node = DisposeNode(handle)
- while (true) { // lock-free loop on state
- val state = this.state
- if (state === this) {
- if (addLastIf(node, { this.state === this }))
- return
- } else { // already selected
- handle.dispose()
- return
- }
- }
- }
- private fun doAfterSelect() {
- parentHandle?.dispose()
- forEach<DisposeNode> {
- it.handle.dispose()
- }
- }
- // it is just like start(), but support idempotent start
- override fun trySelect(idempotent: Any?): Boolean {
- check(idempotent !is OpDescriptor) { "cannot use OpDescriptor as idempotent marker"}
- while (true) { // lock-free loop on state
- val state = this.state
- when {
- state === this -> {
- if (_state.compareAndSet(this, idempotent)) {
- doAfterSelect()
- return true
- }
- }
- // otherwise -- already selected
- idempotent == null -> return false // already selected
- state === idempotent -> return true // was selected with this marker
- else -> return false
- }
- }
- }
- override fun performAtomicTrySelect(desc: AtomicDesc): Any? = AtomicSelectOp(desc, true).perform(null)
- override fun performAtomicIfNotSelected(desc: AtomicDesc): Any? = AtomicSelectOp(desc, false).perform(null)
- private inner class AtomicSelectOp(
- @JvmField val desc: AtomicDesc,
- @JvmField val select: Boolean
- ) : AtomicOp<Any?>() {
- override fun prepare(affected: Any?): Any? {
- // only originator of operation makes preparation move of installing descriptor into this selector's state
- // helpers should never do it, or risk ruining progress when they come late
- if (affected == null) {
- // we are originator (affected reference is not null if helping)
- prepareIfNotSelected()?.let { return it }
- }
- return desc.prepare(this)
- }
- override fun complete(affected: Any?, failure: Any?) {
- completeSelect(failure)
- desc.complete(this, failure)
- }
- fun prepareIfNotSelected(): Any? {
- _state.loop { state ->
- when {
- state === this@AtomicSelectOp -> return null // already in progress
- state is OpDescriptor -> state.perform(this@SelectBuilderImpl) // help
- state === this@SelectBuilderImpl -> {
- if (_state.compareAndSet(this@SelectBuilderImpl, this@AtomicSelectOp))
- return null // success
- }
- else -> return ALREADY_SELECTED
- }
- }
- }
- private fun completeSelect(failure: Any?) {
- val selectSuccess = select && failure == null
- val update = if (selectSuccess) null else this@SelectBuilderImpl
- if (_state.compareAndSet(this@AtomicSelectOp, update)) {
- if (selectSuccess)
- doAfterSelect()
- }
- }
- }
- override fun SelectClause0.invoke(block: suspend () -> R) {
- registerSelectClause0(this@SelectBuilderImpl, block)
- }
- override fun <Q> SelectClause1<Q>.invoke(block: suspend (Q) -> R) {
- registerSelectClause1(this@SelectBuilderImpl, block)
- }
- override fun <P, Q> SelectClause2<P, Q>.invoke(param: P, block: suspend (Q) -> R) {
- registerSelectClause2(this@SelectBuilderImpl, param, block)
- }
- override fun onTimeout(time: Long, unit: TimeUnit, block: suspend () -> R) {
- require(time >= 0) { "Timeout time $time cannot be negative" }
- if (time == 0L) {
- if (trySelect(null))
- block.startCoroutineUndispatched(completion)
- return
- }
- val action = Runnable {
- // todo: we could have replaced startCoroutine with startCoroutineUndispatched
- // But we need a way to know that Delay.invokeOnTimeout had used the right thread
- if (trySelect(null))
- block.startCoroutineCancellable(completion) // shall be cancellable while waits for dispatch
- }
- disposeOnSelect(context.delay.invokeOnTimeout(time, unit, action))
- }
- private class DisposeNode(
- @JvmField val handle: DisposableHandle
- ) : LockFreeLinkedListNode()
- private class Fail(
- @JvmField val exception: Throwable
- )
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement