diff --git a/core/reflection.jvm/src/kotlin/reflect/jvm/internal/ReflectionFactoryImpl.java b/core/reflection.jvm/src/kotlin/reflect/jvm/internal/ReflectionFactoryImpl.java index 5da0e5fc514..96550bd033c 100644 --- a/core/reflection.jvm/src/kotlin/reflect/jvm/internal/ReflectionFactoryImpl.java +++ b/core/reflection.jvm/src/kotlin/reflect/jvm/internal/ReflectionFactoryImpl.java @@ -51,7 +51,7 @@ public class ReflectionFactoryImpl extends ReflectionFactory { } @Override - public String renderLambdaToString(Lambda lambda) { + public String renderLambdaToString(FunctionBase lambda) { KFunction kFunction = ReflectLambdaKt.reflect(lambda); if (kFunction != null) { KFunctionImpl impl = UtilKt.asKFunctionImpl(kFunction); diff --git a/libraries/stdlib/coroutines/common/src/kotlin/CoroutinesH.kt b/libraries/stdlib/coroutines/common/src/kotlin/CoroutinesH.kt index 9ed8cb5eeee..c368bc79750 100644 --- a/libraries/stdlib/coroutines/common/src/kotlin/CoroutinesH.kt +++ b/libraries/stdlib/coroutines/common/src/kotlin/CoroutinesH.kt @@ -14,9 +14,8 @@ internal expect class SafeContinuation : Continuation { internal constructor(delegate: Continuation) @PublishedApi - internal fun getResult(): Any? + internal fun getOrThrow(): Any? override val context: CoroutineContext - override fun resume(value: T): Unit - override fun resumeWithException(exception: Throwable): Unit + override fun resumeWith(result: SuccessOrFailure): Unit } diff --git a/libraries/stdlib/coroutines/common/src/kotlin/CoroutinesIntrinsicsH.kt b/libraries/stdlib/coroutines/common/src/kotlin/CoroutinesIntrinsicsH.kt index d7bb59f78d7..912c2a46209 100644 --- a/libraries/stdlib/coroutines/common/src/kotlin/CoroutinesIntrinsicsH.kt +++ b/libraries/stdlib/coroutines/common/src/kotlin/CoroutinesIntrinsicsH.kt @@ -33,12 +33,28 @@ public expect inline fun (suspend R.() -> T).startCoroutineUninterceptedO ): Any? @SinceKotlin("1.3") +// todo: Drop this function public expect fun (suspend () -> T).createCoroutineUnchecked( completion: Continuation ): Continuation @SinceKotlin("1.3") +// todo: Drop this function public expect fun (suspend R.() -> T).createCoroutineUnchecked( receiver: R, completion: Continuation ): Continuation + +@SinceKotlin("1.3") +public expect fun (suspend () -> T).createCoroutineUnintercepted( + completion: Continuation +): Continuation + +@SinceKotlin("1.3") +public expect fun (suspend R.() -> T).createCoroutineUnintercepted( + receiver: R, + completion: Continuation +): Continuation + +@SinceKotlin("1.3") +public expect fun Continuation.intercepted(): Continuation diff --git a/libraries/stdlib/coroutines/jvm/src/kotlin/coroutines/SafeContinuationJvm.kt b/libraries/stdlib/coroutines/jvm/src/kotlin/coroutines/SafeContinuationJvm.kt index a6814b89f90..8cd9204045f 100644 --- a/libraries/stdlib/coroutines/jvm/src/kotlin/coroutines/SafeContinuationJvm.kt +++ b/libraries/stdlib/coroutines/jvm/src/kotlin/coroutines/SafeContinuationJvm.kt @@ -6,7 +6,7 @@ package kotlin.coroutines import java.util.concurrent.atomic.AtomicReferenceFieldUpdater import kotlin.* -import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED +import kotlin.coroutines.intrinsics.CoroutineSingletons.* @PublishedApi @SinceKotlin("1.3") @@ -15,7 +15,6 @@ internal actual constructor( private val delegate: Continuation, initialResult: Any? ) : Continuation { - @PublishedApi internal actual constructor(delegate: Continuation) : this(delegate, UNDECIDED) @@ -25,10 +24,7 @@ internal actual constructor( @Volatile private var result: Any? = initialResult - companion object { - private val UNDECIDED: Any? = Any() - private val RESUMED: Any? = Any() - + private companion object { @Suppress("UNCHECKED_CAST") @JvmStatic private val RESULT = AtomicReferenceFieldUpdater.newUpdater, Any?>( @@ -36,29 +32,13 @@ internal actual constructor( ) } - private class Fail(val exception: Throwable) - - actual override fun resume(value: T) { + public actual override fun resumeWith(result: SuccessOrFailure) { while (true) { // lock-free loop - val result = this.result // atomic read + val cur = this.result // atomic read when { - result === UNDECIDED -> if (RESULT.compareAndSet(this, UNDECIDED, value)) return - result === COROUTINE_SUSPENDED -> if (RESULT.compareAndSet(this, COROUTINE_SUSPENDED, RESUMED)) { - delegate.resume(value) - return - } - else -> throw IllegalStateException("Already resumed") - } - } - } - - actual override fun resumeWithException(exception: Throwable) { - while (true) { // lock-free loop - val result = this.result // atomic read - when { - result === UNDECIDED -> if (RESULT.compareAndSet(this, UNDECIDED, Fail(exception))) return - result === COROUTINE_SUSPENDED -> if (RESULT.compareAndSet(this, COROUTINE_SUSPENDED, RESUMED)) { - delegate.resumeWithException(exception) + cur === UNDECIDED -> if (RESULT.compareAndSet(this, UNDECIDED, result)) return + cur === COROUTINE_SUSPENDED -> if (RESULT.compareAndSet(this, COROUTINE_SUSPENDED, RESUMED)) { + delegate.resumeWith(result) return } else -> throw IllegalStateException("Already resumed") @@ -67,16 +47,16 @@ internal actual constructor( } @PublishedApi - internal actual fun getResult(): Any? { + internal actual fun getOrThrow(): Any? { var result = this.result // atomic read if (result === UNDECIDED) { if (RESULT.compareAndSet(this, UNDECIDED, COROUTINE_SUSPENDED)) return COROUTINE_SUSPENDED result = this.result // reread volatile var } - when { - result === RESUMED -> return COROUTINE_SUSPENDED // already called continuation, indicate COROUTINE_SUSPENDED upstream - result is Fail -> throw result.exception - else -> return result // either COROUTINE_SUSPENDED or data + return when { + result === RESUMED -> COROUTINE_SUSPENDED // already called continuation, indicate COROUTINE_SUSPENDED upstream + result is Failure -> throw result.exception + else -> result // either COROUTINE_SUSPENDED or data } } } diff --git a/libraries/stdlib/coroutines/jvm/src/kotlin/coroutines/intrinsics/IntrinsicsJvm.kt b/libraries/stdlib/coroutines/jvm/src/kotlin/coroutines/intrinsics/IntrinsicsJvm.kt index c9a80e1d962..07e8cee164e 100644 --- a/libraries/stdlib/coroutines/jvm/src/kotlin/coroutines/intrinsics/IntrinsicsJvm.kt +++ b/libraries/stdlib/coroutines/jvm/src/kotlin/coroutines/intrinsics/IntrinsicsJvm.kt @@ -5,22 +5,31 @@ @file:kotlin.jvm.JvmName("IntrinsicsKt") @file:kotlin.jvm.JvmMultifileClass -@file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") +@file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER", "UNCHECKED_CAST") package kotlin.coroutines.intrinsics import kotlin.coroutines.* +import kotlin.coroutines.jvm.internal.ContinuationImpl +import kotlin.coroutines.jvm.internal.RestrictedContinuationImpl +import kotlin.coroutines.jvm.internal.SuspendFunction0 +import kotlin.coroutines.jvm.internal.SuspendFunction1 +import kotlin.internal.InlineOnly /** * Starts unintercepted coroutine without receiver and with result type [T] and executes it until its first suspension. * Returns the result of the coroutine or throws its exception if it does not suspend or [COROUTINE_SUSPENDED] if it suspends. * In the later case, the [completion] continuation is invoked when coroutine completes with result or exception. - * This function is designed to be used from inside of [suspendCoroutineOrReturn] to resume the execution of suspended + * + * The coroutine is started directly in the invoker's thread without going through the [ContinuationInterceptor] that might + * be present in the completion's [CoroutineContext]. It is invoker's responsibility to ensure that the proper invocation + * context is established. + * + * This function is designed to be used from inside of [suspendCoroutineUninterceptedOrReturn] to resume the execution of suspended * coroutine using a reference to the suspending function. */ @SinceKotlin("1.3") -@Suppress("UNCHECKED_CAST") -@kotlin.internal.InlineOnly +@InlineOnly public actual inline fun (suspend () -> T).startCoroutineUninterceptedOrReturn( completion: Continuation ): Any? = (this as Function1, Any?>).invoke(completion) @@ -29,12 +38,16 @@ public actual inline fun (suspend () -> T).startCoroutineUninterceptedOrRetu * Starts unintercepted coroutine with receiver type [R] and result type [T] and executes it until its first suspension. * Returns the result of the coroutine or throws its exception if it does not suspend or [COROUTINE_SUSPENDED] if it suspends. * In the later case, the [completion] continuation is invoked when coroutine completes with result or exception. - * This function is designed to be used from inside of [suspendCoroutineOrReturn] to resume the execution of suspended + * + * The coroutine is started directly in the invoker's thread without going through the [ContinuationInterceptor] that might + * be present in the completion's [CoroutineContext]. It is invoker's responsibility to ensure that the proper invocation + * context is established. + * + * This function is designed to be used from inside of [suspendCoroutineUninterceptedOrReturn] to resume the execution of suspended * coroutine using a reference to the suspending function. */ @SinceKotlin("1.3") -@Suppress("UNCHECKED_CAST") -@kotlin.internal.InlineOnly +@InlineOnly public actual inline fun (suspend R.() -> T).startCoroutineUninterceptedOrReturn( receiver: R, completion: Continuation @@ -54,16 +67,11 @@ public actual inline fun (suspend R.() -> T).startCoroutineUninterceptedO * state machine of the coroutine and may result in arbitrary behaviour or exception. */ @SinceKotlin("1.3") +// todo: Drop this function public actual fun (suspend () -> T).createCoroutineUnchecked( completion: Continuation ): Continuation = - if (this !is kotlin.coroutines.jvm.internal.CoroutineImpl) - buildContinuationByInvokeCall(completion) { - @Suppress("UNCHECKED_CAST") - (this as Function1, Any?>).invoke(completion) - } - else - (this.create(completion) as kotlin.coroutines.jvm.internal.CoroutineImpl).facade + createCoroutineUnintercepted(completion).intercepted() /** * Creates a coroutine with receiver type [R] and result type [T]. @@ -76,37 +84,95 @@ public actual fun (suspend () -> T).createCoroutineUnchecked( * state machine of the coroutine and may result in arbitrary behaviour or exception. */ @SinceKotlin("1.3") +// todo: Drop this function public actual fun (suspend R.() -> T).createCoroutineUnchecked( receiver: R, completion: Continuation ): Continuation = - if (this !is kotlin.coroutines.jvm.internal.CoroutineImpl) - buildContinuationByInvokeCall(completion) { - @Suppress("UNCHECKED_CAST") + createCoroutineUnintercepted(receiver, completion).intercepted() + +/** + * Creates unintercepted coroutine without receiver and with result type [T]. + * This function creates a new, fresh instance of suspendable computation every time it is invoked. + * + * To start executing the created coroutine, invoke `resume(Unit)` on the returned [Continuation] instance. + * The [completion] continuation is invoked when coroutine completes with result or exception. + * + * This function returns unintercepted continuation. + * Invocation of `resume(Unit)` starts coroutine directly in the invoker's thread without going through the + * [ContinuationInterceptor] that might be present in the completion's [CoroutineContext]. + * It is invoker's responsibility to ensure that the proper invocation context is established. + * [Continuation.intercepted] can be used to acquire the intercepted continuation. + * + * Repeated invocation of any resume function on the resulting continuation corrupts the + * state machine of the coroutine and may result in arbitrary behaviour or exception. + */ +@SinceKotlin("1.3") +public actual fun (suspend () -> T).createCoroutineUnintercepted( + completion: Continuation +): Continuation = + if (this is SuspendFunction0) + create(completion) + else + createCoroutineFromSuspendFunction(completion) { + (this as Function1, Any?>).invoke(completion) + } + +/** + * Creates unintercepted coroutine with receiver type [R] and result type [T]. + * This function creates a new, fresh instance of suspendable computation every time it is invoked. + * + * To start executing the created coroutine, invoke `resume(Unit)` on the returned [Continuation] instance. + * The [completion] continuation is invoked when coroutine completes with result or exception. + * + * This function returns unintercepted continuation. + * Invocation of `resume(Unit)` starts coroutine directly in the invoker's thread without going through the + * [ContinuationInterceptor] that might be present in the completion's [CoroutineContext]. + * It is invoker's responsibility to ensure that the proper invocation context is established. + * [Continuation.intercepted] can be used to acquire the intercepted continuation. + * + * Repeated invocation of any resume function on the resulting continuation corrupts the + * state machine of the coroutine and may result in arbitrary behaviour or exception. + */ +@SinceKotlin("1.3") +public actual fun (suspend R.() -> T).createCoroutineUnintercepted( + receiver: R, + completion: Continuation +): Continuation = + if (this is SuspendFunction1) + create(receiver, completion) + else { + createCoroutineFromSuspendFunction(completion) { (this as Function2, Any?>).invoke(receiver, completion) } - else - (this.create(receiver, completion) as kotlin.coroutines.jvm.internal.CoroutineImpl).facade + } + +/** + * Intercepts this continuation with [ContinuationInterceptor]. + */ +@SinceKotlin("1.3") +public actual fun Continuation.intercepted(): Continuation = + (this as? ContinuationImpl)?.intercepted() ?: this // INTERNAL DEFINITIONS -private inline fun buildContinuationByInvokeCall( +private inline fun createCoroutineFromSuspendFunction( completion: Continuation, crossinline block: () -> Any? ): Continuation { - val continuation = - object : Continuation { - override val context: CoroutineContext - get() = completion.context - - override fun resume(value: Unit) { - processBareContinuationResume(completion, block) - } - - override fun resumeWithException(exception: Throwable) { - completion.resumeWithException(exception) + val context = completion.context + return if (context === EmptyCoroutineContext) + object : RestrictedContinuationImpl(completion as Continuation) { + override fun invokeSuspend(result: SuccessOrFailure): Any? { + result.getOrThrow() // Rethrow exception if trying to start with exception (will be caught by ContinuationImpl.resumeWith + return block() // run the block + } + } + else + object : ContinuationImpl(completion as Continuation, context) { + override fun invokeSuspend(result: SuccessOrFailure): Any? { + result.getOrThrow() // Rethrow exception if trying to start with exception (will be caught by ContinuationImpl.resumeWith + return block() // run the block } } - - return kotlin.coroutines.jvm.internal.interceptContinuationIfNeeded(completion.context, continuation) } diff --git a/libraries/stdlib/coroutines/jvm/src/kotlin/coroutines/jvm/internal/ContinuationImpl.kt b/libraries/stdlib/coroutines/jvm/src/kotlin/coroutines/jvm/internal/ContinuationImpl.kt new file mode 100644 index 00000000000..b0ff4dbb278 --- /dev/null +++ b/libraries/stdlib/coroutines/jvm/src/kotlin/coroutines/jvm/internal/ContinuationImpl.kt @@ -0,0 +1,117 @@ +/* + * Copyright 2010-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license + * that can be found in the license/LICENSE.txt file. + */ + +package kotlin.coroutines.jvm.internal + +import java.io.Serializable +import kotlin.coroutines.* +import kotlin.coroutines.intrinsics.CoroutineSingletons +import kotlin.jvm.internal.FunctionBase +import kotlin.jvm.internal.Reflection + +@SinceKotlin("1.3") +// State machines for named restricted suspend functions extend from this class +internal abstract class RestrictedContinuationImpl protected constructor( + @JvmField + protected val completion: Continuation? +) : Continuation, Serializable { + public override val context: CoroutineContext + get() = EmptyCoroutineContext + + public override fun resumeWith(result: SuccessOrFailure) { + try { + val outcome = invokeSuspend(result) + if (outcome === CoroutineSingletons.COROUTINE_SUSPENDED) return + completion!!.resume(outcome) + } catch (exception: Throwable) { + completion!!.resumeWithException(exception) + } + } + + protected abstract fun invokeSuspend(result: SuccessOrFailure): Any? +} + +@SinceKotlin("1.3") +// State machines for named suspend functions extend from this class +internal abstract class ContinuationImpl protected constructor( + completion: Continuation?, + private val _context: CoroutineContext? +) : RestrictedContinuationImpl(completion) { + protected constructor(completion: Continuation?) : this(completion, completion?.context) + + override val context: CoroutineContext + get() = _context!! + + @Transient + private var intercepted: Continuation? = null + + public fun intercepted(): Continuation = + intercepted + ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this) + .also { intercepted = this } + + public override fun resumeWith(result: SuccessOrFailure) { + try { + val outcome = invokeSuspend(result) + if (outcome === CoroutineSingletons.COROUTINE_SUSPENDED) return + disposeIntercepted() + completion!!.resume(outcome) + } catch (exception: Throwable) { + disposeIntercepted() + completion!!.resumeWithException(exception) + } + } + + private fun disposeIntercepted() { + val intercepted = intercepted + if (intercepted != null && intercepted != this) { + context[ContinuationInterceptor]!!.disposeContinuation(intercepted) + } + this.intercepted = CompletedContinuation // just in case + } +} + +// todo: Do we really need it? +internal object CompletedContinuation : Continuation { + override val context: CoroutineContext + get() = error("This continuation is already complete") + + override fun resumeWith(result: SuccessOrFailure) { + error("This continuation is already complete") + } +} + +internal abstract class RestrictedSuspendLambda protected constructor( + private val arity: Int, + completion: Continuation? +) : RestrictedContinuationImpl(completion), FunctionBase { + protected constructor(arity: Int) : this(arity, null) + + public override fun getArity(): Int = arity + + public override fun toString(): String = Reflection.renderLambdaToString(this) +} + +internal abstract class SuspendLambda protected constructor( + private val arity: Int, + completion: Continuation? +) : ContinuationImpl(completion), FunctionBase { + protected constructor(arity: Int) : this(arity, null) + + public override fun getArity(): Int = arity + + public override fun toString(): String = Reflection.renderLambdaToString(this) +} + +@SinceKotlin("1.3") +internal interface SuspendFunction0 : Function1, Any?> { + fun create(completion: Continuation<*>): Continuation +} + +@SinceKotlin("1.3") +internal interface SuspendFunction1 : Function2, Any?> { + fun create(receiver: Any?, completion: Continuation<*>): Continuation +} + diff --git a/libraries/stdlib/coroutines/jvm/src/kotlin/coroutines/jvm/internal/CoroutineImpl.kt b/libraries/stdlib/coroutines/jvm/src/kotlin/coroutines/jvm/internal/CoroutineImpl.kt deleted file mode 100644 index 2e81d0b1e7a..00000000000 --- a/libraries/stdlib/coroutines/jvm/src/kotlin/coroutines/jvm/internal/CoroutineImpl.kt +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Copyright 2010-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license - * that can be found in the license/LICENSE.txt file. - */ - -package kotlin.coroutines.jvm.internal - -import java.lang.IllegalStateException -import kotlin.coroutines.Continuation -import kotlin.coroutines.CoroutineContext -import kotlin.coroutines.processBareContinuationResume -import kotlin.jvm.internal.Lambda - -/** - * @suppress - */ -@SinceKotlin("1.3") -public abstract class CoroutineImpl( - arity: Int, - @JvmField - protected var completion: Continuation? -) : Lambda(arity), Continuation { - - // label == -1 when coroutine cannot be started (it is just a factory object) or has already finished execution - // label == 0 in initial part of the coroutine - @JvmField - protected var label: Int = if (completion != null) 0 else -1 - - private val _context: CoroutineContext? = completion?.context - - override val context: CoroutineContext - get() = _context!! - - private var _facade: Continuation? = null - - val facade: Continuation get() { - if (_facade == null) _facade = interceptContinuationIfNeeded(_context!!, this) - return _facade!! - } - - override fun resume(value: Any?) { - processBareContinuationResume(completion!!) { - doResume(value, null) - } - } - - override fun resumeWithException(exception: Throwable) { - processBareContinuationResume(completion!!) { - doResume(null, exception) - } - } - - protected abstract fun doResume(data: Any?, exception: Throwable?): Any? - - open fun create(completion: Continuation<*>): Continuation { - throw IllegalStateException("create(Continuation) has not been overridden") - } - - open fun create(value: Any?, completion: Continuation<*>): Continuation { - throw IllegalStateException("create(Any?;Continuation) has not been overridden") - } -} diff --git a/libraries/stdlib/coroutines/jvm/src/kotlin/coroutines/jvm/internal/CoroutineIntrinsics.kt b/libraries/stdlib/coroutines/jvm/src/kotlin/coroutines/jvm/internal/CoroutineIntrinsics.kt deleted file mode 100644 index d2e3531aeef..00000000000 --- a/libraries/stdlib/coroutines/jvm/src/kotlin/coroutines/jvm/internal/CoroutineIntrinsics.kt +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Copyright 2010-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license - * that can be found in the license/LICENSE.txt file. - */ - -@file:JvmName("CoroutineIntrinsics") - -package kotlin.coroutines.jvm.internal - -import kotlin.coroutines.Continuation -import kotlin.coroutines.ContinuationInterceptor -import kotlin.coroutines.CoroutineContext - -/** - * @suppress - */ -@SinceKotlin("1.3") -public fun normalizeContinuation(continuation: Continuation): Continuation = - (continuation as? CoroutineImpl)?.facade ?: continuation - -@SinceKotlin("1.3") -internal fun interceptContinuationIfNeeded( - context: CoroutineContext, - continuation: Continuation -) = context[ContinuationInterceptor]?.interceptContinuation(continuation) ?: continuation diff --git a/libraries/stdlib/coroutines/src/kotlin/SuccessOrFailure.kt b/libraries/stdlib/coroutines/src/kotlin/SuccessOrFailure.kt new file mode 100644 index 00000000000..6091c9b55b5 --- /dev/null +++ b/libraries/stdlib/coroutines/src/kotlin/SuccessOrFailure.kt @@ -0,0 +1,113 @@ +/* + * Copyright 2010-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license + * that can be found in the license/LICENSE.txt file. + */ + +@file:Suppress( + "UNCHECKED_CAST", + "RedundantVisibilityModifier", + "NON_PUBLIC_PRIMARY_CONSTRUCTOR_OF_INLINE_CLASS", + "UNSUPPORTED_FEATURE", + "INVISIBLE_REFERENCE", + "INVISIBLE_MEMBER" +) + +package kotlin + +import kotlin.internal.InlineOnly +import kotlin.jvm.JvmField + +@SinceKotlin("1.3") +public inline class SuccessOrFailure @PublishedApi internal constructor(private val _value: Any?) { + // discovery + + public val isSuccess: Boolean get() = _value !is Failure + public val isFailure: Boolean get() = _value is Failure + + // value retrieval + + public fun getOrThrow(): T = when (_value) { + is Failure -> throw _value.exception + else -> _value as T + } + + public fun getOrNull(): T? = when (_value) { + is Failure -> null + else -> _value as T + } + + // exception retrieval + + public fun exceptionOrNull(): Throwable? = when (_value) { + is Failure -> _value.exception + else -> null + } + + // internal API for inline functions + + @PublishedApi internal val exception: Throwable get() = (_value as Failure).exception + @PublishedApi internal val value: T get() = _value as T + + // companion with constructors + + public companion object { + @InlineOnly public inline fun success(value: T): SuccessOrFailure = + SuccessOrFailure(value) + + @InlineOnly public inline fun failure(exception: Throwable): SuccessOrFailure = + SuccessOrFailure(Failure(exception)) + } +} + +// top-Level internal failure-marker class +// todo: maybe move it to another kotlin.internal package? +@SinceKotlin("1.3") +@PublishedApi +internal class Failure @PublishedApi internal constructor( + @JvmField + val exception: Throwable +) : Serializable + +@SinceKotlin("1.3") +@InlineOnly public inline fun runOrCatch(block: () -> T): SuccessOrFailure = + try { + SuccessOrFailure.success(block()) + } catch (e: Throwable) { + SuccessOrFailure.failure(e) + } + +// -- extensions --- + +@SinceKotlin("1.3") +@InlineOnly public inline fun SuccessOrFailure.getOrElse(default: () -> R): R = when { + isFailure -> default() + else -> value +} + +// transformation + +@SinceKotlin("1.3") +@InlineOnly public inline fun SuccessOrFailure.map(block: (T) -> U): SuccessOrFailure = + if (isFailure) this as SuccessOrFailure + else runOrCatch { block(value) } + +@SinceKotlin("1.3") +@InlineOnly public inline fun SuccessOrFailure.handle(block: (Throwable) -> U): SuccessOrFailure = + if (isFailure) runOrCatch { block(exception) } + else this + +// "peek" onto value/exception and pipe + +@SinceKotlin("1.3") +@InlineOnly public inline fun SuccessOrFailure.onFailure(block: (Throwable) -> Unit): SuccessOrFailure { + if (isFailure) block(exception) + return this +} + +@SinceKotlin("1.3") +@InlineOnly public inline fun SuccessOrFailure.onSuccess(block: (T) -> Unit): SuccessOrFailure { + if (isSuccess) block(value) + return this +} + +// ------------------- \ No newline at end of file diff --git a/libraries/stdlib/coroutines/src/kotlin/coroutines/CoroutinesLibrary.kt b/libraries/stdlib/coroutines/src/kotlin/coroutines/Continuation.kt similarity index 62% rename from libraries/stdlib/coroutines/src/kotlin/coroutines/CoroutinesLibrary.kt rename to libraries/stdlib/coroutines/src/kotlin/coroutines/Continuation.kt index aa228cca064..16ea5fa7979 100644 --- a/libraries/stdlib/coroutines/src/kotlin/coroutines/CoroutinesLibrary.kt +++ b/libraries/stdlib/coroutines/src/kotlin/coroutines/Continuation.kt @@ -4,41 +4,67 @@ */ @file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") -@file:kotlin.jvm.JvmName("CoroutinesKt") package kotlin.coroutines -import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED -import kotlin.coroutines.intrinsics.createCoroutineUnchecked -import kotlin.coroutines.intrinsics.suspendCoroutineOrReturn +import kotlin.coroutines.intrinsics.* import kotlin.internal.InlineOnly /** - * Starts coroutine with receiver type [R] and result type [T]. - * This function creates and start a new, fresh instance of suspendable computation every time it is invoked. - * The [completion] continuation is invoked when coroutine completes with result or exception. + * Interface representing a continuation after a suspension point that returns value of type `T`. */ @SinceKotlin("1.3") -@Suppress("UNCHECKED_CAST") -public fun (suspend R.() -> T).startCoroutine( - receiver: R, - completion: Continuation -) { - createCoroutineUnchecked(receiver, completion).resume(Unit) +public interface Continuation { + /** + * Context of the coroutine that corresponds to this continuation. + */ + // todo: shall we provide default impl with EmptyCoroutineContext? + public val context: CoroutineContext + + /** + * Resumes the execution of the corresponding coroutine passing successful or failed [result] as the + * return value of the last suspension point. + */ + public fun resumeWith(result: SuccessOrFailure) } /** - * Starts coroutine without receiver and with result type [T]. - * This function creates and start a new, fresh instance of suspendable computation every time it is invoked. + * Classes and interfaces marked with this annotation are restricted when used as receivers for extension + * `suspend` functions. These `suspend` extensions can only invoke other member or extension `suspend` functions on this particular + * receiver only and are restricted from calling arbitrary suspension functions. + */ +@SinceKotlin("1.3") +@Target(AnnotationTarget.CLASS) +@Retention(AnnotationRetention.BINARY) +public annotation class RestrictsSuspension + +/** + * Resumes the execution of the corresponding coroutine passing [value] as the return value of the last suspension point. + */ +@InlineOnly public inline fun Continuation.resume(value: T): Unit = + resumeWith(SuccessOrFailure.success(value)) + +/** + * Resumes the execution of the corresponding coroutine so that the [exception] is re-thrown right after the + * last suspension point. + */ +@InlineOnly public inline fun Continuation.resumeWithException(exception: Throwable): Unit = + resumeWith(SuccessOrFailure.failure(exception)) + +/** + * Creates a coroutine without receiver and with result type [T]. + * This function creates a new, fresh instance of suspendable computation every time it is invoked. + * + * To start executing the created coroutine, invoke `resume(Unit)` on the returned [Continuation] instance. * The [completion] continuation is invoked when coroutine completes with result or exception. + * Repeated invocation of any resume function on the resulting continuation produces [IllegalStateException]. */ @SinceKotlin("1.3") @Suppress("UNCHECKED_CAST") -public fun (suspend () -> T).startCoroutine( +public fun (suspend () -> T).createCoroutine( completion: Continuation -) { - createCoroutineUnchecked(completion).resume(Unit) -} +): Continuation = + SafeContinuation(createCoroutineUnintercepted(completion).intercepted(), COROUTINE_SUSPENDED) /** * Creates a coroutine with receiver type [R] and result type [T]. @@ -53,21 +79,35 @@ public fun (suspend () -> T).startCoroutine( public fun (suspend R.() -> T).createCoroutine( receiver: R, completion: Continuation -): Continuation = SafeContinuation(createCoroutineUnchecked(receiver, completion), COROUTINE_SUSPENDED) +): Continuation = + SafeContinuation(createCoroutineUnintercepted(receiver, completion).intercepted(), COROUTINE_SUSPENDED) /** - * Creates a coroutine without receiver and with result type [T]. - * This function creates a new, fresh instance of suspendable computation every time it is invoked. - * - * To start executing the created coroutine, invoke `resume(Unit)` on the returned [Continuation] instance. + * Starts coroutine without receiver and with result type [T]. + * This function creates and start a new, fresh instance of suspendable computation every time it is invoked. * The [completion] continuation is invoked when coroutine completes with result or exception. - * Repeated invocation of any resume function on the resulting continuation produces [IllegalStateException]. */ @SinceKotlin("1.3") @Suppress("UNCHECKED_CAST") -public fun (suspend () -> T).createCoroutine( +public fun (suspend () -> T).startCoroutine( completion: Continuation -): Continuation = SafeContinuation(createCoroutineUnchecked(completion), COROUTINE_SUSPENDED) +) { + createCoroutineUnintercepted(completion).intercepted().resume(Unit) +} + +/** + * Starts coroutine with receiver type [R] and result type [T]. + * This function creates and start a new, fresh instance of suspendable computation every time it is invoked. + * The [completion] continuation is invoked when coroutine completes with result or exception. + */ +@SinceKotlin("1.3") +@Suppress("UNCHECKED_CAST") +public fun (suspend R.() -> T).startCoroutine( + receiver: R, + completion: Continuation +) { + createCoroutineUnintercepted(receiver, completion).intercepted().resume(Unit) +} /** * Obtains the current continuation instance inside suspend functions and suspends @@ -78,11 +118,12 @@ public fun (suspend () -> T).createCoroutine( * from a different thread of execution. Repeated invocation of any resume function produces [IllegalStateException]. */ @SinceKotlin("1.3") +@InlineOnly public suspend inline fun suspendCoroutine(crossinline block: (Continuation) -> Unit): T = - suspendCoroutineOrReturn { c: Continuation -> - val safe = SafeContinuation(c) + suspendCoroutineUninterceptedOrReturn { c: Continuation -> + val safe = SafeContinuation(c.intercepted()) block(safe) - safe.getResult() + safe.getOrThrow() } /** @@ -100,19 +141,3 @@ public suspend inline val coroutineContext: CoroutineContext get() { throw NotImplementedError("Implemented as intrinsic") } - -// INTERNAL DECLARATIONS - -@SinceKotlin("1.3") -@kotlin.internal.InlineOnly -internal inline fun processBareContinuationResume(completion: Continuation<*>, block: () -> Any?) { - try { - val result = block() - if (result !== COROUTINE_SUSPENDED) { - @Suppress("UNCHECKED_CAST") - (completion as Continuation).resume(result) - } - } catch (t: Throwable) { - completion.resumeWithException(t) - } -} diff --git a/libraries/stdlib/coroutines/src/kotlin/coroutines/ContinuationInterceptor.kt b/libraries/stdlib/coroutines/src/kotlin/coroutines/ContinuationInterceptor.kt index 33d9bd5dfa5..8b3b8b2f309 100644 --- a/libraries/stdlib/coroutines/src/kotlin/coroutines/ContinuationInterceptor.kt +++ b/libraries/stdlib/coroutines/src/kotlin/coroutines/ContinuationInterceptor.kt @@ -22,9 +22,22 @@ public interface ContinuationInterceptor : CoroutineContext.Element { * This function is invoked by coroutines framework when needed and the resulting continuations are * cached internally per each instance of the original [continuation]. * - * By convention, implementations that install themselves as *the* interceptor in the context with - * the [Key] shall also scan the context for other element that implement [ContinuationInterceptor] interface - * and use their [interceptContinuation] functions, too. + * This function may simply return `this` if it does not want to intercept this particular continuation. + * + * When the original [continuation] completes coroutine framework invokes [disposeContinuation] + * with the resulting continuation if it was intercepted. */ public fun interceptContinuation(continuation: Continuation): Continuation + + /** + * Invoked for the continuation instance returned by [interceptContinuation] when the original + * continuation completes and will not be used anymore. + * + * Default implementation does nothing. + * + * @param continuation Continuation instance returned by this interceptor's [interceptContinuation] invocation. + */ + public fun disposeContinuation(continuation: Continuation<*>) { + /* do nothing by default */ + } } diff --git a/libraries/stdlib/coroutines/src/kotlin/coroutines/CoroutineContext.kt b/libraries/stdlib/coroutines/src/kotlin/coroutines/CoroutineContext.kt index 73283d5f685..615a7f85d02 100644 --- a/libraries/stdlib/coroutines/src/kotlin/coroutines/CoroutineContext.kt +++ b/libraries/stdlib/coroutines/src/kotlin/coroutines/CoroutineContext.kt @@ -51,6 +51,12 @@ public interface CoroutineContext { */ public fun minusKey(key: Key<*>): CoroutineContext + /** + * Key for the elements of [CoroutineContext]. [E] is a type of element with this key. + * Keys in the context are compared _by reference_. + */ + public interface Key + /** * An element of the [CoroutineContext]. An element of the coroutine context is a singleton context by itself. */ @@ -70,10 +76,4 @@ public interface CoroutineContext { public override fun minusKey(key: Key<*>): CoroutineContext = if (this.key === key) EmptyCoroutineContext else this } - - /** - * Key for the elements of [CoroutineContext]. [E] is a type of element with this key. - * Keys in the context are compared _by reference_. - */ - public interface Key } diff --git a/libraries/stdlib/coroutines/src/kotlin/coroutines/CoroutineContextImpl.kt b/libraries/stdlib/coroutines/src/kotlin/coroutines/CoroutineContextImpl.kt index 6129885c123..59426749fd0 100644 --- a/libraries/stdlib/coroutines/src/kotlin/coroutines/CoroutineContextImpl.kt +++ b/libraries/stdlib/coroutines/src/kotlin/coroutines/CoroutineContextImpl.kt @@ -3,9 +3,12 @@ * that can be found in the license/LICENSE.txt file. */ +@file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") + package kotlin.coroutines import kotlin.coroutines.CoroutineContext.* +import kotlin.io.Serializable /** * Base class for [CoroutineContext.Element] implementations. @@ -17,7 +20,10 @@ public abstract class AbstractCoroutineContextElement(public override val key: K * An empty coroutine context. */ @SinceKotlin("1.3") -public object EmptyCoroutineContext : CoroutineContext { +public object EmptyCoroutineContext : CoroutineContext, Serializable { + private const val serialVersionUID: Long = 0 + private fun readResolve(): Any = this + public override fun get(key: Key): E? = null public override fun fold(initial: R, operation: (R, Element) -> R): R = initial public override fun plus(context: CoroutineContext): CoroutineContext = context @@ -31,7 +37,11 @@ public object EmptyCoroutineContext : CoroutineContext { // this class is not exposed, but is hidden inside implementations // this is a left-biased list, so that `plus` works naturally @SinceKotlin("1.3") -internal class CombinedContext(val left: CoroutineContext, val element: Element) : CoroutineContext { +internal class CombinedContext( + private val left: CoroutineContext, + private val element: Element +) : CoroutineContext, Serializable { + override fun get(key: Key): E? { var cur = this while (true) { @@ -58,8 +68,14 @@ internal class CombinedContext(val left: CoroutineContext, val element: Element) } } - private fun size(): Int = - if (left is CombinedContext) left.size() + 1 else 2 + private fun size(): Int { + var cur = this + var size = 2 + while (true) { + cur = cur.left as? CombinedContext ?: return size + size++ + } + } private fun contains(element: Element): Boolean = get(element.key) == element @@ -84,6 +100,22 @@ internal class CombinedContext(val left: CoroutineContext, val element: Element) override fun toString(): String = "[" + fold("") { acc, element -> - if (acc.isEmpty()) element.toString() else acc + ", " + element + if (acc.isEmpty()) element.toString() else "$acc, $element" } + "]" + + private fun writeReplace(): Any { + val list = ArrayList(size()) + fold(Unit) { _, element -> + if (element is Serializable) list += element + } + return Serialized(list.toTypedArray()) + } + + private class Serialized(val elements: Array) : Serializable { + companion object { + private const val serialVersionUID: Long = 0L + } + + private fun readResolve(): Any = elements.fold(EmptyCoroutineContext, CoroutineContext::plus) + } } diff --git a/libraries/stdlib/coroutines/src/kotlin/coroutines/Coroutines.kt b/libraries/stdlib/coroutines/src/kotlin/coroutines/Coroutines.kt deleted file mode 100644 index e488b90b451..00000000000 --- a/libraries/stdlib/coroutines/src/kotlin/coroutines/Coroutines.kt +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright 2010-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license - * that can be found in the license/LICENSE.txt file. - */ - -package kotlin.coroutines - -/** - * Interface representing a continuation after a suspension point that returns value of type `T`. - */ -@SinceKotlin("1.3") -public interface Continuation { - /** - * Context of the coroutine that corresponds to this continuation. - */ - public val context: CoroutineContext - - /** - * Resumes the execution of the corresponding coroutine passing [value] as the return value of the last suspension point. - */ - public fun resume(value: T) - - /** - * Resumes the execution of the corresponding coroutine so that the [exception] is re-thrown right after the - * last suspension point. - */ - public fun resumeWithException(exception: Throwable) -} - -/** - * Classes and interfaces marked with this annotation are restricted when used as receivers for extension - * `suspend` functions. These `suspend` extensions can only invoke other member or extension `suspend` functions on this particular - * receiver only and are restricted from calling arbitrary suspension functions. - */ -@SinceKotlin("1.3") -@Target(AnnotationTarget.CLASS) -@Retention(AnnotationRetention.BINARY) -public annotation class RestrictsSuspension diff --git a/libraries/stdlib/coroutines/src/kotlin/coroutines/intrinsics/Intrinsics.kt b/libraries/stdlib/coroutines/src/kotlin/coroutines/intrinsics/Intrinsics.kt index 67842918049..6f22b8c30b4 100644 --- a/libraries/stdlib/coroutines/src/kotlin/coroutines/intrinsics/Intrinsics.kt +++ b/libraries/stdlib/coroutines/src/kotlin/coroutines/intrinsics/Intrinsics.kt @@ -10,6 +10,7 @@ package kotlin.coroutines.intrinsics import kotlin.coroutines.* +import kotlin.internal.InlineOnly /** * Obtains the current continuation instance inside suspend functions and either suspends @@ -29,8 +30,9 @@ import kotlin.coroutines.* * continuation instance. */ @SinceKotlin("1.3") -@kotlin.internal.InlineOnly +@InlineOnly @Suppress("UNUSED_PARAMETER") +// todo: Drop this function public suspend inline fun suspendCoroutineOrReturn(crossinline block: (Continuation) -> Any?): T = suspendCoroutineUninterceptedOrReturn { cont -> block(cont.intercepted()) } @@ -38,25 +40,35 @@ public suspend inline fun suspendCoroutineOrReturn(crossinline block: (Conti * Obtains the current continuation instance inside suspend functions and either suspends * currently running coroutine or returns result immediately without suspension. * - * Unlike [suspendCoroutineOrReturn] it does not intercept continuation. + * If the [block] returns the special [COROUTINE_SUSPENDED] value, it means that suspend function did suspend the execution and will + * not return any result immediately. In this case, the [Continuation] provided to the [block] shall be + * resumed by invoking [Continuation.resumeWith] at some moment in the + * future when the result becomes available to resume the computation. + * + * Otherwise, the return value of the [block] must have a type assignable to [T] and represents the result of this suspend function. + * It means that the execution was not suspended and the [Continuation] provided to the [block] shall not be invoked. + * As the result type of the [block] is declared as `Any?` and cannot be correctly type-checked, + * its proper return type remains on the conscience of the suspend function's author. + * + * Invocation of [Continuation.resumeWith] resumes coroutine directly in the invoker's thread without going through the + * [ContinuationInterceptor] that might be present in the coroutine's [CoroutineContext]. + * It is invoker's responsibility to ensure that the proper invocation context is established. + * [Continuation.intercepted] can be used to acquire the intercepted continuation. + * + * Note that it is not recommended to call either [Continuation.resume] nor [Continuation.resumeWithException] functions synchronously + * in the same stackframe where suspension function is run. Use [suspendCoroutine] as a safer way to obtain current + * continuation instance. */ @SinceKotlin("1.3") -@kotlin.internal.InlineOnly +@InlineOnly public suspend inline fun suspendCoroutineUninterceptedOrReturn(crossinline block: (Continuation) -> Any?): T = throw NotImplementedError("Implementation of suspendCoroutineUninterceptedOrReturn is intrinsic") -/** - * Intercept continuation with [ContinuationInterceptor]. - */ -@SinceKotlin("1.3") -@kotlin.internal.InlineOnly -public inline fun Continuation.intercepted(): Continuation = - throw NotImplementedError("Implementation of intercepted is intrinsic") - /** * This value is used as a return value of [suspendCoroutineOrReturn] `block` argument to state that * the execution was suspended and will not return any result immediately. */ @SinceKotlin("1.3") -public val COROUTINE_SUSPENDED: Any = Any() +public val COROUTINE_SUSPENDED: Any = CoroutineSingletons.COROUTINE_SUSPENDED +internal enum class CoroutineSingletons { COROUTINE_SUSPENDED, UNDECIDED, RESUMED } \ No newline at end of file diff --git a/libraries/stdlib/coroutines/src/kotlin/sequences/SequenceBuilder.kt b/libraries/stdlib/coroutines/src/kotlin/sequences/SequenceBuilder.kt index 68dce820370..e7956fa4aa8 100644 --- a/libraries/stdlib/coroutines/src/kotlin/sequences/SequenceBuilder.kt +++ b/libraries/stdlib/coroutines/src/kotlin/sequences/SequenceBuilder.kt @@ -32,7 +32,7 @@ public fun buildSequence(builderAction: suspend SequenceBuilder.() -> Uni @SinceKotlin("1.3") public fun buildIterator(builderAction: suspend SequenceBuilder.() -> Unit): Iterator { val iterator = SequenceBuilderIterator() - iterator.nextStep = builderAction.createCoroutineUnchecked(receiver = iterator, completion = iterator) + iterator.nextStep = builderAction.createCoroutineUnintercepted(receiver = iterator, completion = iterator) return iterator } @@ -152,34 +152,31 @@ private class SequenceBuilderIterator : SequenceBuilder(), Iterator, Co } - suspend override fun yield(value: T) { + override suspend fun yield(value: T) { nextValue = value state = State_Ready - return suspendCoroutineOrReturn { c -> + return suspendCoroutineUninterceptedOrReturn { c -> nextStep = c COROUTINE_SUSPENDED } } - suspend override fun yieldAll(iterator: Iterator) { + override suspend fun yieldAll(iterator: Iterator) { if (!iterator.hasNext()) return nextIterator = iterator state = State_ManyReady - return suspendCoroutineOrReturn { c -> + return suspendCoroutineUninterceptedOrReturn { c -> nextStep = c COROUTINE_SUSPENDED } } // Completion continuation implementation - override fun resume(value: Unit) { + override fun resumeWith(result: SuccessOrFailure) { + result.getOrThrow() // just rethrow exception if it is there state = State_Done } - override fun resumeWithException(exception: Throwable) { - throw exception // just rethrow - } - override val context: CoroutineContext get() = EmptyCoroutineContext } diff --git a/libraries/stdlib/jvm/runtime/kotlin/jvm/internal/Reflection.java b/libraries/stdlib/jvm/runtime/kotlin/jvm/internal/Reflection.java index feacc35d577..24e7f6fb206 100644 --- a/libraries/stdlib/jvm/runtime/kotlin/jvm/internal/Reflection.java +++ b/libraries/stdlib/jvm/runtime/kotlin/jvm/internal/Reflection.java @@ -69,6 +69,11 @@ public class Reflection { return factory.renderLambdaToString(lambda); } + @SinceKotlin(version = "1.3") + public static String renderLambdaToString(FunctionBase lambda) { + return factory.renderLambdaToString(lambda); + } + // Functions public static KFunction function(FunctionReference f) { diff --git a/libraries/stdlib/jvm/runtime/kotlin/jvm/internal/ReflectionFactory.java b/libraries/stdlib/jvm/runtime/kotlin/jvm/internal/ReflectionFactory.java index 54d2cbae10a..f7f02e9b4b3 100644 --- a/libraries/stdlib/jvm/runtime/kotlin/jvm/internal/ReflectionFactory.java +++ b/libraries/stdlib/jvm/runtime/kotlin/jvm/internal/ReflectionFactory.java @@ -33,6 +33,11 @@ public class ReflectionFactory { @SinceKotlin(version = "1.1") public String renderLambdaToString(Lambda lambda) { + return renderLambdaToString((FunctionBase) lambda); + } + + @SinceKotlin(version = "1.3") + public String renderLambdaToString(FunctionBase lambda) { String result = lambda.getClass().getGenericInterfaces()[0].toString(); return result.startsWith(KOTLIN_JVM_FUNCTIONS) ? result.substring(KOTLIN_JVM_FUNCTIONS.length()) : result; }