Maybe I will write article about this
internal fun <T, R> Flow<T>.retryMap(
map: suspend (T) -> R,
predicate: suspend FlowCollector<R>.(value: T, result: R, attempt: Int) -> Boolean
): Flow<R> {
var retryCount = 1
suspend fun FlowCollector<R>.collector(
value: T,
result: R
) {
if (predicate(value, result, retryCount)) {
retryCount++
collector(value, map(value))
} else {
emit(result)
}
}
return flow {
collect { request ->
collector(request, map(request))
}
}
}
internal fun <T, R> Flow<T>.retryMapFlow(
map: Flow<T>.() -> Flow<R>,
predicate: suspend FlowCollector<R>.(value: T, result: R, attempt: Int) -> Boolean
): Flow<R> {
return retryMap(
map = { request -> flowOf(request).map().first() },
predicate = predicate
)
}
data class Request(val id: String)
sealed class Result<out T> {
data class Success<T>(val data: T) : Result<T>()
data class Error(val cause: Throwable) : Result<Nothing>()
data class Loading(val retryCount: Int = 0) : Result<Nothing>()
}
suspend fun getResult(request : Request): Result<Unit> {
delay(500)
return Result.Error(Exception("Error while getting result for [${request.id}]"))
}
fun main() = runBlocking {
flowOf(Request("id1"))
.retryRequest(
getResult = ::getResult,
predicate = { _, result, attempt ->
val shouldRetry = result is Result.Error && count < 3
if (shouldRetry) {
delay(1000)
emit(Result.Loading(retryCount = attempt))
}
shouldRetry
}
)
.stateIn(this, started = SharingStarted.Eagerly, initialValue = Result.Loading())
.collect { println(it) }
}
Prints:
Loading(retryCount=0)
Loading(retryCount=1)
Loading(retryCount=2)
Loading(retryCount=3)
Error(cause=java.lang.Exception: Error while getting result for [id1])