Skip to content

Instantly share code, notes, and snippets.

@slorber
Created October 28, 2013 15:54
Show Gist options
  • Select an option

  • Save slorber/7199403 to your computer and use it in GitHub Desktop.

Select an option

Save slorber/7199403 to your computer and use it in GitHub Desktop.
Not an "ideal" solution because one thread is feeding, while another one is consuming the data fed by the 1st one. But this works fine for me and reduces a lot the memory footprint.
package com.xxx.xxx.http;
import com.ning.http.client.RequestBuilder;
import com.ning.http.client.providers.grizzly.FeedableBodyGenerator;
import com.ning.http.client.providers.grizzly.FeedableBodyGenerator.Feeder;
import com.ning.http.client.providers.grizzly.FeedableBodyGenerator.SimpleFeeder;
import com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProviderConfig.Property;
import com.ning.http.multipart.MultipartEncodingUtil;
import com.ning.http.multipart.MultipartRequestEntity;
import com.ning.http.multipart.Part;
import org.glassfish.grizzly.Buffer;
import org.glassfish.grizzly.memory.HeapBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
/**
* <p>
* An utility class which permits to easily send multipart requests through a FeedableBodyGenerator.
* Using a BodyGenerator permits to reduce the memory footprint if the multipart request is big.
* </p>
*
* <p>
* In the case of a file upload multipart request, the whole file won't be loaded into memory but
* be sent chunk by chunk to the BodyGenerator.
* </p>
*
* <p>
* Note that to guarantee a low memory footprint, Grizzly must be configured so that it doesn't allow an unlimited number of bytes to be "pending".
* OK: <code>clientTransport.getAsyncQueueIO().getWriter().setMaxPendingBytesPerConnection(100000);</code>
* NOK: <code>clientTransport.getAsyncQueueIO().getWriter().setMaxPendingBytesPerConnection(-2);</code>
* This can be customized through {@link Property#TRANSPORT_CUSTOMIZER}
* </p>
*
* <p>
* The {@link #bufferSize} represents the size of the multiple data chunks {@link Buffer} that will be sent to the {@link FeedableBodyGenerator}.
* </p>
*
* <p>
* One must provide the {@link #multipartBoundary} bytes to use. This boundary must be the same than the boundary bytes set in the Content-Type
* header of the request. Note that a boundary can be generated with {@link MultipartRequestEntity#generateMultipartBoundary()}.
* The easiest way to set the same {@link #multipartBoundary} in both the header and body is probably to use the
* {@link #create(RequestBuilder, int)} method which will prepare the request and set the body generator for you.
* </p>
*
* <p>
* Here you fill find a basic usage of this class:
* <code>
* RequestBuilder requestBuilder = new RequestBuilder("POST").setUrl("http://myUrl/multipartUploadEndpoint");
*
* MultipartBodyGeneratorFeeder bodyFeeder = MultipartBodyGeneratorFeeder.create(requestBuilder,100_000);
* bodyFeeder.addBodyPart(new StringPart("param1", "x"))
* .addBodyPart(new StringPart("param2", "y"))
* .addBodyPart(new StringPart("param3", "z"))
* .addBodyPart(new FilePart("file", inputStream))
*
* ListenableFuture<Response> asyncRes = asyncHttpClient
* .prepareRequest(requestBuilder.build())
* .execute(new AsyncCompletionHandlerBase());
*
* Response uploadResponse = asyncRes.get();
* </code>
* </p>
*
* @author Lorber Sebastien <i>([email protected])</i>
*/
public class MultipartBodyGeneratorFeeder extends SimpleFeeder {
private static final Logger LOGGER = LoggerFactory.getLogger(MultipartBodyGeneratorFeeder.class);
private static final Buffer EMPTY_BUFFER = HeapBuffer.wrap(new byte[0]);
private final List<Part> parts = new ArrayList<>();
private final FeedableBodyGenerator feedableBodyGenerator;
private final int bufferSize;
private final byte[] multipartBoundary;
public MultipartBodyGeneratorFeeder(FeedableBodyGenerator feedableBodyGenerator,byte[] multipartBoundary,int bufferSize) {
super(feedableBodyGenerator);
if ( feedableBodyGenerator == null ) {
throw new IllegalArgumentException("The feedableBodyGenerator is required");
}
if ( multipartBoundary == null || multipartBoundary.length == 0 ) {
throw new IllegalArgumentException("The multipartBoundary is required and must also be set as a ContentType header of your request");
}
if ( bufferSize < 1 ) {
throw new IllegalArgumentException("The bufferSize can't be < 1");
}
this.feedableBodyGenerator = feedableBodyGenerator;
this.feedableBodyGenerator.setFeeder(this);
this.multipartBoundary = multipartBoundary;
this.bufferSize = bufferSize;
}
@Override
public void flush() {
LOGGER.debug("Will start the feeding of the FeedableBodyGenerator with feeder {}",this);
startFeeding();
}
/**
* Static factory method to create a MultipartBodyGeneratorFeeder for a given request.
* This will set the multipart header and the feedableBodyGenerator (this is why we need the {@link RequestBuilder}),
* and use the same multipart boundary in the request header and body
*
* @param requestBuilder
* @param bufferSize
* @return
*/
public static MultipartBodyGeneratorFeeder create(RequestBuilder requestBuilder,int bufferSize) {
FeedableBodyGenerator feedableBodyGenerator = new FeedableBodyGenerator();
// We generate a common multipart boundary that will be used in the request header and the multipart body
byte[] boundary = MultipartRequestEntity.generateMultipartBoundary();
String boundaryString = MultipartEncodingUtil.getAsciiString(boundary);
String contentTypeHeader = "multipart/form-data; boundary="+boundaryString;
requestBuilder.addHeader("Content-Type",contentTypeHeader);
requestBuilder.setBody(feedableBodyGenerator);
return new MultipartBodyGeneratorFeeder(feedableBodyGenerator,boundary,bufferSize);
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
/**
* Add a multipart {@link Part} to this feeder
* @param part
* @return
*/
public MultipartBodyGeneratorFeeder addBodyPart(Part part) {
parts.add(part);
return this;
}
/**
* Feeds the {@link FeedableBodyGenerator}.
*
* This is where the real feeding happen, and this may block the calling thread for some time
* according to the amount of data to be sent.
*
* @throws IOException
*/
private void startFeeding() {
Part[] partsArray = parts.toArray(new Part[parts.size()]);
try ( OutputStream outputStream = createFeedingOutputStream() ) {
Part.sendParts(outputStream,partsArray,multipartBoundary);
} catch (Exception e) {
throw new IllegalStateException("Unable to feed the FeedableBodyGenerator",e);
}
}
/**
* Creates an OutputStream that feeds the {@link FeedableBodyGenerator} when it receives data
*
* @return
*/
protected OutputStream createFeedingOutputStream() {
return new BufferedOutputStream(new FeedBodyGeneratorOutputStream(),bufferSize);
}
/**
* This is an {@link OutputStream} implementation that transform every single call to one of the write method,
* to a {@link Buffer} that is sent to the {@link Feeder#feed(org.glassfish.grizzly.Buffer, boolean)}
* method of the underlying {@link FeedableBodyGenerator}.
*
* As every call to a write method creates a new Buffer (even when you write a single byte), it is very recommended to wrap this
* {@link OutputStream} by a {@link BufferedOutputStream} to guarantee that the bytes will be sent by chunks to the {@link FeedableBodyGenerator}
*/
public class FeedBodyGeneratorOutputStream extends OutputStream {
@Override
public void write(int b) throws IOException {
// Not efficient: this is why it's better to always use a BufferedOutputStream wrapper with this class
byte[] arr = new byte[]{(byte)b};
write(arr);
}
@Override
public void write(byte b[]) throws IOException {
write(b,0,b.length);
}
@Override
public void write(byte b[], int off, int len) throws IOException {
checkParamConsistency(b,off,len);
if (len == 0) {
return;
}
byte[] bytesToWrite = new byte[len];
System.arraycopy(b,off,bytesToWrite,0,len);
Buffer buffer = HeapBuffer.wrap(bytesToWrite);
feed(buffer, false);
}
@Override
public void close() throws IOException {
sendIsLastSignal();
}
protected void sendIsLastSignal() throws IOException {
feed(EMPTY_BUFFER, true);
}
// Content copied from the OutputStream class
private void checkParamConsistency(byte b[], int off, int len) throws IOException {
if (b == null) {
throw new NullPointerException();
} else if ((off < 0) || (off > b.length) || (len < 0) ||
((off + len) > b.length) || ((off + len) < 0)) {
throw new IndexOutOfBoundsException();
}
}
}
}
String serviceUrl = "http://xxx.ws";
String param = "xxx";
int bufferSize = 10_000; // not sure this size really has a big impact, just make sure it's not very low... default is fine
File file = new File("/tmp/xxx.pdf");
RequestBuilder requestBuilder = new RequestBuilder("POST").setUrl(serviceUrl);
MultipartBodyGeneratorFeeder feeder = MultipartBodyGeneratorFeeder.create(requestBuilder,bufferSize);
feeder.addBodyPart(new StringPart("param", param));
feeder.addBodyPart(new FilePart("file",new FilePartSource(file)));
Request request = requestBuilder.build();
asyncHttpClient.executeRequest(request)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment