Last active
April 8, 2023 21:39
-
-
Save psteiger/7c48daf5d669469ff0b9d28bb842d8fa to your computer and use it in GitHub Desktop.
Accumulating Kotlin Flow
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Gates upstream flow emissions by [passThrough]. | |
* | |
* When upstream flow emits a value, either emits the value downstream if the last value emitted by [passThrough] | |
* was `true`, or accumulates it for later emission if `false`. Accumulated values are emitted once [passThrough] | |
* emits `true`. | |
*/ | |
@OptIn(FlowPreview::class) // produceIn is stable since kotlinx-coroutines 1.7.0-Beta | |
internal fun <T> Flow<T>.accumulateBy(passThrough: Flow<Boolean>) = flow<List<T>> { | |
coroutineScope { | |
val valueChannel = produceIn(this) | |
val passThroughFlow = passThrough.materializeCompletion().stateIn(this).dematerializeCompletion() | |
while (true) { | |
val firstValue = | |
valueChannel.receiveCatching().getOrNull() ?: break // null -> upstream value flow completed | |
val valuesToEmit = mutableListOf(firstValue) | |
passThroughFlow.firstOrNull { it } ?: error( | |
"Gate flow terminated with passThrough != true. Unsent elements: ${valuesToEmit + valueChannel.toList()}" | |
) | |
while (true) { | |
val value = | |
valueChannel.tryReceive().getOrNull() ?: break // null -> consumed all upstream values at this point in time | |
valuesToEmit.add(value) | |
} | |
emit(valuesToEmit.toList()) | |
} | |
coroutineContext.cancelChildren() | |
} | |
} | |
private fun <T> Flow<T>.materializeCompletion(): Flow<ValueOrCompletion<T>> = flow { | |
val result = runCatching { collect { emit(Value(it)) } } | |
emit(Completion(result.exceptionOrNull())) | |
} | |
private fun <T> Flow<ValueOrCompletion<T>>.dematerializeCompletion(): Flow<T> = transformWhile { valueOrCompletion -> | |
when (valueOrCompletion) { | |
is Value -> { | |
emit(valueOrCompletion.value) | |
true | |
} | |
is Completion -> { | |
valueOrCompletion.throwable?.let { throw it } | |
false | |
} | |
} | |
} | |
private sealed interface ValueOrCompletion<out T> | |
private data class Value<T>(val value: T) : ValueOrCompletion<T> | |
private data class Completion(val throwable: Throwable?) : ValueOrCompletion<Nothing> | |
// test | |
@Test | |
fun testAccumulateBy() = runTest { | |
val passThroughFlow = flow { | |
emit(true) | |
delay(100) | |
emit(false) | |
delay(100) | |
emit(true) | |
} | |
val valueFlow = (1..10).asFlow().onEach { value -> | |
when (value) { | |
3, 5 -> { | |
delay(100) | |
runCurrent() | |
} | |
else -> {} | |
} | |
} | |
val result = | |
valueFlow.accumulateBy(passThroughFlow) | |
.flowOn(UnconfinedTestDispatcher(testScheduler)) | |
.toList() | |
assertThat(result).isEqualTo( | |
listOf(listOf(1), listOf(2), listOf(3), listOf(4, 5), listOf(6), listOf(7), listOf(8, 9, 10)) | |
) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment