Skip to content

Instantly share code, notes, and snippets.

@Andrew0000
Created October 15, 2024 11:46
Show Gist options
  • Save Andrew0000/6e0cdb22ac0eae620ab1113c087be881 to your computer and use it in GitHub Desktop.
Save Andrew0000/6e0cdb22ac0eae620ab1113c087be881 to your computer and use it in GitHub Desktop.
SingleEventStateFlow. A flow that uses the 'Single Event' pattern without losing events.
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.shareIn
import kotlinx.coroutines.flow.update
/**
* A flow that uses the 'Single Event' pattern without losing events.
*/
class SingleEventStateFlow<T>(
scope: CoroutineScope = CoroutineScope(SupervisorJob()),
) : Flow<T> {
private val flow: MutableStateFlow<SingleEventStateFlowElement<T>> =
MutableStateFlow(SingleEventStateFlowElement.Empty())
private val shared = flow.shareIn(scope, SharingStarted.WhileSubscribed(), replay = 1)
private var uid = 0
var value: T?
set(value) {
if (value != null) {
flow.update {
SingleEventStateFlowElement.Data(value = value, uid = uid++)
}
}
}
get() = null
override suspend fun collect(collector: FlowCollector<T>) {
shared.collect {
if (it is SingleEventStateFlowElement.Data) {
collector.emit(it.value)
notifyCollected(it)
}
}
}
private fun notifyCollected(element: SingleEventStateFlowElement.Data<T>) {
flow.update {
if (it is SingleEventStateFlowElement.Data && it == element) {
SingleEventStateFlowElement.Empty()
} else {
it
}
}
}
}
private sealed interface SingleEventStateFlowElement<T> {
class Empty<T> : SingleEventStateFlowElement<T>
data class Data<T>(
val value: T,
internal val uid: Int,
) : SingleEventStateFlowElement<T>
}
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.UnconfinedTestDispatcher
import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.yield
import org.junit.Assert.assertArrayEquals
import org.junit.Test
class SingleEventStateFlowTest {
@OptIn(ExperimentalCoroutinesApi::class)
private val asyncDispatcher = UnconfinedTestDispatcher()
private val flow = SingleEventStateFlow<Int>(CoroutineScope(asyncDispatcher))
@Test
fun `When 2 collectors - Then both receive ongoing events and one before the first subscription`() = runTest {
val result1 = mutableListOf<Int>()
val result2 = mutableListOf<Int>()
flow.value = 1
flow.value = 2
val job1 = launch(asyncDispatcher) {
flow.collect { result1 += it }
}
flow.value = 3
asyncDispatcher.scheduler.advanceUntilIdle()
val job2 = launch(asyncDispatcher) {
flow.collect { result2 += it }
}
flow.value = 4
flow.value = 5
asyncDispatcher.scheduler.advanceUntilIdle()
job2.cancel()
flow.value = 6
flow.value = 7
asyncDispatcher.scheduler.advanceUntilIdle()
job1.cancel()
println("result1: $result1")
println("result2: $result2")
assertArrayEquals(
intArrayOf(2, 3, 4, 5, 6, 7),
result1.toIntArray()
)
assertArrayEquals(
intArrayOf(4, 5),
result2.toIntArray()
)
}
@Test
fun `When collector subscribes several times - Then events are collected and no duplication`() = runTest {
val result1 = mutableListOf<Int>()
flow.value = 1
flow.value = 2
var job = launch(asyncDispatcher) {
flow.collect { result1 += it }
}
flow.value = 3
asyncDispatcher.scheduler.advanceUntilIdle()
job.cancel()
yield()
job = launch(asyncDispatcher) {
flow.collect { result1 += it }
}
flow.value = 4
flow.value = 5
asyncDispatcher.scheduler.advanceUntilIdle()
job.cancel()
yield()
flow.value = 6
flow.value = 7
job = launch(asyncDispatcher) {
flow.collect { result1 += it }
}
flow.value = 8
flow.value = 9
asyncDispatcher.scheduler.advanceUntilIdle()
job.cancel()
yield()
flow.value = 10
println("result1: $result1")
assertArrayEquals(
intArrayOf(2, 3, 4, 5, 7, 8, 9),
result1.toIntArray()
)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment