Skip to content

Instantly share code, notes, and snippets.

@ccsevers
Last active August 29, 2015 13:57
Show Gist options
  • Save ccsevers/9538254 to your computer and use it in GitHub Desktop.
Save ccsevers/9538254 to your computer and use it in GitHub Desktop.
UnpackedAvroSource passing test
class AvroUnpackedSourceTest extends Specification with ScalaCheck {
"Running M/R jobs on Avro data" should {
"work for unpacked sources" in {
val prop = Prop.forAll(MyAvroRecordGenerators.myAvroGen) { (record: MyAvroRecord) =>
var res: Double = 0.0
val tempFolder = new File(System.getProperty("java.io.tmpdir"))
val tempInput = new File(tempFolder, "input")
tempInput.mkdirs
val tempOutput = new File(tempFolder,"output")
tempOutput.mkdirs
val tempInputFile = new File(tempInput, "myrec.avro")
val datumWriter = new SpecificDatumWriter[MyAvroRecord](classOf[MyAvroRecord]);
val dataFileWriter = new DataFileWriter[MyAvroRecord](datumWriter)
dataFileWriter.create(record.getSchema(), tempInputFile)
dataFileWriter.append(record)
dataFileWriter.close()
val args = new Args(Map("input" -> List(tempInput.getPath), "output" -> List(tempOutput.getPath)))
val mode = {
val conf = new JobConf
// Set the polling to a lower value to speed up tests:
conf.set("jobclient.completion.poll.interval", "100")
conf.set("cascading.flow.job.pollinginterval", "5")
// Work around for local hadoop race
conf.set("mapred.local.dir", "/tmp/hadoop/%s/mapred/local".format(java.util.UUID.randomUUID))
Hdfs(true, conf)
}
val margs = Mode.putMode(mode, args)
val job = new SimpleUnpackedAvroJob(margs)
val flow = job.buildFlow
flow.complete
val validationFile = io.Source.fromFile(tempOutput.getPath + "/part-00000").getLines
res = validationFile.next.toDouble
res == record.getPrice * 1.5
}
prop must pass
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment