Created
          August 12, 2013 22:13 
        
      - 
      
- 
        Save breznik/6215834 to your computer and use it in GitHub Desktop. 
    Example code that works when the HttpPostRequestDecoder's setDiscardThreshold(0) method is called, but fails otherwise.
  
        
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | package net.reznik.gist; | |
| import static io.netty.buffer.Unpooled.copiedBuffer; | |
| import static io.netty.handler.codec.http.HttpHeaders.isKeepAlive; | |
| import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; | |
| import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; | |
| import static io.netty.handler.codec.http.HttpResponseStatus.OK; | |
| import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; | |
| import io.netty.buffer.ByteBuf; | |
| import io.netty.buffer.ByteBufOutputStream; | |
| import io.netty.buffer.Unpooled; | |
| import io.netty.channel.ChannelFuture; | |
| import io.netty.channel.ChannelFutureListener; | |
| import io.netty.channel.ChannelHandlerContext; | |
| import io.netty.channel.SimpleChannelInboundHandler; | |
| import io.netty.handler.codec.http.DefaultFullHttpResponse; | |
| import io.netty.handler.codec.http.FullHttpRequest; | |
| import io.netty.handler.codec.http.FullHttpResponse; | |
| import io.netty.handler.codec.http.HttpContent; | |
| import io.netty.handler.codec.http.HttpHeaders; | |
| import io.netty.handler.codec.http.HttpObject; | |
| import io.netty.handler.codec.http.HttpRequest; | |
| import io.netty.handler.codec.http.HttpResponseStatus; | |
| import io.netty.handler.codec.http.LastHttpContent; | |
| import io.netty.handler.codec.http.multipart.Attribute; | |
| import io.netty.handler.codec.http.multipart.DiskAttribute; | |
| import io.netty.handler.codec.http.multipart.DiskFileUpload; | |
| import io.netty.handler.codec.http.multipart.FileUpload; | |
| import io.netty.handler.codec.http.multipart.HttpDataFactory; | |
| import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder; | |
| import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder.EndOfDataDecoderException; | |
| import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder.ErrorDataDecoderException; | |
| import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder.IncompatibleDataDecoderException; | |
| import io.netty.handler.codec.http.multipart.InterfaceHttpData; | |
| import io.netty.util.CharsetUtil; | |
| import java.io.IOException; | |
| import org.slf4j.Logger; | |
| import org.slf4j.LoggerFactory; | |
| public class MyServerHandler extends SimpleChannelInboundHandler<HttpObject> { | |
| private static final Logger LOGGER = LoggerFactory.getLogger(MyServerHandler.class); | |
| private static final String CONTENT_TYPE_JSON = "application/json"; | |
| /** Factory for creating data objects used in decoding of Multipart POSTs */ | |
| private final HttpDataFactory dataFactory; | |
| /** Decoder used for Multipart POST requests */ | |
| private HttpPostRequestDecoder decoder; | |
| /** The request used for decoding of Multipart POST requests */ | |
| private FullHttpRequest fullHttpRequest; | |
| private String readData; | |
| static { | |
| DiskFileUpload.deleteOnExitTemporaryFile = true; // delete file on exit (normal exit) | |
| DiskFileUpload.baseDirectory = null; // system temp directory | |
| DiskAttribute.deleteOnExitTemporaryFile = true; // delete file on exit (normal exit) | |
| DiskAttribute.baseDirectory = null; // system temp directory | |
| } | |
| MyServerHandler(final HttpDataFactory dataFactory) { | |
| this.dataFactory = dataFactory; | |
| } | |
| @Override | |
| public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { | |
| if (decoder != null) { | |
| decoder.cleanFiles(); | |
| } | |
| } | |
| @Override | |
| protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception { | |
| if (msg instanceof FullHttpRequest) { | |
| FullHttpRequest request = fullHttpRequest = (FullHttpRequest) msg; | |
| try { | |
| decoder = new HttpPostRequestDecoder(dataFactory, request); | |
| decoder.setDiscardThreshold(0); | |
| } catch (IncompatibleDataDecoderException e) { | |
| // No body to decode | |
| handleInvalidRequest(ctx, request, BAD_REQUEST, copiedBuffer(e.getMessage(), CharsetUtil.UTF_8)); | |
| return; | |
| } catch (ErrorDataDecoderException e) { | |
| // Bad default charset or other issues such as a missing boundary | |
| handleInvalidRequest(ctx, request, BAD_REQUEST, copiedBuffer(e.getMessage(), CharsetUtil.UTF_8)); | |
| return; | |
| } | |
| } | |
| if (decoder != null) { | |
| if (msg instanceof HttpContent) { | |
| HttpContent chunk = (HttpContent) msg; | |
| try { | |
| decoder.offer(chunk); | |
| } catch (ErrorDataDecoderException e) { | |
| LOGGER.warn("Unable to add chunk to decoder", e); | |
| handleInvalidRequest(ctx, fullHttpRequest, BAD_REQUEST, copiedBuffer(e.getMessage(), CharsetUtil.UTF_8)); | |
| return; | |
| } | |
| // Read data as it becomes available, chunk by chunk. | |
| readChunkByChunk(ctx); | |
| if (chunk instanceof LastHttpContent) { | |
| readChunkByChunk(ctx); | |
| sendResponse(ctx); | |
| resetPostRequestDecoder(); | |
| } | |
| } | |
| } | |
| } | |
| /** | |
| * Reads each HTTP chunk into an {@link InterfaceHttpData} for processing | |
| * @param ctx | |
| */ | |
| private void readChunkByChunk(ChannelHandlerContext ctx) { | |
| try { | |
| while (decoder.hasNext()) { | |
| InterfaceHttpData data = decoder.next(); | |
| if (data != null) { | |
| try { | |
| processChunk(ctx, data); | |
| } finally { | |
| data.release(); | |
| } | |
| } | |
| } | |
| } catch (EndOfDataDecoderException e) { | |
| // No more data to decode, that's fine | |
| } | |
| } | |
| /** | |
| * Processes an individual HTTP Data chunk | |
| * @param ctx | |
| * @param data | |
| */ | |
| private void processChunk(ChannelHandlerContext ctx, InterfaceHttpData data) { | |
| LOGGER.debug("HTTP Data Name: {}, Type: {}", data.getName(), data.getHttpDataType()); | |
| switch(data.getHttpDataType()) { | |
| case Attribute: | |
| Attribute attrib = (Attribute) data; | |
| if (!"json".equals(attrib.getName())) { | |
| LOGGER.debug("Received unknown attribute: {}", attrib.getName()); | |
| handleInvalidRequest(ctx, fullHttpRequest, BAD_REQUEST, copiedBuffer("Unknown Part Name: " + attrib.getName(), CharsetUtil.UTF_8)); | |
| return; | |
| } | |
| try { | |
| readData = attrib.getByteBuf().toString(CharsetUtil.UTF_8); | |
| LOGGER.debug("Content Size: {}, Content: {}", attrib.getByteBuf().readableBytes(), readData); | |
| } catch (IOException e) { | |
| LOGGER.error("Unable to read attribute content", e); | |
| } | |
| break; | |
| case FileUpload: | |
| FileUpload fileUpload = (FileUpload) data; | |
| // handle upload | |
| break; | |
| default: | |
| LOGGER.warn("Received unknown attribute type. Skipping."); | |
| break; | |
| } | |
| } | |
| private void resetPostRequestDecoder() { | |
| fullHttpRequest = null; | |
| readData = null; | |
| // clean previous FileUpload if Any | |
| decoder.destroy(); | |
| decoder = null; | |
| } | |
| private void sendResponse(ChannelHandlerContext ctx) { | |
| // Serialize Response | |
| ByteBuf outputContent = Unpooled.buffer(); | |
| ByteBufOutputStream bbos = new ByteBufOutputStream(outputContent); | |
| try { | |
| bbos.writeBytes(readData); | |
| } catch (IOException e) { | |
| LOGGER.error("Unable to serialize response", e); | |
| } | |
| // Create HTTP Response | |
| FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK, outputContent); | |
| response.headers().set(CONTENT_TYPE, CONTENT_TYPE_JSON); | |
| setContentLength(response, outputContent.readableBytes()); | |
| // Send HTTP Response | |
| sendHttpResponse(ctx, fullHttpRequest, response); | |
| } | |
| private void handleInvalidRequest(ChannelHandlerContext ctx, HttpRequest request, HttpResponseStatus responseStatus, ByteBuf errorMessage) { | |
| sendHttpResponse(ctx, request, new DefaultFullHttpResponse(HTTP_1_1, responseStatus, errorMessage)); | |
| return; | |
| } | |
| private void sendHttpResponse(ChannelHandlerContext ctx, HttpRequest request, FullHttpResponse response) { | |
| if (!response.headers().contains(HttpHeaders.Names.CONTENT_LENGTH)) { | |
| setContentLength(response, response.content().readableBytes()); | |
| } | |
| // Send the response and close the connection if necessary. | |
| ChannelFuture f = ctx.channel().writeAndFlush(response); | |
| if (!isKeepAlive(request) || response.getStatus().code() != OK.code()) { | |
| f.addListener(ChannelFutureListener.CLOSE); | |
| } | |
| } | |
| private static void setContentLength(FullHttpResponse response, int length) { | |
| response.headers().add(HttpHeaders.Names.CONTENT_LENGTH, length); | |
| } | |
| @Override | |
| public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { | |
| LOGGER.warn("Exception:", cause); | |
| ctx.close(); | |
| } | |
| } | 
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | package net.reznik.gist; | |
| import static org.testng.Assert.assertEquals; | |
| import static org.testng.Assert.assertNotNull; | |
| import java.io.BufferedWriter; | |
| import java.io.File; | |
| import java.io.FileWriter; | |
| import java.io.IOException; | |
| import io.netty.buffer.ByteBuf; | |
| import io.netty.buffer.ByteBufOutputStream; | |
| import io.netty.buffer.Unpooled; | |
| import io.netty.channel.embedded.EmbeddedChannel; | |
| import io.netty.handler.codec.http.DefaultFullHttpRequest; | |
| import io.netty.handler.codec.http.FullHttpResponse; | |
| import io.netty.handler.codec.http.HttpContent; | |
| import io.netty.handler.codec.http.HttpHeaders; | |
| import io.netty.handler.codec.http.HttpMethod; | |
| import io.netty.handler.codec.http.HttpObjectAggregator; | |
| import io.netty.handler.codec.http.HttpRequest; | |
| import io.netty.handler.codec.http.HttpResponseStatus; | |
| import io.netty.handler.codec.http.HttpVersion; | |
| import io.netty.handler.codec.http.multipart.DefaultHttpDataFactory; | |
| import io.netty.handler.codec.http.multipart.FileUpload; | |
| import io.netty.handler.codec.http.multipart.HttpDataFactory; | |
| import io.netty.handler.codec.http.multipart.HttpPostRequestEncoder; | |
| import io.netty.handler.codec.http.multipart.InterfaceHttpData; | |
| import io.netty.util.CharsetUtil; | |
| import org.testng.annotations.BeforeMethod; | |
| import org.testng.annotations.Test; | |
| @Test(groups = "Unit") | |
| public class MyServerHandlerTest { | |
| private final HttpDataFactory dataFactory = new DefaultHttpDataFactory(DefaultHttpDataFactory.MINSIZE); | |
| private final String jsonContent = "{key:value, key2:value2}"; | |
| private EmbeddedChannel channel; | |
| private MyServerHandler handler; | |
| @BeforeMethod | |
| private void before() { | |
| handler = new MyServerHandler(dataFactory); | |
| channel = new EmbeddedChannel(new HttpObjectAggregator(1048576), handler); | |
| } | |
| public void testMultipart() throws Exception { | |
| HttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/"); | |
| HttpPostRequestEncoder encoder = new HttpPostRequestEncoder(dataFactory, request, true); | |
| request.headers().add(HttpHeaders.Names.CONTENT_TYPE, HttpHeaders.Values.MULTIPART_FORM_DATA); | |
| encoder.addBodyHttpData(createJSONAttribute(request, dataFactory)); | |
| encoder.addBodyHttpData(createFileUpload(request, dataFactory)); | |
| request = encoder.finalizeRequest(); | |
| sendMultipartRequest(request, encoder); | |
| FullHttpResponse response = (FullHttpResponse) channel.readOutbound(); | |
| assertNotNull(response, "HTTP Response"); | |
| assertEquals(response.getStatus(), HttpResponseStatus.OK, "Unexpected response code. Reason: " + response.content().toString(CharsetUtil.UTF_8) + "."); | |
| assertEquals(response.content().toString(CharsetUtil.UTF_8), jsonContent); | |
| } | |
| private void sendMultipartRequest(HttpRequest request, HttpPostRequestEncoder encoder) throws Exception { | |
| channel.writeInbound(request); | |
| if (!channel.isOpen()) { | |
| // Channel was closed early due to a bad request being written, so don't bother to write the chunks | |
| encoder.cleanFiles(); | |
| return; | |
| } | |
| HttpContent content; | |
| while (!encoder.isEndOfInput()) { | |
| content = encoder.readChunk(null); | |
| channel.writeInbound(content); | |
| } | |
| channel.flush(); | |
| encoder.cleanFiles(); | |
| } | |
| private InterfaceHttpData createJSONAttribute(HttpRequest request, HttpDataFactory factory) throws IOException { | |
| ByteBuf content = Unpooled.buffer(); | |
| ByteBufOutputStream bbos = new ByteBufOutputStream(content); | |
| bbos.writeBytes(jsonContent); | |
| return factory.createAttribute(request, "json", content.toString(CharsetUtil.UTF_8)); | |
| } | |
| private InterfaceHttpData createFileUpload(HttpRequest request, HttpDataFactory factory) throws IOException { | |
| File file = File.createTempFile("upload", ".txt"); | |
| file.deleteOnExit(); | |
| BufferedWriter bw = new BufferedWriter(new FileWriter(file)); | |
| bw.write("Example file to be posted"); | |
| bw.close(); | |
| FileUpload fileUpload = factory.createFileUpload(request, "file", file.getName(), "plain/text", "7bit", null, file.length()); | |
| fileUpload.setContent(file); | |
| return fileUpload; | |
| } | |
| } | 
  
    Sign up for free
    to join this conversation on GitHub.
    Already have an account?
    Sign in to comment
  
            
@breznik is it ok to license this under the ASL2 and so have us include it in netty ?