协程咖啡厅 · 构造魔法
协程为 Kotlin 提供了更优雅的异步编程方式,但由于 Jvm 本身并没有提供协程支持,因此 Kotlin/Jvm 中的协程仿佛如魔法一般的存在。
这一次我们将对协程的实现原理进行解析。
挂起函数
函数声明转换
Kotlin 从语言层面提供了 suspend 关键字,用于表示该函数为挂起函数,但在 Jvm 中并没有可挂起的概念。
以下面的一个挂起函数声明为例:
suspend fun suspendingFunction(a: Int, vararg b: String)
如果通过 IDEA 将其编译的 Class 文件进行反编译,可以得到一个与其 Jvm 实现等价的 Java 方法声明:
public static final Object suspendingFunction(int a, @NotNull String[] b, @NotNull Continuation $completion)
挂起函数的 suspend 关键字转换成了一个额外的参数,其类型为 Continuation。这一操作是 Kotlin 编译器在编译期执行的,称为 CPS(Continuation-Passing Style,续体传递风格)转换,而在下一步分析协程实现原理之前,需要先对 “续体” 这一概念进行说明。
续体
采用回调的方法获取耗时操作的结果一般会类似如下例子中的形式:
request(arguments) { result ->
runTask(result)
}
在上述例子中,Lambda 作为这一代码片段中 request() 函数执行完毕的后续操作,这一部分被称为 续体(Continuation)。
协程中同样拥有续体的概念,Kotlin 中的定义为 挂起点之后的剩余应执行的代码,如在以下例子中,runTask() 便作为挂起函数 request() 这一挂起点的续体。
val result = request(arguments)
runTask(result)
Continuation 便是续体在 Kotlin 协程中的表现形式,它是一个用于回调的接口。而 CPS 转换,在命名上听起来非常高深,但本质上即将协程代码转换为等价的回调形式,如前文的 suspendingFunction() 函数声明会被编译器转换为以下形式:
fun suspendingFunction(a: Int, vararg b: String, $completion: Continuation<Unit>): Any?
就这?
仿佛一切魔法都被打破一般,协程的神奇之处仅此而已?但是,如果协程仅被单纯地转换为回调的形式,会有一个严重的问题:
如果每个续体都要创建一个类,那该如何解决类加载的大量资源损耗?
状态机
我们用上文提及的方法,看看下面这段代码的 Jvm 实现是什么:
suspend fun suspendingFunction(argument: String): Boolean {
val result = mockRequest(argument)
return result == 0
}
suspend fun mockRequest(argument: String): Int {
delay(1000)
return argument.length;
}
这里以 suspendingFunction() 为例进行分析,转换所得的 Java 代码如下:
@Nullable
public static final Object suspendingFunction(@NotNull String argument, @NotNull Continuation $completion) {
Object $continuation;
label25: {
if ($completion instanceof <undefinedtype>) {
$continuation = (<undefinedtype>)$completion;
if ((((<undefinedtype>)$continuation).label & Integer.MIN_VALUE) != 0) {
((<undefinedtype>)$continuation).label -= Integer.MIN_VALUE;
break label25;
}
}
$continuation = new ContinuationImpl($completion) {
// $FF: synthetic field
Object result;
int label;
Object L$0;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
this.result = $result;
this.label |= Integer.MIN_VALUE;
return SampleKt.suspendingFunction((String)null, this);
}
};
}
Object $result = ((<undefinedtype>)$continuation).result;
Object var5 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
Object var10000;
switch(((<undefinedtype>)$continuation).label) {
case 0:
ResultKt.throwOnFailure($result);
((<undefinedtype>)$continuation).L$0 = argument;
((<undefinedtype>)$continuation).label = 1;
var10000 = mockRequest(argument, (Continuation)$continuation);
if (var10000 == var5) {
return var5;
}
break;
case 1:
argument = (String)((<undefinedtype>)$continuation).L$0;
ResultKt.throwOnFailure($result);
var10000 = $result;
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
int result = ((Number)var10000).intValue();
return Boxing.boxBoolean(result == 0);
}
这段 Java 代码还存在一些转换问题,因为 Kotlin 虽然生成的字节码符合 Jvm 规范,但不一定可以用 Java 生成同样的字节码。在结合字节码实际内容,并将生成代码的命名替换为可读性更强的形式之后,可以得出与 Jvm 实现等价的 Kotlin 代码。
fun suspendingFunction(argument: String, completion: Continuation): Any? {
val continuation: Continuation =
if (completion is SuspendingFunctionStateMachine) completion.apply {
if ((label or Int.MIN_VALUE) != 0) label -= Int.MIN_VALUE
} else SuspendingFunctionStateMachine(completion)
val result = continuation.result
val resultTemp: Any? = null
when (continuation.label) {
0 -> run {
throwOnFailure(result)
continuation.argumentState = argument
continuation.label = 1
resultTemp = mockRequest(argument, continuation)
if (resultTemp == COROUTINE_SUSPENDED) return COROUTINE_SUSPENDED
}
1 -> run {
argument = continuation.argumentState
throwOnFailure(result)
resultTemp = result
}
else -> throw IllegalStateException("call to 'resume' before 'invoke' with coroutine")
}
return (resultTemp as Int) == 0
}
class SuspendingFunctionStateMachine(
completion: Continuation
) : ContinuationImpl(completion) {
var result: Any? = null
var label: Int = 0
var argumentState: Any? = null
override fun invokeSuspend(result: Any): Any? {
this.result = result
this.label = (this.label or Int.MIN_VALUE)
return suspendingFunction(null, this)
}
}
可能你会惊讶仅两行代码的挂起函数转换之后包含这么多内容,不必担心,整段代码可以拆分成两个部分进行解析,先来看看第一部分。
fun suspendingFunction(argument: String, completion: Continuation): Any? {
val continuation: Continuation =
if (completion is SuspendingFunctionStateMachine) completion.apply {
if ((label or Int.MIN_VALUE) != 0) label -= Int.MIN_VALUE
} else SuspendingFunctionStateMachine(completion)
// ..
}
class SuspendingFunctionStateMachine(
completion: Continuation
) : ContinuationImpl(completion) {
var result: Any? = null
var label: Int = 0
var argumentState: Any? = null
override fun invokeSuspend(result: Any): Any? {
this.result = result
this.label = (this.label or Int.MIN_VALUE)
return suspendingFunction(null, this)
}
}
这里与一般的回调相比会略显奇特,回调部分会调用函数自身,在形式上更类似于递归。这一部分的代码便是续体的构建过程:如果该函数被上层函数所调用,则构建一个持有上层函数传入续体的新续体(这里是一个重点,但在此先不深入了解 ContinuationImpl 的实现);如果是被自己的续体所调用,则直接在续体上进行修改操作即可。
接下来便是第二部分,也是挂起函数实现的重点。
fun suspendingFunction(argument: String, completion: Continuation): Any? {
// ..
val result = continuation.result
val resultTemp: Any? = null
when (continuation.label) {
0 -> run {
throwOnFailure(result)
continuation.argumentState = argument
continuation.label = 1
resultTemp = mockRequest(argument, continuation)
if (resultTemp == COROUTINE_SUSPENDED) return COROUTINE_SUSPENDED
}
1 -> run {
argument = continuation.argumentState
throwOnFailure(result)
resultTemp = result
}
else -> throw IllegalStateException("call to 'resume' before 'invoke' with coroutine")
}
return (resultTemp as Int) == 0
}
函数中原本的多个原子操作被拆分成了 when 语句的多个分支,通过续体中的标签对分支进行选择,同时续体对局部变量的值进行了保存与恢复(在一定程度上类似于操作系统中进程调度时对进程状态的暂存)。可见,函数与续体构成了一个有限状态机(FSM,即 Finite-State Machine),至此,协程精妙地解决了创建回调类过多的问题。
经过 CPS 转换后,挂起函数的返回值发生了变化,在状态机中可以看到,返回值新增了一个类型 COROUTINE_SUSPENDED,用于表示函数当前处于挂起状态,同时这个状态也会层层上传,直到最顶层的挂起函数。
上述的 Kotlin 代码应当作伪代码,因为在续体内调用函数的过程中传入了一个 null,而该参数是非空的,这是因为 Kotlin/Jvm 中的非空实现是
Intrinsics.checkParameterIsNotNull()函数进行了断言,而 CPS 的过程中并没有这一步操作,续体中对函数的调用仅仅是为了运行下一个状态,不需要原本的参数。
创建协程
上一部分完成对 Jvm 中 “不存在的事物” 的解构,所以协程最接近魔法的一层已经不复存在,但现在疑问反而更多了
-
挂起函数只能在挂起函数中调用,因为挂起函数会获取到上层传入的续体,并在自己的续体中持有,但最上层的续体是从哪来的?
-
谁调用了续体?
不过,如果进行一定的思考,做出一些合理的假设,之后的解析再进行证明或推翻,可以帮助理清思路。通过 CPS 转换的结果,可以大致猜测:
-
ContinuationImpl.invokeSuspend()的返回值如果不是COROUTINE_SUSPENDED,即挂起函数执行完毕不再挂起,则续体将调用自己持有的上层续体,并将ContinuationImpl.invokeSuspend()的返回结果传给上层,从而做到下层挂起函数执行完成后唤醒上层挂起函数。 -
因为
withContext()等线程切换相关的函数存在,续体中会持有关于线程调度相关的元素,比如协程调度器 Dispatcher。
打起精神,解析上述问题将会进入协程解析中最核心的部分。
回看续体
在此之前先来看看 Continuation。
/**
* Interface representing a continuation after a suspension point that returns a value of type `T`.
*/
@SinceKotlin("1.3")
public interface Continuation<in T> {
/**
* The context of the coroutine that corresponds to this continuation.
*/
public val context: CoroutineContext
/**
* Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
* return value of the last suspension point.
*/
public fun resumeWith(result: Result<T>)
}
context 便是协程上下文,CoroutineContext 是一个接口,根据我们的假设可以大致猜测与线程调度相关。
而 resumeWith() 便是续体的回调函数了,与我们所假设的 invokeSuspend() 有些出入,可以猜测在 resumeWith() 会调用到 invokeSuspend()。继续查看可以发现这个猜测是正确的。
@SinceKotlin("1.3")
internal abstract class BaseContinuationImpl(
// This is `public val` so that it is private on JVM and cannot be modified by untrusted code, yet
// it has a public getter (since even untrusted code is allowed to inspect its call stack).
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
// This implementation is final. This fact is used to unroll resumeWith recursion.
public final override fun resumeWith(result: Result<Any?>) {
// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
var current = this
var param = result
while (true) {
// Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
// can precisely track what part of suspended callstack was already resumed
probeCoroutineResumed(current)
with(current) {
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result<Any?> =
try {
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
// unrolling recursion via loop
current = completion
param = outcome
} else {
// top-level completion reached -- invoke and return
completion.resumeWith(outcome)
return
}
}
}
}
protected abstract fun invokeSuspend(result: Result<Any?>): Any?
protected open fun releaseIntercepted() {
// does nothing here, overridden in ContinuationImpl
}
// ..
}
@SinceKotlin("1.3")
// State machines for named suspend functions extend from this class
internal abstract class ContinuationImpl(
completion: Continuation<Any?>?,
private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)
public override val context: CoroutineContext
get() = _context!!
@Transient
private var intercepted: Continuation<Any?>? = null
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
protected override fun releaseIntercepted() {
val intercepted = intercepted
if (intercepted != null && intercepted !== this) {
context[ContinuationInterceptor]!!.releaseInterceptedContinuation(intercepted)
}
this.intercepted = CompletedContinuation // just in case
}
}
在 BaseContinuationImpl.resumeWith() 中可见,当 invokeSuspend() 返回的结果不再是 COROUTINE_SUSPENDED 时,如果上层续体是由挂起函数生成的,则其 invokeSuspend() 会被调用。
注意,是 invokeSuspend() 被调用,而不是 resumeWith() 被调用。
代码中存在一个 while (true) 循环,循环获取上层续体并执行其 invokeSuspend(),直至出现挂起。之所以不直接采用类似递归的调用形式,是为了避免和递归类似的问题 —— 调用栈过深。
这一问题在代码注释中也提及到。
This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
在代码中还出现了 intercepted() 与 releaseIntercepted() 两个函数,releaseIntercepted() 的调用时机在 invokeSuspend() 返回真正的结果,可作出假设,存在一个续体拦截机制,这两个函数用于在该机制下,用于对续体作进一步处理。
续体拦截机制暂且不进行展开,先来看看协程是如何被启动的。
启动协程
通过 CoroutineScope.launch() 函数可以启动一个协程,并在协程对应的调度器中执行一个可挂起的扩展函数
/**
* Launches a new coroutine without blocking the current thread and returns a reference to the coroutine as a [Job].
* The coroutine is cancelled when the resulting job is [cancelled][Job.cancel].
*
* The coroutine context is inherited from a [CoroutineScope]. Additional context elements can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
* The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
* with a corresponding [context] element.
*
* By default, the coroutine is immediately scheduled for execution.
* Other start options can be specified via `start` parameter. See [CoroutineStart] for details.
* An optional [start] parameter can be set to [CoroutineStart.LAZY] to start coroutine _lazily_. In this case,
* the coroutine [Job] is created in _new_ state. It can be explicitly started with [start][Job.start] function
* and will be started implicitly on the first invocation of [join][Job.join].
*
* Uncaught exceptions in this coroutine cancel the parent job in the context by default
* (unless [CoroutineExceptionHandler] is explicitly specified), which means that when `launch` is used with
* the context of another coroutine, then any uncaught exception leads to the cancellation of the parent coroutine.
*
* See [newCoroutineContext] for a description of debugging facilities that are available for a newly created coroutine.
*
* @param context additional to [CoroutineScope.coroutineContext] context of the coroutine.
* @param start coroutine start option. The default value is [CoroutineStart.DEFAULT].
* @param block the coroutine code which will be invoked in the context of the provided scope.
**/
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
如果暂且不考虑更具体的实现,我们可大致得出,函数创建了一个协程,启动之后将其返回,而且协程实现了 Job 接口,查看协程的代码实现,可以惊喜地发现:最顶层的续体实现就是协程自身
private open class StandaloneCoroutine(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active) {
// ..
}
/**
* Abstract base class for implementation of coroutines in coroutine builders.
*
* This class implements completion [Continuation], [Job], and [CoroutineScope] interfaces.
* It stores the result of continuation in the state of the job.
* This coroutine waits for children coroutines to finish before completing and
* fails through an intermediate _failing_ state.
*
* The following methods are available for override:
*
* * [onStart] is invoked when the coroutine was created in non-active state and is being [started][Job.start].
* * [onCancelling] is invoked as soon as the coroutine starts being cancelled for any reason (or completes).
* * [onCompleted] is invoked when the coroutine completes with a value.
* * [onCancelled] in invoked when the coroutine completes with an exception (cancelled).
*
* @param parentContext the context of the parent coroutine.
* @param active when `true` (by default), the coroutine is created in the _active_ state, otherwise it is created in the _new_ state.
* See [Job] for details.
*
* @suppress **This an internal API and should not be used from general code.**
*/
@InternalCoroutinesApi
public abstract class AbstractCoroutine<in T>(
/**
* The context of the parent coroutine.
*/
@JvmField
protected val parentContext: CoroutineContext,
active: Boolean = true
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
// ..
/**
* Starts this coroutine with the given code [block] and [start] strategy.
* This function shall be invoked at most once on this coroutine.
*
* First, this function initializes parent job from the `parentContext` of this coroutine that was passed to it
* during construction. Second, it starts the coroutine based on [start] parameter:
*
* * [DEFAULT] uses [startCoroutineCancellable].
* * [ATOMIC] uses [startCoroutine].
* * [UNDISPATCHED] uses [startCoroutineUndispatched].
* * [LAZY] does nothing.
*/
public fun start(start: CoroutineStart, block: suspend () -> T) {
initParentJob()
start(block, this)
}
/**
* Starts this coroutine with the given code [block] and [start] strategy.
* This function shall be invoked at most once on this coroutine.
*
* First, this function initializes parent job from the `parentContext` of this coroutine that was passed to it
* during construction. Second, it starts the coroutine based on [start] parameter:
*
* * [DEFAULT] uses [startCoroutineCancellable].
* * [ATOMIC] uses [startCoroutine].
* * [UNDISPATCHED] uses [startCoroutineUndispatched].
* * [LAZY] does nothing.
*/
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
initParentJob()
start(block, receiver, this)
}
}
源码中涉及了大量的扩展函数与操作符重载,
start()是CoroutineStart.invoke()操作符重载的形式。
为了之后的思路不至于乱成线球,在这里强调上文代码中推断所得的结论:
在语言层面中,协程 Coroutine 是续体 Continuation 的一个实现,协程创建完毕之后将通过 CoroutineStart.invoke() 函数,让挂起函数运行,并将协程作为最顶层的续体进行传递
以默认的 CoroutineStart.DEFAULT 为例,继续解析
/**
* Defines start options for coroutines builders.
* It is used in `start` parameter of [launch][CoroutineScope.launch], [async][CoroutineScope.async], and other coroutine builder functions.
*
* The summary of coroutine start options is:
* * [DEFAULT] -- immediately schedules coroutine for execution according to its context;
* * [LAZY] -- starts coroutine lazily, only when it is needed;
* * [ATOMIC] -- atomically (in a non-cancellable way) schedules coroutine for execution according to its context;
* * [UNDISPATCHED] -- immediately executes coroutine until its first suspension point _in the current thread_.
*/
public enum class CoroutineStart {
// ..
/**
* Starts the corresponding block as a coroutine with this coroutine's start strategy.
*
* * [DEFAULT] uses [startCoroutineCancellable].
* * [ATOMIC] uses [startCoroutine].
* * [UNDISPATCHED] uses [startCoroutineUndispatched].
* * [LAZY] does nothing.
*
* @suppress **This an internal API and should not be used from general code.**
*/
@InternalCoroutinesApi
public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>) =
when (this) {
CoroutineStart.DEFAULT -> block.startCoroutineCancellable(completion)
// ..
}
/**
* Starts the corresponding block with receiver as a coroutine with this coroutine start strategy.
*
* * [DEFAULT] uses [startCoroutineCancellable].
* * [ATOMIC] uses [startCoroutine].
* * [UNDISPATCHED] uses [startCoroutineUndispatched].
* * [LAZY] does nothing.
*
* @suppress **This an internal API and should not be used from general code.**
*/
@InternalCoroutinesApi
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>) =
when (this) {
CoroutineStart.DEFAULT -> block.startCoroutineCancellable(receiver, completion)
// ..
}
/**
* Returns `true` when [LAZY].
*
* @suppress **This an internal API and should not be used from general code.**
*/
@InternalCoroutinesApi
public val isLazy: Boolean get() = this === LAZY
}
再看 startCoroutineCancellable()。
/**
* Use this function to start coroutine in a cancellable way, so that it can be cancelled
* while waiting to be dispatched.
*/
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
runSafely(completion) {
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit))
}
在 createCoroutineUnintercepted() 中,挂起函数以强制转换的形式变成了最原本的形式,最后到达最终的目的地 createCoroutineFromSuspendFunction(),用一个新的续体对在 launch() 中传入的可挂起 Lambda 进行包装。
/**
* Creates unintercepted coroutine without receiver and with result type [T].
* This function creates a new, fresh instance of suspendable computation every time it is invoked.
*
* To start executing the created coroutine, invoke `resume(Unit)` on the returned [Continuation] instance.
* The [completion] continuation is invoked when coroutine completes with result or exception.
*
* This function returns unintercepted continuation.
* Invocation of `resume(Unit)` starts coroutine immediately in the invoker's call stack without going through the
* [ContinuationInterceptor] that might be present in the completion's [CoroutineContext].
* It is the invoker's responsibility to ensure that a proper invocation context is established.
* Note that [completion] of this function may get invoked in an arbitrary context.
*
* [Continuation.intercepted] can be used to acquire the intercepted continuation.
* Invocation of `resume(Unit)` on intercepted continuation guarantees that execution of
* both the coroutine and [completion] happens in the invocation context established by
* [ContinuationInterceptor].
*
* Repeated invocation of any resume function on the resulting continuation corrupts the
* state machine of the coroutine and may result in arbitrary behaviour or exception.
*/
@SinceKotlin("1.3")
public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
receiver: R,
completion: Continuation<T>
): Continuation<Unit> {
val probeCompletion = probeCoroutineCreated(completion)
return if (/* .. */)
// ..
else {
createCoroutineFromSuspendFunction(probeCompletion) {
(this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)
}
}
}
/**
* This function is used when [createCoroutineUnintercepted] encounters suspending lambda that does not extend BaseContinuationImpl.
*
* It happens in two cases:
* 1. Callable reference to suspending function,
* 2. Suspending function reference implemented by Java code.
*
* We must wrap it into an instance that extends [BaseContinuationImpl], because that is an expectation of all coroutines machinery.
* As an optimization we use lighter-weight [RestrictedContinuationImpl] base class (it has less fields) if the context is
* [EmptyCoroutineContext], and a full-blown [ContinuationImpl] class otherwise.
*
* The instance of [BaseContinuationImpl] is passed to the [block] so that it can be passed to the corresponding invocation.
*/
@SinceKotlin("1.3")
private inline fun <T> createCoroutineFromSuspendFunction(
completion: Continuation<T>,
crossinline block: (Continuation<T>) -> Any?
): Continuation<Unit> {
val context = completion.context
// label == 0 when coroutine is not started yet (initially) or label == 1 when it was
return if (context === EmptyCoroutineContext)
object : RestrictedContinuationImpl(completion as Continuation<Any?>) {
private var label = 0
override fun invokeSuspend(result: Result<Any?>): Any? =
when (label) {
0 -> {
label = 1
result.getOrThrow() // Rethrow exception if trying to start with exception (will be caught by BaseContinuationImpl.resumeWith
block(this) // run the block, may return or suspend
}
1 -> {
label = 2
result.getOrThrow() // this is the result if the block had suspended
}
else -> error("This coroutine had already completed")
}
}
else
object : ContinuationImpl(completion as Continuation<Any?>, context) {
private var label = 0
override fun invokeSuspend(result: Result<Any?>): Any? =
when (label) {
0 -> {
label = 1
result.getOrThrow() // Rethrow exception if trying to start with exception (will be caught by BaseContinuationImpl.resumeWith
block(this) // run the block, may return or suspend
}
1 -> {
label = 2
result.getOrThrow() // this is the result if the block had suspended
}
else -> error("This coroutine had already completed")
}
}
}
协程上下文
上文实际上还留着一个坑,只不过为了保证思路不偏离并没有进行展开。
在解析 Continuation 接口时,我们猜测协程调度器实现的 CoroutineContext 是进行线程调度的主体,但无论是 ContinuationImpl 中的 intercepted() 与 releaseIntercepted(),还是 startCoroutineCancellable 中的 intercepted(),最终都将我们引向了另一个接口:ContinuationInterceptor。
更有意思的是,ContinuationInterceptor 的实现是通过 ConroutineContext 的实现获取的。
好吧,在看这一部分代码之前,我认为先讲结论会更合适一点。
协程上下文 CoroutineContext 用于记录当前协程所持有的信息,是协程运行中一个重要的数据对象。
-
CoroutineContext是一个CoroutineContext.Element的索引集,可通过CoroutineContext.Key获取对应的CoroutineContext.Element。 -
CoroutineContext.Element继承CoroutineContext。 -
ContinuationInterceptor继承CoroutineContext.Element。 -
Job、CoroutineName、CoroutineId均继承CoroutineContext.Element。
接下来展示的是协程之中这一秀到令人头皮发麻的设计。
/**
* Persistent context for the coroutine. It is an indexed set of [Element] instances.
* An indexed set is a mix between a set and a map.
* Every element in this set has a unique [Key].
*/
@SinceKotlin("1.3")
public interface CoroutineContext {
/**
* Returns the element with the given [key] from this context or `null`.
*/
public operator fun <E : Element> get(key: Key<E>): E?
// ..
/**
* Key for the elements of [CoroutineContext]. [E] is a type of element with this key.
*/
public interface Key<E : Element>
/**
* An element of the [CoroutineContext]. An element of the coroutine context is a singleton context by itself.
*/
public interface Element : CoroutineContext {
/**
* A key of this coroutine context element.
*/
public val key: Key<*>
public override operator fun <E : Element> get(key: Key<E>): E? =
@Suppress("UNCHECKED_CAST")
if (this.key == key) this as E else null
// ..
}
}
再看看 ContinuationInterceptor 是如何 “获取自身” 的:以伴生对象为 Key,通过重写 get() 函数将实例自身(可以是子类的实例)作为续体拦截器返回。
/**
* Marks coroutine context element that intercepts coroutine continuations.
* The coroutines framework uses [ContinuationInterceptor.Key] to retrieve the interceptor and
* intercepts all coroutine continuations with [interceptContinuation] invocations.
*
* [ContinuationInterceptor] behaves like a [polymorphic element][AbstractCoroutineContextKey], meaning that
* its implementation delegates [get][CoroutineContext.Element.get] and [minusKey][CoroutineContext.Element.minusKey]
* to [getPolymorphicElement] and [minusPolymorphicKey] respectively.
* [ContinuationInterceptor] subtypes can be extracted from the coroutine context using either [ContinuationInterceptor.Key]
* or subtype key if it extends [AbstractCoroutineContextKey].
*/
@SinceKotlin("1.3")
public interface ContinuationInterceptor : CoroutineContext.Element {
/**
* The key that defines *the* context interceptor.
*/
companion object Key : CoroutineContext.Key<ContinuationInterceptor>
public override operator fun <E : CoroutineContext.Element> get(key: CoroutineContext.Key<E>): E? {
// ..
@Suppress("UNCHECKED_CAST")
return if (ContinuationInterceptor === key) this as E else null
}
// ..
}
续体拦截机制
聊完了 CoroutineContext 与 ContinuationInterceptor 的关系便可以解析线程调度的实现了。回到我们前面所提到的续体拦截机制,所有代码最终都指向了 ContinuationInterceptor。
/**
* Marks coroutine context element that intercepts coroutine continuations.
* The coroutines framework uses [ContinuationInterceptor.Key] to retrieve the interceptor and
* intercepts all coroutine continuations with [interceptContinuation] invocations.
*
* [ContinuationInterceptor] behaves like a [polymorphic element][AbstractCoroutineContextKey], meaning that
* its implementation delegates [get][CoroutineContext.Element.get] and [minusKey][CoroutineContext.Element.minusKey]
* to [getPolymorphicElement] and [minusPolymorphicKey] respectively.
* [ContinuationInterceptor] subtypes can be extracted from the coroutine context using either [ContinuationInterceptor.Key]
* or subtype key if it extends [AbstractCoroutineContextKey].
*/
@SinceKotlin("1.3")
public interface ContinuationInterceptor : CoroutineContext.Element {
// ..
/**
* Returns continuation that wraps the original [continuation], thus intercepting all resumptions.
* This function is invoked by coroutines framework when needed and the resulting continuations are
* cached internally per each instance of the original [continuation].
*
* This function may simply return original [continuation] if it does not want to intercept this particular continuation.
*
* When the original [continuation] completes, coroutine framework invokes [releaseInterceptedContinuation]
* with the resulting continuation if it was intercepted, that is if `interceptContinuation` had previously
* returned a different continuation instance.
*/
public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
/**
* Invoked for the continuation instance returned by [interceptContinuation] when the original
* continuation completes and will not be used anymore. This function is invoked only if [interceptContinuation]
* had returned a different continuation instance from the one it was invoked with.
*
* Default implementation does nothing.
*
* @param continuation Continuation instance returned by this interceptor's [interceptContinuation] invocation.
*/
public fun releaseInterceptedContinuation(continuation: Continuation<*>) {
/* do nothing by default */
}
// ..
}
续体拦截器用于拦截一个续体(通常在 launch() 或 withContext() 等对可线程进行调度的函数进行)。最常见的续体拦截器即协程调度器,它们均继承自 CoroutineDispatcher。
/**
* Base class to be extended by all coroutine dispatcher implementations.
*
* The following standard implementations are provided by `kotlinx.coroutines` as properties on
* the [Dispatchers] object:
*
* * [Dispatchers.Default] — is used by all standard builders if no dispatcher or any other [ContinuationInterceptor]
* is specified in their context. It uses a common pool of shared background threads.
* This is an appropriate choice for compute-intensive coroutines that consume CPU resources.
* * [Dispatchers.IO] — uses a shared pool of on-demand created threads and is designed for offloading of IO-intensive _blocking_
* operations (like file I/O and blocking socket I/O).
* * [Dispatchers.Unconfined] — starts coroutine execution in the current call-frame until the first suspension,
* whereupon the coroutine builder function returns.
* The coroutine will later resume in whatever thread used by the
* corresponding suspending function, without confining it to any specific thread or pool.
* **The `Unconfined` dispatcher should not normally be used in code**.
* * Private thread pools can be created with [newSingleThreadContext] and [newFixedThreadPoolContext].
* * An arbitrary [Executor][java.util.concurrent.Executor] can be converted to a dispatcher with the [asCoroutineDispatcher] extension function.
*
* This class ensures that debugging facilities in [newCoroutineContext] function work properly.
*/
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
/** @suppress */
@ExperimentalStdlibApi
public companion object Key : AbstractCoroutineContextKey<ContinuationInterceptor, CoroutineDispatcher>(
ContinuationInterceptor,
{ it as? CoroutineDispatcher })
/**
* Returns `true` if the execution of the coroutine should be performed with [dispatch] method.
* The default behavior for most dispatchers is to return `true`.
*
* If this method returns `false`, the coroutine is resumed immediately in the current thread,
* potentially forming an event-loop to prevent stack overflows.
* The event loop is an advanced topic and its implications can be found in [Dispatchers.Unconfined] documentation.
*
* A dispatcher can override this method to provide a performance optimization and avoid paying a cost of an unnecessary dispatch.
* E.g. [MainCoroutineDispatcher.immediate] checks whether we are already in the required UI thread in this method and avoids
* an additional dispatch when it is not required.
*
* While this approach can be more efficient, it is not chosen by default to provide a consistent dispatching behaviour
* so that users won't observe unexpected and non-consistent order of events by default.
*
* Coroutine builders like [launch][CoroutineScope.launch] and [async][CoroutineScope.async] accept an optional [CoroutineStart]
* parameter that allows one to optionally choose the [undispatched][CoroutineStart.UNDISPATCHED] behavior to start coroutine immediately,
* but to be resumed only in the provided dispatcher.
*
* This method should generally be exception-safe. An exception thrown from this method
* may leave the coroutines that use this dispatcher in the inconsistent and hard to debug state.
*/
public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
/**
* Dispatches execution of a runnable [block] onto another thread in the given [context].
* This method should guarantee that the given [block] will be eventually invoked,
* otherwise the system may reach a deadlock state and never leave it.
* Cancellation mechanism is transparent for [CoroutineDispatcher] and is managed by [block] internals.
*
* This method should generally be exception-safe. An exception thrown from this method
* may leave the coroutines that use this dispatcher in the inconsistent and hard to debug state.
*
* This method must not immediately call [block]. Doing so would result in [StackOverflowError]
* when [yield] is repeatedly called from a loop. However, an implementation that returns `false` from
* [isDispatchNeeded] can delegate this function to `dispatch` method of [Dispatchers.Unconfined], which is
* integrated with [yield] to avoid this problem.
*/
public abstract fun dispatch(context: CoroutineContext, block: Runnable)
// ..
/**
* Returns a continuation that wraps the provided [continuation], thus intercepting all resumptions.
*
* This method should generally be exception-safe. An exception thrown from this method
* may leave the coroutines that use this dispatcher in the inconsistent and hard to debug state.
*/
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
@InternalCoroutinesApi
public override fun releaseInterceptedContinuation(continuation: Continuation<*>) {
(continuation as DispatchedContinuation<*>).reusableCancellableContinuation?.detachChild()
}
// ..
}
调度器将原有的续体包装为一个新类型的续体:DispatchedContinuation,并在其 resumeWith() 被调用时通过 CoroutineDispatcher.dispatch() 函数将代码以 Runnable 的形式推送到指定线程运行。
作为优化,
resumeWith()调用时会通过CoroutineDispatcher.isDispatchNeeded()判断是否需要进行调度。以 Android 中的Dispatchers.Main为例,如果在调度前的续体已经在主线程运行,则不必再进行调度工作。
internal class DispatchedContinuation<in T>(
@JvmField val dispatcher: CoroutineDispatcher,
@JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_ATOMIC_DEFAULT), CoroutineStackFrame, Continuation<T> by continuation {
// ..
override fun resumeWith(result: Result<T>) {
val context = continuation.context
val state = result.toState()
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_ATOMIC_DEFAULT
dispatcher.dispatch(context, this)
} else {
executeUnconfined(state, MODE_ATOMIC_DEFAULT) {
withCoroutineContext(this.context, countOrElement) {
continuation.resumeWith(result)
}
}
}
}
// ..
}