Created
August 31, 2019 20:41
-
-
Save DrMetallius/ff6a3cad8e4e4d81621f7d955f52e199 to your computer and use it in GitHub Desktop.
Element ordering in Rx operators
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io.reactivex.Observable | |
import io.reactivex.schedulers.Schedulers | |
import java.util.concurrent.TimeUnit | |
import kotlin.random.Random | |
fun main() { | |
/* | |
* Output: | |
* | |
* 0 | |
* 1 | |
* 2 | |
* 3 | |
* 4 | |
*/ | |
println("map") | |
Observable.interval(100, TimeUnit.MILLISECONDS) | |
.take(5) | |
.map { | |
Thread.sleep(Random.nextLong(1000)) | |
it.toString() | |
} | |
.subscribeOn(Schedulers.io()) | |
.doOnNext(::println) | |
.blockingSubscribe() | |
/* | |
* Sample output: | |
* | |
* 2a | |
* 3a | |
* 1a | |
* 3b | |
* 0a | |
* 4a | |
* 1b | |
* 2b | |
* 4b | |
* 0b | |
*/ | |
println("flatMap") | |
Observable.interval(100, TimeUnit.MILLISECONDS) | |
.take(5) | |
.flatMap { value -> | |
Observable.fromArray("a", "b") | |
.map { | |
Thread.sleep(Random.nextLong(1000)) | |
"$value$it" | |
} | |
.subscribeOn(Schedulers.io()) | |
} | |
.subscribeOn(Schedulers.io()) | |
.doOnNext(::println) | |
.blockingSubscribe() | |
/* | |
* Output: | |
* | |
* 0a | |
* 0b | |
* 1a | |
* 1b | |
* 2a | |
* 2b | |
* 3a | |
* 3b | |
* 4a | |
* 4b | |
*/ | |
println("concatMap") | |
Observable.interval(100, TimeUnit.MILLISECONDS) | |
.take(5) | |
.concatMap { value -> | |
Observable.fromArray("a", "b") | |
.map { | |
Thread.sleep(Random.nextLong(1000)) | |
"$value$it" | |
} | |
.subscribeOn(Schedulers.io()) | |
} | |
.subscribeOn(Schedulers.io()) | |
.doOnNext(::println) | |
.blockingSubscribe() | |
/* | |
* Sample output: | |
* | |
* Skipped 0a | |
* Skipped 1a | |
* 2a | |
* Skipped 2b | |
* 3a | |
* 3b | |
* 4a | |
* 4b | |
*/ | |
println("switchMap") | |
Observable.interval(100, TimeUnit.MILLISECONDS) | |
.take(5) | |
.switchMap { value -> | |
Observable.fromArray("a", "b") | |
.map { | |
try { | |
Thread.sleep(Random.nextLong(200)) | |
} catch (e: Exception) { | |
println("Skipped $value$it") | |
Thread.currentThread().interrupt() | |
} | |
"$value$it" | |
} | |
.subscribeOn(Schedulers.io()) | |
} | |
.subscribeOn(Schedulers.io()) | |
.doOnNext(::println) | |
.blockingSubscribe() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment