Skip to content

Instantly share code, notes, and snippets.

@jcputney
Forked from blagerweij/S3OutputStream.java
Last active September 24, 2025 14:04
Show Gist options
  • Save jcputney/b5daeb86a1c0696859da2a0c3b466327 to your computer and use it in GitHub Desktop.
Save jcputney/b5daeb86a1c0696859da2a0c3b466327 to your computer and use it in GitHub Desktop.
OutputStream which wraps AWS Java SDK v2 S3Client, with support for streaming large files directly to S3
import java.io.ByteArrayInputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
import software.amazon.awssdk.services.s3.model.CompletedPart;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
public class S3OutputStream extends OutputStream {
/**
* Default chunk size is 10MB
*/
protected static final int BUFFER_SIZE = 10000000;
/**
* The bucket-name on Amazon S3
*/
private final String bucket;
/**
* The path (key) name within the bucket
*/
private final String path;
/**
* The temporary buffer used for storing the chunks
*/
private final byte[] buf;
private final S3Client s3Client;
/**
* Collection of the etags for the parts that have been uploaded
*/
private final List<String> etags;
/**
* The position in the buffer
*/
private int position;
/**
* The unique id for this upload
*/
private String uploadId;
/**
* indicates whether the stream is still open / valid
*/
private boolean open;
/**
* Creates a new S3 OutputStream
*
* @param s3Client the AmazonS3 client
* @param bucket name of the bucket
* @param path path within the bucket
*/
public S3OutputStream(S3Client s3Client, String bucket, String path) {
this.s3Client = s3Client;
this.bucket = bucket;
this.path = path;
buf = new byte[BUFFER_SIZE];
position = 0;
etags = new ArrayList<>();
open = true;
}
public void cancel() {
open = false;
if (uploadId != null) {
s3Client.abortMultipartUpload(AbortMultipartUploadRequest.builder()
.bucket(bucket)
.key(path)
.uploadId(uploadId)
.build());
}
}
@Override
public void write(int b) {
assertOpen();
if (position >= buf.length) {
flushBufferAndRewind();
}
buf[position++] = (byte) b;
}
/**
* Write an array to the S3 output stream.
*
* @param b the byte-array to append
*/
@Override
public void write(byte[] b) {
write(b, 0, b.length);
}
/**
* Writes an array to the S3 Output Stream
*
* @param byteArray the array to write
* @param o the offset into the array
* @param l the number of bytes to write
*/
@Override
public void write(byte[] byteArray, int o, int l) {
assertOpen();
int ofs = o;
int len = l;
int size;
while (len > (size = buf.length - position)) {
System.arraycopy(byteArray, ofs, buf, position, size);
position += size;
flushBufferAndRewind();
ofs += size;
len -= size;
}
System.arraycopy(byteArray, ofs, buf, position, len);
position += len;
}
/**
* Flushes the buffer by uploading a part to S3.
*/
@Override
public synchronized void flush() {
assertOpen();
}
@Override
public void close() {
if (open) {
open = false;
if (uploadId != null) {
if (position > 0) {
uploadPart();
}
CompletedPart[] completedParts = new CompletedPart[etags.size()];
for (int i = 0; i < etags.size(); i++) {
completedParts[i] = CompletedPart.builder()
.eTag(etags.get(i))
.partNumber(i + 1)
.build();
}
CompletedMultipartUpload completedMultipartUpload = CompletedMultipartUpload.builder()
.parts(completedParts)
.build();
CompleteMultipartUploadRequest completeMultipartUploadRequest = CompleteMultipartUploadRequest.builder()
.bucket(bucket)
.key(path)
.uploadId(uploadId)
.multipartUpload(completedMultipartUpload)
.build();
s3Client.completeMultipartUpload(completeMultipartUploadRequest);
} else {
PutObjectRequest putRequest = PutObjectRequest.builder()
.bucket(bucket)
.key(path)
.contentLength((long) position)
.build();
RequestBody requestBody = RequestBody.fromInputStream(new ByteArrayInputStream(buf, 0, position),
position);
s3Client.putObject(putRequest, requestBody);
}
}
}
private void assertOpen() {
if (!open) {
throw new IllegalStateException("Closed");
}
}
protected void flushBufferAndRewind() {
if (uploadId == null) {
CreateMultipartUploadRequest uploadRequest = CreateMultipartUploadRequest.builder()
.bucket(bucket)
.key(path)
.build();
CreateMultipartUploadResponse multipartUpload = s3Client.createMultipartUpload(uploadRequest);
uploadId = multipartUpload.uploadId();
}
uploadPart();
position = 0;
}
protected void uploadPart() {
UploadPartRequest uploadRequest = UploadPartRequest.builder()
.bucket(bucket)
.key(path)
.uploadId(uploadId)
.partNumber(etags.size() + 1)
.contentLength((long) position)
.build();
RequestBody requestBody = RequestBody.fromInputStream(new ByteArrayInputStream(buf, 0, position),
position);
UploadPartResponse uploadPartResponse = s3Client.uploadPart(uploadRequest, requestBody);
etags.add(uploadPartResponse.eTag());
}
}
@hamzahanafi11
Copy link

Great work, thanks for this implementation.
I tested in my project and it is working perfectly

@headF1rst
Copy link

Hi, thanks for sharing this implementation! I'm curious—was there a particular reason you chose to implement your own OutputStream with the synchronous S3Client instead of using the AWS S3 Transfer Manager, which offers features like parallel uploads and automatic retries? I'd love to hear what advantages or considerations led you to this approach.

@hamzahanafi11
Copy link

Hi, @headF1rst, I just needed a straightforward way to stream data into S3 without pulling in the Transfer Manager abstractions. Implementing OutputStream allowed me to plug it directly into existing code that expects a java.io.OutputStream

@jcputney
Copy link
Author

@headF1rst similar need here. I had an abstraction layer for writing bytes using an OutputStream, and didn't want to rework that code to work directly with the S3 API, just wanted to be able to continue passing the stream.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment