Last active
August 19, 2024 06:56
-
-
Save elizarov/f27400a55c1502aacc35b4a3b2f5c9af to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// UPDATE: Everyone finding this gist via Google! | |
// Modern kotlinx.coroutines has out-of-the-box support for asyncLazy with the following expression: | |
// val myLazyValue = async(start = CoroutineStart.LAZY) { ... } | |
// Use myLazyValue.await() when you need it | |
// ---------------- public api ---------------- | |
public interface AsyncLazy<out T> { | |
public suspend fun value(): T | |
public fun isInitialized(): Boolean | |
} | |
public fun <T> asyncLazy(initializer: suspend () -> T): AsyncLazy<T> = AsyncLazyImpl(initializer) | |
// ---------------- implementation ---------------- | |
private class Fail(val exception: Throwable) | |
private open class Waiter<T>(val cont: Continuation<T>?, val next: Waiter<T>?) | |
private val UNINITIALIZED = Waiter<Any>(null, null) | |
/* | |
The first waiter is special. The first coroutine to request value starts an initializer coroutine and it may | |
produce result without suspension in the same thread. We are using CoroutineIntrinsics.suspendCoroutineOrReturn | |
and we cannot allow the following sequence of events to happen on stack: | |
1. Invoke AsyncLazyImpl.value() | |
2. Invoke CoroutineIntrinsics.suspendCoroutineOrReturn to get continuation "cont" | |
3. Invoke initializer.startCoroutine(...) | |
4. Invoke AsyncLazyImpl.resume(...) | |
5. Invoke AsyncLazyImpl.signalValue(...) | |
6. Invoke AsyncLazyImpl.resumeWithValue(cont, ...) | |
7. Invoke cont.resume(...) <-- forbidden resume in the same stack frame as suspendCoroutineOrReturn | |
So, the first waiter uses a separate consensus to prevent this from happening (similarly to SafeContinuation) | |
*/ | |
private val C_COMPUTING = 0 | |
private val C_SUSPENDED = 1 | |
private val C_SIGNALLED = 2 | |
private class FirstWaiter<T>(cont: Continuation<T>) : Waiter<T>(cont, null) { | |
@Volatile | |
var consensus = C_COMPUTING | |
companion object { | |
@JvmStatic | |
val CONSENSUS = AtomicIntegerFieldUpdater.newUpdater(FirstWaiter::class.java, "consensus") | |
} | |
} | |
private class AsyncLazyImpl<T>(val initializer: suspend () -> T) : AsyncLazy<T>, Continuation<T> { | |
@Volatile | |
var state: Any? = UNINITIALIZED // Note: UNINITIALIZED is Waiter<Any> | |
companion object { | |
@JvmStatic | |
val VALUE = AtomicReferenceFieldUpdater.newUpdater(AsyncLazyImpl::class.java, Any::class.java, "state") | |
} | |
override fun isInitialized(): Boolean = state !is Waiter<*> | |
@Suppress("UNCHECKED_CAST") | |
suspend override fun value(): T = | |
CoroutineIntrinsics.suspendCoroutineOrReturn sc@ { cont -> | |
while (true) { // lock-free loop on state | |
val state = this.state // volatile read | |
if (state !is Waiter<*>) return@sc unwrapValue(state) | |
if (state == UNINITIALIZED) { | |
// special case for first waiter -- compute value | |
val node = FirstWaiter(cont) | |
if (!VALUE.compareAndSet(this, state, node)) continue | |
// start computation | |
initializer.startCoroutine(completion = this) | |
// try suspend | |
if (FirstWaiter.CONSENSUS.compareAndSet(node, C_COMPUTING, C_SUSPENDED)) | |
return@sc CoroutineIntrinsics.SUSPENDED | |
// if failed, then result must have been already produced! | |
return@sc unwrapValue(this.state) // reread state! | |
} else { | |
// other waiters | |
val node = Waiter(cont, state as Waiter<T>) | |
if (VALUE.compareAndSet(this, state, node)) return@sc CoroutineIntrinsics.SUSPENDED | |
} | |
} | |
} | |
@Suppress("UNCHECKED_CAST") | |
fun resumeWithValue(cont: Continuation<T>, value: Any?) = | |
if (value is Fail) cont.resumeWithException(value.exception) else cont.resume(value as T) | |
@Suppress("UNCHECKED_CAST") | |
fun unwrapValue(value: Any?): T { | |
require(value !is Waiter<*>) | |
return if (value is Fail) throw value.exception else value as T | |
} | |
@Suppress("UNCHECKED_CAST") | |
fun signalValue(value: Any?) { | |
while (true) { // lock-free loop of state | |
var state = this.state as? Waiter<T> ?: throw IllegalStateException("Value is already set") | |
if (!VALUE.compareAndSet(this, state, value)) continue | |
while (state !is FirstWaiter<*>) { | |
resumeWithValue(state.cont!!, value) | |
state = state.next!! | |
} | |
// process first waiter | |
signalFirstWaiter(state as FirstWaiter<T>, value) | |
break | |
} | |
} | |
fun signalFirstWaiter(waiter: FirstWaiter<T>, value: Any?) { | |
// if the following CAS succeeds, then value() has not returned yet, don't need to resume | |
if (FirstWaiter.CONSENSUS.compareAndSet(waiter, C_COMPUTING, C_SIGNALLED)) return | |
// otherwise, value() had suspended -> need to resume first waiter | |
resumeWithValue(waiter.cont!!, value) | |
} | |
override fun resume(value: T) = signalValue(value) | |
override fun resumeWithException(exception: Throwable) = signalValue(Fail(exception)) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@elizarov Hello,
I have a doubtabout the out-of-the-box support: let's say that we have a class with a private async lazy property:private val lazyValue = coroutineScope.async(start = CoroutineStart.LAZY) { /* code with suspension points here */ }
and a public method to get it:
suspend fun computeValue() = lazyValue.await()
Is this approach thread safe because of the way CoroutineStart.LAZY is implemented - the same way that the
by lazy {}
is for not-coroutine usage- or do we need to wrap the await call within a mutex lock,if we know that thecomputeValue()
will be called concurrently by many coroutines and we really want to be sure the calculation is computed only once? Thanks for clarifying.I understood it is thread safe because of the Deferred interface: All functions on this interface and on all interfaces derived from it are thread-safe and can be safely invoked from concurrent coroutines without external synchronization.