Skip to content

Instantly share code, notes, and snippets.

@nesterchung
Last active May 24, 2018 16:46
Show Gist options
  • Save nesterchung/22bfc9c98ed54a9c99f41880dca3cdff to your computer and use it in GitHub Desktop.
Save nesterchung/22bfc9c98ed54a9c99f41880dca3cdff to your computer and use it in GitHub Desktop.
rxkotlin parallel execute and collect to single
import io.reactivex.Flowable
import io.reactivex.schedulers.Schedulers
import java.util.*
class Demo {
data class Result(val a: String, val b: String, val c: String)
fun start() {
val a = Flowable.just("A")
val b = Flowable.just("B")
val c = Flowable.just("C")
fun slowApiCall(a: String): String {
Thread.sleep(300)
return a
}
fun asyncApiCall(a: String): Flowable<String> {
return Flowable.fromCallable {
slowApiCall(a)
}
}
//async sequential collect to single
val data = Flowable.concat(a, b, c)
.concatMap ({
asyncApiCall(it).subscribeOn(Schedulers.io())
//maxConcurrency
}, 3)
.collect({ mutableListOf<String>() }, { acc, value ->
println(value) // A, B, C
acc.add(value)
})
.map { Result(it[0], it[1], it[2]) }
.blockingGet()
println("async collect ${Date().toInstant()}: $data")
}
}
fun main(args: Array<String>) {
Demo().start()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment