Skip to content

Instantly share code, notes, and snippets.

@ldacosta
Created July 21, 2015 17:58
Show Gist options
  • Save ldacosta/5616c1da57bcc6dd60aa to your computer and use it in GitHub Desktop.
Save ldacosta/5616c1da57bcc6dd60aa to your computer and use it in GitHub Desktop.
private def templateForATest(dir: String, sql: SQLContext) = {
// whatever structure we want to test
val good = CleanData(reportDate = Timestamp.valueOf("2015-08-01 19:20:21"), // yyyy-[m]m-[d]d hh:mm:ss
date = Timestamp.valueOf("2015-08-01 19:20:21"),
impressions = 10,
clicks = 20,
totalConversions = 33,
impressionsConvergenceProbability = Some(0.99),
clicksConvergenceProbability = Some(0.99),
totalConversionsConvergenceProbability = Some(0.99))
val l = List(good)
// let's make it an RDD
val rdd: RDD[CleanData] = sc.parallelize(l)
// where do I save it?
val fileNameAsString = s"/tmp/afile.parquet" // s"${dir}/afile.parquet"
HDFS.rm(fileNameAsString)
// blahblah :-)
info(s"Generating Parquet file here: ${fileNameAsString}. Should contain ${rdd.count()} elements")
// OK, SAVE!!!
val df = sql.createDataFrame(rdd)
df.saveAsParquetFile(fileNameAsString)
// NOW, READ IT:
val dfP = DataFrameParquet(sql.parquetFile(fileNameAsString))
val wrappedRDD = {
DataFrameReader.toRDD[CleanData](dfP.unwrap) match {
case Success(anRDD) => anRDD
case Failure(fs) =>
sys.error(s"FAILURE: ${fs.shows}") // sc.emptyRDD[CleanData]
}
}
if (wrappedRDD.isEmpty()) info("EEEEEEEMPTY!!") else info(s"This puppy has ${wrappedRDD.count()} elements")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment