Skip to content

Instantly share code, notes, and snippets.

@mhgrove
Created August 20, 2013 11:52
Show Gist options
  • Save mhgrove/6280443 to your computer and use it in GitHub Desktop.
Save mhgrove/6280443 to your computer and use it in GitHub Desktop.
Netty HTTP Streaming Bug
// Copyright (c) 2010 - 2013, Clark & Parsia, LLC. <http://www.clarkparsia.com>
// For more information about licensing and copyright of this software, please contact
// [email protected] or visit http://stardog.com
package netty;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.net.URLDecoder;
import java.nio.channels.ReadableByteChannel;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.Locale;
import java.util.TimeZone;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import javax.activation.MimetypesFileTypeMap;
import com.complexible.common.base.Memory;
import com.complexible.common.netty.buffer.ChannelBufferAggregateInputStream;
import com.complexible.stardog.StardogExecutor;
import com.google.common.io.ByteStreams;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelFutureProgressListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.frame.TooLongFrameException;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpChunk;
import org.jboss.netty.handler.codec.http.HttpChunkTrailer;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.stream.ChunkedNioStream;
import org.jboss.netty.handler.stream.ChunkedWriteHandler;
import org.jboss.netty.util.CharsetUtil;
import static org.jboss.netty.handler.codec.http.HttpHeaders.Names;
import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL;
import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.DATE;
import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.EXPIRES;
import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.IF_MODIFIED_SINCE;
import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.LAST_MODIFIED;
import static org.jboss.netty.handler.codec.http.HttpHeaders.Values;
import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive;
import static org.jboss.netty.handler.codec.http.HttpHeaders.setContentLength;
import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
import static org.jboss.netty.handler.codec.http.HttpMethod.POST;
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.NOT_MODIFIED;
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK;
import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
public class Server2 {
private final int port;
public Server2(int port) {
this.port = port;
}
public void run() {
// Configure the server.
ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
// Set up the event pipeline factory.
bootstrap.setPipelineFactory(new HttpStaticFileServerPipelineFactory());
// Bind and start to accept incoming connections.
bootstrap.bind(new InetSocketAddress(port));
}
public static void main(String[] args) {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8080;
}
new Server2(port).run();
}
static class HttpStaticFileServerPipelineFactory implements ChannelPipelineFactory {
public ChannelPipeline getPipeline() throws Exception {
// Create a default pipeline implementation.
ChannelPipeline pipeline = Channels.pipeline();
// Uncomment the following lines if you want HTTPS
//SSLEngine engine = SecureChatSslContextFactory.getServerContext().createSSLEngine();
//engine.setUseClientMode(false);
//pipeline.addLast("ssl", new SslHandler(engine));
pipeline.addLast("decoder", new HttpRequestDecoder(4096, 8192, 64 * (int)Memory.KB));
// pipeline.addLast("aggregator", new HttpChunkAggregator(65536));
pipeline.addLast("encoder", new HttpResponseEncoder());
pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
pipeline.addLast("handler", new HttpStaticFileServerHandler());
return pipeline;
}
}
static class HttpStaticFileServerHandler extends SimpleChannelUpstreamHandler {
public static final String HTTP_DATE_FORMAT = "EEE, dd MMM yyyy HH:mm:ss zzz";
public static final String HTTP_DATE_GMT_TIMEZONE = "GMT";
public static final int HTTP_CACHE_SECONDS = 60;
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
HttpRequest request = (HttpRequest) e.getMessage();
if (request.getMethod() != GET) {
sendError(ctx, METHOD_NOT_ALLOWED);
return;
}
final String path = sanitizeUri(request.getUri());
if (path == null) {
sendError(ctx, FORBIDDEN);
return;
}
// random content
byte[] content = new byte[4096];
for (int i = 0; i < content.length; i++) {
content[i] = (byte) i;
}
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
// setContentLength(response, 4096);
// setContentTypeHeader(response, file);
// setDateAndCacheHeaders(response, file);
response.setChunked(true);
response.setHeader(Names.TRANSFER_ENCODING, Values.CHUNKED);
final Channel ch = e.getChannel();
// Write the initial line and the header.
ch.write(response);
final ReadableByteChannel aIn = java.nio.channels.Channels.newChannel(new ByteArrayInputStream(content));
ChannelFuture writeFuture = ch.write(new ChunkedNioStream(aIn));
writeFuture.addListener(new ChannelFutureProgressListener() {
public void operationComplete(ChannelFuture future) {
try {
aIn.close();
}
catch (IOException e1) {
e1.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
}
public void operationProgressed(
ChannelFuture future, long amount, long current, long total) {
System.out.printf("%s: %d / %d (+%d)%n", path, current, total, amount);
}
});
// Decide whether to close the connection or not.
if (!isKeepAlive(request)) {
// Close the connection when the whole content is written out.
writeFuture.addListener(ChannelFutureListener.CLOSE);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
throws Exception {
Channel ch = e.getChannel();
Throwable cause = e.getCause();
cause.printStackTrace();
if (cause instanceof TooLongFrameException) {
sendError(ctx, BAD_REQUEST);
return;
}
if (ch.isConnected()) {
sendError(ctx, INTERNAL_SERVER_ERROR);
}
}
private static String sanitizeUri(String uri) {
// Decode the path.
try {
uri = URLDecoder.decode(uri, "UTF-8");
} catch (UnsupportedEncodingException e) {
try {
uri = URLDecoder.decode(uri, "ISO-8859-1");
} catch (UnsupportedEncodingException e1) {
throw new Error();
}
}
// Convert file separators.
uri = uri.replace('/', File.separatorChar);
// Simplistic dumb security check.
// You will have to do something serious in the production environment.
if (uri.contains(File.separator + '.') ||
uri.contains('.' + File.separator) ||
uri.startsWith(".") || uri.endsWith(".")) {
return null;
}
// Convert to absolute path.
return System.getProperty("user.dir") + File.separator + uri;
}
private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
response.setContent(ChannelBuffers.copiedBuffer(
"Failure: " + status.toString() + "\r\n",
CharsetUtil.UTF_8));
// Close the connection as soon as the error message is sent.
ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
}
/**
* When file timestamp is the same as what the browser is sending up, send a "304 Not Modified"
*
* @param ctx
* Context
*/
private static void sendNotModified(ChannelHandlerContext ctx) {
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, NOT_MODIFIED);
setDateHeader(response);
// Close the connection as soon as the error message is sent.
ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
}
/**
* Sets the Date header for the HTTP response
*
* @param response
* HTTP response
*/
private static void setDateHeader(HttpResponse response) {
SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US);
dateFormatter.setTimeZone(TimeZone.getTimeZone(HTTP_DATE_GMT_TIMEZONE));
Calendar time = new GregorianCalendar();
response.setHeader(DATE, dateFormatter.format(time.getTime()));
}
/**
* Sets the Date and Cache headers for the HTTP Response
*
* @param response
* HTTP response
* @param fileToCache
* file to extract content type
*/
private static void setDateAndCacheHeaders(HttpResponse response, File fileToCache) {
SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US);
dateFormatter.setTimeZone(TimeZone.getTimeZone(HTTP_DATE_GMT_TIMEZONE));
// Date header
Calendar time = new GregorianCalendar();
response.setHeader(DATE, dateFormatter.format(time.getTime()));
// Add cache headers
time.add(Calendar.SECOND, HTTP_CACHE_SECONDS);
response.setHeader(EXPIRES, dateFormatter.format(time.getTime()));
response.setHeader(CACHE_CONTROL, "private, max-age=" + HTTP_CACHE_SECONDS);
response.setHeader(
LAST_MODIFIED, dateFormatter.format(new Date(fileToCache.lastModified())));
}
/**
* Sets the content type header for the HTTP Response
*
* @param response
* HTTP response
* @param file
* file to extract content type
*/
private static void setContentTypeHeader(HttpResponse response, File file) {
MimetypesFileTypeMap mimeTypesMap = new MimetypesFileTypeMap();
response.setHeader(CONTENT_TYPE, mimeTypesMap.getContentType(file.getPath()));
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment