Last active
November 7, 2020 14:34
-
-
Save afsalthaj/c0f432d324391121dd8367dff99bb9e8 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cats.effect.{ IO, Resource } | |
import com.amazonaws.services.s3.AmazonS3 | |
import com.amazonaws.services.s3.model.ListObjectsV2Request | |
import org.apache.commons.lang.StringUtils | |
def getKeys(client: AmazonS3, path: S3Path): IO[List[String]] = IO { | |
client | |
.listObjectsV2( | |
new ListObjectsV2Request().withBucketName(path.parentBucket).withPrefix(path.prefix) | |
) | |
.getObjectSummaries | |
.asScala | |
.toList | |
.map(_.getKey) | |
.map(getFileName) | |
} | |
// Includes renaming, writing to some other file within the context of Resource | |
def downloadAllFiles( | |
s3Client: AmazonS3, | |
source: S3Path, | |
dataFileNaming: Naming[TargetFile], | |
featureDetails: FeatureTablePath, | |
runDate: ZonedDateTime | |
): IO[Resources[TemporaryLocalFile]] = | |
getKeys(s3Client, source) | |
.flatTap( | |
list => | |
IO.raiseError( | |
new RuntimeException(s"Zero files in the s3 path ${source.asString}") | |
) | |
.whenA(list.isEmpty) | |
) | |
.map(fileNames => { | |
fileNames.map( | |
fileName => | |
downloadFile(s3Client, source, fileName) | |
.evalMap( | |
localFile => | |
for { | |
r <- IO.fromEither( | |
dataFileNaming | |
.getName(Naming.Input(featureDetails, runDate, fileName), _.toTargetFileName) | |
.leftMap(_.toThrowable("Failed to generate a file name.")) | |
) | |
data <- IO.pure(Data(localFile, r.name, FileType.DataFile)) | |
} yield data | |
) | |
) | |
}) | |
.map(r => Resources(r)) | |
def downloadFile( | |
amazonS3: AmazonS3, | |
source: S3Path, | |
fileName: String | |
): Resource[IO, TemporaryLocalFile] = { | |
val (bucket, key) = (source.asString, fileName) | |
Resource.fromAutoCloseable( | |
IO(amazonS3.getObject(source.asString, key)) | |
.withMessageOnError(s"Failed to get ${key} from ${source.asString}") | |
) flatMap { s3Obj => | |
TemporaryLocalFile | |
.createResource("S3_DOWNLOADED", Some(Prefix.mk(key)), None) | |
.evalMap(r => r.writeFrom(s3Obj).map(_ => r)) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment