Last active
August 29, 2015 14:06
-
-
Save NiteshKant/2128b5e0337ed7249545 to your computer and use it in GitHub Desktop.
Jersey Response proposal
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.netflix.karyon.examples.hellonoss.server.jersey; | |
import io.reactivex.netty.protocol.text.sse.ServerSentEvent; | |
import rx.Observable; | |
import rx.functions.Action0; | |
import rx.functions.Func1; | |
import javax.ws.rs.GET; | |
import javax.ws.rs.Path; | |
import java.util.concurrent.TimeUnit; | |
@Path("example") | |
public class ExampleJersey { | |
@GET | |
@Path("/hello/sse") | |
public Response<ServerSentEvent> sse() { | |
return Response.from(Observable.interval(1, TimeUnit.SECONDS).map(new Func1<Long, ServerSentEvent>() { | |
@Override | |
public ServerSentEvent call(Long interval) { | |
return new ServerSentEvent(String.valueOf(interval), "notification", "hello " + interval); | |
} | |
})); | |
} | |
@GET | |
@Path("/hello/sync") | |
public Response<String> sync() { | |
return Response.just("Just one entity"); | |
} | |
@GET | |
@Path("/hello/raw") | |
public Response<Void> raw() { | |
final Response.RawResponse<ServerSentEvent> rawResponse = Response.raw(); | |
Observable.interval(1, TimeUnit.SECONDS).flatMap(new Func1<Long, Observable<Void>>() { | |
@Override | |
public Observable<Void> call(Long interval) { | |
rawResponse.addEntity(new ServerSentEvent(String.valueOf(interval), "notification", "hello " + interval)); | |
if (flushable()) { | |
return rawResponse.flush(); | |
} else { | |
return Observable.empty(); | |
} | |
} | |
}).ignoreElements().onErrorResumeNext(Observable.<Void>empty()) | |
.finallyDo(new Action0() { | |
@Override | |
public void call() { | |
rawResponse.close(); | |
} | |
}).subscribe(); | |
return rawResponse; | |
} | |
} | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.netflix.karyon.examples.hellonoss.server.jersey; | |
import io.reactivex.netty.protocol.http.UnicastContentSubject; | |
import io.reactivex.netty.protocol.http.server.HttpServerResponse; | |
import rx.Observable; | |
import rx.subjects.PublishSubject; | |
import rx.subjects.Subject; | |
/** | |
* A response object that can be returned from a Jersey resource irrespective of the request processing model i.e. | |
* synchronous or asynchronous. | |
*/ | |
public class Response<T> { | |
protected final Observable<T> source; | |
public Response(Observable<T> source) { | |
this.source = source; | |
} | |
/** | |
* Returns this response as an {@link Observable} | |
* | |
* @return This response as an {@link Observable} | |
*/ | |
public Observable<T> asObservable() { | |
return source; | |
} | |
/** | |
* Response is flushed on every onNext() | |
*/ | |
public static <T> Response<T> from(Observable<T> source) { | |
return new Response<T>(source); | |
} | |
/** | |
* Response is flushed onCompleted() | |
*/ | |
public static <T> Response<T> just(T singleEntity) { | |
return new Response<T>(Observable.just(singleEntity)); | |
} | |
/** | |
* User controls flushing. | |
*/ | |
public static <T> RawResponse<T> raw() { | |
return new RawResponse<T>(); | |
} | |
public static final class RawResponse<T> extends Response<Void> { | |
/** | |
* TODO: This must be replaced by a subject that can store a composite object encompassing write/flush/close | |
* which buffers only when there is no subscription and passes through when there is a subscription. This is | |
* just like | |
* https://github.com/ReactiveX/RxNetty/blob/0.x/rx-netty/src/main/java/io/reactivex/netty/protocol/http/UnicastContentSubject.java | |
* but providing support for flush | |
*/ | |
private final UnicastContentSubject<T> writeSubject; | |
public RawResponse() { | |
super(PublishSubject.<Void>create()); | |
writeSubject = UnicastContentSubject.createWithoutNoSubscriptionTimeout(); | |
} | |
public void addEntity(T entity) { | |
writeSubject.onNext(entity); | |
} | |
public Observable<Void> flush() { | |
//writeSubject.flush(); Subject should provide a flush() | |
} | |
public Observable<Void> close() { | |
((Subject<Void, Void>)source).onCompleted(); | |
// return writeSubject.close(); // Subject should provide close which bridges HttpServerResponse.close() | |
} | |
/** | |
* This must be called by the framework to associate the RawResponse with a HttpServerResponse. | |
*/ | |
/*Package private to be used by framework alone*/ RawResponse<T> connect(HttpServerResponse<T> connectTo) { | |
// TODO: Set the response to the subject so that the notifications convert to the HttpServerResponse calls. | |
return this; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I've been trying to incorporate this and have run into several issues
This approach looks kinda ugly and doesn't really give us much (other than transparent entity mapping) beyond returning Observable and using HttpServerResponse directly.
a) Successful response
b) Fallback for completely failed response. Switch to a different Response
c) How to deal with errors after the headers and first entity have been written