-
-
Save gsoltis/86210e3259dcc6998801 to your computer and use it in GitHub Desktop.
package com.firebase.client; | |
import com.firebase.client.core.Constants; | |
import rx.Observable; | |
import rx.Subscriber; | |
import rx.functions.Action0; | |
import rx.functions.Func1; | |
import rx.subscriptions.Subscriptions; | |
public class RxFirebase { | |
/** | |
* Essentially a struct so that we can pass all children events through as a single object. | |
*/ | |
public static class FirebaseChildEvent { | |
public DataSnapshot snapshot; | |
public Constants.EventType eventType; | |
public String prevName; | |
public FirebaseChildEvent(DataSnapshot snapshot, Constants.EventType eventType, String prevName) { | |
this.snapshot = snapshot; | |
this.eventType = eventType; | |
this.prevName = prevName; | |
} | |
} | |
public static Observable<FirebaseChildEvent> observeChildren(final Query ref) { | |
return Observable.create(new Observable.OnSubscribe<FirebaseChildEvent>() { | |
@Override | |
public void call(final Subscriber<? super FirebaseChildEvent> subscriber) { | |
final ChildEventListener listener = ref.addChildEventListener(new ChildEventListener() { | |
@Override | |
public void onChildAdded(DataSnapshot dataSnapshot, String prevName) { | |
subscriber.onNext(new FirebaseChildEvent(dataSnapshot, Constants.EventType.CHILD_ADDED, prevName)); | |
} | |
@Override | |
public void onChildChanged(DataSnapshot dataSnapshot, String prevName) { | |
subscriber.onNext(new FirebaseChildEvent(dataSnapshot, Constants.EventType.CHILD_CHANGED, prevName)); | |
} | |
@Override | |
public void onChildRemoved(DataSnapshot dataSnapshot) { | |
subscriber.onNext(new FirebaseChildEvent(dataSnapshot, Constants.EventType.CHILD_REMOVED, null)); | |
} | |
@Override | |
public void onChildMoved(DataSnapshot dataSnapshot, String prevName) { | |
subscriber.onNext(new FirebaseChildEvent(dataSnapshot, Constants.EventType.CHILD_MOVED, prevName)); | |
} | |
@Override | |
public void onCancelled(FirebaseError error) { | |
// Turn the FirebaseError into a throwable to conform to the API | |
subscriber.onError(new FirebaseException(error.getMessage())); | |
} | |
}); | |
// When the subscription is cancelled, remove the listener | |
subscriber.add(Subscriptions.create(new Action0() { | |
@Override | |
public void call() { | |
ref.removeEventListener(listener); | |
} | |
})); | |
} | |
}); | |
} | |
private static Func1<FirebaseChildEvent, Boolean> makeEventFilter(final Constants.EventType eventType) { | |
return new Func1<FirebaseChildEvent, Boolean>() { | |
@Override | |
public Boolean call(FirebaseChildEvent firebaseChildEvent) { | |
return firebaseChildEvent.eventType == eventType; | |
} | |
}; | |
} | |
public static Observable<FirebaseChildEvent> observeChildAdded(Query ref) { | |
return observeChildren(ref).filter(makeEventFilter(Constants.EventType.CHILD_ADDED)); | |
} | |
public static Observable<FirebaseChildEvent> observeChildChanged(Query ref) { | |
return observeChildren(ref).filter(makeEventFilter(Constants.EventType.CHILD_CHANGED)); | |
} | |
public static Observable<FirebaseChildEvent> observeChildMoved(Query ref) { | |
return observeChildren(ref).filter(makeEventFilter(Constants.EventType.CHILD_MOVED)); | |
} | |
public static Observable<FirebaseChildEvent> observeChildRemoved(Query ref) { | |
return observeChildren(ref).filter(makeEventFilter(Constants.EventType.CHILD_REMOVED)); | |
} | |
public static Observable<DataSnapshot> observe(final Query ref) { | |
return Observable.create(new Observable.OnSubscribe<DataSnapshot>() { | |
@Override | |
public void call(final Subscriber<? super DataSnapshot> subscriber) { | |
final ValueEventListener listener = ref.addValueEventListener(new ValueEventListener() { | |
@Override | |
public void onDataChange(DataSnapshot dataSnapshot) { | |
subscriber.onNext(dataSnapshot); | |
} | |
@Override | |
public void onCancelled(FirebaseError error) { | |
// Turn the FirebaseError into a throwable to conform to the API | |
subscriber.onError(new FirebaseException(error.getMessage())); | |
} | |
}); | |
// When the subscription is cancelled, remove the listener | |
subscriber.add(Subscriptions.create(new Action0() { | |
@Override | |
public void call() { | |
ref.removeEventListener(listener); | |
} | |
})); | |
} | |
}); | |
} | |
} |
Hi. I'm not familiar with Firebase but here are my comments about the RxJava usage.
L32-58: are the callbacks coming from a single thread? If not you should wrap the child subscriber with SerializedSubscriber and route calls through it. Same for L103-115.
FirebaseError: you could add the entire error object to the exception, not just the message.
@zsxwing Good to know, thanks!
@akarnokd The callbacks do come from a single thread by default. It's possible for users to override that behavior, but I think they will then be on their own to modify these bindings.
Adding the FirebaseError to the exception is a good idea, I'll look into making that change. It might take a bit to do that one since I'll have to modify the Firebase SDK where FirebaseException is defined.
This looks really useful!
I was wondering if you could add some kind of licence if you are happy for other people to use this in their work?
Where are you getting Constants.EventType from? Is there someplace I could get access to those variables?
For anyone who should be interested, here is the missing enum declaration:
enum EventType {
CHILD_ADDED, CHILD_CHANGED, CHILD_REMOVED, CHILD_MOVED
}
If anyone is interested I have forked this and made my own adjustments here: https://gist.github.com/TinasheMzondiwa/3b330c72a5b6625d66c2e1b63abaac9b
Will "new Observable.OnSubscribe" block cause some memory leak?
After call the .subscribe(), I can unsubscribe subscription in onDestory(). Then what happened to the running "new Observable.OnSubscribe" block ?
Hi, nice share @gssoltis.
But, how to do it in Java? It is android way.
Thanks.
@gsoltis Your code is right. The blocking
OperatorToObservableFuture
is a compromise sinceFuture
doest not provide a callback API.