-
-
Save snowindy/d438cb5256f9331f5eec to your computer and use it in GitHub Desktop.
val s3Paths = "s3://yourbucket/path/to/file1.txt,s3://yourbucket/path/to/directory" | |
val pageLength = 100 | |
val key = "YOURKEY" | |
val secret = "YOUR_SECRET" | |
import com.amazonaws.services.s3._, model._ | |
import com.amazonaws.auth.BasicAWSCredentials | |
import com.amazonaws.services.s3.model.ObjectListing | |
import scala.collection.JavaConverters._ | |
import scala.io.Source | |
import java.io.InputStream | |
import org.apache.spark.rdd.RDD | |
def s3 = new AmazonS3Client(new BasicAWSCredentials(key, secret)) | |
var inputLinesRDD_raw:RDD[String] = sc.emptyRDD[String] | |
s3Paths.split(",").foreach{ s3Path => | |
val regex = """(?i)^s3://([^/]+)/(.*)""".r | |
val bucket = regex.findFirstMatchIn(s3Path).map(_ group 1).getOrElse(null) | |
val prefix = regex.findFirstMatchIn(s3Path).map(_ group 2).getOrElse(null) | |
println("Processing s3 resource: bucket '%s', prefix '%s'".format(bucket, prefix)) | |
@transient val request = new ListObjectsRequest() | |
request.setBucketName(bucket) | |
request.setPrefix(prefix) | |
request.setMaxKeys(pageLength) | |
@transient var listing = s3.listObjects(request) | |
var proceed = true | |
while (proceed){ | |
if (listing.getObjectSummaries.isEmpty){ | |
proceed = false | |
}else{ | |
@transient val s3FileKeys = listing.getObjectSummaries.asScala.map(_.getKey).toList | |
val inputLines = sc.parallelize(s3FileKeys).flatMap { key => Source.fromInputStream(s3.getObject(bucket, key).getObjectContent: InputStream).getLines } | |
inputLinesRDD_raw = inputLinesRDD_raw.union(inputLines) | |
listing = s3.listNextBatchOfObjects(listing) | |
} | |
} | |
} | |
// TODO do something with inputLinesRDD_raw |
I have the same error in Spark 2.0
Same - @snowindy are you on a funny version of the aws sdk?
FYI @wojtekk and @dineshjlabs and anyone else that has this problem - I managed to fix this by wrapping AmazonS3Client in a new class like so (I'm using groovy for this project)
import com.amazonaws.auth.BasicAWSCredentials
import com.amazonaws.regions.Regions
import com.amazonaws.services.s3.AmazonS3Client
class SerializableAmazonS3Client extends AmazonS3Client implements Serializable {
public SerializableAmazonS3Client(BasicAWSCredentials credentials) {
super(credentials)
// I'm also setting my region here because it's convenient
super.setRegion(Regions.EU_WEST_1)
}
}
I haven't got time to test it, but I guess the scala would just be
class SerializableAmazonS3Client(creds: BasicAWSCredentials) extends AmazonS3Client(creds) with Serializable
@dineshjlabs @wojtekk and fyi for anyone who drops by I noticed that I had mistakenly changed
def s3 = new AmazonS3Client(new BasicAWSCredentials(key, secret))
to
val s3 = new AmazonS3Client(new BasicAWSCredentials(key, secret))
At least this is what causes the serialization exception for me.
is there a python alternative for this?
@snowindy @hamishdickson If my Input data is ORC and if am just interested in fetching out say 20 columns out of 120 columns present in the files (columnar data access) , how does this work ?
Would love to see the python port
Below is the code I used to UNLOAD data from Redshift to S3, then read S3 into Spark. You could refer to Line 62~72,
https://github.com/skinheadbob/trinity/blob/master/aws/rsutil.py
The idea is the same: parallelize( s3_keys ).flatMap( lambda: read_lines(s3_key) ). Hope it helps.
FYI @wojtekk and @dineshjlabs and anyone else that has this problem - I managed to fix this by wrapping AmazonS3Client in a new class like so (I'm using groovy for this project)
import com.amazonaws.auth.BasicAWSCredentials import com.amazonaws.regions.Regions import com.amazonaws.services.s3.AmazonS3Client class SerializableAmazonS3Client extends AmazonS3Client implements Serializable { public SerializableAmazonS3Client(BasicAWSCredentials credentials) { super(credentials) // I'm also setting my region here because it's convenient super.setRegion(Regions.EU_WEST_1) } }I haven't got time to test it, but I guess the scala would just be
class SerializableAmazonS3Client(creds: BasicAWSCredentials) extends AmazonS3Client(creds) with Serializable
This worked for me. Thanks @hamishdickson !
I am getting org.apache.spark.SparkException: Task not serializable
and
java.io.NotSerializableException: com.amazonaws.services.s3.AmazonS3Client