Skip to content

Instantly share code, notes, and snippets.

@NiteshKant
Last active August 29, 2016 18:26
Show Gist options
  • Save NiteshKant/8704182 to your computer and use it in GitHub Desktop.
Save NiteshKant/8704182 to your computer and use it in GitHub Desktop.
HttpRedirect with RxNetty
package io.reactivex.netty.examples.java;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.reactivex.netty.ObservableConnection;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.protocol.http.HttpServer;
import rx.Observer;
import rx.util.functions.Action1;
/**
* @author Nitesh Kant
*/
public final class HttpWelcomeServer {
public static void main(final String[] args) throws InterruptedException {
final int port = 8080;
HttpServer<FullHttpRequest, FullHttpResponse> server = RxNetty.createHttpServer(port);
server.start(new Action1<ObservableConnection<FullHttpRequest, FullHttpResponse>>() {
@Override
public void call(final ObservableConnection<FullHttpRequest, FullHttpResponse> connection) {
connection.getInput().subscribe(new Observer<FullHttpRequest>() {
@Override
public void onCompleted() {
System.out.println("Request/response completed.");
}
@Override
public void onError(Throwable e) {
System.out.println("Error while reading request. Error: ");
e.printStackTrace(System.out);
}
@Override
public void onNext(FullHttpRequest httpRequest) {
System.out.println("New request recieved: " + httpRequest);
FullHttpResponse response;
if (httpRequest.getUri().contains("redirect")) {
response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
} else {
response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.TEMPORARY_REDIRECT);
response.headers().add(HttpHeaders.Names.LOCATION, "/redirect");
}
response.content().writeBytes("Welcome! \n\n".getBytes());
// writing to the connection is the only place where anything is remote
connection.write(response).subscribe(new Observer<Void>() {
@Override
public void onCompleted() {
System.out.println("Response write successful.");
}
@Override
public void onError(Throwable e) {
System.out.println("Response write failed. Error: ");
e.printStackTrace(System.out);
}
@Override
public void onNext(Void args) {
// No op.
}
});
}
});
}
});
server.waitTillShutdown();
}
}
package io.reactivex.netty.examples.java;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.protocol.http.HttpClient;
import io.reactivex.netty.protocol.http.ObservableHttpResponse;
import rx.Observable;
import rx.util.functions.Action1;
import rx.util.functions.Func1;
/**
* @author Nitesh Kant
*/
public class HttpRedirect {
public static void main(String[] args) {
HttpClient<FullHttpRequest, HttpObject> client = RxNetty.createStreamingHttpClient("localhost", 8080);
submitWithRedirect(client, new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"))
.flatMap(new Func1<ObservableHttpResponse<HttpObject>, Observable<HttpObject>>() {
@Override
public Observable<HttpObject> call(ObservableHttpResponse<HttpObject> observableHttpResponse) {
return observableHttpResponse.content();
}
}).doOnNext(new Action1<HttpObject>() {
@Override
public void call(HttpObject httpObject) {
System.out.println("Content: " + httpObject);
}
}).subscribe();
}
private static Observable<ObservableHttpResponse<HttpObject>> submitWithRedirect(
final HttpClient<FullHttpRequest, HttpObject> client,
final DefaultFullHttpRequest request) {
final Observable<ObservableHttpResponse<HttpObject>> responseObsvble = client.submit(request);
return responseObsvble.flatMap(
new Func1<ObservableHttpResponse<HttpObject>, Observable<ObservableHttpResponse<HttpObject>>>() {
@Override
public Observable<ObservableHttpResponse<HttpObject>> call(
final ObservableHttpResponse<HttpObject> observableHttpResponse) {
return observableHttpResponse.header().flatMap(
new Func1<HttpResponse, Observable<ObservableHttpResponse<HttpObject>>>() {
@Override
public Observable<ObservableHttpResponse<HttpObject>> call(HttpResponse response) {
if (isRedirect(response.getStatus().code())) {
String location = response.headers().get(HttpHeaders.Names.LOCATION);
return submitWithRedirect(client, new DefaultFullHttpRequest(
HttpVersion.HTTP_1_1, HttpMethod.GET, location));
} else {
return responseObsvble;
}
}
});
}
});
}
private static boolean isRedirect(int code) {
return code == HttpResponseStatus.TEMPORARY_REDIRECT.code();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment