-
-
Save jcputney/b5daeb86a1c0696859da2a0c3b466327 to your computer and use it in GitHub Desktop.
import java.io.ByteArrayInputStream; | |
import java.io.OutputStream; | |
import java.util.ArrayList; | |
import java.util.List; | |
import software.amazon.awssdk.core.sync.RequestBody; | |
import software.amazon.awssdk.services.s3.S3Client; | |
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest; | |
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; | |
import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload; | |
import software.amazon.awssdk.services.s3.model.CompletedPart; | |
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest; | |
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse; | |
import software.amazon.awssdk.services.s3.model.PutObjectRequest; | |
import software.amazon.awssdk.services.s3.model.UploadPartRequest; | |
import software.amazon.awssdk.services.s3.model.UploadPartResponse; | |
public class S3OutputStream extends OutputStream { | |
/** | |
* Default chunk size is 10MB | |
*/ | |
protected static final int BUFFER_SIZE = 10000000; | |
/** | |
* The bucket-name on Amazon S3 | |
*/ | |
private final String bucket; | |
/** | |
* The path (key) name within the bucket | |
*/ | |
private final String path; | |
/** | |
* The temporary buffer used for storing the chunks | |
*/ | |
private final byte[] buf; | |
private final S3Client s3Client; | |
/** | |
* Collection of the etags for the parts that have been uploaded | |
*/ | |
private final List<String> etags; | |
/** | |
* The position in the buffer | |
*/ | |
private int position; | |
/** | |
* The unique id for this upload | |
*/ | |
private String uploadId; | |
/** | |
* indicates whether the stream is still open / valid | |
*/ | |
private boolean open; | |
/** | |
* Creates a new S3 OutputStream | |
* | |
* @param s3Client the AmazonS3 client | |
* @param bucket name of the bucket | |
* @param path path within the bucket | |
*/ | |
public S3OutputStream(S3Client s3Client, String bucket, String path) { | |
this.s3Client = s3Client; | |
this.bucket = bucket; | |
this.path = path; | |
buf = new byte[BUFFER_SIZE]; | |
position = 0; | |
etags = new ArrayList<>(); | |
open = true; | |
} | |
public void cancel() { | |
open = false; | |
if (uploadId != null) { | |
s3Client.abortMultipartUpload(AbortMultipartUploadRequest.builder() | |
.bucket(bucket) | |
.key(path) | |
.uploadId(uploadId) | |
.build()); | |
} | |
} | |
@Override | |
public void write(int b) { | |
assertOpen(); | |
if (position >= buf.length) { | |
flushBufferAndRewind(); | |
} | |
buf[position++] = (byte) b; | |
} | |
/** | |
* Write an array to the S3 output stream. | |
* | |
* @param b the byte-array to append | |
*/ | |
@Override | |
public void write(byte[] b) { | |
write(b, 0, b.length); | |
} | |
/** | |
* Writes an array to the S3 Output Stream | |
* | |
* @param byteArray the array to write | |
* @param o the offset into the array | |
* @param l the number of bytes to write | |
*/ | |
@Override | |
public void write(byte[] byteArray, int o, int l) { | |
assertOpen(); | |
int ofs = o; | |
int len = l; | |
int size; | |
while (len > (size = buf.length - position)) { | |
System.arraycopy(byteArray, ofs, buf, position, size); | |
position += size; | |
flushBufferAndRewind(); | |
ofs += size; | |
len -= size; | |
} | |
System.arraycopy(byteArray, ofs, buf, position, len); | |
position += len; | |
} | |
/** | |
* Flushes the buffer by uploading a part to S3. | |
*/ | |
@Override | |
public synchronized void flush() { | |
assertOpen(); | |
} | |
@Override | |
public void close() { | |
if (open) { | |
open = false; | |
if (uploadId != null) { | |
if (position > 0) { | |
uploadPart(); | |
} | |
CompletedPart[] completedParts = new CompletedPart[etags.size()]; | |
for (int i = 0; i < etags.size(); i++) { | |
completedParts[i] = CompletedPart.builder() | |
.eTag(etags.get(i)) | |
.partNumber(i + 1) | |
.build(); | |
} | |
CompletedMultipartUpload completedMultipartUpload = CompletedMultipartUpload.builder() | |
.parts(completedParts) | |
.build(); | |
CompleteMultipartUploadRequest completeMultipartUploadRequest = CompleteMultipartUploadRequest.builder() | |
.bucket(bucket) | |
.key(path) | |
.uploadId(uploadId) | |
.multipartUpload(completedMultipartUpload) | |
.build(); | |
s3Client.completeMultipartUpload(completeMultipartUploadRequest); | |
} else { | |
PutObjectRequest putRequest = PutObjectRequest.builder() | |
.bucket(bucket) | |
.key(path) | |
.contentLength((long) position) | |
.build(); | |
RequestBody requestBody = RequestBody.fromInputStream(new ByteArrayInputStream(buf, 0, position), | |
position); | |
s3Client.putObject(putRequest, requestBody); | |
} | |
} | |
} | |
private void assertOpen() { | |
if (!open) { | |
throw new IllegalStateException("Closed"); | |
} | |
} | |
protected void flushBufferAndRewind() { | |
if (uploadId == null) { | |
CreateMultipartUploadRequest uploadRequest = CreateMultipartUploadRequest.builder() | |
.bucket(bucket) | |
.key(path) | |
.build(); | |
CreateMultipartUploadResponse multipartUpload = s3Client.createMultipartUpload(uploadRequest); | |
uploadId = multipartUpload.uploadId(); | |
} | |
uploadPart(); | |
position = 0; | |
} | |
protected void uploadPart() { | |
UploadPartRequest uploadRequest = UploadPartRequest.builder() | |
.bucket(bucket) | |
.key(path) | |
.uploadId(uploadId) | |
.partNumber(etags.size() + 1) | |
.contentLength((long) position) | |
.build(); | |
RequestBody requestBody = RequestBody.fromInputStream(new ByteArrayInputStream(buf, 0, position), | |
position); | |
UploadPartResponse uploadPartResponse = s3Client.uploadPart(uploadRequest, requestBody); | |
etags.add(uploadPartResponse.eTag()); | |
} | |
} | |
Hi, thanks for sharing this implementation! I'm curious—was there a particular reason you chose to implement your own OutputStream with the synchronous S3Client instead of using the AWS S3 Transfer Manager, which offers features like parallel uploads and automatic retries? I'd love to hear what advantages or considerations led you to this approach.
Hi, @headF1rst, I just needed a straightforward way to stream data into S3 without pulling in the Transfer Manager abstractions. Implementing OutputStream allowed me to plug it directly into existing code that expects a java.io.OutputStream
@headF1rst similar need here. I had an abstraction layer for writing bytes using an OutputStream, and didn't want to rework that code to work directly with the S3 API, just wanted to be able to continue passing the stream.
Great work, thanks for this implementation.
I tested in my project and it is working perfectly