Skip to content

Instantly share code, notes, and snippets.

override fun onNext(contact: Contact) {
openConnection<Contact>().use {
it.write(contact)
}
request(1)
}
class MainActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
val source = Flowable.interval(200, TimeUnit.MILLISECONDS)
.map { Contact("FirstName $it", "LastName $it", "$it") }
Repository().save(source)
fun save(contacts: Flowable<Contact>) =
contacts
.observeOn(Schedulers.io())
.window(30)
.concatMap { window -> window.toList().toFlowable() }
.subscribe(object : DefaultSubscriber<List<Contact>>() {
override fun onStart() = request(1)
override fun onNext(contacts: List<Contact>) {
openConnection<Contact>().use {
inline fun <T : Any> ContentResolver.itemsFor(
crossinline queryRawData: ContentResolver.() -> Cursor,
crossinline mapRawData: (Cursor) -> T): Flowable<T> =
Flowable.generate<T, Cursor>(
Callable<Cursor> { queryRawData(this) },
BiFunction<Cursor, Emitter<T>, Cursor> { cursor, emitter ->
if (cursor.moveToNext()) {
emitter.onNext(mapRawData(cursor))
} else {
emitter.onComplete()
inline fun <T> ContentResolver.uriChangesOf(
crossinline onUriChanged: ContentResolver.() -> T,
vararg uris: Uri, notifyForDescendants: Boolean = false): Flowable<T> =
Flowable.create<T>({ emitter ->
val contentObserver = object : ContentObserver(Handler()) {
override fun onChange(selfChange: Boolean, uri: Uri?) {
emitter.onNext(onUriChanged())
}
}
uris.forEach {
data class SimpleContact(val displayName: String, val isStarred: Boolean) {
companion object {
fun from(cursor: Cursor) =
SimpleContact(
cursor.getString(cursor.getColumnIndex(Contacts.DISPLAY_NAME)),
cursor.getInt(cursor.getColumnIndex(Contacts.STARRED)) == 1)
}
}
fun contacts(contentResolver: ContentResolver): Flowable<SimpleContact> =
data class Credentials(val username: String, val password: String)
interface ContactsApi {
fun getContacts(credentials: Credentials): Flowable<SimpleContact>
companion object {
fun createFor(serverUrl: String) = object: ContactsApi {
override fun getContacts(credentials: Credentials):
Flowable<SimpleContact> {
Log.i("ContactsApi",
fun obtainContacts(credentials: Flowable<Credentials>,
serverUrls: Flowable<String>) =
Flowables.combineLatest(
credentials, serverUrls,
{ secrets, url ->
ApiWithCredentials(ContactsApi.createFor(url), secrets)
}
).switchMap { with(it) { api.getContacts(secrets) } }
private data class ApiWithCredentials(
fun obtainContacts(credentials: Flowable<Credentials>,
serverUrls: Flowable<String>) =
Flowables.combineLatest(
credentials, serverUrls,
{ secrets, url ->
ContactsApi.createFor(url).getContacts(secrets)
}
).switchMap { it }
import io.reactivex.Flowable
import io.reactivex.rxkotlin.withLatestFrom
fun combine(triggers: Flowable<Boolean>,
gatekeepers: Flowable<Boolean>) =
triggers.withLatestFrom(gatekeepers,
{ trigger, gateIsOpen ->
if (gateIsOpen) {
Flowable.just(trigger)
} else {