-
-
Save krebernisak/ba50705fbf919972fa44 to your computer and use it in GitHub Desktop.
RxJava Bindings for Firebase
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.firebase.client; | |
import com.firebase.client.AuthData; | |
import com.firebase.client.ChildEventListener; | |
import com.firebase.client.DataSnapshot; | |
import com.firebase.client.Firebase; | |
import com.firebase.client.FirebaseError; | |
import com.firebase.client.Query; | |
import com.firebase.client.ValueEventListener; | |
import com.firebase.client.core.view.Event; | |
import rx.Observable; | |
import rx.Subscriber; | |
import rx.functions.Action0; | |
import rx.functions.Func1; | |
import rx.subscriptions.Subscriptions; | |
import static com.firebase.client.core.view.Event.EventType.CHILD_ADDED; | |
import static com.firebase.client.core.view.Event.EventType.CHILD_CHANGED; | |
import static com.firebase.client.core.view.Event.EventType.CHILD_MOVED; | |
import static com.firebase.client.core.view.Event.EventType.CHILD_REMOVED; | |
public class RxFirebase { | |
/** | |
* Essentially a struct so that we can pass all children events through as a single object. | |
*/ | |
public static class FirebaseChildEvent { | |
public final DataSnapshot snapshot; | |
public final Event.EventType eventType; | |
public final String prevName; | |
public FirebaseChildEvent(DataSnapshot snapshot, Event.EventType eventType, String prevName) { | |
this.snapshot = snapshot; | |
this.eventType = eventType; | |
this.prevName = prevName; | |
} | |
} | |
public static class RxFirebaseException extends RuntimeException { | |
public final FirebaseError error; | |
private RxFirebaseException(FirebaseError error) { | |
super(error.getMessage(), error.toException()); | |
this.error = error; | |
} | |
public static RxFirebaseException from(FirebaseError error) { | |
return new RxFirebaseException(error); | |
} | |
} | |
public static Observable<FirebaseChildEvent> observeChildren(final Query ref) { | |
return Observable.create(new Observable.OnSubscribe<FirebaseChildEvent>() { | |
@Override | |
public void call(final Subscriber<? super FirebaseChildEvent> subscriber) { | |
final ChildEventListener listener = ref.addChildEventListener(new ChildEventListener() { | |
@Override | |
public void onChildAdded(DataSnapshot dataSnapshot, String prevName) { | |
subscriber.onNext(new FirebaseChildEvent(dataSnapshot, CHILD_ADDED, prevName)); | |
} | |
@Override | |
public void onChildChanged(DataSnapshot dataSnapshot, String prevName) { | |
subscriber.onNext(new FirebaseChildEvent(dataSnapshot, CHILD_CHANGED, prevName)); | |
} | |
@Override | |
public void onChildRemoved(DataSnapshot dataSnapshot) { | |
subscriber.onNext(new FirebaseChildEvent(dataSnapshot, CHILD_REMOVED, null)); | |
} | |
@Override | |
public void onChildMoved(DataSnapshot dataSnapshot, String prevName) { | |
subscriber.onNext(new FirebaseChildEvent(dataSnapshot, CHILD_MOVED, prevName)); | |
} | |
@Override | |
public void onCancelled(FirebaseError error) { | |
// Turn the FirebaseError into a throwable to conform to the API | |
subscriber.onError(RxFirebaseException.from(error)); | |
} | |
}); | |
// When the subscription is cancelled, remove the listener | |
subscriber.add(Subscriptions.create(new Action0() { | |
@Override | |
public void call() { | |
ref.removeEventListener(listener); | |
} | |
})); | |
} | |
}); | |
} | |
private static Func1<FirebaseChildEvent, Boolean> makeEventFilter(final Event.EventType eventType) { | |
return new Func1<FirebaseChildEvent, Boolean>() { | |
@Override | |
public Boolean call(FirebaseChildEvent firebaseChildEvent) { | |
return firebaseChildEvent.eventType == eventType; | |
} | |
}; | |
} | |
public static Observable<FirebaseChildEvent> observeChildAdded(Query ref) { | |
return observeChildren(ref).filter(makeEventFilter(CHILD_ADDED)); | |
} | |
public static Observable<FirebaseChildEvent> observeChildChanged(Query ref) { | |
return observeChildren(ref).filter(makeEventFilter(CHILD_CHANGED)); | |
} | |
public static Observable<FirebaseChildEvent> observeChildMoved(Query ref) { | |
return observeChildren(ref).filter(makeEventFilter(CHILD_MOVED)); | |
} | |
public static Observable<FirebaseChildEvent> observeChildRemoved(Query ref) { | |
return observeChildren(ref).filter(makeEventFilter(CHILD_REMOVED)); | |
} | |
public static Observable<DataSnapshot> observe(final Query ref) { | |
return Observable.create(new Observable.OnSubscribe<DataSnapshot>() { | |
@Override | |
public void call(final Subscriber<? super DataSnapshot> subscriber) { | |
final ValueEventListener listener = ref.addValueEventListener(new ValueEventListener() { | |
@Override | |
public void onDataChange(DataSnapshot dataSnapshot) { | |
subscriber.onNext(dataSnapshot); | |
} | |
@Override | |
public void onCancelled(FirebaseError error) { | |
// Turn the FirebaseError into a throwable to conform to the API | |
subscriber.onError(RxFirebaseException.from(error)); | |
} | |
}); | |
// When the subscription is cancelled, remove the listener | |
subscriber.add(Subscriptions.create(new Action0() { | |
@Override | |
public void call() { | |
ref.removeEventListener(listener); | |
} | |
})); | |
} | |
}); | |
} | |
public static Observable<DataSnapshot> once(final Query ref) { | |
return Observable.create(new Observable.OnSubscribe<DataSnapshot>() { | |
@Override | |
public void call(final Subscriber<? super DataSnapshot> subscriber) { | |
ref.addListenerForSingleValueEvent(new ValueEventListener() { | |
@Override | |
public void onDataChange(DataSnapshot dataSnapshot) { | |
subscriber.onNext(dataSnapshot); | |
} | |
@Override | |
public void onCancelled(FirebaseError error) { | |
// Turn the FirebaseError into a throwable to conform to the API | |
subscriber.onError(RxFirebaseException.from(error)); | |
} | |
}); | |
} | |
}); | |
} | |
public static Observable<Firebase> setValue(final Firebase ref, final Object value) { | |
return Observable.create(new Observable.OnSubscribe<Firebase>() { | |
@Override | |
public void call(final Subscriber<? super Firebase> subscriber) { | |
ref.setValue(value, new Firebase.CompletionListener() { | |
@Override | |
public void onComplete(FirebaseError error, Firebase firebase) { | |
if (error != null) { | |
subscriber.onError(RxFirebaseException.from(error)); | |
return; | |
} | |
subscriber.onNext(firebase); | |
subscriber.onCompleted(); | |
} | |
}); | |
} | |
}); | |
} | |
public static Observable<AuthData> observeAuthState(final Firebase ref) { | |
return Observable.create(new Observable.OnSubscribe<AuthData>() { | |
@Override | |
public void call(final Subscriber<? super AuthData> subscriber) { | |
final Firebase.AuthStateListener listener = ref.addAuthStateListener(new Firebase.AuthStateListener() { | |
@Override | |
public void onAuthStateChanged (AuthData authData) { | |
subscriber.onNext(authData); | |
} | |
}); | |
// When the subscription is cancelled, remove the listener | |
subscriber.add(Subscriptions.create(new Action0() { | |
@Override | |
public void call() { | |
ref.removeAuthStateListener(listener); | |
} | |
})); | |
} | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment