-
-
Save djodjoni/e63cc2b0dce0c65427645c837f5c0033 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.*; | |
import rx.*; | |
import rx.Observable.Operator; | |
import rx.Observable; | |
import rx.observers.TestSubscriber; | |
import rx.subjects.*; | |
public class RxEventBus<T> { | |
private final Subject<Notification<T>, Notification<T>> subject; | |
private final Observable<T> observable; | |
public RxEventBus() { | |
subject = PublishSubject.<Notification<T>>create().toSerialized(); | |
observable = subject.<T>dematerialize().lift(new ClientErrorBounceBack<T>()); | |
} | |
public void post(T value) { | |
subject.onNext(Notification.createOnNext(value)); | |
} | |
public void postError(Throwable e) { | |
subject.onNext(Notification.createOnError(e)); | |
} | |
public Observable<T> observe() { | |
return observable; | |
} | |
/** | |
* This operator captures exceptions thrown by a downstream onNext so it doesn't disrupt | |
* the upstream PublishSubject. | |
* @param <T> | |
*/ | |
static final class ClientErrorBounceBack<T> implements Operator<T, T> { | |
@Override | |
public Subscriber<? super T> call(final Subscriber<? super T> child) { | |
Subscriber<T> result = new Subscriber<T>(child, false) { | |
boolean done; | |
@Override | |
public void onNext(T t) { | |
if (!done) { | |
try { | |
child.onNext(t); | |
} catch (Throwable e) { | |
onError(e); | |
return; | |
} | |
} | |
} | |
@Override | |
public void onError(Throwable e) { | |
if (!done) { | |
done = true; | |
try { | |
child.onError(e); | |
} finally { | |
unsubscribe(); | |
} | |
} | |
} | |
@Override | |
public void onCompleted() { | |
if (!done) { | |
done = true; | |
try { | |
child.onCompleted(); | |
} finally { | |
unsubscribe(); | |
} | |
} | |
} | |
}; | |
child.add(result); | |
return result; | |
} | |
} | |
public static void main(String[] args) { | |
RxEventBus<Integer> eb = new RxEventBus<>(); | |
TestSubscriber<Integer> ts1 = new TestSubscriber<>(); | |
TestSubscriber<Integer> ts2 = new TestSubscriber<>(); | |
eb.observe().subscribe(ts1); | |
eb.observe().subscribe(ts2); | |
eb.post(1); | |
eb.postError(new Exception()); | |
// See if both received the value and the error | |
ts1.assertTerminalEvent(); | |
ts1.assertReceivedOnNext(Arrays.asList(1)); | |
if (ts1.getOnErrorEvents().isEmpty()) { | |
throw new AssertionError("No exception received?"); | |
} | |
ts2.assertTerminalEvent(); | |
ts2.assertReceivedOnNext(Arrays.asList(1)); | |
if (ts2.getOnErrorEvents().isEmpty()) { | |
throw new AssertionError("No exception received?"); | |
} | |
// see if further subscribers aren't affected by former errors post | |
TestSubscriber<Integer> ts3 = new TestSubscriber<>(); | |
eb.observe().subscribe(ts3); | |
eb.post(2); | |
ts3.assertReceivedOnNext(Arrays.asList(2)); | |
// see if a client error doesn't affect others | |
TestSubscriber<Integer> ts4 = new TestSubscriber<Integer>() { | |
@Override | |
public void onNext(Integer t) { | |
throw new RuntimeException(); | |
} | |
}; | |
eb.observe().subscribe(ts4); | |
eb.post(3); | |
eb.post(4); | |
ts3.assertReceivedOnNext(Arrays.asList(2, 3, 4)); | |
ts4.assertTerminalEvent(); | |
ts4.assertReceivedOnNext(Collections.emptyList()); | |
if (ts4.getOnErrorEvents().isEmpty()) { | |
throw new AssertionError("No errors received!"); | |
} | |
System.out.println("Success!"); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment