-
-
Save gsoltis/86210e3259dcc6998801 to your computer and use it in GitHub Desktop.
package com.firebase.client; | |
import com.firebase.client.core.Constants; | |
import rx.Observable; | |
import rx.Subscriber; | |
import rx.functions.Action0; | |
import rx.functions.Func1; | |
import rx.subscriptions.Subscriptions; | |
public class RxFirebase { | |
/** | |
* Essentially a struct so that we can pass all children events through as a single object. | |
*/ | |
public static class FirebaseChildEvent { | |
public DataSnapshot snapshot; | |
public Constants.EventType eventType; | |
public String prevName; | |
public FirebaseChildEvent(DataSnapshot snapshot, Constants.EventType eventType, String prevName) { | |
this.snapshot = snapshot; | |
this.eventType = eventType; | |
this.prevName = prevName; | |
} | |
} | |
public static Observable<FirebaseChildEvent> observeChildren(final Query ref) { | |
return Observable.create(new Observable.OnSubscribe<FirebaseChildEvent>() { | |
@Override | |
public void call(final Subscriber<? super FirebaseChildEvent> subscriber) { | |
final ChildEventListener listener = ref.addChildEventListener(new ChildEventListener() { | |
@Override | |
public void onChildAdded(DataSnapshot dataSnapshot, String prevName) { | |
subscriber.onNext(new FirebaseChildEvent(dataSnapshot, Constants.EventType.CHILD_ADDED, prevName)); | |
} | |
@Override | |
public void onChildChanged(DataSnapshot dataSnapshot, String prevName) { | |
subscriber.onNext(new FirebaseChildEvent(dataSnapshot, Constants.EventType.CHILD_CHANGED, prevName)); | |
} | |
@Override | |
public void onChildRemoved(DataSnapshot dataSnapshot) { | |
subscriber.onNext(new FirebaseChildEvent(dataSnapshot, Constants.EventType.CHILD_REMOVED, null)); | |
} | |
@Override | |
public void onChildMoved(DataSnapshot dataSnapshot, String prevName) { | |
subscriber.onNext(new FirebaseChildEvent(dataSnapshot, Constants.EventType.CHILD_MOVED, prevName)); | |
} | |
@Override | |
public void onCancelled(FirebaseError error) { | |
// Turn the FirebaseError into a throwable to conform to the API | |
subscriber.onError(new FirebaseException(error.getMessage())); | |
} | |
}); | |
// When the subscription is cancelled, remove the listener | |
subscriber.add(Subscriptions.create(new Action0() { | |
@Override | |
public void call() { | |
ref.removeEventListener(listener); | |
} | |
})); | |
} | |
}); | |
} | |
private static Func1<FirebaseChildEvent, Boolean> makeEventFilter(final Constants.EventType eventType) { | |
return new Func1<FirebaseChildEvent, Boolean>() { | |
@Override | |
public Boolean call(FirebaseChildEvent firebaseChildEvent) { | |
return firebaseChildEvent.eventType == eventType; | |
} | |
}; | |
} | |
public static Observable<FirebaseChildEvent> observeChildAdded(Query ref) { | |
return observeChildren(ref).filter(makeEventFilter(Constants.EventType.CHILD_ADDED)); | |
} | |
public static Observable<FirebaseChildEvent> observeChildChanged(Query ref) { | |
return observeChildren(ref).filter(makeEventFilter(Constants.EventType.CHILD_CHANGED)); | |
} | |
public static Observable<FirebaseChildEvent> observeChildMoved(Query ref) { | |
return observeChildren(ref).filter(makeEventFilter(Constants.EventType.CHILD_MOVED)); | |
} | |
public static Observable<FirebaseChildEvent> observeChildRemoved(Query ref) { | |
return observeChildren(ref).filter(makeEventFilter(Constants.EventType.CHILD_REMOVED)); | |
} | |
public static Observable<DataSnapshot> observe(final Query ref) { | |
return Observable.create(new Observable.OnSubscribe<DataSnapshot>() { | |
@Override | |
public void call(final Subscriber<? super DataSnapshot> subscriber) { | |
final ValueEventListener listener = ref.addValueEventListener(new ValueEventListener() { | |
@Override | |
public void onDataChange(DataSnapshot dataSnapshot) { | |
subscriber.onNext(dataSnapshot); | |
} | |
@Override | |
public void onCancelled(FirebaseError error) { | |
// Turn the FirebaseError into a throwable to conform to the API | |
subscriber.onError(new FirebaseException(error.getMessage())); | |
} | |
}); | |
// When the subscription is cancelled, remove the listener | |
subscriber.add(Subscriptions.create(new Action0() { | |
@Override | |
public void call() { | |
ref.removeEventListener(listener); | |
} | |
})); | |
} | |
}); | |
} | |
} |
ref.removeEventListener(listener);
is thread-safe? Subscription.unsubscribe
can be called in other thread.
And if observeChildren
can be called in any thread, ref.addChildEventListener
should also be thread-safe.
Not sure what you mean by "convert FirebaseException to a Throwable" - RxJava doesn't wrap the Exception so you can still handle the downstream Exception as FirebaseException (but you also need to handle / re-throw other Throwable types)
@zsxwing Both methods are threadsafe, they append to a queue on a separate, internal thread managed by the library. I was mainly confused because it looks like code that wraps a Future into an observable blocks if you don't specify a TimeUnit. This code will not block ever, as all actual processing takes place on the internal queue thread.
@petermd FirebaseError, the type returned by the Firebase API in the case of a problem, is not a subclass of throwable. We do have FirebaseException that's used as well, hence the conversion.
But presumably the end user gets a Throwable in their onError callback and has to do some instanceof checks to separate out different classes of errors?
I was mainly confused because it looks like code that wraps a Future into an observable blocks if you don't specify a TimeUnit.
@gsoltis can you give an example? I can not find Future
in your codes.
@zsxwing Sorry, I wasn't clear. I meant in contrast with the official handling of Futures by the RxJava library. Specifically: https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/operators/OperatorToObservableFuture.java#L65
It appears to block on the result of the future when a subscriber is added, and then fire the callback on the same thread when the value is available. My code above will not do this, which I think is right given that it is a stream of values, rather than a single value. My question was around the expected behavior though. One option, similar to what the Future Observable is doing, would be to suspend the subscribing thread and only wake it up to fire events. The code, as currently written will instead return immediately from the subscribe call and will fire events on a separate thread.
@gsoltis Your code is right. The blocking OperatorToObservableFuture
is a compromise since Future
doest not provide a callback API.
Hi. I'm not familiar with Firebase but here are my comments about the RxJava usage.
L32-58: are the callbacks coming from a single thread? If not you should wrap the child subscriber with SerializedSubscriber and route calls through it. Same for L103-115.
FirebaseError: you could add the entire error object to the exception, not just the message.
@zsxwing Good to know, thanks!
@akarnokd The callbacks do come from a single thread by default. It's possible for users to override that behavior, but I think they will then be on their own to modify these bindings.
Adding the FirebaseError to the exception is a good idea, I'll look into making that change. It might take a bit to do that one since I'll have to modify the Firebase SDK where FirebaseException is defined.
This looks really useful!
I was wondering if you could add some kind of licence if you are happy for other people to use this in their work?
Where are you getting Constants.EventType from? Is there someplace I could get access to those variables?
For anyone who should be interested, here is the missing enum declaration:
enum EventType {
CHILD_ADDED, CHILD_CHANGED, CHILD_REMOVED, CHILD_MOVED
}
If anyone is interested I have forked this and made my own adjustments here: https://gist.github.com/TinasheMzondiwa/3b330c72a5b6625d66c2e1b63abaac9b
Will "new Observable.OnSubscribe" block cause some memory leak?
After call the .subscribe(), I can unsubscribe subscription in onDestory(). Then what happened to the running "new Observable.OnSubscribe" block ?
Hi, nice share @gssoltis.
But, how to do it in Java? It is android way.
Thanks.
I'd love some feedback from anyone more familiar with RxJava. In particular, converting FirebaseError to a throwable seems to lose some semantic meaning (is a throwable the only way a stream can error out?). Also, any thoughts on threading would be greatly appreciated. The Firebase client will spin up its own thread to fire events, and on Android it will use the main Looper. I don't know what Rx devs normally expect in terms of what thread a particular callback will run on, so advice there would be great.