Last active
August 29, 2016 18:26
-
-
Save NiteshKant/8704182 to your computer and use it in GitHub Desktop.
HttpRedirect with RxNetty
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package io.reactivex.netty.examples.java; | |
import io.netty.handler.codec.http.DefaultFullHttpResponse; | |
import io.netty.handler.codec.http.FullHttpRequest; | |
import io.netty.handler.codec.http.FullHttpResponse; | |
import io.netty.handler.codec.http.HttpHeaders; | |
import io.netty.handler.codec.http.HttpResponseStatus; | |
import io.netty.handler.codec.http.HttpVersion; | |
import io.reactivex.netty.ObservableConnection; | |
import io.reactivex.netty.RxNetty; | |
import io.reactivex.netty.protocol.http.HttpServer; | |
import rx.Observer; | |
import rx.util.functions.Action1; | |
/** | |
* @author Nitesh Kant | |
*/ | |
public final class HttpWelcomeServer { | |
public static void main(final String[] args) throws InterruptedException { | |
final int port = 8080; | |
HttpServer<FullHttpRequest, FullHttpResponse> server = RxNetty.createHttpServer(port); | |
server.start(new Action1<ObservableConnection<FullHttpRequest, FullHttpResponse>>() { | |
@Override | |
public void call(final ObservableConnection<FullHttpRequest, FullHttpResponse> connection) { | |
connection.getInput().subscribe(new Observer<FullHttpRequest>() { | |
@Override | |
public void onCompleted() { | |
System.out.println("Request/response completed."); | |
} | |
@Override | |
public void onError(Throwable e) { | |
System.out.println("Error while reading request. Error: "); | |
e.printStackTrace(System.out); | |
} | |
@Override | |
public void onNext(FullHttpRequest httpRequest) { | |
System.out.println("New request recieved: " + httpRequest); | |
FullHttpResponse response; | |
if (httpRequest.getUri().contains("redirect")) { | |
response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); | |
} else { | |
response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.TEMPORARY_REDIRECT); | |
response.headers().add(HttpHeaders.Names.LOCATION, "/redirect"); | |
} | |
response.content().writeBytes("Welcome! \n\n".getBytes()); | |
// writing to the connection is the only place where anything is remote | |
connection.write(response).subscribe(new Observer<Void>() { | |
@Override | |
public void onCompleted() { | |
System.out.println("Response write successful."); | |
} | |
@Override | |
public void onError(Throwable e) { | |
System.out.println("Response write failed. Error: "); | |
e.printStackTrace(System.out); | |
} | |
@Override | |
public void onNext(Void args) { | |
// No op. | |
} | |
}); | |
} | |
}); | |
} | |
}); | |
server.waitTillShutdown(); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package io.reactivex.netty.examples.java; | |
import io.netty.handler.codec.http.DefaultFullHttpRequest; | |
import io.netty.handler.codec.http.FullHttpRequest; | |
import io.netty.handler.codec.http.HttpHeaders; | |
import io.netty.handler.codec.http.HttpMethod; | |
import io.netty.handler.codec.http.HttpObject; | |
import io.netty.handler.codec.http.HttpResponse; | |
import io.netty.handler.codec.http.HttpResponseStatus; | |
import io.netty.handler.codec.http.HttpVersion; | |
import io.reactivex.netty.RxNetty; | |
import io.reactivex.netty.protocol.http.HttpClient; | |
import io.reactivex.netty.protocol.http.ObservableHttpResponse; | |
import rx.Observable; | |
import rx.util.functions.Action1; | |
import rx.util.functions.Func1; | |
/** | |
* @author Nitesh Kant | |
*/ | |
public class HttpRedirect { | |
public static void main(String[] args) { | |
HttpClient<FullHttpRequest, HttpObject> client = RxNetty.createStreamingHttpClient("localhost", 8080); | |
submitWithRedirect(client, new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/")) | |
.flatMap(new Func1<ObservableHttpResponse<HttpObject>, Observable<HttpObject>>() { | |
@Override | |
public Observable<HttpObject> call(ObservableHttpResponse<HttpObject> observableHttpResponse) { | |
return observableHttpResponse.content(); | |
} | |
}).doOnNext(new Action1<HttpObject>() { | |
@Override | |
public void call(HttpObject httpObject) { | |
System.out.println("Content: " + httpObject); | |
} | |
}).subscribe(); | |
} | |
private static Observable<ObservableHttpResponse<HttpObject>> submitWithRedirect( | |
final HttpClient<FullHttpRequest, HttpObject> client, | |
final DefaultFullHttpRequest request) { | |
final Observable<ObservableHttpResponse<HttpObject>> responseObsvble = client.submit(request); | |
return responseObsvble.flatMap( | |
new Func1<ObservableHttpResponse<HttpObject>, Observable<ObservableHttpResponse<HttpObject>>>() { | |
@Override | |
public Observable<ObservableHttpResponse<HttpObject>> call( | |
final ObservableHttpResponse<HttpObject> observableHttpResponse) { | |
return observableHttpResponse.header().flatMap( | |
new Func1<HttpResponse, Observable<ObservableHttpResponse<HttpObject>>>() { | |
@Override | |
public Observable<ObservableHttpResponse<HttpObject>> call(HttpResponse response) { | |
if (isRedirect(response.getStatus().code())) { | |
String location = response.headers().get(HttpHeaders.Names.LOCATION); | |
return submitWithRedirect(client, new DefaultFullHttpRequest( | |
HttpVersion.HTTP_1_1, HttpMethod.GET, location)); | |
} else { | |
return responseObsvble; | |
} | |
} | |
}); | |
} | |
}); | |
} | |
private static boolean isRedirect(int code) { | |
return code == HttpResponseStatus.TEMPORARY_REDIRECT.code(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment