Created
October 15, 2024 11:46
-
-
Save Andrew0000/6e0cdb22ac0eae620ab1113c087be881 to your computer and use it in GitHub Desktop.
SingleEventStateFlow. A flow that uses the 'Single Event' pattern without losing events.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import kotlinx.coroutines.CoroutineScope | |
import kotlinx.coroutines.SupervisorJob | |
import kotlinx.coroutines.flow.Flow | |
import kotlinx.coroutines.flow.FlowCollector | |
import kotlinx.coroutines.flow.MutableStateFlow | |
import kotlinx.coroutines.flow.SharingStarted | |
import kotlinx.coroutines.flow.shareIn | |
import kotlinx.coroutines.flow.update | |
/** | |
* A flow that uses the 'Single Event' pattern without losing events. | |
*/ | |
class SingleEventStateFlow<T>( | |
scope: CoroutineScope = CoroutineScope(SupervisorJob()), | |
) : Flow<T> { | |
private val flow: MutableStateFlow<SingleEventStateFlowElement<T>> = | |
MutableStateFlow(SingleEventStateFlowElement.Empty()) | |
private val shared = flow.shareIn(scope, SharingStarted.WhileSubscribed(), replay = 1) | |
private var uid = 0 | |
var value: T? | |
set(value) { | |
if (value != null) { | |
flow.update { | |
SingleEventStateFlowElement.Data(value = value, uid = uid++) | |
} | |
} | |
} | |
get() = null | |
override suspend fun collect(collector: FlowCollector<T>) { | |
shared.collect { | |
if (it is SingleEventStateFlowElement.Data) { | |
collector.emit(it.value) | |
notifyCollected(it) | |
} | |
} | |
} | |
private fun notifyCollected(element: SingleEventStateFlowElement.Data<T>) { | |
flow.update { | |
if (it is SingleEventStateFlowElement.Data && it == element) { | |
SingleEventStateFlowElement.Empty() | |
} else { | |
it | |
} | |
} | |
} | |
} | |
private sealed interface SingleEventStateFlowElement<T> { | |
class Empty<T> : SingleEventStateFlowElement<T> | |
data class Data<T>( | |
val value: T, | |
internal val uid: Int, | |
) : SingleEventStateFlowElement<T> | |
} | |
import kotlinx.coroutines.CoroutineScope | |
import kotlinx.coroutines.ExperimentalCoroutinesApi | |
import kotlinx.coroutines.launch | |
import kotlinx.coroutines.test.UnconfinedTestDispatcher | |
import kotlinx.coroutines.test.runTest | |
import kotlinx.coroutines.yield | |
import org.junit.Assert.assertArrayEquals | |
import org.junit.Test | |
class SingleEventStateFlowTest { | |
@OptIn(ExperimentalCoroutinesApi::class) | |
private val asyncDispatcher = UnconfinedTestDispatcher() | |
private val flow = SingleEventStateFlow<Int>(CoroutineScope(asyncDispatcher)) | |
@Test | |
fun `When 2 collectors - Then both receive ongoing events and one before the first subscription`() = runTest { | |
val result1 = mutableListOf<Int>() | |
val result2 = mutableListOf<Int>() | |
flow.value = 1 | |
flow.value = 2 | |
val job1 = launch(asyncDispatcher) { | |
flow.collect { result1 += it } | |
} | |
flow.value = 3 | |
asyncDispatcher.scheduler.advanceUntilIdle() | |
val job2 = launch(asyncDispatcher) { | |
flow.collect { result2 += it } | |
} | |
flow.value = 4 | |
flow.value = 5 | |
asyncDispatcher.scheduler.advanceUntilIdle() | |
job2.cancel() | |
flow.value = 6 | |
flow.value = 7 | |
asyncDispatcher.scheduler.advanceUntilIdle() | |
job1.cancel() | |
println("result1: $result1") | |
println("result2: $result2") | |
assertArrayEquals( | |
intArrayOf(2, 3, 4, 5, 6, 7), | |
result1.toIntArray() | |
) | |
assertArrayEquals( | |
intArrayOf(4, 5), | |
result2.toIntArray() | |
) | |
} | |
@Test | |
fun `When collector subscribes several times - Then events are collected and no duplication`() = runTest { | |
val result1 = mutableListOf<Int>() | |
flow.value = 1 | |
flow.value = 2 | |
var job = launch(asyncDispatcher) { | |
flow.collect { result1 += it } | |
} | |
flow.value = 3 | |
asyncDispatcher.scheduler.advanceUntilIdle() | |
job.cancel() | |
yield() | |
job = launch(asyncDispatcher) { | |
flow.collect { result1 += it } | |
} | |
flow.value = 4 | |
flow.value = 5 | |
asyncDispatcher.scheduler.advanceUntilIdle() | |
job.cancel() | |
yield() | |
flow.value = 6 | |
flow.value = 7 | |
job = launch(asyncDispatcher) { | |
flow.collect { result1 += it } | |
} | |
flow.value = 8 | |
flow.value = 9 | |
asyncDispatcher.scheduler.advanceUntilIdle() | |
job.cancel() | |
yield() | |
flow.value = 10 | |
println("result1: $result1") | |
assertArrayEquals( | |
intArrayOf(2, 3, 4, 5, 7, 8, 9), | |
result1.toIntArray() | |
) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment