Skip to content

Instantly share code, notes, and snippets.

@jdmichaud
Last active September 26, 2017 10:25
Show Gist options
  • Save jdmichaud/3d413328cf91dc5aa83b3edc749fcaae to your computer and use it in GitHub Desktop.
Save jdmichaud/3d413328cf91dc5aa83b3edc749fcaae to your computer and use it in GitHub Desktop.
Some explanation of how reactivex Observable works
// An observable is an helper class that creates a Subscriber and pass it
// an OnScubscribe method.
public class Observable<T> {
public Observable(OnSubscribe<T> func) {
onSubscribe = func;
}
public Subscription subscribe(Observer<? super T> observer) {
if (observer instance Subscriber) {
return subscribe((Subscriber<? super T>) observer);
}
return subscribe(new Subscriber<T>() {
@Override public void onNext(T args) {
observer.onNext(args);
}
@Override public void onCompleted() {
observer.onCompleted();
}
@Override public void onError(Throwable e) {
observer.onError(e);
}
});
}
// public Subscription subscribe() -> same thing as above
// public Subscription subscribe(Action1<? super T> onNext) -> same thing as above
public Subscription subscribe(Subscriber<? super T> subscriber) {
if (!(subscriber instanceof SafeSubscriber)) {
subscriber = new SafeSubscriber<T>(subscriber);
}
onSubscribe.call(subscriber);
return subscriber;
}
public static <T> Observable<T> just(final T value) {
return new Observable<T>(new OnSubscribe<T>() {
@Override public void call(Subscriber<? super T> s) {
s.onNext(value);
s.onComplete();
}
});
});
// An operator convert the output of the previous Subscriber
// to the input of the next Subscriber
public interface Operator<R, T> extends Func1<Subscriber<? super R>,
Subscriber<? super T>> {}
public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribe<R>() {
@Override public void call(Subsciber<? super R> sr) {
Subscriber<? super T> st = operator.call(sr);
onSubscribe.call(st);
})
});
}
// Operators
public <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return lift(new OperatorMap<T, R>(func));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment