Last active
October 14, 2020 21:29
-
-
Save fluidsonic/01702dbab744595a8dbdd41befe6829c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* Output | |
Collecting 2 elements… | |
upstream -> hot | |
emit: 0 | |
resumed | |
collect: 0 | |
Expensive work on 0 (isPaused = false)… | |
Expensive work on 0 (isPaused = false)… | |
Expensive work on 0 (isPaused = false)… | |
Expensive work on 0 (isPaused = false)… | |
Expensive work on 0 (isPaused = false)… | |
emit: 1 | |
collect: 1 | |
Expensive work on 1 (isPaused = false)… | |
Done. | |
Waiting 10s to see that the Flow really pauses. | |
paused | |
Expensive work on 1 (isPaused = true)… | |
<nothing happening for a while> | |
Collecting 2 more elements… | |
resumed | |
collect: 1 | |
Expensive work on 1 (isPaused = false)… | |
Expensive work on 1 (isPaused = false)… | |
Expensive work on 1 (isPaused = false)… | |
emit: 2 | |
collect: 2 | |
Expensive work on 2 (isPaused = false)… | |
Looks good? | |
paused | |
upstream -> cold | |
*/ | |
import kotlinx.atomicfu.* | |
import kotlinx.coroutines.* | |
import kotlinx.coroutines.flow.* | |
suspend fun main(): Unit = coroutineScope { | |
val flow = pausableFlow<Int> { | |
println("upstream -> hot") | |
try { | |
onPause { println("paused") } | |
onResume { println("resumed") } | |
var value = 0 | |
println("emit: $value") | |
emit(value) | |
while (true) { | |
repeat(5) { // expensive work | |
joinPause { | |
delay(1_000) | |
println("Expensive work on $value (isPaused = $isPaused)…") | |
} | |
} | |
value += 1 | |
println("emit: $value") | |
emit(value) | |
} | |
} finally { | |
println("upstream -> cold") | |
} | |
} | |
println("Collecting 2 elements…") | |
try { | |
flow.collect { | |
println("collect: $it") | |
if (it == 1) { | |
delay(1_500) | |
throw NumberFormatException() | |
} | |
} | |
} catch (e: NumberFormatException) { | |
} | |
println("Done.") | |
println("Waiting 10s to see that the Flow really pauses.") | |
delay(10_000) | |
println("Collecting 2 more elements…") | |
try { | |
flow.collect { | |
println("collect: $it") | |
if (it == 2) { | |
delay(1_500) | |
throw NumberFormatException() | |
} | |
} | |
} catch (e: NumberFormatException) { | |
} | |
println("Looks good?") | |
coroutineContext.cancelChildren() | |
} | |
interface PausableFlowCollector<T> : FlowCollector<T> { | |
val isPaused: Boolean | |
suspend fun joinPause() | |
fun onPause(block: suspend () -> Unit) | |
fun onResume(block: suspend () -> Unit) | |
} | |
suspend inline fun <T, R> PausableFlowCollector<T>.joinPause(block: () -> R): R = | |
try { | |
joinPause() | |
block() | |
} finally { | |
joinPause() | |
} | |
private class PausableFlowCollectorImpl<T> : PausableFlowCollector<T> { | |
var _isPaused = atomic(true) | |
val flow = MutableSharedFlow<T>(replay = 1) | |
var pause = CompletableDeferred<Unit>() | |
val onPause = mutableListOf<suspend () -> Unit>() | |
val onResume = mutableListOf<suspend () -> Unit>() | |
override suspend fun emit(value: T) { | |
joinPause { | |
flow.emit(value) | |
} | |
} | |
override val isPaused | |
get() = _isPaused.value | |
override suspend fun joinPause() { | |
pause.await() | |
} | |
override fun onPause(block: suspend () -> Unit) { | |
onPause += block | |
} | |
override fun onResume(block: suspend () -> Unit) { | |
onResume += block | |
} | |
} | |
fun <T> CoroutineScope.pausableFlow(block: suspend PausableFlowCollector<T>.() -> Unit): SharedFlow<T> = | |
PausableFlowCollectorImpl<T>() | |
.apply { | |
launch { | |
flow.subscriptionCount.collect { count -> | |
when { | |
count > 0 -> { | |
if (_isPaused.getAndSet(false)) { | |
onResume.forEach { it() } | |
pause.complete(Unit) | |
} | |
} | |
count == 0 -> { | |
if (!_isPaused.getAndSet(true)) { | |
onPause.forEach { it() } | |
pause = CompletableDeferred() | |
} | |
} | |
} | |
} | |
} | |
launch { | |
block() | |
} | |
} | |
.flow |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment