Skip to content

Instantly share code, notes, and snippets.

@NiteshKant
Created March 4, 2015 19:59
Show Gist options
  • Save NiteshKant/094f0db9f88f67d11e4a to your computer and use it in GitHub Desktop.
Save NiteshKant/094f0db9f88f67d11e4a to your computer and use it in GitHub Desktop.
/*
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.reactivex.netty.examples;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.protocol.http.client.HttpClient;
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
import rx.Observable;
import java.nio.charset.Charset;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
public class ManipulateRequestHeaders {
public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
RxNetty.createHttpServer(8999, (request, response) -> response
.writeStringAndFlush("hello" + request.getHeaders().get("Count"))).start();
HttpClient<ByteBuf, ByteBuf> client =
RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder("127.0.0.1", 8999)
.appendPipelineConfigurator(
pipeline -> {
pipeline.addLast(new ChannelDuplexHandler() {
private long headerWriteStartTimeNanos;
private long count;
@Override
public void channelRead(ChannelHandlerContext ctx,
Object msg)
throws Exception {
if (msg instanceof HttpResponse) {
/**
* Since, we only use a single connection for a request at a time, the response
* header read start will just use the startTime set by the request write.
*/
long endTime = System.nanoTime();
long timeTaken = endTime - headerWriteStartTimeNanos;
System.out.println(hashCode() + ". Request count: " + count + ", time taken: " + timeTaken);
}
super.channelRead(ctx, msg);
}
@Override
public void write(ChannelHandlerContext ctx,
Object msg,
ChannelPromise promise)
throws Exception {
if (msg instanceof HttpRequest) {
/**
* Everytime we write a request, we reset the start time.
* If RxNetty used request pipelining (multiple requests on a single connection),
* we would have to store these times in a queue, but that is not the case, so it
* can just be a single value.
*/
headerWriteStartTimeNanos = System.nanoTime();
HttpRequest request = (HttpRequest) msg;
request.headers().add("Count", ++count);
System.out.println(hashCode() + ": Request count: " + count + ", start time: "
+ headerWriteStartTimeNanos);
}
super.write(ctx, msg, promise);
}
});
})
.build();
Observable.range(1, 10)
.concatMap(aLong -> client.submit(HttpClientRequest.createGet("/"))
.flatMap(response -> response.getContent())
.map(bb -> {
System.out.println(bb.toString(Charset.defaultCharset()));
return null;
})
.ignoreElements())
.toBlocking()
.lastOrDefault(null);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment