Last active
December 15, 2015 06:59
-
-
Save jroper/5220203 to your computer and use it in GitHub Desktop.
Demonstration of HTTP pipelining support in Netty.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.jboss.netty.buffer.ChannelBuffer; | |
import org.jboss.netty.channel.*; | |
import org.jboss.netty.handler.codec.http.*; | |
import java.net.SocketAddress; | |
import java.util.LinkedList; | |
import java.util.concurrent.Callable; | |
public class HttpPipeliningHandler implements ChannelUpstreamHandler, ChannelDownstreamHandler { | |
private final LinkedList<Channel> channelQueue = new LinkedList<Channel>(); | |
private long currentResponseContentLength = -1; | |
private boolean isChunked; | |
@Override | |
public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception { | |
if (e instanceof MessageEvent) { | |
MessageEvent msgEvent = (MessageEvent) e; | |
Object message = msgEvent.getMessage(); | |
if (message instanceof HttpResponse) { | |
HttpResponse response = (HttpResponse) message; | |
if (response.getStatus() == HttpResponseStatus.NO_CONTENT || response.getStatus() == HttpResponseStatus.NOT_MODIFIED) { | |
currentResponseContentLength = 0; | |
} else { | |
currentResponseContentLength = HttpHeaders.getContentLength(response, -1); | |
} | |
isChunked = response.isChunked(); | |
if (!isChunked) { | |
observeContent(response.getContent(), e.getFuture()); | |
} | |
} else if (message instanceof HttpChunk) { | |
HttpChunk chunk = (HttpChunk) message; | |
if (isChunked) { | |
if (chunk.isLast()) { | |
popChannelQueue(e.getFuture()); | |
} | |
} else { | |
observeContent(chunk.getContent(), e.getFuture()); | |
} | |
} else if (message instanceof ChannelBuffer) { | |
observeContent((ChannelBuffer) message, e.getFuture()); | |
} | |
} | |
ctx.sendDownstream(e); | |
} | |
private void observeContent(ChannelBuffer buffer, ChannelFuture future) { | |
if (currentResponseContentLength >= 0) { | |
currentResponseContentLength -= buffer.readableBytes(); | |
if (currentResponseContentLength <= 0) { | |
popChannelQueue(future); | |
currentResponseContentLength = -1; | |
} | |
} | |
} | |
private void popChannelQueue(ChannelFuture future) { | |
future.addListener(new ChannelFutureListener() { | |
@Override | |
public void operationComplete(ChannelFuture future) throws Exception { | |
Channel next; | |
synchronized (channelQueue) { | |
channelQueue.pop(); | |
next = channelQueue.peek(); | |
} | |
if (next instanceof HttpPipeliningChannel) { | |
if (future.isCancelled()) { | |
((HttpPipeliningChannel) next).cancel(); | |
} else { | |
((HttpPipeliningChannel) next).proceed(); | |
} | |
} | |
} | |
}); | |
} | |
@Override | |
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception { | |
// A messages has come from upstream (ie, part of a request) | |
if (e instanceof MessageEvent) { | |
MessageEvent msgEvent = (MessageEvent) e; | |
Object msg = msgEvent.getMessage(); | |
if (msg instanceof HttpRequest) { | |
synchronized (channelQueue) { | |
if (channelQueue.size() > 0) { | |
// This request is pipelined behind another | |
channelQueue.add(new HttpPipeliningChannel(e.getChannel())); | |
} else { | |
// No pipelining, so no need to use the pipelining channel | |
channelQueue.add(e.getChannel()); | |
} | |
} | |
} | |
ctx.sendUpstream(new UpstreamMessageEvent(channelQueue.getLast(), msg, msgEvent.getRemoteAddress())); | |
} else { | |
ctx.sendUpstream(e); | |
} | |
} | |
} | |
/** | |
* Delegates operations to the underlying channel, but only once told to proceed | |
*/ | |
class HttpPipeliningChannel implements Channel { | |
private final Channel delegate; | |
private final ChannelFuture future = new DefaultChannelFuture(this, false); | |
private Object attachment; | |
HttpPipeliningChannel(Channel delegate) { | |
this.delegate = delegate; | |
} | |
public void proceed() { | |
future.setSuccess(); | |
} | |
public void cancel() { | |
future.cancel(); | |
} | |
@Override | |
public Integer getId() { | |
return delegate.getId(); | |
} | |
@Override | |
public ChannelFactory getFactory() { | |
return delegate.getFactory(); | |
} | |
@Override | |
public Channel getParent() { | |
return delegate.getParent(); | |
} | |
@Override | |
public ChannelConfig getConfig() { | |
return delegate.getConfig(); | |
} | |
@Override | |
public ChannelPipeline getPipeline() { | |
return delegate.getPipeline(); | |
} | |
@Override | |
public boolean isOpen() { | |
return delegate.isOpen(); | |
} | |
@Override | |
public boolean isBound() { | |
return delegate.isBound(); | |
} | |
@Override | |
public boolean isConnected() { | |
return delegate.isConnected(); | |
} | |
@Override | |
public SocketAddress getLocalAddress() { | |
return delegate.getLocalAddress(); | |
} | |
@Override | |
public SocketAddress getRemoteAddress() { | |
return delegate.getRemoteAddress(); | |
} | |
@Override | |
public ChannelFuture write(final Object message) { | |
return chainFutureTo(new Callable<ChannelFuture>() { | |
@Override | |
public ChannelFuture call() throws Exception { | |
return delegate.write(message); | |
} | |
}); | |
} | |
@Override | |
public ChannelFuture write(final Object message, final SocketAddress remoteAddress) { | |
return chainFutureTo(new Callable<ChannelFuture>() { | |
@Override | |
public ChannelFuture call() throws Exception { | |
return delegate.write(message, remoteAddress); | |
} | |
}); | |
} | |
@Override | |
public ChannelFuture bind(final SocketAddress localAddress) { | |
return chainFutureTo(new Callable<ChannelFuture>() { | |
@Override | |
public ChannelFuture call() throws Exception { | |
return delegate.bind(localAddress); | |
} | |
}); | |
} | |
@Override | |
public ChannelFuture connect(final SocketAddress remoteAddress) { | |
return chainFutureTo(new Callable<ChannelFuture>() { | |
@Override | |
public ChannelFuture call() throws Exception { | |
return delegate.connect(remoteAddress); | |
} | |
}); | |
} | |
@Override | |
public ChannelFuture disconnect() { | |
return chainFutureTo(new Callable<ChannelFuture>() { | |
@Override | |
public ChannelFuture call() throws Exception { | |
return delegate.disconnect(); | |
} | |
}); | |
} | |
@Override | |
public ChannelFuture unbind() { | |
return chainFutureTo(new Callable<ChannelFuture>() { | |
@Override | |
public ChannelFuture call() throws Exception { | |
return delegate.unbind(); | |
} | |
}); | |
} | |
@Override | |
public ChannelFuture close() { | |
return chainFutureTo(new Callable<ChannelFuture>() { | |
@Override | |
public ChannelFuture call() throws Exception { | |
return delegate.close(); | |
} | |
}); | |
} | |
@Override | |
public ChannelFuture getCloseFuture() { | |
return chainFutureTo(new Callable<ChannelFuture>() { | |
@Override | |
public ChannelFuture call() throws Exception { | |
return delegate.getCloseFuture(); | |
} | |
}); | |
} | |
@Override | |
public int getInterestOps() { | |
return delegate.getInterestOps(); | |
} | |
@Override | |
public boolean isReadable() { | |
if (future.isDone()) { | |
return delegate.isReadable(); | |
} else { | |
return false; | |
} | |
} | |
@Override | |
public boolean isWritable() { | |
if (future.isDone()) { | |
return delegate.isWritable(); | |
} else { | |
return false; | |
} | |
} | |
@Override | |
public ChannelFuture setInterestOps(final int interestOps) { | |
return chainFutureTo(new Callable<ChannelFuture>() { | |
@Override | |
public ChannelFuture call() throws Exception { | |
return delegate.setInterestOps(interestOps); | |
} | |
}); | |
} | |
@Override | |
public ChannelFuture setReadable(final boolean readable) { | |
return chainFutureTo(new Callable<ChannelFuture>() { | |
@Override | |
public ChannelFuture call() throws Exception { | |
return delegate.setReadable(readable); | |
} | |
}); | |
} | |
@Override | |
public Object getAttachment() { | |
return attachment; | |
} | |
@Override | |
public void setAttachment(Object attachment) { | |
this.attachment = attachment; | |
} | |
@Override | |
public int compareTo(Channel channel) { | |
return delegate.compareTo(channel); | |
} | |
private ChannelFuture chainFutureTo(final Callable<ChannelFuture> op) { | |
final ChannelFuture chained = new DefaultChannelFuture(this, true); | |
future.addListener(new ChannelFutureListener() { | |
@Override | |
public void operationComplete(ChannelFuture future) throws Exception { | |
if (future.isCancelled()) { | |
chained.cancel(); | |
} else { | |
op.call().addListener(new ChannelFutureListener() { | |
@Override | |
public void operationComplete(ChannelFuture future) throws Exception { | |
if (future.isSuccess()) { | |
chained.setSuccess(); | |
} else if (future.isCancelled()) { | |
chained.cancel(); | |
} else { | |
chained.setFailure(future.getCause()); | |
} | |
} | |
}); | |
} | |
} | |
}); | |
return chained; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.net.InetSocketAddress; | |
import java.util.*; | |
import java.util.concurrent.Executors; | |
import org.jboss.netty.bootstrap.ServerBootstrap; | |
import org.jboss.netty.buffer.ChannelBuffer; | |
import org.jboss.netty.buffer.ChannelBuffers; | |
import org.jboss.netty.channel.*; | |
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; | |
import org.jboss.netty.handler.codec.http.*; | |
import org.jboss.netty.util.CharsetUtil; | |
import static org.jboss.netty.channel.Channels.*; | |
import static org.jboss.netty.handler.codec.http.HttpHeaders.*; | |
import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.*; | |
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*; | |
import static org.jboss.netty.handler.codec.http.HttpVersion.*; | |
public class HttpSnoopServer { | |
private final int port; | |
public HttpSnoopServer(int port) { | |
this.port = port; | |
} | |
public void run() { | |
// Configure the server. | |
ServerBootstrap bootstrap = new ServerBootstrap( | |
new NioServerSocketChannelFactory( | |
Executors.newCachedThreadPool(), | |
Executors.newCachedThreadPool())); | |
// Set up the event pipeline factory. | |
bootstrap.setPipelineFactory(new HttpSnoopServerPipelineFactory()); | |
// Bind and start to accept incoming connections. | |
bootstrap.bind(new InetSocketAddress(port)); | |
} | |
public static void main(String[] args) { | |
int port; | |
if (args.length > 0) { | |
port = Integer.parseInt(args[0]); | |
} else { | |
port = 8080; | |
} | |
new HttpSnoopServer(port).run(); | |
} | |
} | |
class HttpSnoopServerPipelineFactory implements ChannelPipelineFactory { | |
public ChannelPipeline getPipeline() throws Exception { | |
// Create a default pipeline implementation. | |
ChannelPipeline pipeline = pipeline(); | |
pipeline.addLast("decoder", new HttpRequestDecoder()); | |
// Uncomment the following line if you don't want to handle HttpChunks. | |
//pipeline.addLast("aggregator", new HttpChunkAggregator(1048576)); | |
pipeline.addLast("encoder", new HttpResponseEncoder()); | |
// Remove the following line if you don't want automatic content compression. | |
pipeline.addLast("pipeliner", new HttpPipeliningHandler()); | |
pipeline.addLast("deflater", new HttpContentCompressor()); | |
pipeline.addLast("handler", new HttpSnoopServerHandler()); | |
return pipeline; | |
} | |
} | |
class RequestState { | |
RequestState(HttpRequest request) { | |
this.request = request; | |
} | |
final HttpRequest request; | |
final StringBuilder buf = new StringBuilder(); | |
static RequestState get(Channel channel) { | |
Object attachment = channel.getAttachment(); | |
if (attachment instanceof RequestState) { | |
return (RequestState) attachment; | |
} else { | |
throw new IllegalStateException("Attachment is not request state: " + attachment); | |
} | |
} | |
} | |
class HttpSnoopServerHandler extends SimpleChannelUpstreamHandler { | |
private static Timer timer = new Timer(); | |
private boolean readingChunks; | |
@Override | |
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { | |
if (!readingChunks) { | |
HttpRequest request = (HttpRequest) e.getMessage(); | |
RequestState state = new RequestState(request); | |
StringBuilder buf = state.buf; | |
e.getChannel().setAttachment(state); | |
System.out.println("Received request: " + request.getUri()); | |
if (is100ContinueExpected(request)) { | |
send100Continue(e); | |
} | |
buf.setLength(0); | |
buf.append("WELCOME TO THE WILD WILD WEB SERVER\r\n"); | |
buf.append("===================================\r\n"); | |
buf.append("VERSION: " + request.getProtocolVersion() + "\r\n"); | |
buf.append("HOSTNAME: " + getHost(request, "unknown") + "\r\n"); | |
buf.append("REQUEST_URI: " + request.getUri() + "\r\n\r\n"); | |
for (Map.Entry<String, String> h: request.getHeaders()) { | |
buf.append("HEADER: " + h.getKey() + " = " + h.getValue() + "\r\n"); | |
} | |
buf.append("\r\n"); | |
QueryStringDecoder queryStringDecoder = new QueryStringDecoder(request.getUri()); | |
Map<String, List<String>> params = queryStringDecoder.getParameters(); | |
if (!params.isEmpty()) { | |
for (Map.Entry<String, List<String>> p: params.entrySet()) { | |
String key = p.getKey(); | |
List<String> vals = p.getValue(); | |
for (String val : vals) { | |
buf.append("PARAM: " + key + " = " + val + "\r\n"); | |
} | |
} | |
buf.append("\r\n"); | |
} | |
if (request.isChunked()) { | |
readingChunks = true; | |
} else { | |
ChannelBuffer content = request.getContent(); | |
if (content.readable()) { | |
buf.append("CONTENT: " + content.toString(CharsetUtil.UTF_8) + "\r\n"); | |
} | |
writeResponse(e); | |
} | |
} else { | |
HttpChunk chunk = (HttpChunk) e.getMessage(); | |
StringBuilder buf = RequestState.get(e.getChannel()).buf; | |
if (chunk.isLast()) { | |
readingChunks = false; | |
buf.append("END OF CONTENT\r\n"); | |
HttpChunkTrailer trailer = (HttpChunkTrailer) chunk; | |
if (!trailer.getHeaderNames().isEmpty()) { | |
buf.append("\r\n"); | |
for (String name: trailer.getHeaderNames()) { | |
for (String value: trailer.getHeaders(name)) { | |
buf.append("TRAILING HEADER: " + name + " = " + value + "\r\n"); | |
} | |
} | |
buf.append("\r\n"); | |
} | |
writeResponse(e); | |
} else { | |
buf.append("CHUNK: " + chunk.getContent().toString(CharsetUtil.UTF_8) + "\r\n"); | |
} | |
} | |
} | |
private void writeResponse(final MessageEvent e) { | |
if (RequestState.get(e.getChannel()).request.getUri().startsWith("/delayed")) { | |
timer.schedule(new TimerTask() { | |
@Override | |
public void run() { | |
doWriteResponse(e); | |
} | |
}, 2000); | |
} else { | |
doWriteResponse(e); | |
} | |
} | |
private void doWriteResponse(MessageEvent e) { | |
RequestState state = RequestState.get(e.getChannel()); | |
// Decide whether to close the connection or not. | |
boolean keepAlive = isKeepAlive(state.request); | |
// Build the response object. | |
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); | |
response.setContent(ChannelBuffers.copiedBuffer(state.buf.toString(), CharsetUtil.UTF_8)); | |
response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8"); | |
if (keepAlive) { | |
// Add 'Content-Length' header only for a keep-alive connection. | |
response.setHeader(CONTENT_LENGTH, response.getContent().readableBytes()); | |
// Add keep alive header as per: | |
// - http://www.w3.org/Protocols/HTTP/1.1/draft-ietf-http-v11-spec-01.html#Connection | |
response.setHeader(CONNECTION, HttpHeaders.Values.KEEP_ALIVE); | |
} | |
// Encode the cookie. | |
String cookieString = state.request.getHeader(COOKIE); | |
if (cookieString != null) { | |
CookieDecoder cookieDecoder = new CookieDecoder(); | |
Set<Cookie> cookies = cookieDecoder.decode(cookieString); | |
if (!cookies.isEmpty()) { | |
// Reset the cookies if necessary. | |
CookieEncoder cookieEncoder = new CookieEncoder(true); | |
for (Cookie cookie : cookies) { | |
cookieEncoder.addCookie(cookie); | |
response.addHeader(SET_COOKIE, cookieEncoder.encode()); | |
} | |
} | |
} else { | |
// Browser sent no cookie. Add some. | |
CookieEncoder cookieEncoder = new CookieEncoder(true); | |
cookieEncoder.addCookie("key1", "value1"); | |
response.addHeader(SET_COOKIE, cookieEncoder.encode()); | |
cookieEncoder.addCookie("key2", "value2"); | |
response.addHeader(SET_COOKIE, cookieEncoder.encode()); | |
} | |
// Write the response. | |
ChannelFuture future = e.getChannel().write(response); | |
// Close the non-keep-alive connection after the write operation is done. | |
if (!keepAlive) { | |
future.addListener(ChannelFutureListener.CLOSE); | |
} | |
} | |
private static void send100Continue(MessageEvent e) { | |
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, CONTINUE); | |
e.getChannel().write(response); | |
} | |
@Override | |
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) | |
throws Exception { | |
e.getCause().printStackTrace(); | |
e.getChannel().close(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment