Created
May 7, 2014 05:53
-
-
Save benjchristensen/e19d64654ebbe82c5bb1 to your computer and use it in GitHub Desktop.
ChooseSubjectBasedOnFirstValue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package perf.backend; | |
import rx.Observable; | |
import rx.Observable.Operator; | |
import rx.Subscriber; | |
import rx.subjects.AsyncSubject; | |
import rx.subjects.ReplaySubject; | |
import rx.subjects.Subject; | |
public class ChooseSubjectBasedOnFirstValue { | |
public static void main(String[] args) { | |
// Observable<Object> o = Observable.just(1); | |
Observable<Object> o = Observable.from("A", "B", "C"); | |
o.lift(new Operator<Subject<Object, Object>, Object>() { | |
@Override | |
public Subscriber<? super Object> call(Subscriber<? super Subject<Object, Object>> child) { | |
return new Subscriber<Object>(child) { | |
private Subject<Object, Object> s; | |
@Override | |
public void onCompleted() { | |
s.onCompleted(); | |
} | |
@Override | |
public void onError(Throwable e) { | |
s.onError(e); | |
} | |
@Override | |
public void onNext(Object t) { | |
if (s == null) { | |
if (t instanceof Number) { | |
s = AsyncSubject.create(); | |
} else { | |
s = ReplaySubject.create(); | |
} | |
child.onNext(s); | |
child.onCompleted(); | |
} | |
s.onNext(t); | |
} | |
}; | |
} | |
}).flatMap((Subject s) -> { | |
System.out.println("Subject: " + s); | |
s.map(i -> "1) => " + i).subscribe(System.out::println); | |
s.map(i -> "2) => " + i).subscribe(System.out::println); | |
return s; | |
}).subscribe(); | |
} | |
} |
I don't have time to finish this tonight but this gives an idea ... it works for this example because everything is synchronous. If async it would fail ... the TODO
section explains where it's not yet implemented.
I'll probably go with a state machine + CAS and combine the eventualSubject
and array of subscribers into a single state.
Then we just need a proper name for this thing :-)
package test;
import java.util.concurrent.atomic.AtomicReference;
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.subjects.AsyncSubject;
import rx.subjects.ReplaySubject;
import rx.subjects.Subject;
public class ChooseSubjectBasedOnFirstValue {
public static void main(String[] args) {
ChooseSubjectBasedOnFirstValue c = new ChooseSubjectBasedOnFirstValue();
Observable<Object> o = c.sendRequest(1);
// Observable<Object> o = c.sendRequest(2);
o.subscribe(System.out::println);
}
public Observable<Object> sendRequest(int i) {
ObservableObserver observableObserver = ObservableObserver.create();
doActualRequestAsynchronously(i, observableObserver);
return observableObserver;
}
public void doActualRequestAsynchronously(int requestArgs, Observer<Object> callback) {
if (requestArgs % 2 == 0) {
callback.onNext(requestArgs);
callback.onCompleted();
} else {
callback.onNext("A");
callback.onNext("B");
callback.onNext("C");
callback.onCompleted();
}
}
public static class ObservableObserver extends Observable<Object> implements Observer<Object> {
private final AtomicReference<Subject<Object, Object>> eventualSubject;
public static ObservableObserver create() {
AtomicReference<Subject<Object, Object>> eventualSubject = new AtomicReference<Subject<Object, Object>>();
return new ObservableObserver(eventualSubject);
}
private ObservableObserver(AtomicReference<Subject<Object, Object>> eventualSubject) {
super(new OnSubscribe<Object>() {
@Override
public void call(Subscriber<? super Object> child) {
if (eventualSubject.get() != null) {
eventualSubject.get().subscribe(child);
} else {
// TODO this is the tricky part ...
// If this needs to support multicast, it should .publish()
// otherwise it needs to also worry about everything in https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/subjects/SubjectSubscriptionManager.java
//
// Assuming it's one-to-one this then needs to deal with a Subscriber arriving before
// the subject has been created ... and do so in a thread-safe manner
//
// so either locking or a state machine
}
}
});
this.eventualSubject = eventualSubject;
}
@Override
public void onCompleted() {
if (eventualSubject.get() == null) {
eventualSubject.set(AsyncSubject.create());
}
eventualSubject.get().onCompleted();
}
@Override
public void onError(Throwable e) {
if (eventualSubject.get() == null) {
eventualSubject.set(AsyncSubject.create());
}
eventualSubject.get().onError(e);
}
@Override
public void onNext(Object t) {
if (eventualSubject.get() == null) {
if (t instanceof Number) {
System.out.println("Using an AsyncSubject");
eventualSubject.set(AsyncSubject.create());
} else {
System.out.println("Using a ReplaySubject");
eventualSubject.set(ReplaySubject.create());
}
}
eventualSubject.get().onNext(t);
}
}
}
Try this:
package test;
import java.util.concurrent.atomic.AtomicReference;
import rx.Observable;
import rx.Observer;
import rx.Scheduler;
import rx.Subscriber;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
import rx.subjects.AsyncSubject;
import rx.subjects.ReplaySubject;
import rx.subjects.Subject;
public class ChooseSubjectBasedOnFirstValue {
public static void main(String[] args) {
ChooseSubjectBasedOnFirstValue c = new ChooseSubjectBasedOnFirstValue();
Observable<Object> o = c.sendRequest(1);
o.toBlockingObservable().forEach(System.out::println);
Observable<Object> o2 = c.sendRequest(2);
o2.toBlockingObservable().forEach(System.out::println);
}
public Observable<Object> sendRequest(int i) {
ProxySubject<Object> observableObserver = ProxySubject.createSubject((t) -> {
if (t == null || t instanceof Number) {
System.out.println("Using an AsyncSubject");
return AsyncSubject.create();
} else {
System.out.println("Using a ReplaySubject");
return ReplaySubject.create();
}
});
doActualRequestAsynchronously(i, observableObserver);
return observableObserver;
}
Scheduler.Worker worker = Schedulers.computation().createWorker();
public void doActualRequestAsynchronously(int requestArgs, Observer<Object> callback) {
worker.schedule(() -> {
if (requestArgs % 2 == 0) {
callback.onNext(requestArgs);
callback.onCompleted();
} else {
callback.onNext("A");
callback.onNext("B");
callback.onNext("C");
callback.onCompleted();
}
});
}
public static class ProxySubject<T> extends Subject<T, T> {
private final AtomicReference<State<T>> state;
private final Func1<T, Subject<T, T>> subjectFactory;
public static <T> ProxySubject<T> createSubject(Func1<T, Subject<T, T>> subjectFactory) {
return new ProxySubject<T>(new AtomicReference<State<T>>(new State<T>(null, (Subscriber<T>[]) State.EMPTY)), subjectFactory);
}
private static class State<T> {
private final Subject<T, T> subject;
private final Subscriber<T>[] subscribers;
private static final Subscriber<?>[] EMPTY = new Subscriber[0];
private State(Subject<T, T> subject, Subscriber<T>[] subscribers) {
this.subject = subject;
this.subscribers = subscribers;
}
}
private ProxySubject(AtomicReference<State<T>> state, Func1<T, Subject<T, T>> subjectFactory) {
super(new OnSubscribe<T>() {
@Override
public void call(Subscriber<? super T> child) {
State<T> currentState;
State<T> newState;
do {
currentState = state.get();
if (currentState.subject != null) {
currentState.subject.subscribe(child);
// we don't need to change the state
return;
} else {
Subscriber[] ss = currentState.subscribers;
Subscriber[] newss = new Subscriber[ss.length + 1];
System.arraycopy(ss, 0, newss, 0, ss.length);
newss[newss.length - 1] = child;
newState = new State<T>(null, newss);
}
} while (!state.compareAndSet(currentState, newState));
}
});
this.state = state;
this.subjectFactory = subjectFactory;
}
@Override
public void onCompleted() {
if (state.get().subject == null) {
setSubject(subjectFactory.call(null));
}
state.get().subject.onCompleted();
}
@Override
public void onError(Throwable e) {
if (state.get().subject == null) {
setSubject(subjectFactory.call(null));
}
state.get().subject.onError(e);
}
@Override
public void onNext(T t) {
if (state.get().subject == null) {
setSubject(subjectFactory.call(t));
}
state.get().subject.onNext(t);
}
private void setSubject(Subject<T, T> s) {
State currentState;
State newState;
do {
currentState = state.get();
if (currentState.subject != null) {
throw new IllegalStateException("only 1 subject can be defined");
}
newState = new State(s, currentState.subscribers);
} while (!state.compareAndSet(currentState, newState));
/* we subscribe any subscribers that were already there */
for (Subscriber<T> subscriber : newState.subscribers) {
System.out.println("Subscribing to subject with Subscriber that existed before Subject => " + subscriber);
newState.subject.subscribe(subscriber);
}
}
}
}
Wow thanks much for your efforts!
Will try it tomorrow morning and report success or failure :)
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
currently it is a subject, but I can change that at will, its only internal structures. this is not exposed to the user in any way.