Last active
December 20, 2018 22:29
-
-
Save f3401pal/79f8351d1bc1422bf25eedc76a4900df to your computer and use it in GitHub Desktop.
RxKotlin Flowable from staged completables (draft)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io.reactivex.Completable | |
import io.reactivex.Flowable | |
object FlowableEx { | |
/** | |
* Create a Flowable by chaining multiple completables and execute them in order. | |
* Note that the completables will be run in sequense. If previous completable fails, the next one will NOT run. | |
* For example: | |
* FlowableEx.fromCompletables(developing, review, testing) { index -> | |
* when (inde) { | |
* 0 -> "done developing, PR created" | |
* 1 -> "PR review done and merged" | |
* 2 -> "testing done, released" | |
* } | |
* } | |
* When each completable completes, emit the result returned from stateProvider given the index of the completables passed in. | |
*/ | |
fun <R> fromCompletables(vararg completables: Completable, stateProvider: (Int) -> R): Flowable<R> { | |
class StageCompletable<R>(val completable: Completable, val stageResult: R) | |
val stages = completables.mapIndexed { index, completable -> | |
StageCompletable(completable, stateProvider(index)) | |
} | |
return Flowable.fromIterable(stages).map { stageCompletable -> | |
// this will block the current thread | |
stageCompletable.completable.blockingGet()?.let { | |
throw it | |
} ?: stageCompletable.stageResult | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.nhaarman.mockito_kotlin.mock | |
import com.nhaarman.mockito_kotlin.never | |
import com.nhaarman.mockito_kotlin.verify | |
import io.reactivex.Completable | |
import io.reactivex.schedulers.Schedulers | |
import io.reactivex.schedulers.TestScheduler | |
import org.junit.Test | |
import java.util.concurrent.TimeUnit | |
class RxExtensionTest { | |
private val testScheduler: TestScheduler = TestScheduler() | |
@Test | |
fun `fromCompletables@Flowable chains the complatables and return a result in each stage`() { | |
FlowableEx.fromCompletables( | |
Completable.complete(), | |
Completable.complete(), | |
Completable.complete() | |
) { it }.test() | |
.assertValues(0, 1, 2) | |
.assertComplete() | |
} | |
@Test | |
fun `fromCompletables@Flowable breaks out of the stream when one completable failed`() { | |
val error = RuntimeException() | |
FlowableEx.fromCompletables( | |
Completable.complete(), | |
Completable.error(error), | |
Completable.complete() | |
) { it }.test() | |
.assertValues(0) | |
.assertError(error) | |
} | |
@Test | |
fun `fromCompletables@Flowable executes the completables in order`() { | |
val scheduler = Schedulers.single() | |
FlowableEx.fromCompletables( | |
Completable.timer(5, TimeUnit.SECONDS, scheduler), | |
Completable.timer(2, TimeUnit.SECONDS, scheduler), | |
Completable.complete() | |
) { it }.test() | |
.assertValues(0, 1, 2) | |
.assertComplete() | |
} | |
@Test | |
fun `fromCompletables@Flowable stops execution when disposed`() { | |
var subscriber: TestSubscriber<Int>? = null | |
val dispose = fun () { subscriber?.dispose() } | |
subscriber = FlowableEx.fromCompletables( | |
Completable.fromCallable { dispose }, | |
Completable.fromCallable { fail() } | |
) { it }.test() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment