Skip to content

Instantly share code, notes, and snippets.

@hadilq
Last active October 27, 2018 21:00
Show Gist options
  • Save hadilq/ebbb311cb70f3de7df386aaad3498498 to your computer and use it in GitHub Desktop.
Save hadilq/ebbb311cb70f3de7df386aaad3498498 to your computer and use it in GitHub Desktop.
/**
* The base repository to handle important functionality of repositories.
*
* @param E the entity that is related to this repository. Every repository responsible for
* one and only one entity.
*/
abstract class BaseRepository<E : Entity> {
/**
* Loads an entity from database and network.
*
* @param D a data type
* @param databaseFlowable a flowable to load an entity from database.
* @param netSingle a single to load an entity from server.
* @param persist a high order function to persist an entity.
*/
protected fun <D> perform(
databaseFlowable: Flowable<List<D>>,
netSingle: Single<D>,
persist: (D) -> Unit
): Flowable<ResultState<D>> {
val cachedData = AtomicReference<D?>()
val processor = PublishProcessor.create<ResultState<D>>()
val flowable = databaseFlowable.doOnNext { cachedData.set(it.firstOrNull()) }.share()
return Flowable.merge(flowable.take(1)
.flatMap {
if (it.isEmpty()) {
handleNetSingle(netSingle, persist, cachedData)
} else {
concatJustFlowable(
ResultState.Loading(it.first()),
handleNetSingle(netSingle, persist, cachedData)
)
}
}, flowable.skip(1)
.flatMap {
if (it.isNotEmpty()) {
concatJustFlowable(ResultState.Success(it.first()), processor)
} else {
processor
}
})
.onErrorResumeNext(io.reactivex.functions.Function {
concatJustFlowable(ResultState.Error(it, cachedData.get()), processor)
})
}
private fun <D> concatJustFlowable(
d: ResultState<D>,
flowable: Flowable<ResultState<D>>
) = Flowable.concat(Flowable.just<ResultState<D>>(d), flowable)
private fun <D> handleNetSingle(
netSingle: Single<D>,
persist: (D) -> Unit,
cachedData: AtomicReference<D?>
): Flowable<ResultState<D>> = netSingle.toFlowable().flatMap {
persist(it)
Flowable.empty<ResultState<D>>()
}.onErrorReturn {
ResultState.Error(it, cachedData.get())
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment