Created
October 23, 2020 22:40
-
-
Save blagerweij/ad1dbb7ee2fff8bcffd372815ad310eb to your computer and use it in GitHub Desktop.
OutputStream which wraps S3Client, with support for streaming large files directly to S3
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.ByteArrayInputStream; | |
import java.io.OutputStream; | |
import java.util.ArrayList; | |
import java.util.List; | |
import com.amazonaws.services.s3.AmazonS3; | |
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest; | |
import com.amazonaws.services.s3.model.CannedAccessControlList; | |
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; | |
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; | |
import com.amazonaws.services.s3.model.InitiateMultipartUploadResult; | |
import com.amazonaws.services.s3.model.ObjectMetadata; | |
import com.amazonaws.services.s3.model.PartETag; | |
import com.amazonaws.services.s3.model.PutObjectRequest; | |
import com.amazonaws.services.s3.model.UploadPartRequest; | |
import com.amazonaws.services.s3.model.UploadPartResult; | |
public class S3OutputStream extends OutputStream { | |
/** Default chunk size is 10MB */ | |
protected static final int BUFFER_SIZE = 10000000; | |
/** The bucket-name on Amazon S3 */ | |
private final String bucket; | |
/** The path (key) name within the bucket */ | |
private final String path; | |
/** The temporary buffer used for storing the chunks */ | |
private final byte[] buf; | |
/** The position in the buffer */ | |
private int position; | |
/** Amazon S3 client. TODO: support KMS */ | |
private final AmazonS3 s3Client; | |
/** The unique id for this upload */ | |
private String uploadId; | |
/** Collection of the etags for the parts that have been uploaded */ | |
private final List<PartETag> etags; | |
/** indicates whether the stream is still open / valid */ | |
private boolean open; | |
/** | |
* Creates a new S3 OutputStream | |
* @param s3Client the AmazonS3 client | |
* @param bucket name of the bucket | |
* @param path path within the bucket | |
*/ | |
public S3OutputStream(AmazonS3 s3Client, String bucket, String path) { | |
this.s3Client = s3Client; | |
this.bucket = bucket; | |
this.path = path; | |
this.buf = new byte[BUFFER_SIZE]; | |
this.position = 0; | |
this.etags = new ArrayList<>(); | |
this.open = true; | |
} | |
/** | |
* Write an array to the S3 output stream. | |
* | |
* @param b the byte-array to append | |
*/ | |
@Override | |
public void write(byte[] b) { | |
write(b,0,b.length); | |
} | |
/** | |
* Writes an array to the S3 Output Stream | |
* | |
* @param byteArray the array to write | |
* @param o the offset into the array | |
* @param l the number of bytes to write | |
*/ | |
@Override | |
public void write(final byte[] byteArray, final int o, final int l) { | |
this.assertOpen(); | |
int ofs = o, len = l; | |
int size; | |
while (len > (size = this.buf.length - position)) { | |
System.arraycopy(byteArray, ofs, this.buf, this.position, size); | |
this.position += size; | |
flushBufferAndRewind(); | |
ofs += size; | |
len -= size; | |
} | |
System.arraycopy(byteArray, ofs, this.buf, this.position, len); | |
this.position += len; | |
} | |
/** | |
* Flushes the buffer by uploading a part to S3. | |
*/ | |
@Override | |
public synchronized void flush() { | |
this.assertOpen(); | |
} | |
protected void flushBufferAndRewind() { | |
if (uploadId == null) { | |
final InitiateMultipartUploadRequest request = new InitiateMultipartUploadRequest(this.bucket, this.path) | |
.withCannedACL(CannedAccessControlList.BucketOwnerFullControl); | |
InitiateMultipartUploadResult initResponse = s3Client.initiateMultipartUpload(request); | |
this.uploadId = initResponse.getUploadId(); | |
} | |
uploadPart(); | |
this.position = 0; | |
} | |
protected void uploadPart() { | |
UploadPartResult uploadResult = this.s3Client.uploadPart(new UploadPartRequest() | |
.withBucketName(this.bucket) | |
.withKey(this.path) | |
.withUploadId(this.uploadId) | |
.withInputStream(new ByteArrayInputStream(buf,0,this.position)) | |
.withPartNumber(this.etags.size() + 1) | |
.withPartSize(this.position)); | |
this.etags.add(uploadResult.getPartETag()); | |
} | |
@Override | |
public void close() { | |
if (this.open) { | |
this.open = false; | |
if (this.uploadId != null) { | |
if (this.position > 0) { | |
uploadPart(); | |
} | |
this.s3Client.completeMultipartUpload(new CompleteMultipartUploadRequest(bucket, path, uploadId, etags)); | |
} | |
else { | |
final ObjectMetadata metadata = new ObjectMetadata(); | |
metadata.setContentLength(this.position); | |
final PutObjectRequest request = new PutObjectRequest(this.bucket, this.path, new ByteArrayInputStream(this.buf, 0, this.position), metadata) | |
.withCannedAcl(CannedAccessControlList.BucketOwnerFullControl); | |
this.s3Client.putObject(request); | |
} | |
} | |
} | |
public void cancel() { | |
this.open = false; | |
if (this.uploadId != null) { | |
this.s3Client.abortMultipartUpload(new AbortMultipartUploadRequest(this.bucket, this.path, this.uploadId)); | |
} | |
} | |
@Override | |
public void write(int b) { | |
this.assertOpen(); | |
if (position >= this.buf.length) { | |
flushBufferAndRewind(); | |
} | |
this.buf[position++] = (byte)b; | |
} | |
private void assertOpen() { | |
if (!this.open) { | |
throw new IllegalStateException("Closed"); | |
} | |
} | |
} |
Unable to use this class.
We have created an inputstream from fileInputStream and also output stream using this class.
But when I ran the code the file is not uploaded in S3.PFB for code snippet used:
import static org.jsoup.Jsoup.parse;
_```
public class LocalToS3 {
static TransferManager tm;
static AmazonS3 s3Client;
static String bucketName;
static String filePath;
static S3Utility s3Utility = new S3Utility();public static void main(String[] args) throws Exception { HashMap<String, Object> s3Creds = JSONUtils.jsonToMap(FileUtils.readFileToString(new File(args[0]), Charset.defaultCharset())); s3Client = s3Utility.configureS3Client(s3Creds); bucketName = s3Creds.get("bucket").toString(); filePath = s3Creds.get("dest_path").toString(); InputStream inputStream = new FileInputStream(new File("C:\\workarea\\Test\\S3Utility\\S3Utility\\wikipathways-20210310-gpml-Anopheles_gambiae.zip")); OutputStream outputStream = new S3OutputStream(s3Client, bucketName, "data/unstructured/rsa/Anopheles_gambiae.zip"); IOUtils.copy(inputStream, outputStream); LocalToS3 localToS3 = new LocalToS3(); }
}
Please help me how to use this?
Your code is fine. You just need to close the output stream
I've updated this gist for anyone using the v2 of the AWS Java SDK: https://gist.github.com/jcputney/b5daeb86a1c0696859da2a0c3b466327
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Unable to use this class.
We have created an inputstream from fileInputStream and also output stream using this class.
But when I ran the code the file is not uploaded in S3.
PFB for code snippet used:
import static org.jsoup.Jsoup.parse;
_```
public class LocalToS3 {
static TransferManager tm;
static AmazonS3 s3Client;
static String bucketName;
static String filePath;
static S3Utility s3Utility = new S3Utility();
}