Last active
February 12, 2016 21:58
-
-
Save ldacosta/a00a967d6f6ab756beec to your computer and use it in GitHub Desktop.
I want to write a data writer that is generic for the Warehouse API's data
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* I want to write a data writer that is generic for the Warehouse API's data | |
* however, this won't compile as d.toDF needs some implicits that can only be found if | |
* (1) T is a case class (that is why T <: Product) and | |
* (2) T is defiend in the proper place (see https://issues.scala-lang.org/browse/SI-6649 or http://stackoverflow.com/questions/33704831/value-todf-is-not-a-member-of-org-apache-spark-rdd-rdd) | |
* | |
* I think step (1) is OK, but the typetags (step (2)) are not working. | |
*/ | |
case class GenericDataWriter[T <: Product](name: String, sqlC: SQLContext, stage: Stage, fmt: Format) extends Serializable with Logging with DataWriter[T] { | |
import sqlC.implicits._ | |
def write(d: RDD[T]): Try[RDD[T]] = { | |
val fName = s"${name}_${stage.toString}.parquet" | |
val allLocations = locations.getOrElse((Program, stage, fmt), Set.empty[WarehousePath]) | |
if (allLocations.isEmpty) { | |
fail(s"No locations specified to save '$name'") | |
} else { | |
fmt match { | |
case Parquet => | |
val allResults = | |
allLocations.map { physicalLocation => | |
physicalLocation match { | |
case l @ Local(dir) => | |
d.toDF().write.parquet(l.getFullFileName(fName)) | |
true | |
case s @ S3(bucketName) => | |
// to write on S3 we took the convoluted path to | |
// (1) write a local file | |
val fileName = s"/tmp/$fName" | |
d.toDF().write.parquet(fileName) | |
// (2) copy it into S3's bucket | |
val dstFileName = s.getFullFileName(fName) | |
sparkutils.S3.writeFileToS3(bucketName, localFileName = fileName, dstFileName) match { | |
case None => | |
logger.info(s"Writing to S3's bucket '${bucketName}', file '${dstFileName}' worked like a charm") | |
true | |
case Some(errMsg) => | |
logger.error(s"Writing to S3's bucket '${bucketName}' failed miserably ($errMsg)") | |
false | |
} | |
} | |
} | |
// let's report the final results | |
allResults.count(_ == true) match { | |
case 0 => | |
fail(s"No saving of '$name' was possible") | |
case n if n < allResults.size => | |
logger.warn(s"'$name' was saved for some configurations, but not for others") | |
Success(d) | |
case _ => | |
Success(d) | |
} | |
case f => fail(s"Save '$name' data on format $f has not been implemented") | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Luis, I'll take a better look later, but one thing is that case classes automatically get
Serializable
, you don't need to specify it.