Skip to content

Instantly share code, notes, and snippets.

@atamborrino
Last active August 29, 2015 14:14
Show Gist options
  • Save atamborrino/7f3934d7d9a1aa1b4ca2 to your computer and use it in GitHub Desktop.
Save atamborrino/7f3934d7d9a1aa1b4ca2 to your computer and use it in GitHub Desktop.
/**
* Streamed upload of an akka stream to S3
* @param bucket the bucket name
* @param key the key of file
* @param source a source of array of bytes
* @return a successful future of the uploaded number of chunks (or a failure)
*/
def uploadStream(bucket: String, key: String, source: Source[Array[Byte]], parallelism: Int = 1)(implicit fm: FlowMaterializer): Future[Int] = {
import scala.collection.JavaConversions._
import client.executionContext
def makeUploader(uploadId: String) = {
MFGFlow
.zipWithIndex[Array[Byte]]
.via(
MFGFlow.mapAsyncUnorderedWithBoundedParallelism(parallelism) { case (bytes, partNumber) => // //[(Int,Array[Byte]),UploadPartResult]
val uploadRequest = new UploadPartRequest()
.withBucketName(bucket)
.withKey(key)
.withPartNumber(partNumber + 1)
.withUploadId(uploadId)
.withInputStream(new ByteArrayInputStream(bytes))
.withPartSize(bytes.length)
client.uploadPart(uploadRequest)
})
}
client.initiateMultipartUpload(new InitiateMultipartUploadRequest(bucket, key)) flatMap { initResponse =>
val uploadId = initResponse.getUploadId
val etagsFut: Future[Vector[PartETag]] =
source
.via(MFGFlow.rechunkArray[Byte](5 * 1024 * 1024))
.via(makeUploader(uploadId))
.runWith(Sink.fold(Vector.empty[PartETag])(_ :+ _.getPartETag))
etagsFut.flatMap { etags =>
val compRequest = new CompleteMultipartUploadRequest(bucket, key, uploadId, etags.toBuffer[PartETag])
client.completeMultipartUpload(compRequest) map { _ =>
etags.length
} recoverWith { case e: Exception =>
client.abortMultipartUpload(new AbortMultipartUploadRequest(bucket, key, uploadId))
Future.failed(e)
}
}
}
}
@atamborrino
Copy link
Author

Current impl not working...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment