Skip to content

Instantly share code, notes, and snippets.

@datikaa
Last active September 28, 2024 18:05
Show Gist options
  • Save datikaa/b111927c58422c01a3298a914d7ab164 to your computer and use it in GitHub Desktop.
Save datikaa/b111927c58422c01a3298a914d7ab164 to your computer and use it in GitHub Desktop.
Updatign a Kotlin MutableStateFlow with another flow lifecycle-aware
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.runningReduce
import kotlinx.coroutines.flow.update
/**
* Tracks whether the [MutableStateFlow] has active subscribers.
*
* @return A [Flow] emitting [Subscribed] objects indicating whether the flow has active subscribers.
*/
fun <T> MutableStateFlow<T>.hasSubscribers(): Flow<Subscribed> =
subscriptionCount
.map { Subscribed(it > 0) }
.distinctUntilChanged()
/**
* Delays emissions when there are no active subscribers.
*
* @param delay The delay duration in milliseconds to apply when there are no subscribers. Default is 5000ms.
* @return A [Flow] that delays emissions based on subscriber activity.
*/
fun Flow<Subscribed>.delayWhenNoSubscribers(delay: Long? = 5000): Flow<Subscribed> =
runningReduce { prev, next ->
if (delay != null && prev.value && !next.value) {
delay(delay)
}
next
}
/**
* Maps the current [Flow] to another [Flow] while there are active subscribers.
*
* @param other The [Flow] to emit values from when there are active subscribers.
* @return A [Flow] emitting values from the `other` flow when there are subscribers, otherwise emitting an empty flow.
*/
@OptIn(ExperimentalCoroutinesApi::class)
fun <T> Flow<Subscribed>.flatMapWhileSubscribed(other: Flow<T>): Flow<T> =
flatMapLatest { if (it.value) other else emptyFlow() }
/**
* Updates the state of a [MutableStateFlow] based on another flow's emissions
* while there are active subscribers.
*
* @param other The [Flow] whose emissions will be used to update the state.
* @param delay The delay duration in milliseconds to apply when there are no subscribers. Default is 5000ms.
* @param function The function to apply for updating the state based on the current state and incoming emissions.
*/
fun <S, O> MutableStateFlow<S>.whileHasSubscriberUpdateWith(
other: Flow<O>,
delay: Long? = 5000,
function: (S, O) -> S,
) = hasSubscribers()
.delayWhenNoSubscribers(delay)
.flatMapWhileSubscribed(other)
.onEach { other -> update { state -> function(state, other) } }
/**
* Represents whether there are active subscribers to a flow.
*
* @property value A boolean indicating whether there are active subscribers.
*/
@JvmInline
value class Subscribed(val value: Boolean)
import android.os.Bundle
import android.util.Log
import androidx.activity.ComponentActivity
import androidx.activity.compose.setContent
import androidx.activity.enableEdgeToEdge
import androidx.activity.viewModels
import androidx.compose.foundation.layout.fillMaxSize
import androidx.compose.foundation.layout.padding
import androidx.compose.material3.Scaffold
import androidx.compose.material3.Text
import androidx.compose.runtime.Composable
import androidx.compose.runtime.Immutable
import androidx.compose.ui.Modifier
import androidx.compose.runtime.getValue
import androidx.compose.ui.tooling.preview.Preview
import androidx.lifecycle.ViewModel
import androidx.lifecycle.compose.collectAsStateWithLifecycle
import androidx.lifecycle.viewModelScope
import com.example.myapplication.ui.theme.MyApplicationTheme
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onStart
// Tag used for logging
const val TAG = "Example"
@OptIn(ExperimentalCoroutinesApi::class)
class MainViewModel : ViewModel() {
// Private mutable state for the UI state
private val _uiState = MutableStateFlow<UiState>(UiState())
// Public immutable state exposed to the UI
val uiState: StateFlow<UiState> = _uiState.asStateFlow()
init {
// Updates the _uiState with the current time from stateUpdatingFlow
// whenever there are active subscribers to _uiState
_uiState.whileHasSubscriberUpdateWith(stateUpdatingFlow, 1000L) { oldState, other ->
// Update the UI state with the new timestamp
UiState("Updated At: $other")
}.launchIn(viewModelScope) // Launch the flow in the ViewModel's scope
}
}
// A flow that emits the current time in milliseconds every second
private val stateUpdatingFlow: Flow<Long> = flow {
while (true) {
emit(System.currentTimeMillis()) // Emit the current time
delay(1000) // Delay for 1 second
}
}
.onStart { Log.d(TAG, "Updater started") } // Log when the flow starts
.onCompletion { Log.d(TAG, "Updater completed") } // Log when the flow completes
// Data class representing the UI
@Immutable
data class UiState(
val timeStamp: String = "Inited At: ${System.currentTimeMillis()}",
) {
init {
Log.d(TAG, timeStamp)
}
}
class MainActivity : ComponentActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContent {
MyApplicationTheme {
Scaffold(modifier = Modifier.fillMaxSize()) { innerPadding ->
val viewModel by viewModels<MainViewModel>()
val text by viewModel.uiState.collectAsStateWithLifecycle()
// Display the timestamp from the UI state
Text(text.timeStamp)
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment