Skip to content

Instantly share code, notes, and snippets.

@YusukeIwaki
Created April 27, 2017 12:57
Show Gist options
  • Save YusukeIwaki/f9a302dd6e42f487530d49707161f70b to your computer and use it in GitHub Desktop.
Save YusukeIwaki/f9a302dd6e42f487530d49707161f70b to your computer and use it in GitHub Desktop.
RealmResults#asObservableをRxJava2対応に
package io.github.yusukeiwaki.realm_java_helpers;
import java.util.IdentityHashMap;
import java.util.Map;
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.android.MainThreadDisposable;
import io.realm.RealmChangeListener;
import io.realm.RealmModel;
import io.realm.RealmResults;
public class RealmResultsObservable<T extends RealmModel> extends Observable<RealmResults<T>> {
private final RealmResults<T> results;
public RealmResultsObservable(RealmResults<T> results) {
this.results = results;
}
@Override
protected void subscribeActual(Observer<? super RealmResults<T>> observer) {
MyRealmChangeListener listener = new MyRealmChangeListener(results, observer);
observer.onSubscribe(listener);
listener.enable();
}
// https://github.com/realm/realm-java/blob/721753ab6e73537424e36f33c8d8e407ab72d883/realm/realm-library/src/main/java/io/realm/rx/RealmObservableFactory.java#L371
private static class StrongReferenceCounter<K> {
private final Map<K, Integer> references = new IdentityHashMap<K, Integer>();
public void acquireReference(K object) {
Integer count = references.get(object);
if (count == null) {
references.put(object, 1);
} else {
references.put(object, count + 1);
}
}
public void releaseReference(K object) {
Integer count = references.get(object);
if (count == null) {
throw new IllegalStateException("Object does not have any references: " + object);
} else if (count > 1) {
references.put(object, count - 1);
} else if (count == 1) {
references.remove(object);
} else {
throw new IllegalStateException("Invalid reference count: " + count);
}
}
}
private static class MyRealmChangeListener<T extends RealmModel> extends MainThreadDisposable implements RealmChangeListener<RealmResults<T>> {
private final RealmResults<T> origResults;
private final Observer<RealmResults<T>> observer;
private ThreadLocal<StrongReferenceCounter<RealmResults>> resultsRefs = new ThreadLocal<StrongReferenceCounter<RealmResults>>() {
@Override
protected StrongReferenceCounter<RealmResults> initialValue() {
return new StrongReferenceCounter<>();
}
};
private MyRealmChangeListener(RealmResults<T> origResults, Observer<RealmResults<T>> observer) {
this.origResults = origResults;
this.observer = observer;
}
public void enable() {
resultsRefs.get().acquireReference(origResults);
origResults.addChangeListener(this);
observer.onNext(origResults);
}
@Override
public void onChange(RealmResults<T> element) {
if (!isDisposed()) {
observer.onNext(element);
}
}
@Override
protected void onDispose() {
resultsRefs.get().releaseReference(origResults);
origResults.removeChangeListener(this);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment