Last active
December 24, 2015 09:59
-
-
Save mauricio/6781029 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import com.amazonaws.services.s3.AmazonS3Client | |
| import com.amazonaws.auth.BasicAWSCredentials | |
| import com.amazonaws.services.s3.model.{GetObjectRequest, S3ObjectSummary, ListObjectsRequest} | |
| import scala.collection.JavaConversions._ | |
| import java.util.concurrent.Executors | |
| import java.io.File | |
| import java.util.concurrent.atomic.AtomicInteger | |
| object Downloader { | |
| val credential = new BasicAWSCredentials("key", "secret" ) | |
| val client = new AmazonS3Client(credential) | |
| val counter = new AtomicInteger() | |
| def main(args : Array[String]) { | |
| val bucket = "bucket" | |
| val executorService = Executors.newFixedThreadPool(5) | |
| var objects = client.listObjects(bucket) | |
| var continue = true | |
| while ( continue ) { | |
| println(s"received ${objects.getObjectSummaries.size()} objects to download - ${objects.getNextMarker}") | |
| for( item <- objects.getObjectSummaries ) { | |
| executorService.submit(new Downloader(item, bucket, client)) | |
| } | |
| if ( objects.getNextMarker != null ) { | |
| val listRequest = new ListObjectsRequest() | |
| .withBucketName(bucket) | |
| .withMarker(objects.getNextMarker) | |
| objects = client.listObjects(listRequest) | |
| } else { | |
| continue = false | |
| } | |
| } | |
| executorService.shutdown() | |
| while ( !executorService.isTerminated ) { | |
| println(s"sleeping for 5 - ${counter.get()} items processed") | |
| Thread.sleep(5000) | |
| } | |
| } | |
| } | |
| class Downloader( summary : S3ObjectSummary, bucket : String, client : AmazonS3Client ) extends Runnable { | |
| import Downloader.counter | |
| def run() { | |
| try { | |
| val file = new File(s"downloads/${summary.getKey}") | |
| if ( !file.exists() ) { | |
| val request = new GetObjectRequest(bucket, summary.getKey) | |
| client.getObject(request, file) | |
| } | |
| val value = counter.incrementAndGet() | |
| if ( (value % 1000) == 0 ) { | |
| println(s"processed ${value} files") | |
| } | |
| } catch { | |
| case e : Exception => { | |
| println(s"Failed to download file ${summary.getKey}") | |
| e.printStackTrace() | |
| } | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment