Skip to content

Instantly share code, notes, and snippets.

@belinwu
Forked from cbeyls/FlowExt.kt
Created February 17, 2023 01:21
Show Gist options
  • Save belinwu/0222a5cd835e6b9b75d731a1f8f40193 to your computer and use it in GitHub Desktop.
Save belinwu/0222a5cd835e6b9b75d731a1f8f40193 to your computer and use it in GitHub Desktop.
Synchronize Flow emissions with SharedFlow's subscriptionCount
package be.digitalia.flow
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.SharingCommand
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.launch
inline fun <T> stateFlow(
scope: CoroutineScope,
initialValue: T,
producer: (subscriptionCount: StateFlow<Int>) -> Flow<T>
): StateFlow<T> {
val state = MutableStateFlow(initialValue)
producer(state.subscriptionCount).launchIn(scope, state)
return state.asStateFlow()
}
inline fun <T> sharedFlow(
scope: CoroutineScope,
replay: Int = 0,
extraBufferCapacity: Int = 0,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
producer: (subscriptionCount: StateFlow<Int>) -> Flow<T>
): SharedFlow<T> {
val shared = MutableSharedFlow<T>(replay, extraBufferCapacity, onBufferOverflow)
producer(shared.subscriptionCount).launchIn(scope, shared)
return shared.asSharedFlow()
}
fun <T> Flow<T>.launchIn(scope: CoroutineScope, collector: FlowCollector<T>): Job = scope.launch {
collect(collector)
}
@OptIn(ExperimentalCoroutinesApi::class)
fun <T> Flow<T>.flowWhileShared(
subscriptionCount: StateFlow<Int>,
started: SharingStarted
): Flow<T> {
return started.command(subscriptionCount)
.distinctUntilChanged()
.flatMapLatest {
when (it) {
SharingCommand.START -> this
SharingCommand.STOP,
SharingCommand.STOP_AND_RESET_REPLAY_CACHE -> emptyFlow()
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment