Skip to content

Instantly share code, notes, and snippets.

@NiteshKant
Last active August 29, 2015 14:06
Show Gist options
  • Save NiteshKant/2128b5e0337ed7249545 to your computer and use it in GitHub Desktop.
Save NiteshKant/2128b5e0337ed7249545 to your computer and use it in GitHub Desktop.
Jersey Response proposal
package com.netflix.karyon.examples.hellonoss.server.jersey;
import io.reactivex.netty.protocol.text.sse.ServerSentEvent;
import rx.Observable;
import rx.functions.Action0;
import rx.functions.Func1;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import java.util.concurrent.TimeUnit;
@Path("example")
public class ExampleJersey {
@GET
@Path("/hello/sse")
public Response<ServerSentEvent> sse() {
return Response.from(Observable.interval(1, TimeUnit.SECONDS).map(new Func1<Long, ServerSentEvent>() {
@Override
public ServerSentEvent call(Long interval) {
return new ServerSentEvent(String.valueOf(interval), "notification", "hello " + interval);
}
}));
}
@GET
@Path("/hello/sync")
public Response<String> sync() {
return Response.just("Just one entity");
}
@GET
@Path("/hello/raw")
public Response<Void> raw() {
final Response.RawResponse<ServerSentEvent> rawResponse = Response.raw();
Observable.interval(1, TimeUnit.SECONDS).flatMap(new Func1<Long, Observable<Void>>() {
@Override
public Observable<Void> call(Long interval) {
rawResponse.addEntity(new ServerSentEvent(String.valueOf(interval), "notification", "hello " + interval));
if (flushable()) {
return rawResponse.flush();
} else {
return Observable.empty();
}
}
}).ignoreElements().onErrorResumeNext(Observable.<Void>empty())
.finallyDo(new Action0() {
@Override
public void call() {
rawResponse.close();
}
}).subscribe();
return rawResponse;
}
}
package com.netflix.karyon.examples.hellonoss.server.jersey;
import io.reactivex.netty.protocol.http.UnicastContentSubject;
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
import rx.Observable;
import rx.subjects.PublishSubject;
import rx.subjects.Subject;
/**
* A response object that can be returned from a Jersey resource irrespective of the request processing model i.e.
* synchronous or asynchronous.
*/
public class Response<T> {
protected final Observable<T> source;
public Response(Observable<T> source) {
this.source = source;
}
/**
* Returns this response as an {@link Observable}
*
* @return This response as an {@link Observable}
*/
public Observable<T> asObservable() {
return source;
}
/**
* Response is flushed on every onNext()
*/
public static <T> Response<T> from(Observable<T> source) {
return new Response<T>(source);
}
/**
* Response is flushed onCompleted()
*/
public static <T> Response<T> just(T singleEntity) {
return new Response<T>(Observable.just(singleEntity));
}
/**
* User controls flushing.
*/
public static <T> RawResponse<T> raw() {
return new RawResponse<T>();
}
public static final class RawResponse<T> extends Response<Void> {
/**
* TODO: This must be replaced by a subject that can store a composite object encompassing write/flush/close
* which buffers only when there is no subscription and passes through when there is a subscription. This is
* just like
* https://github.com/ReactiveX/RxNetty/blob/0.x/rx-netty/src/main/java/io/reactivex/netty/protocol/http/UnicastContentSubject.java
* but providing support for flush
*/
private final UnicastContentSubject<T> writeSubject;
public RawResponse() {
super(PublishSubject.<Void>create());
writeSubject = UnicastContentSubject.createWithoutNoSubscriptionTimeout();
}
public void addEntity(T entity) {
writeSubject.onNext(entity);
}
public Observable<Void> flush() {
//writeSubject.flush(); Subject should provide a flush()
}
public Observable<Void> close() {
((Subject<Void, Void>)source).onCompleted();
// return writeSubject.close(); // Subject should provide close which bridges HttpServerResponse.close()
}
/**
* This must be called by the framework to associate the RawResponse with a HttpServerResponse.
*/
/*Package private to be used by framework alone*/ RawResponse<T> connect(HttpServerResponse<T> connectTo) {
// TODO: Set the response to the subject so that the notifications convert to the HttpServerResponse calls.
return this;
}
}
}
@elandau
Copy link

elandau commented Nov 19, 2014

I've been trying to incorporate this and have run into several issues

  1. UnicastContentSubject would be adding unnecessary complexity. We can simply use inversion of control to provide a RawResponse. For example,
public Response<String> foo() {
   final Observable<String> source = ... ; // This might be a query to cassandra or other async source
   return Response.ok(new Func1<RawResponse<String>, Observable<Void>>() {
            public Observable<Void> call(final RawResponse<String> response) {
                return source.flatMap(new Func1<String, Observable<Void>>() {
                            @Override
                            public Observable<Void> call(T event) {
                                response.addEntity(event);
                                return response.flush();
                            }
                        });
            }
   }).header("foo", "bar").build();
}

This approach looks kinda ugly and doesn't really give us much (other than transparent entity mapping) beyond returning Observable and using HttpServerResponse directly.

  1. Custom error handling seems awkward. In the above example, I'm showing how a custom header can be specified with the ok response. But what should we do if source emits an error? What if the user wanted to return a completely different success response or a redirect. We could potentially solve this by returning an Observable<Response>. That would allow us to implement the failover logic in rx. But, to echo the previous comment, I'm not convinced that this added complexity is much better than just using HttpServerResponse. I did consider adding an onErrorResumeNext method to Response but that solution is not longer composable.
  2. We need to have a consistent and straight forward fallback mechanism. I'm finding it difficult to implement such a thing because of the async nature of Observables. In blocking Jersey the rest endpoint makes blocking calls and based on the result returns a different Response object. With async we have more error cases to deal with and can't really return a Response object until more is known about the response. This brings me back to returning Observable<Response> (Yuck!). The use cases we have to deal with for async errors are

a) Successful response
b) Fallback for completely failed response. Switch to a different Response
c) How to deal with errors after the headers and first entity have been written

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment