Skip to content

Instantly share code, notes, and snippets.

@cbeyls
Created May 28, 2022 16:32
Show Gist options
  • Save cbeyls/5e0489bdc8fabcaa7a796757237d86f2 to your computer and use it in GitHub Desktop.
Save cbeyls/5e0489bdc8fabcaa7a796757237d86f2 to your computer and use it in GitHub Desktop.
Synchronize Flow emissions with SharedFlow's subscriptionCount
package be.digitalia.flow
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.SharingCommand
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.launch
inline fun <T> stateFlow(
scope: CoroutineScope,
initialValue: T,
producer: (subscriptionCount: StateFlow<Int>) -> Flow<T>
): StateFlow<T> {
val state = MutableStateFlow(initialValue)
producer(state.subscriptionCount).launchIn(scope, state)
return state.asStateFlow()
}
inline fun <T> sharedFlow(
scope: CoroutineScope,
replay: Int = 0,
extraBufferCapacity: Int = 0,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
producer: (subscriptionCount: StateFlow<Int>) -> Flow<T>
): SharedFlow<T> {
val shared = MutableSharedFlow<T>(replay, extraBufferCapacity, onBufferOverflow)
producer(shared.subscriptionCount).launchIn(scope, shared)
return shared.asSharedFlow()
}
fun <T> Flow<T>.launchIn(scope: CoroutineScope, collector: FlowCollector<T>): Job = scope.launch {
collect(collector)
}
@OptIn(ExperimentalCoroutinesApi::class)
fun <T> Flow<T>.flowWhileShared(
subscriptionCount: StateFlow<Int>,
started: SharingStarted
): Flow<T> {
return started.command(subscriptionCount)
.distinctUntilChanged()
.flatMapLatest {
when (it) {
SharingCommand.START -> this
SharingCommand.STOP,
SharingCommand.STOP_AND_RESET_REPLAY_CACHE -> emptyFlow()
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment