Draft Kotlin Coroutines 1.3 ABI & API:

* SuccessOrFailure inline class is introduced
* Continuation.resumeWith(SuccessOrFailure)
* createCoroutineUnintercepted
* [Restricted]ContinuationImpl as named suspending function base
* [Restricted]SuspendLambda as suspending lambda base
* SuspendFunction[01] interfaces for efficient createCoroutine
* Serializable coroutine classes
This commit is contained in:
Roman Elizarov
2018-06-19 20:29:44 +03:00
committed by Denis Zharkov
parent fb5e11dfc6
commit 8bbd78ca9a
18 changed files with 531 additions and 276 deletions
@@ -51,7 +51,7 @@ public class ReflectionFactoryImpl extends ReflectionFactory {
}
@Override
public String renderLambdaToString(Lambda lambda) {
public String renderLambdaToString(FunctionBase lambda) {
KFunction kFunction = ReflectLambdaKt.reflect(lambda);
if (kFunction != null) {
KFunctionImpl impl = UtilKt.asKFunctionImpl(kFunction);
@@ -14,9 +14,8 @@ internal expect class SafeContinuation<in T> : Continuation<T> {
internal constructor(delegate: Continuation<T>)
@PublishedApi
internal fun getResult(): Any?
internal fun getOrThrow(): Any?
override val context: CoroutineContext
override fun resume(value: T): Unit
override fun resumeWithException(exception: Throwable): Unit
override fun resumeWith(result: SuccessOrFailure<T>): Unit
}
@@ -33,12 +33,28 @@ public expect inline fun <R, T> (suspend R.() -> T).startCoroutineUninterceptedO
): Any?
@SinceKotlin("1.3")
// todo: Drop this function
public expect fun <T> (suspend () -> T).createCoroutineUnchecked(
completion: Continuation<T>
): Continuation<Unit>
@SinceKotlin("1.3")
// todo: Drop this function
public expect fun <R, T> (suspend R.() -> T).createCoroutineUnchecked(
receiver: R,
completion: Continuation<T>
): Continuation<Unit>
@SinceKotlin("1.3")
public expect fun <T> (suspend () -> T).createCoroutineUnintercepted(
completion: Continuation<T>
): Continuation<Unit>
@SinceKotlin("1.3")
public expect fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
receiver: R,
completion: Continuation<T>
): Continuation<Unit>
@SinceKotlin("1.3")
public expect fun <T> Continuation<T>.intercepted(): Continuation<T>
@@ -6,7 +6,7 @@ package kotlin.coroutines
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater
import kotlin.*
import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED
import kotlin.coroutines.intrinsics.CoroutineSingletons.*
@PublishedApi
@SinceKotlin("1.3")
@@ -15,7 +15,6 @@ internal actual constructor(
private val delegate: Continuation<T>,
initialResult: Any?
) : Continuation<T> {
@PublishedApi
internal actual constructor(delegate: Continuation<T>) : this(delegate, UNDECIDED)
@@ -25,10 +24,7 @@ internal actual constructor(
@Volatile
private var result: Any? = initialResult
companion object {
private val UNDECIDED: Any? = Any()
private val RESUMED: Any? = Any()
private companion object {
@Suppress("UNCHECKED_CAST")
@JvmStatic
private val RESULT = AtomicReferenceFieldUpdater.newUpdater<SafeContinuation<*>, Any?>(
@@ -36,29 +32,13 @@ internal actual constructor(
)
}
private class Fail(val exception: Throwable)
actual override fun resume(value: T) {
public actual override fun resumeWith(result: SuccessOrFailure<T>) {
while (true) { // lock-free loop
val result = this.result // atomic read
val cur = this.result // atomic read
when {
result === UNDECIDED -> if (RESULT.compareAndSet(this, UNDECIDED, value)) return
result === COROUTINE_SUSPENDED -> if (RESULT.compareAndSet(this, COROUTINE_SUSPENDED, RESUMED)) {
delegate.resume(value)
return
}
else -> throw IllegalStateException("Already resumed")
}
}
}
actual override fun resumeWithException(exception: Throwable) {
while (true) { // lock-free loop
val result = this.result // atomic read
when {
result === UNDECIDED -> if (RESULT.compareAndSet(this, UNDECIDED, Fail(exception))) return
result === COROUTINE_SUSPENDED -> if (RESULT.compareAndSet(this, COROUTINE_SUSPENDED, RESUMED)) {
delegate.resumeWithException(exception)
cur === UNDECIDED -> if (RESULT.compareAndSet(this, UNDECIDED, result)) return
cur === COROUTINE_SUSPENDED -> if (RESULT.compareAndSet(this, COROUTINE_SUSPENDED, RESUMED)) {
delegate.resumeWith(result)
return
}
else -> throw IllegalStateException("Already resumed")
@@ -67,16 +47,16 @@ internal actual constructor(
}
@PublishedApi
internal actual fun getResult(): Any? {
internal actual fun getOrThrow(): Any? {
var result = this.result // atomic read
if (result === UNDECIDED) {
if (RESULT.compareAndSet(this, UNDECIDED, COROUTINE_SUSPENDED)) return COROUTINE_SUSPENDED
result = this.result // reread volatile var
}
when {
result === RESUMED -> return COROUTINE_SUSPENDED // already called continuation, indicate COROUTINE_SUSPENDED upstream
result is Fail -> throw result.exception
else -> return result // either COROUTINE_SUSPENDED or data
return when {
result === RESUMED -> COROUTINE_SUSPENDED // already called continuation, indicate COROUTINE_SUSPENDED upstream
result is Failure -> throw result.exception
else -> result // either COROUTINE_SUSPENDED or data
}
}
}
@@ -5,22 +5,31 @@
@file:kotlin.jvm.JvmName("IntrinsicsKt")
@file:kotlin.jvm.JvmMultifileClass
@file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
@file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER", "UNCHECKED_CAST")
package kotlin.coroutines.intrinsics
import kotlin.coroutines.*
import kotlin.coroutines.jvm.internal.ContinuationImpl
import kotlin.coroutines.jvm.internal.RestrictedContinuationImpl
import kotlin.coroutines.jvm.internal.SuspendFunction0
import kotlin.coroutines.jvm.internal.SuspendFunction1
import kotlin.internal.InlineOnly
/**
* Starts unintercepted coroutine without receiver and with result type [T] and executes it until its first suspension.
* Returns the result of the coroutine or throws its exception if it does not suspend or [COROUTINE_SUSPENDED] if it suspends.
* In the later case, the [completion] continuation is invoked when coroutine completes with result or exception.
* This function is designed to be used from inside of [suspendCoroutineOrReturn] to resume the execution of suspended
*
* The coroutine is started directly in the invoker's thread without going through the [ContinuationInterceptor] that might
* be present in the completion's [CoroutineContext]. It is invoker's responsibility to ensure that the proper invocation
* context is established.
*
* This function is designed to be used from inside of [suspendCoroutineUninterceptedOrReturn] to resume the execution of suspended
* coroutine using a reference to the suspending function.
*/
@SinceKotlin("1.3")
@Suppress("UNCHECKED_CAST")
@kotlin.internal.InlineOnly
@InlineOnly
public actual inline fun <T> (suspend () -> T).startCoroutineUninterceptedOrReturn(
completion: Continuation<T>
): Any? = (this as Function1<Continuation<T>, Any?>).invoke(completion)
@@ -29,12 +38,16 @@ public actual inline fun <T> (suspend () -> T).startCoroutineUninterceptedOrRetu
* Starts unintercepted coroutine with receiver type [R] and result type [T] and executes it until its first suspension.
* Returns the result of the coroutine or throws its exception if it does not suspend or [COROUTINE_SUSPENDED] if it suspends.
* In the later case, the [completion] continuation is invoked when coroutine completes with result or exception.
* This function is designed to be used from inside of [suspendCoroutineOrReturn] to resume the execution of suspended
*
* The coroutine is started directly in the invoker's thread without going through the [ContinuationInterceptor] that might
* be present in the completion's [CoroutineContext]. It is invoker's responsibility to ensure that the proper invocation
* context is established.
*
* This function is designed to be used from inside of [suspendCoroutineUninterceptedOrReturn] to resume the execution of suspended
* coroutine using a reference to the suspending function.
*/
@SinceKotlin("1.3")
@Suppress("UNCHECKED_CAST")
@kotlin.internal.InlineOnly
@InlineOnly
public actual inline fun <R, T> (suspend R.() -> T).startCoroutineUninterceptedOrReturn(
receiver: R,
completion: Continuation<T>
@@ -54,16 +67,11 @@ public actual inline fun <R, T> (suspend R.() -> T).startCoroutineUninterceptedO
* state machine of the coroutine and may result in arbitrary behaviour or exception.
*/
@SinceKotlin("1.3")
// todo: Drop this function
public actual fun <T> (suspend () -> T).createCoroutineUnchecked(
completion: Continuation<T>
): Continuation<Unit> =
if (this !is kotlin.coroutines.jvm.internal.CoroutineImpl)
buildContinuationByInvokeCall(completion) {
@Suppress("UNCHECKED_CAST")
(this as Function1<Continuation<T>, Any?>).invoke(completion)
}
else
(this.create(completion) as kotlin.coroutines.jvm.internal.CoroutineImpl).facade
createCoroutineUnintercepted(completion).intercepted()
/**
* Creates a coroutine with receiver type [R] and result type [T].
@@ -76,37 +84,95 @@ public actual fun <T> (suspend () -> T).createCoroutineUnchecked(
* state machine of the coroutine and may result in arbitrary behaviour or exception.
*/
@SinceKotlin("1.3")
// todo: Drop this function
public actual fun <R, T> (suspend R.() -> T).createCoroutineUnchecked(
receiver: R,
completion: Continuation<T>
): Continuation<Unit> =
if (this !is kotlin.coroutines.jvm.internal.CoroutineImpl)
buildContinuationByInvokeCall(completion) {
@Suppress("UNCHECKED_CAST")
createCoroutineUnintercepted(receiver, completion).intercepted()
/**
* Creates unintercepted coroutine without receiver and with result type [T].
* This function creates a new, fresh instance of suspendable computation every time it is invoked.
*
* To start executing the created coroutine, invoke `resume(Unit)` on the returned [Continuation] instance.
* The [completion] continuation is invoked when coroutine completes with result or exception.
*
* This function returns unintercepted continuation.
* Invocation of `resume(Unit)` starts coroutine directly in the invoker's thread without going through the
* [ContinuationInterceptor] that might be present in the completion's [CoroutineContext].
* It is invoker's responsibility to ensure that the proper invocation context is established.
* [Continuation.intercepted] can be used to acquire the intercepted continuation.
*
* Repeated invocation of any resume function on the resulting continuation corrupts the
* state machine of the coroutine and may result in arbitrary behaviour or exception.
*/
@SinceKotlin("1.3")
public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(
completion: Continuation<T>
): Continuation<Unit> =
if (this is SuspendFunction0)
create(completion)
else
createCoroutineFromSuspendFunction(completion) {
(this as Function1<Continuation<T>, Any?>).invoke(completion)
}
/**
* Creates unintercepted coroutine with receiver type [R] and result type [T].
* This function creates a new, fresh instance of suspendable computation every time it is invoked.
*
* To start executing the created coroutine, invoke `resume(Unit)` on the returned [Continuation] instance.
* The [completion] continuation is invoked when coroutine completes with result or exception.
*
* This function returns unintercepted continuation.
* Invocation of `resume(Unit)` starts coroutine directly in the invoker's thread without going through the
* [ContinuationInterceptor] that might be present in the completion's [CoroutineContext].
* It is invoker's responsibility to ensure that the proper invocation context is established.
* [Continuation.intercepted] can be used to acquire the intercepted continuation.
*
* Repeated invocation of any resume function on the resulting continuation corrupts the
* state machine of the coroutine and may result in arbitrary behaviour or exception.
*/
@SinceKotlin("1.3")
public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
receiver: R,
completion: Continuation<T>
): Continuation<Unit> =
if (this is SuspendFunction1)
create(receiver, completion)
else {
createCoroutineFromSuspendFunction(completion) {
(this as Function2<R, Continuation<T>, Any?>).invoke(receiver, completion)
}
else
(this.create(receiver, completion) as kotlin.coroutines.jvm.internal.CoroutineImpl).facade
}
/**
* Intercepts this continuation with [ContinuationInterceptor].
*/
@SinceKotlin("1.3")
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
(this as? ContinuationImpl)?.intercepted() ?: this
// INTERNAL DEFINITIONS
private inline fun <T> buildContinuationByInvokeCall(
private inline fun <T> createCoroutineFromSuspendFunction(
completion: Continuation<T>,
crossinline block: () -> Any?
): Continuation<Unit> {
val continuation =
object : Continuation<Unit> {
override val context: CoroutineContext
get() = completion.context
override fun resume(value: Unit) {
processBareContinuationResume(completion, block)
}
override fun resumeWithException(exception: Throwable) {
completion.resumeWithException(exception)
val context = completion.context
return if (context === EmptyCoroutineContext)
object : RestrictedContinuationImpl(completion as Continuation<Any?>) {
override fun invokeSuspend(result: SuccessOrFailure<Any?>): Any? {
result.getOrThrow() // Rethrow exception if trying to start with exception (will be caught by ContinuationImpl.resumeWith
return block() // run the block
}
}
else
object : ContinuationImpl(completion as Continuation<Any?>, context) {
override fun invokeSuspend(result: SuccessOrFailure<Any?>): Any? {
result.getOrThrow() // Rethrow exception if trying to start with exception (will be caught by ContinuationImpl.resumeWith
return block() // run the block
}
}
return kotlin.coroutines.jvm.internal.interceptContinuationIfNeeded(completion.context, continuation)
}
@@ -0,0 +1,117 @@
/*
* Copyright 2010-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license
* that can be found in the license/LICENSE.txt file.
*/
package kotlin.coroutines.jvm.internal
import java.io.Serializable
import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.CoroutineSingletons
import kotlin.jvm.internal.FunctionBase
import kotlin.jvm.internal.Reflection
@SinceKotlin("1.3")
// State machines for named restricted suspend functions extend from this class
internal abstract class RestrictedContinuationImpl protected constructor(
@JvmField
protected val completion: Continuation<Any?>?
) : Continuation<Any?>, Serializable {
public override val context: CoroutineContext
get() = EmptyCoroutineContext
public override fun resumeWith(result: SuccessOrFailure<Any?>) {
try {
val outcome = invokeSuspend(result)
if (outcome === CoroutineSingletons.COROUTINE_SUSPENDED) return
completion!!.resume(outcome)
} catch (exception: Throwable) {
completion!!.resumeWithException(exception)
}
}
protected abstract fun invokeSuspend(result: SuccessOrFailure<Any?>): Any?
}
@SinceKotlin("1.3")
// State machines for named suspend functions extend from this class
internal abstract class ContinuationImpl protected constructor(
completion: Continuation<Any?>?,
private val _context: CoroutineContext?
) : RestrictedContinuationImpl(completion) {
protected constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)
override val context: CoroutineContext
get() = _context!!
@Transient
private var intercepted: Continuation<Any?>? = null
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = this }
public override fun resumeWith(result: SuccessOrFailure<Any?>) {
try {
val outcome = invokeSuspend(result)
if (outcome === CoroutineSingletons.COROUTINE_SUSPENDED) return
disposeIntercepted()
completion!!.resume(outcome)
} catch (exception: Throwable) {
disposeIntercepted()
completion!!.resumeWithException(exception)
}
}
private fun disposeIntercepted() {
val intercepted = intercepted
if (intercepted != null && intercepted != this) {
context[ContinuationInterceptor]!!.disposeContinuation(intercepted)
}
this.intercepted = CompletedContinuation // just in case
}
}
// todo: Do we really need it?
internal object CompletedContinuation : Continuation<Any?> {
override val context: CoroutineContext
get() = error("This continuation is already complete")
override fun resumeWith(result: SuccessOrFailure<Any?>) {
error("This continuation is already complete")
}
}
internal abstract class RestrictedSuspendLambda protected constructor(
private val arity: Int,
completion: Continuation<Any?>?
) : RestrictedContinuationImpl(completion), FunctionBase {
protected constructor(arity: Int) : this(arity, null)
public override fun getArity(): Int = arity
public override fun toString(): String = Reflection.renderLambdaToString(this)
}
internal abstract class SuspendLambda protected constructor(
private val arity: Int,
completion: Continuation<Any?>?
) : ContinuationImpl(completion), FunctionBase {
protected constructor(arity: Int) : this(arity, null)
public override fun getArity(): Int = arity
public override fun toString(): String = Reflection.renderLambdaToString(this)
}
@SinceKotlin("1.3")
internal interface SuspendFunction0 : Function1<Continuation<Any?>, Any?> {
fun create(completion: Continuation<*>): Continuation<Unit>
}
@SinceKotlin("1.3")
internal interface SuspendFunction1 : Function2<Any?, Continuation<Any?>, Any?> {
fun create(receiver: Any?, completion: Continuation<*>): Continuation<Unit>
}
@@ -1,62 +0,0 @@
/*
* Copyright 2010-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license
* that can be found in the license/LICENSE.txt file.
*/
package kotlin.coroutines.jvm.internal
import java.lang.IllegalStateException
import kotlin.coroutines.Continuation
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.processBareContinuationResume
import kotlin.jvm.internal.Lambda
/**
* @suppress
*/
@SinceKotlin("1.3")
public abstract class CoroutineImpl(
arity: Int,
@JvmField
protected var completion: Continuation<Any?>?
) : Lambda(arity), Continuation<Any?> {
// label == -1 when coroutine cannot be started (it is just a factory object) or has already finished execution
// label == 0 in initial part of the coroutine
@JvmField
protected var label: Int = if (completion != null) 0 else -1
private val _context: CoroutineContext? = completion?.context
override val context: CoroutineContext
get() = _context!!
private var _facade: Continuation<Any?>? = null
val facade: Continuation<Any?> get() {
if (_facade == null) _facade = interceptContinuationIfNeeded(_context!!, this)
return _facade!!
}
override fun resume(value: Any?) {
processBareContinuationResume(completion!!) {
doResume(value, null)
}
}
override fun resumeWithException(exception: Throwable) {
processBareContinuationResume(completion!!) {
doResume(null, exception)
}
}
protected abstract fun doResume(data: Any?, exception: Throwable?): Any?
open fun create(completion: Continuation<*>): Continuation<Unit> {
throw IllegalStateException("create(Continuation) has not been overridden")
}
open fun create(value: Any?, completion: Continuation<*>): Continuation<Unit> {
throw IllegalStateException("create(Any?;Continuation) has not been overridden")
}
}
@@ -1,25 +0,0 @@
/*
* Copyright 2010-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license
* that can be found in the license/LICENSE.txt file.
*/
@file:JvmName("CoroutineIntrinsics")
package kotlin.coroutines.jvm.internal
import kotlin.coroutines.Continuation
import kotlin.coroutines.ContinuationInterceptor
import kotlin.coroutines.CoroutineContext
/**
* @suppress
*/
@SinceKotlin("1.3")
public fun <T> normalizeContinuation(continuation: Continuation<T>): Continuation<T> =
(continuation as? CoroutineImpl)?.facade ?: continuation
@SinceKotlin("1.3")
internal fun <T> interceptContinuationIfNeeded(
context: CoroutineContext,
continuation: Continuation<T>
) = context[ContinuationInterceptor]?.interceptContinuation(continuation) ?: continuation
@@ -0,0 +1,113 @@
/*
* Copyright 2010-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license
* that can be found in the license/LICENSE.txt file.
*/
@file:Suppress(
"UNCHECKED_CAST",
"RedundantVisibilityModifier",
"NON_PUBLIC_PRIMARY_CONSTRUCTOR_OF_INLINE_CLASS",
"UNSUPPORTED_FEATURE",
"INVISIBLE_REFERENCE",
"INVISIBLE_MEMBER"
)
package kotlin
import kotlin.internal.InlineOnly
import kotlin.jvm.JvmField
@SinceKotlin("1.3")
public inline class SuccessOrFailure<out T> @PublishedApi internal constructor(private val _value: Any?) {
// discovery
public val isSuccess: Boolean get() = _value !is Failure
public val isFailure: Boolean get() = _value is Failure
// value retrieval
public fun getOrThrow(): T = when (_value) {
is Failure -> throw _value.exception
else -> _value as T
}
public fun getOrNull(): T? = when (_value) {
is Failure -> null
else -> _value as T
}
// exception retrieval
public fun exceptionOrNull(): Throwable? = when (_value) {
is Failure -> _value.exception
else -> null
}
// internal API for inline functions
@PublishedApi internal val exception: Throwable get() = (_value as Failure).exception
@PublishedApi internal val value: T get() = _value as T
// companion with constructors
public companion object {
@InlineOnly public inline fun <T> success(value: T): SuccessOrFailure<T> =
SuccessOrFailure(value)
@InlineOnly public inline fun <T> failure(exception: Throwable): SuccessOrFailure<T> =
SuccessOrFailure<T>(Failure(exception))
}
}
// top-Level internal failure-marker class
// todo: maybe move it to another kotlin.internal package?
@SinceKotlin("1.3")
@PublishedApi
internal class Failure @PublishedApi internal constructor(
@JvmField
val exception: Throwable
) : Serializable
@SinceKotlin("1.3")
@InlineOnly public inline fun <T> runOrCatch(block: () -> T): SuccessOrFailure<T> =
try {
SuccessOrFailure.success(block())
} catch (e: Throwable) {
SuccessOrFailure.failure(e)
}
// -- extensions ---
@SinceKotlin("1.3")
@InlineOnly public inline fun <R, T : R> SuccessOrFailure<T>.getOrElse(default: () -> R): R = when {
isFailure -> default()
else -> value
}
// transformation
@SinceKotlin("1.3")
@InlineOnly public inline fun <U, T> SuccessOrFailure<T>.map(block: (T) -> U): SuccessOrFailure<U> =
if (isFailure) this as SuccessOrFailure<U>
else runOrCatch { block(value) }
@SinceKotlin("1.3")
@InlineOnly public inline fun <U, T: U> SuccessOrFailure<T>.handle(block: (Throwable) -> U): SuccessOrFailure<U> =
if (isFailure) runOrCatch { block(exception) }
else this
// "peek" onto value/exception and pipe
@SinceKotlin("1.3")
@InlineOnly public inline fun <T> SuccessOrFailure<T>.onFailure(block: (Throwable) -> Unit): SuccessOrFailure<T> {
if (isFailure) block(exception)
return this
}
@SinceKotlin("1.3")
@InlineOnly public inline fun <T> SuccessOrFailure<T>.onSuccess(block: (T) -> Unit): SuccessOrFailure<T> {
if (isSuccess) block(value)
return this
}
// -------------------
@@ -4,41 +4,67 @@
*/
@file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
@file:kotlin.jvm.JvmName("CoroutinesKt")
package kotlin.coroutines
import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED
import kotlin.coroutines.intrinsics.createCoroutineUnchecked
import kotlin.coroutines.intrinsics.suspendCoroutineOrReturn
import kotlin.coroutines.intrinsics.*
import kotlin.internal.InlineOnly
/**
* Starts coroutine with receiver type [R] and result type [T].
* This function creates and start a new, fresh instance of suspendable computation every time it is invoked.
* The [completion] continuation is invoked when coroutine completes with result or exception.
* Interface representing a continuation after a suspension point that returns value of type `T`.
*/
@SinceKotlin("1.3")
@Suppress("UNCHECKED_CAST")
public fun <R, T> (suspend R.() -> T).startCoroutine(
receiver: R,
completion: Continuation<T>
) {
createCoroutineUnchecked(receiver, completion).resume(Unit)
public interface Continuation<in T> {
/**
* Context of the coroutine that corresponds to this continuation.
*/
// todo: shall we provide default impl with EmptyCoroutineContext?
public val context: CoroutineContext
/**
* Resumes the execution of the corresponding coroutine passing successful or failed [result] as the
* return value of the last suspension point.
*/
public fun resumeWith(result: SuccessOrFailure<T>)
}
/**
* Starts coroutine without receiver and with result type [T].
* This function creates and start a new, fresh instance of suspendable computation every time it is invoked.
* Classes and interfaces marked with this annotation are restricted when used as receivers for extension
* `suspend` functions. These `suspend` extensions can only invoke other member or extension `suspend` functions on this particular
* receiver only and are restricted from calling arbitrary suspension functions.
*/
@SinceKotlin("1.3")
@Target(AnnotationTarget.CLASS)
@Retention(AnnotationRetention.BINARY)
public annotation class RestrictsSuspension
/**
* Resumes the execution of the corresponding coroutine passing [value] as the return value of the last suspension point.
*/
@InlineOnly public inline fun <T> Continuation<T>.resume(value: T): Unit =
resumeWith(SuccessOrFailure.success(value))
/**
* Resumes the execution of the corresponding coroutine so that the [exception] is re-thrown right after the
* last suspension point.
*/
@InlineOnly public inline fun <T> Continuation<T>.resumeWithException(exception: Throwable): Unit =
resumeWith(SuccessOrFailure.failure(exception))
/**
* Creates a coroutine without receiver and with result type [T].
* This function creates a new, fresh instance of suspendable computation every time it is invoked.
*
* To start executing the created coroutine, invoke `resume(Unit)` on the returned [Continuation] instance.
* The [completion] continuation is invoked when coroutine completes with result or exception.
* Repeated invocation of any resume function on the resulting continuation produces [IllegalStateException].
*/
@SinceKotlin("1.3")
@Suppress("UNCHECKED_CAST")
public fun <T> (suspend () -> T).startCoroutine(
public fun <T> (suspend () -> T).createCoroutine(
completion: Continuation<T>
) {
createCoroutineUnchecked(completion).resume(Unit)
}
): Continuation<Unit> =
SafeContinuation(createCoroutineUnintercepted(completion).intercepted(), COROUTINE_SUSPENDED)
/**
* Creates a coroutine with receiver type [R] and result type [T].
@@ -53,21 +79,35 @@ public fun <T> (suspend () -> T).startCoroutine(
public fun <R, T> (suspend R.() -> T).createCoroutine(
receiver: R,
completion: Continuation<T>
): Continuation<Unit> = SafeContinuation(createCoroutineUnchecked(receiver, completion), COROUTINE_SUSPENDED)
): Continuation<Unit> =
SafeContinuation(createCoroutineUnintercepted(receiver, completion).intercepted(), COROUTINE_SUSPENDED)
/**
* Creates a coroutine without receiver and with result type [T].
* This function creates a new, fresh instance of suspendable computation every time it is invoked.
*
* To start executing the created coroutine, invoke `resume(Unit)` on the returned [Continuation] instance.
* Starts coroutine without receiver and with result type [T].
* This function creates and start a new, fresh instance of suspendable computation every time it is invoked.
* The [completion] continuation is invoked when coroutine completes with result or exception.
* Repeated invocation of any resume function on the resulting continuation produces [IllegalStateException].
*/
@SinceKotlin("1.3")
@Suppress("UNCHECKED_CAST")
public fun <T> (suspend () -> T).createCoroutine(
public fun <T> (suspend () -> T).startCoroutine(
completion: Continuation<T>
): Continuation<Unit> = SafeContinuation(createCoroutineUnchecked(completion), COROUTINE_SUSPENDED)
) {
createCoroutineUnintercepted(completion).intercepted().resume(Unit)
}
/**
* Starts coroutine with receiver type [R] and result type [T].
* This function creates and start a new, fresh instance of suspendable computation every time it is invoked.
* The [completion] continuation is invoked when coroutine completes with result or exception.
*/
@SinceKotlin("1.3")
@Suppress("UNCHECKED_CAST")
public fun <R, T> (suspend R.() -> T).startCoroutine(
receiver: R,
completion: Continuation<T>
) {
createCoroutineUnintercepted(receiver, completion).intercepted().resume(Unit)
}
/**
* Obtains the current continuation instance inside suspend functions and suspends
@@ -78,11 +118,12 @@ public fun <T> (suspend () -> T).createCoroutine(
* from a different thread of execution. Repeated invocation of any resume function produces [IllegalStateException].
*/
@SinceKotlin("1.3")
@InlineOnly
public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T =
suspendCoroutineOrReturn { c: Continuation<T> ->
val safe = SafeContinuation(c)
suspendCoroutineUninterceptedOrReturn { c: Continuation<T> ->
val safe = SafeContinuation(c.intercepted())
block(safe)
safe.getResult()
safe.getOrThrow()
}
/**
@@ -100,19 +141,3 @@ public suspend inline val coroutineContext: CoroutineContext
get() {
throw NotImplementedError("Implemented as intrinsic")
}
// INTERNAL DECLARATIONS
@SinceKotlin("1.3")
@kotlin.internal.InlineOnly
internal inline fun processBareContinuationResume(completion: Continuation<*>, block: () -> Any?) {
try {
val result = block()
if (result !== COROUTINE_SUSPENDED) {
@Suppress("UNCHECKED_CAST")
(completion as Continuation<Any?>).resume(result)
}
} catch (t: Throwable) {
completion.resumeWithException(t)
}
}
@@ -22,9 +22,22 @@ public interface ContinuationInterceptor : CoroutineContext.Element {
* This function is invoked by coroutines framework when needed and the resulting continuations are
* cached internally per each instance of the original [continuation].
*
* By convention, implementations that install themselves as *the* interceptor in the context with
* the [Key] shall also scan the context for other element that implement [ContinuationInterceptor] interface
* and use their [interceptContinuation] functions, too.
* This function may simply return `this` if it does not want to intercept this particular continuation.
*
* When the original [continuation] completes coroutine framework invokes [disposeContinuation]
* with the resulting continuation if it was intercepted.
*/
public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
/**
* Invoked for the continuation instance returned by [interceptContinuation] when the original
* continuation completes and will not be used anymore.
*
* Default implementation does nothing.
*
* @param continuation Continuation instance returned by this interceptor's [interceptContinuation] invocation.
*/
public fun disposeContinuation(continuation: Continuation<*>) {
/* do nothing by default */
}
}
@@ -51,6 +51,12 @@ public interface CoroutineContext {
*/
public fun minusKey(key: Key<*>): CoroutineContext
/**
* Key for the elements of [CoroutineContext]. [E] is a type of element with this key.
* Keys in the context are compared _by reference_.
*/
public interface Key<E : Element>
/**
* An element of the [CoroutineContext]. An element of the coroutine context is a singleton context by itself.
*/
@@ -70,10 +76,4 @@ public interface CoroutineContext {
public override fun minusKey(key: Key<*>): CoroutineContext =
if (this.key === key) EmptyCoroutineContext else this
}
/**
* Key for the elements of [CoroutineContext]. [E] is a type of element with this key.
* Keys in the context are compared _by reference_.
*/
public interface Key<E : Element>
}
@@ -3,9 +3,12 @@
* that can be found in the license/LICENSE.txt file.
*/
@file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
package kotlin.coroutines
import kotlin.coroutines.CoroutineContext.*
import kotlin.io.Serializable
/**
* Base class for [CoroutineContext.Element] implementations.
@@ -17,7 +20,10 @@ public abstract class AbstractCoroutineContextElement(public override val key: K
* An empty coroutine context.
*/
@SinceKotlin("1.3")
public object EmptyCoroutineContext : CoroutineContext {
public object EmptyCoroutineContext : CoroutineContext, Serializable {
private const val serialVersionUID: Long = 0
private fun readResolve(): Any = this
public override fun <E : Element> get(key: Key<E>): E? = null
public override fun <R> fold(initial: R, operation: (R, Element) -> R): R = initial
public override fun plus(context: CoroutineContext): CoroutineContext = context
@@ -31,7 +37,11 @@ public object EmptyCoroutineContext : CoroutineContext {
// this class is not exposed, but is hidden inside implementations
// this is a left-biased list, so that `plus` works naturally
@SinceKotlin("1.3")
internal class CombinedContext(val left: CoroutineContext, val element: Element) : CoroutineContext {
internal class CombinedContext(
private val left: CoroutineContext,
private val element: Element
) : CoroutineContext, Serializable {
override fun <E : Element> get(key: Key<E>): E? {
var cur = this
while (true) {
@@ -58,8 +68,14 @@ internal class CombinedContext(val left: CoroutineContext, val element: Element)
}
}
private fun size(): Int =
if (left is CombinedContext) left.size() + 1 else 2
private fun size(): Int {
var cur = this
var size = 2
while (true) {
cur = cur.left as? CombinedContext ?: return size
size++
}
}
private fun contains(element: Element): Boolean =
get(element.key) == element
@@ -84,6 +100,22 @@ internal class CombinedContext(val left: CoroutineContext, val element: Element)
override fun toString(): String =
"[" + fold("") { acc, element ->
if (acc.isEmpty()) element.toString() else acc + ", " + element
if (acc.isEmpty()) element.toString() else "$acc, $element"
} + "]"
private fun writeReplace(): Any {
val list = ArrayList<CoroutineContext>(size())
fold(Unit) { _, element ->
if (element is Serializable) list += element
}
return Serialized(list.toTypedArray())
}
private class Serialized(val elements: Array<CoroutineContext>) : Serializable {
companion object {
private const val serialVersionUID: Long = 0L
}
private fun readResolve(): Any = elements.fold(EmptyCoroutineContext, CoroutineContext::plus)
}
}
@@ -1,38 +0,0 @@
/*
* Copyright 2010-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license
* that can be found in the license/LICENSE.txt file.
*/
package kotlin.coroutines
/**
* Interface representing a continuation after a suspension point that returns value of type `T`.
*/
@SinceKotlin("1.3")
public interface Continuation<in T> {
/**
* Context of the coroutine that corresponds to this continuation.
*/
public val context: CoroutineContext
/**
* Resumes the execution of the corresponding coroutine passing [value] as the return value of the last suspension point.
*/
public fun resume(value: T)
/**
* Resumes the execution of the corresponding coroutine so that the [exception] is re-thrown right after the
* last suspension point.
*/
public fun resumeWithException(exception: Throwable)
}
/**
* Classes and interfaces marked with this annotation are restricted when used as receivers for extension
* `suspend` functions. These `suspend` extensions can only invoke other member or extension `suspend` functions on this particular
* receiver only and are restricted from calling arbitrary suspension functions.
*/
@SinceKotlin("1.3")
@Target(AnnotationTarget.CLASS)
@Retention(AnnotationRetention.BINARY)
public annotation class RestrictsSuspension
@@ -10,6 +10,7 @@
package kotlin.coroutines.intrinsics
import kotlin.coroutines.*
import kotlin.internal.InlineOnly
/**
* Obtains the current continuation instance inside suspend functions and either suspends
@@ -29,8 +30,9 @@ import kotlin.coroutines.*
* continuation instance.
*/
@SinceKotlin("1.3")
@kotlin.internal.InlineOnly
@InlineOnly
@Suppress("UNUSED_PARAMETER")
// todo: Drop this function
public suspend inline fun <T> suspendCoroutineOrReturn(crossinline block: (Continuation<T>) -> Any?): T =
suspendCoroutineUninterceptedOrReturn { cont -> block(cont.intercepted()) }
@@ -38,25 +40,35 @@ public suspend inline fun <T> suspendCoroutineOrReturn(crossinline block: (Conti
* Obtains the current continuation instance inside suspend functions and either suspends
* currently running coroutine or returns result immediately without suspension.
*
* Unlike [suspendCoroutineOrReturn] it does not intercept continuation.
* If the [block] returns the special [COROUTINE_SUSPENDED] value, it means that suspend function did suspend the execution and will
* not return any result immediately. In this case, the [Continuation] provided to the [block] shall be
* resumed by invoking [Continuation.resumeWith] at some moment in the
* future when the result becomes available to resume the computation.
*
* Otherwise, the return value of the [block] must have a type assignable to [T] and represents the result of this suspend function.
* It means that the execution was not suspended and the [Continuation] provided to the [block] shall not be invoked.
* As the result type of the [block] is declared as `Any?` and cannot be correctly type-checked,
* its proper return type remains on the conscience of the suspend function's author.
*
* Invocation of [Continuation.resumeWith] resumes coroutine directly in the invoker's thread without going through the
* [ContinuationInterceptor] that might be present in the coroutine's [CoroutineContext].
* It is invoker's responsibility to ensure that the proper invocation context is established.
* [Continuation.intercepted] can be used to acquire the intercepted continuation.
*
* Note that it is not recommended to call either [Continuation.resume] nor [Continuation.resumeWithException] functions synchronously
* in the same stackframe where suspension function is run. Use [suspendCoroutine] as a safer way to obtain current
* continuation instance.
*/
@SinceKotlin("1.3")
@kotlin.internal.InlineOnly
@InlineOnly
public suspend inline fun <T> suspendCoroutineUninterceptedOrReturn(crossinline block: (Continuation<T>) -> Any?): T =
throw NotImplementedError("Implementation of suspendCoroutineUninterceptedOrReturn is intrinsic")
/**
* Intercept continuation with [ContinuationInterceptor].
*/
@SinceKotlin("1.3")
@kotlin.internal.InlineOnly
public inline fun <T> Continuation<T>.intercepted(): Continuation<T> =
throw NotImplementedError("Implementation of intercepted is intrinsic")
/**
* This value is used as a return value of [suspendCoroutineOrReturn] `block` argument to state that
* the execution was suspended and will not return any result immediately.
*/
@SinceKotlin("1.3")
public val COROUTINE_SUSPENDED: Any = Any()
public val COROUTINE_SUSPENDED: Any = CoroutineSingletons.COROUTINE_SUSPENDED
internal enum class CoroutineSingletons { COROUTINE_SUSPENDED, UNDECIDED, RESUMED }
@@ -32,7 +32,7 @@ public fun <T> buildSequence(builderAction: suspend SequenceBuilder<T>.() -> Uni
@SinceKotlin("1.3")
public fun <T> buildIterator(builderAction: suspend SequenceBuilder<T>.() -> Unit): Iterator<T> {
val iterator = SequenceBuilderIterator<T>()
iterator.nextStep = builderAction.createCoroutineUnchecked(receiver = iterator, completion = iterator)
iterator.nextStep = builderAction.createCoroutineUnintercepted(receiver = iterator, completion = iterator)
return iterator
}
@@ -152,34 +152,31 @@ private class SequenceBuilderIterator<T> : SequenceBuilder<T>(), Iterator<T>, Co
}
suspend override fun yield(value: T) {
override suspend fun yield(value: T) {
nextValue = value
state = State_Ready
return suspendCoroutineOrReturn { c ->
return suspendCoroutineUninterceptedOrReturn { c ->
nextStep = c
COROUTINE_SUSPENDED
}
}
suspend override fun yieldAll(iterator: Iterator<T>) {
override suspend fun yieldAll(iterator: Iterator<T>) {
if (!iterator.hasNext()) return
nextIterator = iterator
state = State_ManyReady
return suspendCoroutineOrReturn { c ->
return suspendCoroutineUninterceptedOrReturn { c ->
nextStep = c
COROUTINE_SUSPENDED
}
}
// Completion continuation implementation
override fun resume(value: Unit) {
override fun resumeWith(result: SuccessOrFailure<Unit>) {
result.getOrThrow() // just rethrow exception if it is there
state = State_Done
}
override fun resumeWithException(exception: Throwable) {
throw exception // just rethrow
}
override val context: CoroutineContext
get() = EmptyCoroutineContext
}
@@ -69,6 +69,11 @@ public class Reflection {
return factory.renderLambdaToString(lambda);
}
@SinceKotlin(version = "1.3")
public static String renderLambdaToString(FunctionBase lambda) {
return factory.renderLambdaToString(lambda);
}
// Functions
public static KFunction function(FunctionReference f) {
@@ -33,6 +33,11 @@ public class ReflectionFactory {
@SinceKotlin(version = "1.1")
public String renderLambdaToString(Lambda lambda) {
return renderLambdaToString((FunctionBase) lambda);
}
@SinceKotlin(version = "1.3")
public String renderLambdaToString(FunctionBase lambda) {
String result = lambda.getClass().getGenericInterfaces()[0].toString();
return result.startsWith(KOTLIN_JVM_FUNCTIONS) ? result.substring(KOTLIN_JVM_FUNCTIONS.length()) : result;
}