-
-
Save benjchristensen/e19d64654ebbe82c5bb1 to your computer and use it in GitHub Desktop.
package perf.backend; | |
import rx.Observable; | |
import rx.Observable.Operator; | |
import rx.Subscriber; | |
import rx.subjects.AsyncSubject; | |
import rx.subjects.ReplaySubject; | |
import rx.subjects.Subject; | |
public class ChooseSubjectBasedOnFirstValue { | |
public static void main(String[] args) { | |
// Observable<Object> o = Observable.just(1); | |
Observable<Object> o = Observable.from("A", "B", "C"); | |
o.lift(new Operator<Subject<Object, Object>, Object>() { | |
@Override | |
public Subscriber<? super Object> call(Subscriber<? super Subject<Object, Object>> child) { | |
return new Subscriber<Object>(child) { | |
private Subject<Object, Object> s; | |
@Override | |
public void onCompleted() { | |
s.onCompleted(); | |
} | |
@Override | |
public void onError(Throwable e) { | |
s.onError(e); | |
} | |
@Override | |
public void onNext(Object t) { | |
if (s == null) { | |
if (t instanceof Number) { | |
s = AsyncSubject.create(); | |
} else { | |
s = ReplaySubject.create(); | |
} | |
child.onNext(s); | |
child.onCompleted(); | |
} | |
s.onNext(t); | |
} | |
}; | |
} | |
}).flatMap((Subject s) -> { | |
System.out.println("Subject: " + s); | |
s.map(i -> "1) => " + i).subscribe(System.out::println); | |
s.map(i -> "2) => " + i).subscribe(System.out::println); | |
return s; | |
}).subscribe(); | |
} | |
} |
Thanks much for putting this together. I tried to digest it, but I have one issue in that I still don't know the first observable ("o") at creation time?
Let me give you more context to my use case - maybe this helps (maybe I just don't get it ;))
So the contract to the user is:
public <R extends CouchbaseResponse> Observable<R> send(CouchbaseRequest request)
The user gives me a request and I promise him a observable containing the response. Here is the current implementation which always creates a ReplaySubject for the lack of better knowledge. It then gets put into the ringbuffer and trickles down my code and into netty until it hits the network:
public <R extends CouchbaseResponse> Observable<R> send(CouchbaseRequest request) {
final Subject<CouchbaseResponse, CouchbaseResponse> observable = ReplaySubject.create();
request.observable(observable);
if (request instanceof InternalRequest) {
handleInternalRequest(request);
} else if (request instanceof ClusterRequest) {
handleClusterRequest(request);
} else {
boolean published = requestRingBuffer.tryPublishEvent(REQUEST_TRANSLATOR, request);
if (!published) {
observable.onError(BACKPRESSURE_EXCEPTION);
}
}
return (Observable<R>) observable;
}
The observable here gets attached to the response internally.
What I do when a response arrives async from netty is that from my uppermost decoder I write the CouchbaseResponse object into a response ringbuffer and the handler at the other end ultimately completes my observable like this:
@Override
public void onEvent(final ResponseEvent event, long sequence, boolean endOfBatch) throws Exception {
CouchbaseResponse response = event.getResponse();
ResponseStatus status = response.status();
if (status == ResponseStatus.CHUNKED || status == ResponseStatus.SUCCESS) {
event.getObservable().onNext(response); // <--- this is the same observable from the snippet above which got carried along in netty as well and attached to the event.
if (status == ResponseStatus.SUCCESS) {
event.getObservable().onCompleted();
}
} else {
//....
}
}
So if those messages indicate a CHUNKED response I just call onNext and the final one always by contract contains SUCCESS or ERROR so I know I need to call onCompleted() or onError().
So this works since I always know its a ReplaySubject, but I found that AsyncSubject has much better performance and GC friendlyness if I just need to respond with 1 onNext call.
Maybe the easiest way for me to handle this is just to inspect the CouchbaseRequest type and then do optimizations on that front? It would be hardcoded but then I dont have to go a level of indirection - what do you think?
For this code:
request.observable(observable);
Does this observable()
method only take Subject
, or is it any Observer
/Subscriber
that it will emit to when it receives data?
currently it is a subject, but I can change that at will, its only internal structures. this is not exposed to the user in any way.
I don't have time to finish this tonight but this gives an idea ... it works for this example because everything is synchronous. If async it would fail ... the TODO
section explains where it's not yet implemented.
I'll probably go with a state machine + CAS and combine the eventualSubject
and array of subscribers into a single state.
Then we just need a proper name for this thing :-)
package test;
import java.util.concurrent.atomic.AtomicReference;
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.subjects.AsyncSubject;
import rx.subjects.ReplaySubject;
import rx.subjects.Subject;
public class ChooseSubjectBasedOnFirstValue {
public static void main(String[] args) {
ChooseSubjectBasedOnFirstValue c = new ChooseSubjectBasedOnFirstValue();
Observable<Object> o = c.sendRequest(1);
// Observable<Object> o = c.sendRequest(2);
o.subscribe(System.out::println);
}
public Observable<Object> sendRequest(int i) {
ObservableObserver observableObserver = ObservableObserver.create();
doActualRequestAsynchronously(i, observableObserver);
return observableObserver;
}
public void doActualRequestAsynchronously(int requestArgs, Observer<Object> callback) {
if (requestArgs % 2 == 0) {
callback.onNext(requestArgs);
callback.onCompleted();
} else {
callback.onNext("A");
callback.onNext("B");
callback.onNext("C");
callback.onCompleted();
}
}
public static class ObservableObserver extends Observable<Object> implements Observer<Object> {
private final AtomicReference<Subject<Object, Object>> eventualSubject;
public static ObservableObserver create() {
AtomicReference<Subject<Object, Object>> eventualSubject = new AtomicReference<Subject<Object, Object>>();
return new ObservableObserver(eventualSubject);
}
private ObservableObserver(AtomicReference<Subject<Object, Object>> eventualSubject) {
super(new OnSubscribe<Object>() {
@Override
public void call(Subscriber<? super Object> child) {
if (eventualSubject.get() != null) {
eventualSubject.get().subscribe(child);
} else {
// TODO this is the tricky part ...
// If this needs to support multicast, it should .publish()
// otherwise it needs to also worry about everything in https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/subjects/SubjectSubscriptionManager.java
//
// Assuming it's one-to-one this then needs to deal with a Subscriber arriving before
// the subject has been created ... and do so in a thread-safe manner
//
// so either locking or a state machine
}
}
});
this.eventualSubject = eventualSubject;
}
@Override
public void onCompleted() {
if (eventualSubject.get() == null) {
eventualSubject.set(AsyncSubject.create());
}
eventualSubject.get().onCompleted();
}
@Override
public void onError(Throwable e) {
if (eventualSubject.get() == null) {
eventualSubject.set(AsyncSubject.create());
}
eventualSubject.get().onError(e);
}
@Override
public void onNext(Object t) {
if (eventualSubject.get() == null) {
if (t instanceof Number) {
System.out.println("Using an AsyncSubject");
eventualSubject.set(AsyncSubject.create());
} else {
System.out.println("Using a ReplaySubject");
eventualSubject.set(ReplaySubject.create());
}
}
eventualSubject.get().onNext(t);
}
}
}
Try this:
package test;
import java.util.concurrent.atomic.AtomicReference;
import rx.Observable;
import rx.Observer;
import rx.Scheduler;
import rx.Subscriber;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
import rx.subjects.AsyncSubject;
import rx.subjects.ReplaySubject;
import rx.subjects.Subject;
public class ChooseSubjectBasedOnFirstValue {
public static void main(String[] args) {
ChooseSubjectBasedOnFirstValue c = new ChooseSubjectBasedOnFirstValue();
Observable<Object> o = c.sendRequest(1);
o.toBlockingObservable().forEach(System.out::println);
Observable<Object> o2 = c.sendRequest(2);
o2.toBlockingObservable().forEach(System.out::println);
}
public Observable<Object> sendRequest(int i) {
ProxySubject<Object> observableObserver = ProxySubject.createSubject((t) -> {
if (t == null || t instanceof Number) {
System.out.println("Using an AsyncSubject");
return AsyncSubject.create();
} else {
System.out.println("Using a ReplaySubject");
return ReplaySubject.create();
}
});
doActualRequestAsynchronously(i, observableObserver);
return observableObserver;
}
Scheduler.Worker worker = Schedulers.computation().createWorker();
public void doActualRequestAsynchronously(int requestArgs, Observer<Object> callback) {
worker.schedule(() -> {
if (requestArgs % 2 == 0) {
callback.onNext(requestArgs);
callback.onCompleted();
} else {
callback.onNext("A");
callback.onNext("B");
callback.onNext("C");
callback.onCompleted();
}
});
}
public static class ProxySubject<T> extends Subject<T, T> {
private final AtomicReference<State<T>> state;
private final Func1<T, Subject<T, T>> subjectFactory;
public static <T> ProxySubject<T> createSubject(Func1<T, Subject<T, T>> subjectFactory) {
return new ProxySubject<T>(new AtomicReference<State<T>>(new State<T>(null, (Subscriber<T>[]) State.EMPTY)), subjectFactory);
}
private static class State<T> {
private final Subject<T, T> subject;
private final Subscriber<T>[] subscribers;
private static final Subscriber<?>[] EMPTY = new Subscriber[0];
private State(Subject<T, T> subject, Subscriber<T>[] subscribers) {
this.subject = subject;
this.subscribers = subscribers;
}
}
private ProxySubject(AtomicReference<State<T>> state, Func1<T, Subject<T, T>> subjectFactory) {
super(new OnSubscribe<T>() {
@Override
public void call(Subscriber<? super T> child) {
State<T> currentState;
State<T> newState;
do {
currentState = state.get();
if (currentState.subject != null) {
currentState.subject.subscribe(child);
// we don't need to change the state
return;
} else {
Subscriber[] ss = currentState.subscribers;
Subscriber[] newss = new Subscriber[ss.length + 1];
System.arraycopy(ss, 0, newss, 0, ss.length);
newss[newss.length - 1] = child;
newState = new State<T>(null, newss);
}
} while (!state.compareAndSet(currentState, newState));
}
});
this.state = state;
this.subjectFactory = subjectFactory;
}
@Override
public void onCompleted() {
if (state.get().subject == null) {
setSubject(subjectFactory.call(null));
}
state.get().subject.onCompleted();
}
@Override
public void onError(Throwable e) {
if (state.get().subject == null) {
setSubject(subjectFactory.call(null));
}
state.get().subject.onError(e);
}
@Override
public void onNext(T t) {
if (state.get().subject == null) {
setSubject(subjectFactory.call(t));
}
state.get().subject.onNext(t);
}
private void setSubject(Subject<T, T> s) {
State currentState;
State newState;
do {
currentState = state.get();
if (currentState.subject != null) {
throw new IllegalStateException("only 1 subject can be defined");
}
newState = new State(s, currentState.subscribers);
} while (!state.compareAndSet(currentState, newState));
/* we subscribe any subscribers that were already there */
for (Subscriber<T> subscriber : newState.subscribers) {
System.out.println("Subscribing to subject with Subscriber that existed before Subject => " + subscriber);
newState.subject.subscribe(subscriber);
}
}
}
}
Wow thanks much for your efforts!
Will try it tomorrow morning and report success or failure :)
or