Skip to content

Instantly share code, notes, and snippets.

@andrikeev
Created August 2, 2025 10:59
Show Gist options
  • Save andrikeev/068f8911dd1e8c23721d7d5e3445988f to your computer and use it in GitHub Desktop.
Save andrikeev/068f8911dd1e8c23721d7d5e3445988f to your computer and use it in GitHub Desktop.
ContentFlow
sealed interface Content<out Data> {
data object Loading : Content<Nothing>
data class Success<out Data>(
val data: Data,
val refresh: LoadState = LoadState.NotLoading,
) : Content<Data>
data object Error : Content<Nothing>
}
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.MutableSharedFlow
enum class ContentAction {
Load,
Retry,
Refresh,
Update,
}
typealias ContentActions = Flow<ContentAction>
@Suppress("FunctionName")
fun ContentActions() = MutableSharedFlow<ContentAction>()
suspend fun FlowCollector<ContentAction>.load() {
emit(ContentAction.Load)
}
suspend fun FlowCollector<ContentAction>.refresh() {
emit(ContentAction.Refresh)
}
suspend fun FlowCollector<ContentAction>.update() {
emit(ContentAction.Update)
}
suspend fun FlowCollector<ContentAction>.retry() {
emit(ContentAction.Retry)
}
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.flow.collectLatest
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.flow.transformLatest
import kotlinx.coroutines.flow.update
import kotlinx.coroutines.launch
typealias ContentFlow<T> = Flow<Content<T>>
fun <T> contentFlow(
fetch: suspend () -> Result<T>,
actions: ContentActions,
): ContentFlow<T> = channelFlow {
val loader = ContentLoader(fetch = fetch)
launch {
actions
.onStart { emit(ContentAction.Load) }
.collectLatest(loader::action)
}
launch { loader.content.collectLatest(::send) }
}
@OptIn(ExperimentalCoroutinesApi::class)
inline fun <T, R> ContentFlow<T>.mapLatest(
crossinline transform: suspend (T) -> R,
): ContentFlow<R> = transformLatest { content ->
when (content) {
is Content.Loading -> emit(content)
is Content.Success<T> -> emit(Content.Success(transform(content.data), content.refresh))
is Content.Error -> emit(content)
}
}
private class ContentLoader<Data>(
private val fetch: suspend () -> Result<Data>,
) {
private val mutableContent = MutableStateFlow<Content<Data>>(Content.Loading)
val content = mutableContent.asSharedFlow()
suspend fun action(action: ContentAction) {
when (action) {
ContentAction.Load -> load()
ContentAction.Retry -> retry()
ContentAction.Refresh -> refresh()
ContentAction.Update -> update()
}
}
private suspend fun load() {
mutableContent.update { Content.Loading }
fetch.invoke()
.onSuccess { data ->
mutableContent.update { Content.Success(data) }
}
.onFailure { error ->
if (error !is CancellationException) {
mutableContent.update { Content.Error }
}
}
}
private suspend fun retry() {
when (mutableContent.value) {
is Content.Loading -> Unit
is Content.Success<Data> -> Unit
is Content.Error -> load()
}
}
private suspend fun refresh() {
when (val state = mutableContent.value) {
is Content.Loading -> Unit
is Content.Success<Data> -> {
mutableContent.update { state.copy(refresh = LoadState.Loading) }
fetch()
.onSuccess { data ->
mutableContent.update { Content.Success(data) }
}
.onFailure { error ->
if (error !is CancellationException) {
mutableContent.update { state.copy(refresh = LoadState.Error) }
}
}
}
is Content.Error -> load()
}
}
private suspend fun update() {
fetch().onSuccess { data ->
mutableContent.update { Content.Success(data) }
}
}
}
import androidx.lifecycle.ViewModel
import androidx.lifecycle.viewModelScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collectLatest
import kotlinx.coroutines.flow.mapLatest
import kotlinx.coroutines.flow.merge
import kotlinx.coroutines.launch
class ExamplesViewModel(
private val observeExamplesUseCase: ObserveExamplesUseCase,
) : ViewModel() {
private val actions = ContentActions()
init {
viewModelScope.launch {
observeExamplesUseCase.invoke(actions)
.collectLatest { content ->
when (content) {
is Content.Loading -> TODO()
is Content.Success<List<Example>> -> TODO()
is Content.Error -> TODO()
}
}
}
}
fun onRetryClick() {
viewModelScope.launch {
actions.retry()
}
}
fun onPullToRefresh() {
viewModelScope.launch {
actions.refresh()
}
}
}
class ObserveExamplesUseCase(
val repository: ExampleRepository,
val eventsServices: ExampleEventsService,
) {
operator fun invoke(actions: ContentActions): ContentFlow<List<Example>> {
return contentFlow(
fetch = { repository.getExamples() },
actions = merge(
actions,
eventsServices.events.mapLatest { ContentAction.Update },
)
)
}
}
class UpdateExampleUseCase(
val repository: ExampleRepository,
val eventsServices: ExampleEventsService,
) {
suspend operator fun invoke(example: Example): Result<Unit> {
return repository.update(example)
.onSuccess { eventsServices.emit(ExampleEvent.Changed) }
}
}
interface ExampleRepository {
suspend fun getExamples(): Result<List<Example>>
suspend fun update(example: Example): Result<Unit>
}
interface ExampleEventsService {
val events: Flow<ExampleEvent>
suspend fun emit(event: ExampleEvent)
}
enum class ExampleEvent {
Deleted,
Added,
Changed,
}
data object Example
enum class LoadState {
NotLoading,
Loading,
Error,
}
@D00mch
Copy link

D00mch commented Aug 2, 2025

It's so simple to read, it feels like I wrote it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment