Skip to content

Instantly share code, notes, and snippets.

@jroper
Last active December 15, 2015 06:59
Show Gist options
  • Save jroper/5220203 to your computer and use it in GitHub Desktop.
Save jroper/5220203 to your computer and use it in GitHub Desktop.
Demonstration of HTTP pipelining support in Netty.
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.*;
import org.jboss.netty.handler.codec.http.*;
import java.net.SocketAddress;
import java.util.LinkedList;
import java.util.concurrent.Callable;
public class HttpPipeliningHandler implements ChannelUpstreamHandler, ChannelDownstreamHandler {
private final LinkedList<Channel> channelQueue = new LinkedList<Channel>();
private long currentResponseContentLength = -1;
private boolean isChunked;
@Override
public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
if (e instanceof MessageEvent) {
MessageEvent msgEvent = (MessageEvent) e;
Object message = msgEvent.getMessage();
if (message instanceof HttpResponse) {
HttpResponse response = (HttpResponse) message;
if (response.getStatus() == HttpResponseStatus.NO_CONTENT || response.getStatus() == HttpResponseStatus.NOT_MODIFIED) {
currentResponseContentLength = 0;
} else {
currentResponseContentLength = HttpHeaders.getContentLength(response, -1);
}
isChunked = response.isChunked();
if (!isChunked) {
observeContent(response.getContent(), e.getFuture());
}
} else if (message instanceof HttpChunk) {
HttpChunk chunk = (HttpChunk) message;
if (isChunked) {
if (chunk.isLast()) {
popChannelQueue(e.getFuture());
}
} else {
observeContent(chunk.getContent(), e.getFuture());
}
} else if (message instanceof ChannelBuffer) {
observeContent((ChannelBuffer) message, e.getFuture());
}
}
ctx.sendDownstream(e);
}
private void observeContent(ChannelBuffer buffer, ChannelFuture future) {
if (currentResponseContentLength >= 0) {
currentResponseContentLength -= buffer.readableBytes();
if (currentResponseContentLength <= 0) {
popChannelQueue(future);
currentResponseContentLength = -1;
}
}
}
private void popChannelQueue(ChannelFuture future) {
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Channel next;
synchronized (channelQueue) {
channelQueue.pop();
next = channelQueue.peek();
}
if (next instanceof HttpPipeliningChannel) {
if (future.isCancelled()) {
((HttpPipeliningChannel) next).cancel();
} else {
((HttpPipeliningChannel) next).proceed();
}
}
}
});
}
@Override
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
// A messages has come from upstream (ie, part of a request)
if (e instanceof MessageEvent) {
MessageEvent msgEvent = (MessageEvent) e;
Object msg = msgEvent.getMessage();
if (msg instanceof HttpRequest) {
synchronized (channelQueue) {
if (channelQueue.size() > 0) {
// This request is pipelined behind another
channelQueue.add(new HttpPipeliningChannel(e.getChannel()));
} else {
// No pipelining, so no need to use the pipelining channel
channelQueue.add(e.getChannel());
}
}
}
ctx.sendUpstream(new UpstreamMessageEvent(channelQueue.getLast(), msg, msgEvent.getRemoteAddress()));
} else {
ctx.sendUpstream(e);
}
}
}
/**
* Delegates operations to the underlying channel, but only once told to proceed
*/
class HttpPipeliningChannel implements Channel {
private final Channel delegate;
private final ChannelFuture future = new DefaultChannelFuture(this, false);
private Object attachment;
HttpPipeliningChannel(Channel delegate) {
this.delegate = delegate;
}
public void proceed() {
future.setSuccess();
}
public void cancel() {
future.cancel();
}
@Override
public Integer getId() {
return delegate.getId();
}
@Override
public ChannelFactory getFactory() {
return delegate.getFactory();
}
@Override
public Channel getParent() {
return delegate.getParent();
}
@Override
public ChannelConfig getConfig() {
return delegate.getConfig();
}
@Override
public ChannelPipeline getPipeline() {
return delegate.getPipeline();
}
@Override
public boolean isOpen() {
return delegate.isOpen();
}
@Override
public boolean isBound() {
return delegate.isBound();
}
@Override
public boolean isConnected() {
return delegate.isConnected();
}
@Override
public SocketAddress getLocalAddress() {
return delegate.getLocalAddress();
}
@Override
public SocketAddress getRemoteAddress() {
return delegate.getRemoteAddress();
}
@Override
public ChannelFuture write(final Object message) {
return chainFutureTo(new Callable<ChannelFuture>() {
@Override
public ChannelFuture call() throws Exception {
return delegate.write(message);
}
});
}
@Override
public ChannelFuture write(final Object message, final SocketAddress remoteAddress) {
return chainFutureTo(new Callable<ChannelFuture>() {
@Override
public ChannelFuture call() throws Exception {
return delegate.write(message, remoteAddress);
}
});
}
@Override
public ChannelFuture bind(final SocketAddress localAddress) {
return chainFutureTo(new Callable<ChannelFuture>() {
@Override
public ChannelFuture call() throws Exception {
return delegate.bind(localAddress);
}
});
}
@Override
public ChannelFuture connect(final SocketAddress remoteAddress) {
return chainFutureTo(new Callable<ChannelFuture>() {
@Override
public ChannelFuture call() throws Exception {
return delegate.connect(remoteAddress);
}
});
}
@Override
public ChannelFuture disconnect() {
return chainFutureTo(new Callable<ChannelFuture>() {
@Override
public ChannelFuture call() throws Exception {
return delegate.disconnect();
}
});
}
@Override
public ChannelFuture unbind() {
return chainFutureTo(new Callable<ChannelFuture>() {
@Override
public ChannelFuture call() throws Exception {
return delegate.unbind();
}
});
}
@Override
public ChannelFuture close() {
return chainFutureTo(new Callable<ChannelFuture>() {
@Override
public ChannelFuture call() throws Exception {
return delegate.close();
}
});
}
@Override
public ChannelFuture getCloseFuture() {
return chainFutureTo(new Callable<ChannelFuture>() {
@Override
public ChannelFuture call() throws Exception {
return delegate.getCloseFuture();
}
});
}
@Override
public int getInterestOps() {
return delegate.getInterestOps();
}
@Override
public boolean isReadable() {
if (future.isDone()) {
return delegate.isReadable();
} else {
return false;
}
}
@Override
public boolean isWritable() {
if (future.isDone()) {
return delegate.isWritable();
} else {
return false;
}
}
@Override
public ChannelFuture setInterestOps(final int interestOps) {
return chainFutureTo(new Callable<ChannelFuture>() {
@Override
public ChannelFuture call() throws Exception {
return delegate.setInterestOps(interestOps);
}
});
}
@Override
public ChannelFuture setReadable(final boolean readable) {
return chainFutureTo(new Callable<ChannelFuture>() {
@Override
public ChannelFuture call() throws Exception {
return delegate.setReadable(readable);
}
});
}
@Override
public Object getAttachment() {
return attachment;
}
@Override
public void setAttachment(Object attachment) {
this.attachment = attachment;
}
@Override
public int compareTo(Channel channel) {
return delegate.compareTo(channel);
}
private ChannelFuture chainFutureTo(final Callable<ChannelFuture> op) {
final ChannelFuture chained = new DefaultChannelFuture(this, true);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isCancelled()) {
chained.cancel();
} else {
op.call().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
chained.setSuccess();
} else if (future.isCancelled()) {
chained.cancel();
} else {
chained.setFailure(future.getCause());
}
}
});
}
}
});
return chained;
}
}
import java.net.InetSocketAddress;
import java.util.*;
import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.*;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.http.*;
import org.jboss.netty.util.CharsetUtil;
import static org.jboss.netty.channel.Channels.*;
import static org.jboss.netty.handler.codec.http.HttpHeaders.*;
import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.*;
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*;
import static org.jboss.netty.handler.codec.http.HttpVersion.*;
public class HttpSnoopServer {
private final int port;
public HttpSnoopServer(int port) {
this.port = port;
}
public void run() {
// Configure the server.
ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
// Set up the event pipeline factory.
bootstrap.setPipelineFactory(new HttpSnoopServerPipelineFactory());
// Bind and start to accept incoming connections.
bootstrap.bind(new InetSocketAddress(port));
}
public static void main(String[] args) {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8080;
}
new HttpSnoopServer(port).run();
}
}
class HttpSnoopServerPipelineFactory implements ChannelPipelineFactory {
public ChannelPipeline getPipeline() throws Exception {
// Create a default pipeline implementation.
ChannelPipeline pipeline = pipeline();
pipeline.addLast("decoder", new HttpRequestDecoder());
// Uncomment the following line if you don't want to handle HttpChunks.
//pipeline.addLast("aggregator", new HttpChunkAggregator(1048576));
pipeline.addLast("encoder", new HttpResponseEncoder());
// Remove the following line if you don't want automatic content compression.
pipeline.addLast("pipeliner", new HttpPipeliningHandler());
pipeline.addLast("deflater", new HttpContentCompressor());
pipeline.addLast("handler", new HttpSnoopServerHandler());
return pipeline;
}
}
class RequestState {
RequestState(HttpRequest request) {
this.request = request;
}
final HttpRequest request;
final StringBuilder buf = new StringBuilder();
static RequestState get(Channel channel) {
Object attachment = channel.getAttachment();
if (attachment instanceof RequestState) {
return (RequestState) attachment;
} else {
throw new IllegalStateException("Attachment is not request state: " + attachment);
}
}
}
class HttpSnoopServerHandler extends SimpleChannelUpstreamHandler {
private static Timer timer = new Timer();
private boolean readingChunks;
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
if (!readingChunks) {
HttpRequest request = (HttpRequest) e.getMessage();
RequestState state = new RequestState(request);
StringBuilder buf = state.buf;
e.getChannel().setAttachment(state);
System.out.println("Received request: " + request.getUri());
if (is100ContinueExpected(request)) {
send100Continue(e);
}
buf.setLength(0);
buf.append("WELCOME TO THE WILD WILD WEB SERVER\r\n");
buf.append("===================================\r\n");
buf.append("VERSION: " + request.getProtocolVersion() + "\r\n");
buf.append("HOSTNAME: " + getHost(request, "unknown") + "\r\n");
buf.append("REQUEST_URI: " + request.getUri() + "\r\n\r\n");
for (Map.Entry<String, String> h: request.getHeaders()) {
buf.append("HEADER: " + h.getKey() + " = " + h.getValue() + "\r\n");
}
buf.append("\r\n");
QueryStringDecoder queryStringDecoder = new QueryStringDecoder(request.getUri());
Map<String, List<String>> params = queryStringDecoder.getParameters();
if (!params.isEmpty()) {
for (Map.Entry<String, List<String>> p: params.entrySet()) {
String key = p.getKey();
List<String> vals = p.getValue();
for (String val : vals) {
buf.append("PARAM: " + key + " = " + val + "\r\n");
}
}
buf.append("\r\n");
}
if (request.isChunked()) {
readingChunks = true;
} else {
ChannelBuffer content = request.getContent();
if (content.readable()) {
buf.append("CONTENT: " + content.toString(CharsetUtil.UTF_8) + "\r\n");
}
writeResponse(e);
}
} else {
HttpChunk chunk = (HttpChunk) e.getMessage();
StringBuilder buf = RequestState.get(e.getChannel()).buf;
if (chunk.isLast()) {
readingChunks = false;
buf.append("END OF CONTENT\r\n");
HttpChunkTrailer trailer = (HttpChunkTrailer) chunk;
if (!trailer.getHeaderNames().isEmpty()) {
buf.append("\r\n");
for (String name: trailer.getHeaderNames()) {
for (String value: trailer.getHeaders(name)) {
buf.append("TRAILING HEADER: " + name + " = " + value + "\r\n");
}
}
buf.append("\r\n");
}
writeResponse(e);
} else {
buf.append("CHUNK: " + chunk.getContent().toString(CharsetUtil.UTF_8) + "\r\n");
}
}
}
private void writeResponse(final MessageEvent e) {
if (RequestState.get(e.getChannel()).request.getUri().startsWith("/delayed")) {
timer.schedule(new TimerTask() {
@Override
public void run() {
doWriteResponse(e);
}
}, 2000);
} else {
doWriteResponse(e);
}
}
private void doWriteResponse(MessageEvent e) {
RequestState state = RequestState.get(e.getChannel());
// Decide whether to close the connection or not.
boolean keepAlive = isKeepAlive(state.request);
// Build the response object.
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
response.setContent(ChannelBuffers.copiedBuffer(state.buf.toString(), CharsetUtil.UTF_8));
response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
if (keepAlive) {
// Add 'Content-Length' header only for a keep-alive connection.
response.setHeader(CONTENT_LENGTH, response.getContent().readableBytes());
// Add keep alive header as per:
// - http://www.w3.org/Protocols/HTTP/1.1/draft-ietf-http-v11-spec-01.html#Connection
response.setHeader(CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
}
// Encode the cookie.
String cookieString = state.request.getHeader(COOKIE);
if (cookieString != null) {
CookieDecoder cookieDecoder = new CookieDecoder();
Set<Cookie> cookies = cookieDecoder.decode(cookieString);
if (!cookies.isEmpty()) {
// Reset the cookies if necessary.
CookieEncoder cookieEncoder = new CookieEncoder(true);
for (Cookie cookie : cookies) {
cookieEncoder.addCookie(cookie);
response.addHeader(SET_COOKIE, cookieEncoder.encode());
}
}
} else {
// Browser sent no cookie. Add some.
CookieEncoder cookieEncoder = new CookieEncoder(true);
cookieEncoder.addCookie("key1", "value1");
response.addHeader(SET_COOKIE, cookieEncoder.encode());
cookieEncoder.addCookie("key2", "value2");
response.addHeader(SET_COOKIE, cookieEncoder.encode());
}
// Write the response.
ChannelFuture future = e.getChannel().write(response);
// Close the non-keep-alive connection after the write operation is done.
if (!keepAlive) {
future.addListener(ChannelFutureListener.CLOSE);
}
}
private static void send100Continue(MessageEvent e) {
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, CONTINUE);
e.getChannel().write(response);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
throws Exception {
e.getCause().printStackTrace();
e.getChannel().close();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment