Created
September 14, 2017 22:08
-
-
Save magillus/c0795671c10d191001d1f61d9db66ec5 to your computer and use it in GitHub Desktop.
RxRealm Flowable wrappers with Looper
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example.playground; | |
import android.os.HandlerThread | |
import android.os.Process | |
/** | |
* Looper based BackgroundThread handler for REalm executions. | |
*/ | |
class BackgroundThread : HandlerThread("Scheduler-Realm-BackgroundThread", Process.THREAD_PRIORITY_BACKGROUND) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example.playground; | |
import android.os.Handler | |
import io.reactivex.BackpressureStrategy | |
import io.reactivex.Flowable | |
import io.reactivex.FlowableEmitter | |
import io.reactivex.FlowableOnSubscribe | |
import io.reactivex.disposables.Disposables | |
import io.realm.* | |
/** | |
* Rx wrapper for Realm object changes. | |
* Wraps all Realm actions in ThreadHandler with Looper | |
* | |
* Copyright 2017 Mateusz Perlak - http://www.apache.org/licenses/LICENSE-2.0 | |
*/ | |
class RxRealmObjectFlowable<T : RealmObject>( | |
private val fetchManageObject: (Realm) -> T?, | |
private val changeProperties: List<String>? = null, | |
private val realmConfig: RealmConfiguration?) : FlowableOnSubscribe<T> { | |
var backgroundThread: BackgroundThread? = null | |
private var handler = prepareHandler() | |
fun prepareHandler(): Handler { | |
if (backgroundThread == null) { | |
backgroundThread = BackgroundThread() | |
backgroundThread!!.start() | |
} | |
return Handler(backgroundThread!!.looper) | |
} | |
companion object { | |
fun <T : RealmObject> create(fetchManageObject: (Realm) -> T?, realmConfig: RealmConfiguration? = null): Flowable<T> { | |
return Flowable.create(RxRealmObjectFlowable(fetchManageObject, null, realmConfig), BackpressureStrategy.LATEST) | |
} | |
fun <T : RealmObject> create(fetchManageObject: (Realm) -> T?, changeProperties: List<String>?, realmConfig: RealmConfiguration? = null): Flowable<T> { | |
return Flowable.create(RxRealmObjectFlowable(fetchManageObject, changeProperties, realmConfig), BackpressureStrategy.LATEST) | |
} | |
} | |
override fun subscribe(e: FlowableEmitter<T>?) { | |
e?.let { | |
handler.post { | |
var realm = Realm.getInstance(realmConfig) | |
val changeObservable = RealmObjectChangeListener<T> { t, changeSet -> | |
var gotValidChange = true | |
changeProperties?.let { | |
gotValidChange = false | |
it.forEach { trackChangeProperty -> | |
gotValidChange = gotValidChange or changeSet.isFieldChanged(trackChangeProperty) | |
} | |
} | |
if (gotValidChange) { | |
if (realm != null) { | |
e.onNext(realm.copyFromRealm(t)) | |
} else { | |
e.onNext(t) | |
} | |
} | |
} | |
val changeObject = fetchManageObject(realm) | |
if (changeObject == null) { | |
e.onComplete() | |
} else { | |
changeObject.addChangeListener(changeObservable) | |
e.onNext(realm.copyFromRealm(changeObject)) | |
e.setDisposable(Disposables.fromRunnable({ | |
handler.post { | |
changeObject.removeChangeListener(changeObservable) | |
realm.close() | |
} | |
})) | |
} | |
} | |
} | |
} | |
} | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example.playground; | |
import android.os.Handler | |
import io.reactivex.BackpressureStrategy | |
import io.reactivex.Flowable | |
import io.reactivex.FlowableEmitter | |
import io.reactivex.FlowableOnSubscribe | |
import io.reactivex.disposables.Disposables | |
import io.realm.* | |
/** | |
* Rx wrapper for Realm query results changes. | |
* All Realm actions happen on same Looper | |
* Copyright 2017 Mateusz Perlak - http://www.apache.org/licenses/LICENSE-2.0 | |
*/ | |
class RxRealmQueryFlowable<T : RealmObject>(val fetchQuery: (Realm) -> RealmQuery<T>, val realmConfiguration: RealmConfiguration?) : FlowableOnSubscribe<List<T>> { | |
companion object { | |
fun <T : RealmObject> create(fetchQuery: (Realm) -> RealmQuery<T>, realmConfig: RealmConfiguration?): Flowable<List<T>> { | |
return Flowable.create(RxRealmQueryFlowable(fetchQuery, realmConfig), BackpressureStrategy.LATEST) | |
} | |
} | |
var backgroundThread: BackgroundThread? = null | |
private var handler = prepareHandler() | |
fun prepareHandler(): Handler { | |
if (backgroundThread == null) { | |
backgroundThread = BackgroundThread() | |
backgroundThread!!.start() | |
} | |
return Handler(backgroundThread!!.looper) | |
} | |
override fun subscribe(emitter: FlowableEmitter<List<T>>?) { | |
emitter?.let { emitter -> | |
handler.post { | |
var realm = if (realmConfiguration == null) Realm.getDefaultInstance() else Realm.getInstance(realmConfiguration) | |
val queryObserver = RealmChangeListener<RealmResults<T>> { list -> | |
if (realm != null) { | |
emitter.onNext(realm.copyFromRealm(list)) | |
} else { | |
emitter.onNext(list) | |
} | |
} | |
var realmResult = fetchQuery.invoke(realm).findAll() | |
realmResult.addChangeListener(queryObserver) | |
emitter.onNext(realm.copyFromRealm(realmResult)) | |
emitter.setDisposable(Disposables.fromRunnable({ | |
handler.post { | |
realmResult.removeChangeListener(queryObserver) | |
realm.close() | |
} | |
})) | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment