This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| fun statefulIntegers() = Flowable.generate( | |
| Callable<Int> { 0 }, | |
| BiFunction<Int, Emitter<Int>, Int> { index, emitter -> | |
| if (index < 100) { | |
| emitter.onNext(index) | |
| } else { | |
| emitter.onComplete() | |
| } | |
| return@BiFunction index + 1 | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| data class PagedResponse<out T>(@Json(name = "isLastPage") val isLastPage: Boolean, | |
| @Json(name = "values") val values: List<T>, | |
| @Json(name = "nextPageStart") val nextPageStart: Int) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| interface ServerApi { | |
| @GET("/rest/api/1.0/strings/") | |
| fun getStrings( | |
| @Query("start") start: Int, | |
| @Query("pageLimit") pageLimit: Int | |
| ): Flowable<PagedResponse<String>> | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| fun <TData> pagedDataOf(nextPageStartingFrom: (start: Int) -> Flowable<PagedResponse<TData>>): Flowable<List<TData>> { | |
| return Flowable.generate<List<TData>, Int>( | |
| Callable<Int> { 0 }, | |
| BiFunction<Int, Emitter<List<TData>>, Int> { index, emitter -> | |
| nextPageStartingFrom(index) | |
| .doOnNext { page -> | |
| if (!page.values.isEmpty()) { | |
| emitter.onNext(page.values) | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| fun <TData> pagedDataOf(nextPageStartingFrom: (start: Int) -> Flowable<PagedResponse<TData>>): Flowable<List<TData>> = | |
| Flowables.generateWith({ 0 }) { start, emitter -> | |
| nextPageStartingFrom(start) | |
| .doOnNext { | |
| it.apply { | |
| values.takeIf { it.isNotEmpty() }?.let { emitter.onNext(it) } | |
| isLastPage.takeIf { it }?.let { emitter.onComplete() } | |
| } | |
| } | |
| .map { it.nextPageStart } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| class Flowables private constructor(){ | |
| companion object { | |
| inline fun <T, S> generateWith(noinline initialState: () -> S, crossinline generator: (S, Emitter<T>) -> S) | |
| = Flowable.generate(Callable<S>(initialState), BiFunction<S, Emitter<T>, S> { a, b -> generator.invoke(a, b) }) | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| fun sampleCall(api: ServerApi) { | |
| pagedDataOf { start -> api.getStrings(start, pageLimit = 20) } | |
| .subscribeOn(Schedulers.io()) | |
| .subscribe(object: Subscriber<List<String>> { | |
| override fun onSubscribe(subscription: Subscription) { | |
| subscription.request(2) | |
| } | |
| override fun onNext(nextPage: List<String>) = System.out.println(nextPage) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package com.github.luks91.streamnesting | |
| import android.util.Log | |
| import io.reactivex.Flowable | |
| import io.reactivex.schedulers.Schedulers | |
| import io.reactivex.subscribers.DefaultSubscriber | |
| data class Contact(val firstName: String, | |
| val lastName: String, | |
| val phoneNumber: String) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| fun save(contacts: Flowable<Contact>) = | |
| contacts | |
| .observeOn(Schedulers.io()) | |
| .subscribe(object : DefaultSubscriber<Contact>() { | |
| override fun onStart() = request(1) | |
| override fun onNext(contact: Contact) = | |
| openConnection<Contact>().use { | |
| it.write(contact) | |
| request(1) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| override fun onStart() = request(1) |
OlderNewer