Skip to content

Instantly share code, notes, and snippets.

@ldacosta
Created April 2, 2015 14:45
Show Gist options
  • Save ldacosta/90aee2b9315b001a2c63 to your computer and use it in GitHub Desktop.
Save ldacosta/90aee2b9315b001a2c63 to your computer and use it in GitHub Desktop.
Diff
diff --git a/report-core/src/main/scala/mediative/util/wrapper/FileWrapper.scala b/report-core/src/main/scala/mediative/util/wrapper/FileWrapper.scala
new file mode 100644
index 0000000..acbdfd8
--- /dev/null
+++ b/report-core/src/main/scala/mediative/util/wrapper/FileWrapper.scala
@@ -0,0 +1,42 @@
+package mediative.util.wrapper
+
+import java.io
+
+import mediative.util.Logging
+
+/**
+ * Simply labels a Path as a File (as opposed to a Directory).
+ */
+trait FileWrapper {
+ def value: String
+}
+
+object FileWrapper extends Logging {
+ /**
+ * Wraps a file name into a FileWrapper. This will succeed if
+ * * file name does not point to a directory
+ * * file name points to an existing file
+ * * the file pointed at is readable
+ * @param maybeFile
+ * @return
+ */
+ def apply(maybeFile: String): Option[FileWrapper] = {
+
+ // if security manager is not present we can't check for attributes of the file,
+ // specifically for its readability. So we fail in that case:
+ // (cf. https://docs.oracle.com/javase/tutorial/essential/environment/security.html)
+ if (System.getSecurityManager == null) {
+ logger.error("Security Manager not present. File wrapping will fail in all cases.")
+ None
+ } else {
+ try {
+ if (new io.File(maybeFile).isFile) Some(FileWrapperImpl(value = maybeFile))
+ else None
+ } catch {
+ case e: SecurityException => None // in case java.io.File attributes can't be accessed
+ }
+ }
+ }
+ private case class FileWrapperImpl(value: String) extends FileWrapper
+}
+
diff --git a/report-core/src/main/scala/mediative/util/wrapper/String.scala b/report-core/src/main/scala/mediative/util/wrapper/String.scala
deleted file mode 100644
index 59d6404..0000000
--- a/report-core/src/main/scala/mediative/util/wrapper/String.scala
+++ /dev/null
@@ -1,10 +0,0 @@
-package mediative.util.wrapper
-
-trait String extends Wrapper[Predef.String] {
- def value: Predef.String
-}
-
-object String {
- // if I have a Wrapper[String] I want to be able to implicitly extract the value
- implicit def wrapper2String: String => Predef.String = ws => ws.value
-}
diff --git a/report-core/src/main/scala/mediative/util/wrapper/StringWrapper.scala b/report-core/src/main/scala/mediative/util/wrapper/StringWrapper.scala
new file mode 100644
index 0000000..eab3adb
--- /dev/null
+++ b/report-core/src/main/scala/mediative/util/wrapper/StringWrapper.scala
@@ -0,0 +1,10 @@
+package mediative.util.wrapper
+
+trait StringWrapper extends Wrapper[String] {
+ def value: String
+}
+
+object StringWrapper {
+ // if I have a Wrapper[String] I want to be able to implicitly extract the value
+ implicit def wrapper2String(ws: StringWrapper): String = ws.value
+}
diff --git a/report-core/src/test/scala/mediative/util/wrapper/FileWrapperTest.scala b/report-core/src/test/scala/mediative/util/wrapper/FileWrapperTest.scala
new file mode 100644
index 0000000..a19e0a2
--- /dev/null
+++ b/report-core/src/test/scala/mediative/util/wrapper/FileWrapperTest.scala
@@ -0,0 +1,73 @@
+package mediative.util.wrapper
+
+import org.scalacheck.Gen
+import org.scalatest.prop.GeneratorDrivenPropertyChecks
+import org.scalatest.{ Matchers, BeforeAndAfterAll, FreeSpec }
+import java.io
+
+class FileWrapperTest extends FreeSpec with BeforeAndAfterAll with Matchers with GeneratorDrivenPropertyChecks {
+
+ override def beforeAll() {
+
+ }
+
+ override def afterAll(): Unit = {
+
+ }
+
+ "A File should" - {
+ "be recognized when the wrapped String is" - {
+ "an existing file which we have access to" in {
+ val currDir = new io.File(".")
+ withClue(s"Current directory is not such a thing??") { assert(currDir.isDirectory) }
+ assert(currDir.listFiles().filter(_.isFile).forall(f => FileWrapper(f.getAbsolutePath).isDefined))
+ }
+ }
+ "be rejected when the wrapped String" - {
+ "is empty" in {
+ assert(FileWrapper("").isEmpty)
+ }
+ "represents a non-existent file, when" - {
+ "no directory is specified" in {
+ forAll(Gen.alphaStr) { aStr =>
+ val fileName = aStr + ".txt"
+ whenever(!(new io.File(fileName).exists())) {
+ assert(FileWrapper(fileName).isEmpty)
+ }
+ }
+ }
+ "the current directory is specified" in {
+ forAll(Gen.alphaStr) { aStr =>
+ val fileName = s"./${aStr}.txt"
+ whenever(!(new io.File(fileName).exists())) {
+ assert(FileWrapper(fileName).isEmpty)
+ }
+ }
+ }
+ "a random directory is specified" in {
+ forAll(Gen.alphaStr, Gen.alphaStr) { (aDir, aStr) =>
+ val fileName = s"${aDir}${io.File.separator}${aStr}.txt"
+ whenever(!(new io.File(fileName).exists())) {
+ assert(FileWrapper(fileName).isEmpty)
+ }
+ }
+ }
+ }
+ "represent a directory, eg" - {
+ "the current one" in {
+ assert(FileWrapper(".").isEmpty)
+ }
+ "a non-existing directory (but with a directory name)" in {
+ val aDir = s"${io.File.separator}mnt${io.File.separator}"
+ withClue(s"${aDir} seems to be a File") { assert(FileWrapper(aDir).isEmpty) }
+ }
+ "an existing directory" in {
+ val currDir = new io.File(".")
+ withClue(s"Current directory is not such a thing??") { assert(currDir.isDirectory) }
+ assert(currDir.listFiles().filter(_.isDirectory).forall(f => FileWrapper(f.getAbsolutePath).isEmpty))
+ }
+ }
+ }
+ }
+
+}
diff --git a/report-db/src/main/scala/mediative/distributed/schema/design/MockExpected.scala b/report-db/src/main/scala/mediative/distributed/schema/design/MockExpected.scala
new file mode 100644
index 0000000..cbafd06
--- /dev/null
+++ b/report-db/src/main/scala/mediative/distributed/schema/design/MockExpected.scala
@@ -0,0 +1,73 @@
+package mediative.distributed.schema.design
+
+import mediative.distributed.util.RDDable
+import mediative.util.Logging
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.errors.TreeNodeException
+import scala.util.control.Exception._
+import mediative.distributed.util.json.JsonFile
+
+case class MockExpected(i: Int, s: String, d: Option[Double])
+
+object MockExpected extends Serializable with Logging {
+
+ val expectedSchema: StructType = {
+ StructType(Seq(
+ StructField(name = "i", dataType = IntegerType, nullable = false),
+ StructField(name = "s", dataType = StringType, nullable = false),
+ StructField(name = "d", dataType = DoubleType, nullable = true)))
+
+ }
+
+ implicit object MockExpectedRDDable extends RDDable[JsonFile, MockExpected] {
+ def toRDD(aJsonFile: JsonFile, sc: SparkContext, sqlContext: SQLContext): RDD[MockExpected] = {
+ import sqlContext._ // as always, brings implicits into scope
+
+ {
+ val schemaRDD = aJsonFile.toSchemaRDD(sqlContext)
+ val diff = schemaRDD.schema.fieldNames.toSet -- expectedSchema.fieldNames.toSet
+ if (!diff.isEmpty) {
+ logger.info(s"There are extra fields: ${diff.toString()}")
+ None
+ } else {
+ // assumption: if an optional field NEVER appears we assume it is missing
+ // (as opposed to empty all the time, which could also be reasonable)
+ // NB: from a practical point of view, if a field is not present in the data,
+ // then the empirical schema will not contain it, which screws all later queries
+ val newSchemaRDD =
+ catching(classOf[Exception]).opt { schemaRDD.select(Symbol("d")).count() } match {
+ case None =>
+ logger.info(s"Field 'd' never appears in dataset")
+ schemaRDD.select('i, 's)
+ case Some(_) =>
+ schemaRDD.select('i, 's, 'd)
+ }
+ try {
+ newSchemaRDD.count() // this will throw an Exception if fields not there
+ Some(
+ newSchemaRDD.map {
+ case Row(i: Int, s: String, d: Double) => MockExpected(i, s, Some(d))
+ case Row(i: Int, s: String, _) => MockExpected(i, s, None) // d is Null
+ case Row(i: Int, s: String) => MockExpected(i, s, None) // d was never there
+ })
+ } catch {
+ case e: TreeNodeException[_] =>
+ // ends up here because the 'select' is asking for fields that do not exist
+ logger.info(s"'select' failed: ${e.getMessage}")
+ None
+ }
+ }
+ }.getOrElse(sc.emptyRDD[MockExpected])
+ }
+ }
ldacost1@shaddam [~/dev/mpn/databricks-trial] (on git::develop) $ git diff develop origin/feature/MPN-854-design-of-validation-strategy
diff --git a/report-core/src/main/scala/mediative/util/wrapper/FileWrapper.scala b/report-core/src/main/scala/mediative/util/wrapper/FileWrapper.scala
new file mode 100644
index 0000000..acbdfd8
--- /dev/null
+++ b/report-core/src/main/scala/mediative/util/wrapper/FileWrapper.scala
@@ -0,0 +1,42 @@
+package mediative.util.wrapper
+
+import java.io
+
+import mediative.util.Logging
+
+/**
+ * Simply labels a Path as a File (as opposed to a Directory).
+ */
+trait FileWrapper {
+ def value: String
+}
+
+object FileWrapper extends Logging {
+ /**
+ * Wraps a file name into a FileWrapper. This will succeed if
+ * * file name does not point to a directory
+ * * file name points to an existing file
+ * * the file pointed at is readable
+ * @param maybeFile
+ * @return
+ */
+ def apply(maybeFile: String): Option[FileWrapper] = {
+
+ // if security manager is not present we can't check for attributes of the file,
+ // specifically for its readability. So we fail in that case:
+ // (cf. https://docs.oracle.com/javase/tutorial/essential/environment/security.html)
+ if (System.getSecurityManager == null) {
+ logger.error("Security Manager not present. File wrapping will fail in all cases.")
+ None
+ } else {
+ try {
+ if (new io.File(maybeFile).isFile) Some(FileWrapperImpl(value = maybeFile))
+ else None
+ } catch {
+ case e: SecurityException => None // in case java.io.File attributes can't be accessed
+ }
+ }
+ }
+ private case class FileWrapperImpl(value: String) extends FileWrapper
+}
+
diff --git a/report-core/src/main/scala/mediative/util/wrapper/String.scala b/report-core/src/main/scala/mediative/util/wrapper/String.scala
deleted file mode 100644
index 59d6404..0000000
--- a/report-core/src/main/scala/mediative/util/wrapper/String.scala
+++ /dev/null
@@ -1,10 +0,0 @@
-package mediative.util.wrapper
-
-trait String extends Wrapper[Predef.String] {
- def value: Predef.String
-}
-
-object String {
- // if I have a Wrapper[String] I want to be able to implicitly extract the value
- implicit def wrapper2String: String => Predef.String = ws => ws.value
-}
diff --git a/report-core/src/main/scala/mediative/util/wrapper/StringWrapper.scala b/report-core/src/main/scala/mediative/util/wrapper/StringWrapper.scala
new file mode 100644
index 0000000..eab3adb
--- /dev/null
+++ b/report-core/src/main/scala/mediative/util/wrapper/StringWrapper.scala
@@ -0,0 +1,10 @@
+package mediative.util.wrapper
+
+trait StringWrapper extends Wrapper[String] {
+ def value: String
+}
+
+object StringWrapper {
+ // if I have a Wrapper[String] I want to be able to implicitly extract the value
+ implicit def wrapper2String(ws: StringWrapper): String = ws.value
+}
diff --git a/report-core/src/test/scala/mediative/util/wrapper/FileWrapperTest.scala b/report-core/src/test/scala/mediative/util/wrapper/FileWrapperTest.scala
new file mode 100644
index 0000000..a19e0a2
--- /dev/null
+++ b/report-core/src/test/scala/mediative/util/wrapper/FileWrapperTest.scala
@@ -0,0 +1,73 @@
+package mediative.util.wrapper
+
+import org.scalacheck.Gen
+import org.scalatest.prop.GeneratorDrivenPropertyChecks
+import org.scalatest.{ Matchers, BeforeAndAfterAll, FreeSpec }
+import java.io
+
+class FileWrapperTest extends FreeSpec with BeforeAndAfterAll with Matchers with GeneratorDrivenPropertyChecks {
+
+ override def beforeAll() {
+
+ }
+
+ override def afterAll(): Unit = {
+
+ }
+
+ "A File should" - {
+ "be recognized when the wrapped String is" - {
+ "an existing file which we have access to" in {
+ val currDir = new io.File(".")
+ withClue(s"Current directory is not such a thing??") { assert(currDir.isDirectory) }
+ assert(currDir.listFiles().filter(_.isFile).forall(f => FileWrapper(f.getAbsolutePath).isDefined))
+ }
+ }
+ "be rejected when the wrapped String" - {
+ "is empty" in {
+ assert(FileWrapper("").isEmpty)
+ }
+ "represents a non-existent file, when" - {
+ "no directory is specified" in {
+ forAll(Gen.alphaStr) { aStr =>
+ val fileName = aStr + ".txt"
+ whenever(!(new io.File(fileName).exists())) {
+ assert(FileWrapper(fileName).isEmpty)
+ }
+ }
+ }
+ "the current directory is specified" in {
+ forAll(Gen.alphaStr) { aStr =>
+ val fileName = s"./${aStr}.txt"
+ whenever(!(new io.File(fileName).exists())) {
+ assert(FileWrapper(fileName).isEmpty)
+ }
+ }
+ }
+ "a random directory is specified" in {
+ forAll(Gen.alphaStr, Gen.alphaStr) { (aDir, aStr) =>
+ val fileName = s"${aDir}${io.File.separator}${aStr}.txt"
+ whenever(!(new io.File(fileName).exists())) {
+ assert(FileWrapper(fileName).isEmpty)
+ }
+ }
+ }
+ }
+ "represent a directory, eg" - {
+ "the current one" in {
+ assert(FileWrapper(".").isEmpty)
+ }
+ "a non-existing directory (but with a directory name)" in {
+ val aDir = s"${io.File.separator}mnt${io.File.separator}"
+ withClue(s"${aDir} seems to be a File") { assert(FileWrapper(aDir).isEmpty) }
+ }
+ "an existing directory" in {
+ val currDir = new io.File(".")
+ withClue(s"Current directory is not such a thing??") { assert(currDir.isDirectory) }
+ assert(currDir.listFiles().filter(_.isDirectory).forall(f => FileWrapper(f.getAbsolutePath).isEmpty))
+ }
+ }
+ }
+ }
+
+}
diff --git a/report-db/src/main/scala/mediative/distributed/schema/design/MockExpected.scala b/report-db/src/main/scala/mediative/distributed/schema/design/MockExpected.scala
new file mode 100644
index 0000000..cbafd06
--- /dev/null
+++ b/report-db/src/main/scala/mediative/distributed/schema/design/MockExpected.scala
@@ -0,0 +1,73 @@
+package mediative.distributed.schema.design
+
+import mediative.distributed.util.RDDable
+import mediative.util.Logging
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.errors.TreeNodeException
+import scala.util.control.Exception._
+import mediative.distributed.util.json.JsonFile
+
+case class MockExpected(i: Int, s: String, d: Option[Double])
+
+object MockExpected extends Serializable with Logging {
+
+ val expectedSchema: StructType = {
+ StructType(Seq(
+ StructField(name = "i", dataType = IntegerType, nullable = false),
+ StructField(name = "s", dataType = StringType, nullable = false),
+ StructField(name = "d", dataType = DoubleType, nullable = true)))
+
+ }
+
+ implicit object MockExpectedRDDable extends RDDable[JsonFile, MockExpected] {
+ def toRDD(aJsonFile: JsonFile, sc: SparkContext, sqlContext: SQLContext): RDD[MockExpected] = {
+ import sqlContext._ // as always, brings implicits into scope
+
+ {
+ val schemaRDD = aJsonFile.toSchemaRDD(sqlContext)
+ val diff = schemaRDD.schema.fieldNames.toSet -- expectedSchema.fieldNames.toSet
+ if (!diff.isEmpty) {
+ logger.info(s"There are extra fields: ${diff.toString()}")
+ None
+ } else {
+ // assumption: if an optional field NEVER appears we assume it is missing
+ // (as opposed to empty all the time, which could also be reasonable)
+ // NB: from a practical point of view, if a field is not present in the data,
+ // then the empirical schema will not contain it, which screws all later queries
+ val newSchemaRDD =
+ catching(classOf[Exception]).opt { schemaRDD.select(Symbol("d")).count() } match {
+ case None =>
+ logger.info(s"Field 'd' never appears in dataset")
+ schemaRDD.select('i, 's)
+ case Some(_) =>
+ schemaRDD.select('i, 's, 'd)
+ }
+ try {
+ newSchemaRDD.count() // this will throw an Exception if fields not there
+ Some(
+ newSchemaRDD.map {
+ case Row(i: Int, s: String, d: Double) => MockExpected(i, s, Some(d))
+ case Row(i: Int, s: String, _) => MockExpected(i, s, None) // d is Null
+ case Row(i: Int, s: String) => MockExpected(i, s, None) // d was never there
+ })
+ } catch {
+ case e: TreeNodeException[_] =>
+ // ends up here because the 'select' is asking for fields that do not exist
+ logger.info(s"'select' failed: ${e.getMessage}")
+ None
+ }
+ }
+ }.getOrElse(sc.emptyRDD[MockExpected])
+ }
+ }
+
+ def readJSonFromFile(jsonFile: JsonFile,
+ sc: SparkContext,
+ sqlContext: SQLContext)(implicit ops: RDDable[JsonFile, MockExpected]): RDD[MockExpected] = {
+
+ ops.toRDD(jsonFile, sc, sqlContext)
+ }
+
+}
diff --git a/report-db/src/main/scala/mediative/distributed/util/RDDable.scala b/report-db/src/main/scala/mediative/distributed/util/RDDable.scala
new file mode 100644
index 0000000..27297be
--- /dev/null
+++ b/report-db/src/main/scala/mediative/distributed/util/RDDable.scala
@@ -0,0 +1,9 @@
+package mediative.distributed.util
+
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SQLContext
+
+trait RDDable[I, O] {
+ def toRDD(input: I, sc: SparkContext, sqlContext: SQLContext): RDD[O]
+}
diff --git a/report-db/src/main/scala/mediative/distributed/util/json/JsonFile.scala b/report-db/src/main/scala/mediative/distributed/util/json/JsonFile.scala
new file mode 100644
index 0000000..1964479
--- /dev/null
+++ b/report-db/src/main/scala/mediative/distributed/util/json/JsonFile.scala
@@ -0,0 +1,49 @@
+package mediative.distributed.util.json
+
+import mediative.util.Logging
+import mediative.util.wrapper.FileWrapper
+import org.apache.spark.sql.{ SchemaRDD, SQLContext }
+
+trait JsonFile {
+ def value: FileWrapper
+
+ /**
+ * Translates a JSon File into a SchemaRDD
+ * @note this should never fail, since the only way to construct this type is through the smart constructor
+ * @param sqlContext
+ * @return
+ */
+ def toSchemaRDD(sqlContext: SQLContext): SchemaRDD =
+ sqlContext.jsonFile(value.value)
+
+}
+
+object JsonFile extends Logging {
+ /* a JsonFile contains a json if I can read it as such and there is no corrupt fields in it */
+ def apply(maybeJsonFile: FileWrapper, sqlContext: SQLContext): Option[JsonFile] = {
+
+ (try {
+ Some(sqlContext.jsonFile(maybeJsonFile.value))
+ } catch {
+ // NB: I would like to get more specific but there is no information
+ // about exact Exception thrown and when (in 1.2.0)
+ // cf, // https://spark.apache.org/docs/1.2.0/api/scala/index.html#org.apache.spark.sql.SQLContext
+ // We have only seen that an Exception is thrown if the content of the file is not a JSon
+ // TODO: make a better exception catcher here
+ case e: Exception =>
+ logger.error(s"Parsing of '${maybeJsonFile.value}' as a JSon failed. (details: ${e.getMessage})")
+ None
+ }).filter(rdd => !rdd.schema.fieldNames.exists(_.startsWith("_corrupt"))).map { _ =>
+ JsonFileImpl(value = maybeJsonFile)
+ }
+ }
+
+ def apply(aPath: String, sqlContext: SQLContext): Option[JsonFile] = {
+ FileWrapper(aPath).map {
+ JsonFile(_, sqlContext).map { identity _ }
+ }.flatten
+ }
+
+ private case class JsonFileImpl(value: FileWrapper) extends JsonFile
+}
+
diff --git a/report-db/src/main/scala/mediative/util/wrapper/DistributedCSVFileName.scala b/report-db/src/main/scala/mediative/util/wrapper/DistributedCSVFileName.scala
index 00a2d52..df03283 100644
--- a/report-db/src/main/scala/mediative/util/wrapper/DistributedCSVFileName.scala
+++ b/report-db/src/main/scala/mediative/util/wrapper/DistributedCSVFileName.scala
@@ -1,9 +1,7 @@
package mediative.util.wrapper
-import mediative.util.wrapper.{ String => StringWrapper }
-
/**
* Simply labels a String as a distributed file name (eg, living in HDFS, DBFS, ...)
*/
-case class DistributedCSVFileName(value: Predef.String) extends StringWrapper
+case class DistributedCSVFileName(value: String) extends StringWrapper
diff --git a/report-db/src/test/scala/mediative/distributed/schema/design/MockExpectedTest.scala b/report-db/src/test/scala/mediative/distributed/schema/design/MockExpectedTest.scala
new file mode 100644
index 0000000..35c1cf7
--- /dev/null
+++ b/report-db/src/test/scala/mediative/distributed/schema/design/MockExpectedTest.scala
@@ -0,0 +1,123 @@
+package mediative.distributed.schema.design
+
+import java.io.File
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.SQLContext
+import org.scalatest.{ BeforeAndAfterAll, FreeSpec }
+import mediative.util.{ Base => MediativeUtil }
+import MediativeUtil._
+import mediative.distributed.util.json.JsonFile
+
+class MockExpectedTest extends FreeSpec with BeforeAndAfterAll {
+
+ var sc: SparkContext = _
+ val nonExistentPath = MediativeUtil.String.random(10) + ".json"
+
+ val pathToFileNotJson = "thisIsNotAJson.txt"
+ val pathToFileJsonTooBig = "pathToFileJsonTooBig.json"
+ val pathToFileMockExpectedJson = "pathToFileMockExpectedJson.json"
+ val pathToFileSubsetRequiredJson = "pathToFileSubsetJson.json"
+
+ private def writeOnFile(fileName: String, what: String) = {
+ import java.io._
+ using(new PrintWriter(new File(fileName))) {
+ writer => writer.write(what)
+ }
+
+ }
+ private def writeNeededFiles() = {
+ val jsonLessFieldsThanNeeded = """{"i":1,"s": "potatoe"}"""
+ val jsonMoreFieldsThanNeeded =
+ """{"name":"Yin","lastname":"Lee", "address":{"city":"Columbus","state":"Ohio"}}
+ |{"name":"Luis","address":{"city":"Montreal","state":"Quebec"}} """.stripMargin
+ val jsonFieldsExactlyNeeded =
+ """{"i":1, "s": "luis", "d": 2.0}
+ |{"i":11, "s": "luis1"}""".stripMargin
+
+ writeOnFile(pathToFileNotJson, "Hello Scala")
+ writeOnFile(pathToFileJsonTooBig, jsonMoreFieldsThanNeeded)
+ writeOnFile(pathToFileSubsetRequiredJson, jsonLessFieldsThanNeeded)
+ writeOnFile(pathToFileMockExpectedJson, jsonFieldsExactlyNeeded)
+ }
+
+ private def deleteNeededFiles() = {
+ (new File(pathToFileNotJson)).delete()
+ (new File(pathToFileJsonTooBig)).delete()
+ (new File(pathToFileSubsetRequiredJson)).delete()
+ (new File(pathToFileMockExpectedJson)).delete()
+ }
+
+ override def beforeAll() {
+ assert(!(new File(nonExistentPath)).exists())
+ writeNeededFiles()
+ sc = new SparkContext("local", "test1") // An existing spark context.
+ }
+
+ override def afterAll() {
+ sc.stop()
+ sc = null
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port")
+ deleteNeededFiles()
+ }
+
+ /*
+From https://issues.ypg.com/browse/MPN-854:
+
+Pipeline:
+ 1. The interaction is started with a String _s_ (ex: _/mnt/myfile.txt_)
+ 2. _s_ is expected to represent a Path (which is not always true/possible); we try to do such a conversion:
+{code}String => Option[FileWrapper]{code}
+ 3. The successful FileWrapper will be converted, on success, into a File containing a Json object
+{code}File => Option[JSonFile]{code}
+ 4. The JSon comes with metadata information (about the data it contains): _EmpiricalMetadata_
+ 5. We have expectations about what that metadata has to be: _ExpectedMetadata_
+ 6. The JSon contained in the JsonFile has to be transformed into element of type _ExpectedMetadata_, which can then be saved and operated upon.
+{code}JsonFile => DataContainer[ExpectedMetadata]{code}
+ */
+
+ "A MockExpected object M" - {
+ lazy val sqlContext = new SQLContext(sc)
+ import MockExpected._ // brings implicits into scope
+
+ // NB: MockExpected.readJSonFromFile implements pipeline: (4), (5), (6)
+ "can't be built from" - {
+ "a non-existent file" in {
+ assert(JsonFile(nonExistentPath, sqlContext).isEmpty)
+ }
+ "a file that does not contain a json" in {
+ assert(JsonFile(pathToFileNotJson, sqlContext).isEmpty)
+ }
+ "a file that contains a json with fields that are not in M" in {
+ val jsonFileOpt = JsonFile(pathToFileJsonTooBig, sqlContext)
+ assert(jsonFileOpt.isDefined)
+ val jsonFile = jsonFileOpt.get
+ assert(MockExpected.readJSonFromFile(jsonFile, sc, sqlContext).count() == 0)
+ }
+ }
+
+ "should be built from" - {
+ "a file that contains a json with the same schema as M" in {
+ val jsonFileOpt = JsonFile(pathToFileMockExpectedJson, sqlContext)
+ assert(jsonFileOpt.isDefined)
+ val jsonFile = jsonFileOpt.get
+ val r = MockExpected.readJSonFromFile(jsonFile, sc, sqlContext)
+ assert(r.count() > 0)
+ // just for feedback...
+ r.collect().foreach(m => info(m.toString))
+ }
+ "a file that contains a json with a schema whose required fields are a subset of M's" in {
+ val jsonFileOpt = JsonFile(pathToFileSubsetRequiredJson, sqlContext)
+ assert(jsonFileOpt.isDefined)
+ val jsonFile = jsonFileOpt.get
+ val r = MockExpected.readJSonFromFile(jsonFile, sc, sqlContext)
+ assert(r.count() > 0)
+ // just for feedback...
+ r.collect().foreach(m => info(m.toString))
+ }
+ }
+ }
+
+}
+
diff --git a/report-db/src/test/scala/mediative/distributed/util/BaseTest.scala b/report-db/src/test/scala/mediative/distributed/util/BaseTest.scala
index 8ec61fd..857adda 100644
--- a/report-db/src/test/scala/mediative/distributed/util/BaseTest.scala
+++ b/report-db/src/test/scala/mediative/distributed/util/BaseTest.scala
@@ -155,10 +155,11 @@ class BaseTest extends FreeSpec with BeforeAndAfterAll {
"remove non-empty directories" in {
val dirName = StringHelper.random(10)
+ val aRandomFileName = randomFileName
withClue(s"Impossible to create non-empty directory '${dirName}'") { assert(HDFS.getFileSystem.mkdirs(new Path(dirName))) }
- val srcFileName = s"${dirName}${File.separator}${randomFileName}"
+ val srcFileName = s"${dirName}${File.separator}${aRandomFileName}"
withClue(s"Impossible to create HDFS file ${srcFileName}") { assert(writeASampleFile(srcFileName)) }
- info(s"Populated directory '${dirName}' with file named '${dirName}' (full path is '${srcFileName}')")
+ info(s"Populated directory '${dirName}' with file named '${aRandomFileName}' (full path is '${srcFileName}')")
withClue(s"HDFS file '${srcFileName}' does not exist after creation ") { assert(HDFS.fileExists(srcFileName)) }
withClue(s"Impossible to remove directory '${dirName}'") { assert(HDFS.rm(dirName)) }
withClue(s"Directory '${dirName}' exists after 'rm'!!!!") { assert(!HDFS.fileExists(dirName)) }
diff --git a/report-db/src/test/scala/mediative/distributed/util/json/JsonFileTest.scala b/report-db/src/test/scala/mediative/distributed/util/json/JsonFileTest.scala
new file mode 100644
index 0000000..b92704d
--- /dev/null
+++ b/report-db/src/test/scala/mediative/distributed/util/json/JsonFileTest.scala
@@ -0,0 +1,105 @@
+package mediative.distributed.util.json
+
+import mediative.util.Base._
+import mediative.util.wrapper.FileWrapper
+import mediative.util.{ Base => MediativeUtil }
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.SQLContext
+import org.scalacheck.Gen
+import org.scalatest.prop.GeneratorDrivenPropertyChecks
+import org.scalatest.{ Matchers, BeforeAndAfterAll, FreeSpec }
+import java.io
+
+class JsonFileTest extends FreeSpec with BeforeAndAfterAll with Matchers with GeneratorDrivenPropertyChecks {
+
+ var sc: SparkContext = _
+ val nonExistentPath = MediativeUtil.String.random(10) + ".json"
+
+ val pathToFileNotJson = "thisIsNotAJson.txt"
+ val pathToFileJson = "pathToFileJson.json"
+
+ private def writeOnFile(fileName: String, what: String) = {
+ import java.io._
+ using(new PrintWriter(new io.File(fileName))) {
+ writer => writer.write(what)
+ }
+
+ }
+ private def writeNeededFiles() = {
+ val aJson =
+ """{"i":1, "s": "luis", "d": 2.0}
+ |{"i":11, "s": "luis1"}""".stripMargin
+
+ writeOnFile(pathToFileNotJson, "Hello Scala")
+ writeOnFile(pathToFileJson, aJson)
+ }
+
+ private def deleteNeededFiles() = {
+ (new io.File(pathToFileNotJson)).delete()
+ (new io.File(pathToFileJson)).delete()
+ }
+
+ override def beforeAll() {
+ assert(!(new io.File(nonExistentPath)).exists())
+ writeNeededFiles()
+ sc = new SparkContext("local", "test1") // An existing spark context.
+ }
+
+ override def afterAll() {
+ sc.stop()
+ sc = null
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+import org.apache.spark.sql.SQLContext
+import org.scalacheck.Gen
+import org.scalatest.prop.GeneratorDrivenPropertyChecks
+import org.scalatest.{ Matchers, BeforeAndAfterAll, FreeSpec }
+import java.io
+
+class JsonFileTest extends FreeSpec with BeforeAndAfterAll with Matchers with GeneratorDrivenPropertyChecks {
+
+ var sc: SparkContext = _
+ val nonExistentPath = MediativeUtil.String.random(10) + ".json"
+
+ val pathToFileNotJson = "thisIsNotAJson.txt"
+ val pathToFileJson = "pathToFileJson.json"
+
+ private def writeOnFile(fileName: String, what: String) = {
+ import java.io._
+ using(new PrintWriter(new io.File(fileName))) {
+ writer => writer.write(what)
+ }
+
+ }
+ private def writeNeededFiles() = {
+ val aJson =
+ """{"i":1, "s": "luis", "d": 2.0}
+ |{"i":11, "s": "luis1"}""".stripMargin
+
+ writeOnFile(pathToFileNotJson, "Hello Scala")
+ writeOnFile(pathToFileJson, aJson)
+ }
+
+ private def deleteNeededFiles() = {
+ (new io.File(pathToFileNotJson)).delete()
+ (new io.File(pathToFileJson)).delete()
+ }
+
+ override def beforeAll() {
+ assert(!(new io.File(nonExistentPath)).exists())
+ writeNeededFiles()
+ sc = new SparkContext("local", "test1") // An existing spark context.
+ }
+
+ override def afterAll() {
+ sc.stop()
+ sc = null
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port")
+ deleteNeededFiles()
+ }
+
+ "A Json File" - {
+ lazy val sqlContext = new SQLContext(sc)
+
+ "can't be built from" - {
+ "an invalid FileWrapper" in {
ldacost1@shaddam [~/dev/mpn/databricks-trial] (on git::develop) $ git diff develop origin/feature/MPN-854-design-of-validation-strategy
diff --git a/report-core/src/main/scala/mediative/util/wrapper/FileWrapper.scala b/report-core/src/main/scala/mediative/util/wrapper/FileWrapper.scala
new file mode 100644
index 0000000..acbdfd8
--- /dev/null
+++ b/report-core/src/main/scala/mediative/util/wrapper/FileWrapper.scala
@@ -0,0 +1,42 @@
+package mediative.util.wrapper
+
+import java.io
+
+import mediative.util.Logging
+
+/**
+ * Simply labels a Path as a File (as opposed to a Directory).
+ */
+trait FileWrapper {
+ def value: String
+}
+
+object FileWrapper extends Logging {
+ /**
+ * Wraps a file name into a FileWrapper. This will succeed if
+ * * file name does not point to a directory
+ * * file name points to an existing file
+ * * the file pointed at is readable
+ * @param maybeFile
+ * @return
+ */
+ def apply(maybeFile: String): Option[FileWrapper] = {
+
+ // if security manager is not present we can't check for attributes of the file,
+ // specifically for its readability. So we fail in that case:
+ // (cf. https://docs.oracle.com/javase/tutorial/essential/environment/security.html)
+ if (System.getSecurityManager == null) {
+ logger.error("Security Manager not present. File wrapping will fail in all cases.")
+ None
+ } else {
+ try {
+ if (new io.File(maybeFile).isFile) Some(FileWrapperImpl(value = maybeFile))
+ else None
+ } catch {
+ case e: SecurityException => None // in case java.io.File attributes can't be accessed
+ }
+ }
+ }
+ private case class FileWrapperImpl(value: String) extends FileWrapper
+}
+
diff --git a/report-core/src/main/scala/mediative/util/wrapper/String.scala b/report-core/src/main/scala/mediative/util/wrapper/String.scala
deleted file mode 100644
index 59d6404..0000000
--- a/report-core/src/main/scala/mediative/util/wrapper/String.scala
+++ /dev/null
@@ -1,10 +0,0 @@
-package mediative.util.wrapper
-
-trait String extends Wrapper[Predef.String] {
- def value: Predef.String
-}
-
-object String {
- // if I have a Wrapper[String] I want to be able to implicitly extract the value
- implicit def wrapper2String: String => Predef.String = ws => ws.value
-}
diff --git a/report-core/src/main/scala/mediative/util/wrapper/StringWrapper.scala b/report-core/src/main/scala/mediative/util/wrapper/StringWrapper.scala
new file mode 100644
index 0000000..eab3adb
--- /dev/null
+++ b/report-core/src/main/scala/mediative/util/wrapper/StringWrapper.scala
@@ -0,0 +1,10 @@
+package mediative.util.wrapper
+
+trait StringWrapper extends Wrapper[String] {
+ def value: String
+}
+
+object StringWrapper {
+ // if I have a Wrapper[String] I want to be able to implicitly extract the value
+ implicit def wrapper2String(ws: StringWrapper): String = ws.value
+}
diff --git a/report-core/src/test/scala/mediative/util/wrapper/FileWrapperTest.scala b/report-core/src/test/scala/mediative/util/wrapper/FileWrapperTest.scala
new file mode 100644
index 0000000..a19e0a2
--- /dev/null
+++ b/report-core/src/test/scala/mediative/util/wrapper/FileWrapperTest.scala
@@ -0,0 +1,73 @@
+package mediative.util.wrapper
+
+import org.scalacheck.Gen
+import org.scalatest.prop.GeneratorDrivenPropertyChecks
+import org.scalatest.{ Matchers, BeforeAndAfterAll, FreeSpec }
+import java.io
+
+class FileWrapperTest extends FreeSpec with BeforeAndAfterAll with Matchers with GeneratorDrivenPropertyChecks {
+
+ override def beforeAll() {
+
+ }
+
+ override def afterAll(): Unit = {
+
+ }
+
+ "A File should" - {
+ "be recognized when the wrapped String is" - {
+ "an existing file which we have access to" in {
+ val currDir = new io.File(".")
+ withClue(s"Current directory is not such a thing??") { assert(currDir.isDirectory) }
+ assert(currDir.listFiles().filter(_.isFile).forall(f => FileWrapper(f.getAbsolutePath).isDefined))
+ }
+ }
+ "be rejected when the wrapped String" - {
+ "is empty" in {
+ assert(FileWrapper("").isEmpty)
+ }
+ "represents a non-existent file, when" - {
+ "no directory is specified" in {
+ forAll(Gen.alphaStr) { aStr =>
+ val fileName = aStr + ".txt"
+ whenever(!(new io.File(fileName).exists())) {
+ assert(FileWrapper(fileName).isEmpty)
+ }
+ }
+ }
+ "the current directory is specified" in {
+ forAll(Gen.alphaStr) { aStr =>
+ val fileName = s"./${aStr}.txt"
+ whenever(!(new io.File(fileName).exists())) {
+ assert(FileWrapper(fileName).isEmpty)
+ }
+ }
+ }
+ "a random directory is specified" in {
+ forAll(Gen.alphaStr, Gen.alphaStr) { (aDir, aStr) =>
+ val fileName = s"${aDir}${io.File.separator}${aStr}.txt"
+ whenever(!(new io.File(fileName).exists())) {
+ assert(FileWrapper(fileName).isEmpty)
+ }
+ }
+ }
+ }
+ "represent a directory, eg" - {
+ "the current one" in {
+ assert(FileWrapper(".").isEmpty)
+ }
+ "a non-existing directory (but with a directory name)" in {
+ val aDir = s"${io.File.separator}mnt${io.File.separator}"
+ withClue(s"${aDir} seems to be a File") { assert(FileWrapper(aDir).isEmpty) }
+ }
+ "an existing directory" in {
+ val currDir = new io.File(".")
+ withClue(s"Current directory is not such a thing??") { assert(currDir.isDirectory) }
+ assert(currDir.listFiles().filter(_.isDirectory).forall(f => FileWrapper(f.getAbsolutePath).isEmpty))
+ }
+ }
+ }
+ }
+
+}
diff --git a/report-db/src/main/scala/mediative/distributed/schema/design/MockExpected.scala b/report-db/src/main/scala/mediative/distributed/schema/design/MockExpected.scala
new file mode 100644
index 0000000..cbafd06
--- /dev/null
+++ b/report-db/src/main/scala/mediative/distributed/schema/design/MockExpected.scala
@@ -0,0 +1,73 @@
+package mediative.distributed.schema.design
+
+import mediative.distributed.util.RDDable
+import mediative.util.Logging
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.errors.TreeNodeException
+import scala.util.control.Exception._
+import mediative.distributed.util.json.JsonFile
+
+case class MockExpected(i: Int, s: String, d: Option[Double])
+
+object MockExpected extends Serializable with Logging {
+
+ val expectedSchema: StructType = {
+ StructType(Seq(
+ StructField(name = "i", dataType = IntegerType, nullable = false),
+ StructField(name = "s", dataType = StringType, nullable = false),
+ StructField(name = "d", dataType = DoubleType, nullable = true)))
+
+ }
+
+ implicit object MockExpectedRDDable extends RDDable[JsonFile, MockExpected] {
+ def toRDD(aJsonFile: JsonFile, sc: SparkContext, sqlContext: SQLContext): RDD[MockExpected] = {
+ import sqlContext._ // as always, brings implicits into scope
+
+ {
+ val schemaRDD = aJsonFile.toSchemaRDD(sqlContext)
+ val diff = schemaRDD.schema.fieldNames.toSet -- expectedSchema.fieldNames.toSet
+ if (!diff.isEmpty) {
+ logger.info(s"There are extra fields: ${diff.toString()}")
+ None
+ } else {
+ // assumption: if an optional field NEVER appears we assume it is missing
+ // (as opposed to empty all the time, which could also be reasonable)
+ // NB: from a practical point of view, if a field is not present in the data,
+ // then the empirical schema will not contain it, which screws all later queries
+ val newSchemaRDD =
+ catching(classOf[Exception]).opt { schemaRDD.select(Symbol("d")).count() } match {
+ case None =>
+ logger.info(s"Field 'd' never appears in dataset")
+ schemaRDD.select('i, 's)
+ case Some(_) =>
+ schemaRDD.select('i, 's, 'd)
+ }
+ try {
+ newSchemaRDD.count() // this will throw an Exception if fields not there
+ Some(
+ newSchemaRDD.map {
+ case Row(i: Int, s: String, d: Double) => MockExpected(i, s, Some(d))
+ case Row(i: Int, s: String, _) => MockExpected(i, s, None) // d is Null
+ case Row(i: Int, s: String) => MockExpected(i, s, None) // d was never there
+ })
+ } catch {
+ case e: TreeNodeException[_] =>
+ // ends up here because the 'select' is asking for fields that do not exist
+ logger.info(s"'select' failed: ${e.getMessage}")
+ None
+ }
+ }
+ }.getOrElse(sc.emptyRDD[MockExpected])
+ }
+ }
+
+ def readJSonFromFile(jsonFile: JsonFile,
+ sc: SparkContext,
+ sqlContext: SQLContext)(implicit ops: RDDable[JsonFile, MockExpected]): RDD[MockExpected] = {
+
+ ops.toRDD(jsonFile, sc, sqlContext)
+ }
+
+}
diff --git a/report-db/src/main/scala/mediative/distributed/util/RDDable.scala b/report-db/src/main/scala/mediative/distributed/util/RDDable.scala
new file mode 100644
index 0000000..27297be
--- /dev/null
+++ b/report-db/src/main/scala/mediative/distributed/util/RDDable.scala
@@ -0,0 +1,9 @@
+package mediative.distributed.util
+
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SQLContext
+
+trait RDDable[I, O] {
+ def toRDD(input: I, sc: SparkContext, sqlContext: SQLContext): RDD[O]
+}
diff --git a/report-db/src/main/scala/mediative/distributed/util/json/JsonFile.scala b/report-db/src/main/scala/mediative/distributed/util/json/JsonFile.scala
new file mode 100644
index 0000000..1964479
--- /dev/null
+++ b/report-db/src/main/scala/mediative/distributed/util/json/JsonFile.scala
@@ -0,0 +1,49 @@
+package mediative.distributed.util.json
+
+import mediative.util.Logging
+import mediative.util.wrapper.FileWrapper
+import org.apache.spark.sql.{ SchemaRDD, SQLContext }
+
+trait JsonFile {
+ def value: FileWrapper
+
+ /**
+ * Translates a JSon File into a SchemaRDD
+ * @note this should never fail, since the only way to construct this type is through the smart constructor
+ * @param sqlContext
+ * @return
+ */
+ def toSchemaRDD(sqlContext: SQLContext): SchemaRDD =
+ sqlContext.jsonFile(value.value)
+
+}
+
+object JsonFile extends Logging {
+ /* a JsonFile contains a json if I can read it as such and there is no corrupt fields in it */
+ def apply(maybeJsonFile: FileWrapper, sqlContext: SQLContext): Option[JsonFile] = {
+
+ (try {
+ Some(sqlContext.jsonFile(maybeJsonFile.value))
+ } catch {
+ // NB: I would like to get more specific but there is no information
+ // about exact Exception thrown and when (in 1.2.0)
+ // cf, // https://spark.apache.org/docs/1.2.0/api/scala/index.html#org.apache.spark.sql.SQLContext
+ // We have only seen that an Exception is thrown if the content of the file is not a JSon
+ // TODO: make a better exception catcher here
+ case e: Exception =>
+ logger.error(s"Parsing of '${maybeJsonFile.value}' as a JSon failed. (details: ${e.getMessage})")
+ None
+ }).filter(rdd => !rdd.schema.fieldNames.exists(_.startsWith("_corrupt"))).map { _ =>
+ JsonFileImpl(value = maybeJsonFile)
+ }
+ }
+
+ def apply(aPath: String, sqlContext: SQLContext): Option[JsonFile] = {
+ FileWrapper(aPath).map {
+ JsonFile(_, sqlContext).map { identity _ }
+ }.flatten
+ }
+
+ private case class JsonFileImpl(value: FileWrapper) extends JsonFile
+}
+
diff --git a/report-db/src/main/scala/mediative/util/wrapper/DistributedCSVFileName.scala b/report-db/src/main/scala/mediative/util/wrapper/DistributedCSVFileName.scala
index 00a2d52..df03283 100644
--- a/report-db/src/main/scala/mediative/util/wrapper/DistributedCSVFileName.scala
+++ b/report-db/src/main/scala/mediative/util/wrapper/DistributedCSVFileName.scala
@@ -1,9 +1,7 @@
package mediative.util.wrapper
-import mediative.util.wrapper.{ String => StringWrapper }
-
/**
* Simply labels a String as a distributed file name (eg, living in HDFS, DBFS, ...)
*/
-case class DistributedCSVFileName(value: Predef.String) extends StringWrapper
+case class DistributedCSVFileName(value: String) extends StringWrapper
diff --git a/report-db/src/test/scala/mediative/distributed/schema/design/MockExpectedTest.scala b/report-db/src/test/scala/mediative/distributed/schema/design/MockExpectedTest.scala
new file mode 100644
index 0000000..35c1cf7
--- /dev/null
+++ b/report-db/src/test/scala/mediative/distributed/schema/design/MockExpectedTest.scala
@@ -0,0 +1,123 @@
+package mediative.distributed.schema.design
+
+import java.io.File
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.SQLContext
+import org.scalatest.{ BeforeAndAfterAll, FreeSpec }
+import mediative.util.{ Base => MediativeUtil }
+import MediativeUtil._
+import mediative.distributed.util.json.JsonFile
+
+class MockExpectedTest extends FreeSpec with BeforeAndAfterAll {
+
+ var sc: SparkContext = _
+ val nonExistentPath = MediativeUtil.String.random(10) + ".json"
+
+ val pathToFileNotJson = "thisIsNotAJson.txt"
+ val pathToFileJsonTooBig = "pathToFileJsonTooBig.json"
+ val pathToFileMockExpectedJson = "pathToFileMockExpectedJson.json"
+ val pathToFileSubsetRequiredJson = "pathToFileSubsetJson.json"
+
+ private def writeOnFile(fileName: String, what: String) = {
+ import java.io._
+ using(new PrintWriter(new File(fileName))) {
+ writer => writer.write(what)
+ }
+
+ }
+ private def writeNeededFiles() = {
+ val jsonLessFieldsThanNeeded = """{"i":1,"s": "potatoe"}"""
+ val jsonMoreFieldsThanNeeded =
+ """{"name":"Yin","lastname":"Lee", "address":{"city":"Columbus","state":"Ohio"}}
+ |{"name":"Luis","address":{"city":"Montreal","state":"Quebec"}} """.stripMargin
+ val jsonFieldsExactlyNeeded =
+ """{"i":1, "s": "luis", "d": 2.0}
+ |{"i":11, "s": "luis1"}""".stripMargin
+
+ writeOnFile(pathToFileNotJson, "Hello Scala")
+ writeOnFile(pathToFileJsonTooBig, jsonMoreFieldsThanNeeded)
+ writeOnFile(pathToFileSubsetRequiredJson, jsonLessFieldsThanNeeded)
+ writeOnFile(pathToFileMockExpectedJson, jsonFieldsExactlyNeeded)
+ }
+
+ private def deleteNeededFiles() = {
+ (new File(pathToFileNotJson)).delete()
+ (new File(pathToFileJsonTooBig)).delete()
+ (new File(pathToFileSubsetRequiredJson)).delete()
+ (new File(pathToFileMockExpectedJson)).delete()
+ }
+
+ override def beforeAll() {
+ assert(!(new File(nonExistentPath)).exists())
+ writeNeededFiles()
+ sc = new SparkContext("local", "test1") // An existing spark context.
+ }
+
+ override def afterAll() {
+ sc.stop()
+ sc = null
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port")
+ deleteNeededFiles()
+ }
+
+ /*
+From https://issues.ypg.com/browse/MPN-854:
+
+Pipeline:
+ 1. The interaction is started with a String _s_ (ex: _/mnt/myfile.txt_)
+ 2. _s_ is expected to represent a Path (which is not always true/possible); we try to do such a conversion:
+{code}String => Option[FileWrapper]{code}
+ 3. The successful FileWrapper will be converted, on success, into a File containing a Json object
+{code}File => Option[JSonFile]{code}
+ 4. The JSon comes with metadata information (about the data it contains): _EmpiricalMetadata_
+ 5. We have expectations about what that metadata has to be: _ExpectedMetadata_
+ 6. The JSon contained in the JsonFile has to be transformed into element of type _ExpectedMetadata_, which can then be saved and operated upon.
+{code}JsonFile => DataContainer[ExpectedMetadata]{code}
+ */
+
+ "A MockExpected object M" - {
+ lazy val sqlContext = new SQLContext(sc)
+ import MockExpected._ // brings implicits into scope
+
+ // NB: MockExpected.readJSonFromFile implements pipeline: (4), (5), (6)
+ "can't be built from" - {
+ "a non-existent file" in {
+ assert(JsonFile(nonExistentPath, sqlContext).isEmpty)
+ }
+ "a file that does not contain a json" in {
+ assert(JsonFile(pathToFileNotJson, sqlContext).isEmpty)
+ }
+ "a file that contains a json with fields that are not in M" in {
+ val jsonFileOpt = JsonFile(pathToFileJsonTooBig, sqlContext)
+ assert(jsonFileOpt.isDefined)
+ val jsonFile = jsonFileOpt.get
+ assert(MockExpected.readJSonFromFile(jsonFile, sc, sqlContext).count() == 0)
+ }
+ }
+
+ "should be built from" - {
+ "a file that contains a json with the same schema as M" in {
+ val jsonFileOpt = JsonFile(pathToFileMockExpectedJson, sqlContext)
+ assert(jsonFileOpt.isDefined)
+ val jsonFile = jsonFileOpt.get
+ val r = MockExpected.readJSonFromFile(jsonFile, sc, sqlContext)
+ assert(r.count() > 0)
+ // just for feedback...
+ r.collect().foreach(m => info(m.toString))
+ }
+ "a file that contains a json with a schema whose required fields are a subset of M's" in {
+ val jsonFileOpt = JsonFile(pathToFileSubsetRequiredJson, sqlContext)
+ assert(jsonFileOpt.isDefined)
+ val jsonFile = jsonFileOpt.get
+ val r = MockExpected.readJSonFromFile(jsonFile, sc, sqlContext)
+ assert(r.count() > 0)
+ // just for feedback...
+ r.collect().foreach(m => info(m.toString))
+ }
+ }
+ }
+
+}
+
diff --git a/report-db/src/test/scala/mediative/distributed/util/BaseTest.scala b/report-db/src/test/scala/mediative/distributed/util/BaseTest.scala
index 8ec61fd..857adda 100644
--- a/report-db/src/test/scala/mediative/distributed/util/BaseTest.scala
+++ b/report-db/src/test/scala/mediative/distributed/util/BaseTest.scala
@@ -155,10 +155,11 @@ class BaseTest extends FreeSpec with BeforeAndAfterAll {
"remove non-empty directories" in {
val dirName = StringHelper.random(10)
+ val aRandomFileName = randomFileName
withClue(s"Impossible to create non-empty directory '${dirName}'") { assert(HDFS.getFileSystem.mkdirs(new Path(dirName))) }
- val srcFileName = s"${dirName}${File.separator}${randomFileName}"
+ val srcFileName = s"${dirName}${File.separator}${aRandomFileName}"
withClue(s"Impossible to create HDFS file ${srcFileName}") { assert(writeASampleFile(srcFileName)) }
- info(s"Populated directory '${dirName}' with file named '${dirName}' (full path is '${srcFileName}')")
+ info(s"Populated directory '${dirName}' with file named '${aRandomFileName}' (full path is '${srcFileName}')")
withClue(s"HDFS file '${srcFileName}' does not exist after creation ") { assert(HDFS.fileExists(srcFileName)) }
withClue(s"Impossible to remove directory '${dirName}'") { assert(HDFS.rm(dirName)) }
withClue(s"Directory '${dirName}' exists after 'rm'!!!!") { assert(!HDFS.fileExists(dirName)) }
diff --git a/report-db/src/test/scala/mediative/distributed/util/json/JsonFileTest.scala b/report-db/src/test/scala/mediative/distributed/util/json/JsonFileTest.scala
new file mode 100644
index 0000000..b92704d
--- /dev/null
+++ b/report-db/src/test/scala/mediative/distributed/util/json/JsonFileTest.scala
@@ -0,0 +1,105 @@
+package mediative.distributed.util.json
+
+import mediative.util.Base._
+import mediative.util.wrapper.FileWrapper
+import mediative.util.{ Base => MediativeUtil }
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.SQLContext
+import org.scalacheck.Gen
+import org.scalatest.prop.GeneratorDrivenPropertyChecks
+import org.scalatest.{ Matchers, BeforeAndAfterAll, FreeSpec }
+import java.io
+
+class JsonFileTest extends FreeSpec with BeforeAndAfterAll with Matchers with GeneratorDrivenPropertyChecks {
+
+ var sc: SparkContext = _
+ val nonExistentPath = MediativeUtil.String.random(10) + ".json"
+
+ val pathToFileNotJson = "thisIsNotAJson.txt"
+ val pathToFileJson = "pathToFileJson.json"
+
+ private def writeOnFile(fileName: String, what: String) = {
+ import java.io._
+ using(new PrintWriter(new io.File(fileName))) {
+ writer => writer.write(what)
+ }
+
+ }
+ private def writeNeededFiles() = {
+ val aJson =
+ """{"i":1, "s": "luis", "d": 2.0}
+ |{"i":11, "s": "luis1"}""".stripMargin
+
+ writeOnFile(pathToFileNotJson, "Hello Scala")
+ writeOnFile(pathToFileJson, aJson)
+ }
+
+ private def deleteNeededFiles() = {
+ (new io.File(pathToFileNotJson)).delete()
+ (new io.File(pathToFileJson)).delete()
+ }
+
+ override def beforeAll() {
+ assert(!(new io.File(nonExistentPath)).exists())
+ writeNeededFiles()
+ sc = new SparkContext("local", "test1") // An existing spark context.
+ }
+
+ override def afterAll() {
+ sc.stop()
+ sc = null
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port")
+ deleteNeededFiles()
+ }
+
+ "A Json File" - {
+ lazy val sqlContext = new SQLContext(sc)
+
+ "can't be built from" - {
+ "an invalid FileWrapper" in {
+ forAll(Gen.alphaStr) { aStr =>
+ whenever(!(new io.File(aStr)).exists()) {
+ assert(FileWrapper(aStr).isEmpty)
+ // and since I don't have a FileWrapper, I can't create the JSonFile.
+ // So this test is trivially successful
+ }
+ }
+ }
+ "a String denoting an non-existent file" in {
+ forAll(Gen.alphaStr) { aStr =>
+ whenever(!(new io.File(aStr)).exists()) {
+ assert(JsonFile(aStr, sqlContext).isEmpty)
+ }
+ }
+ }
+
+ "a FileWrapper that does not contain a json" in {
+ FileWrapper(pathToFileNotJson).map { aFile =>
+ JsonFile(aFile, sqlContext).map {
+ jsonFile => assert(false)
+ }.getOrElse(assert(true))
+ }.getOrElse(withClue(s"Impossible to build File from ${pathToFileNotJson}") { assert(false) })
+ }
+
+ "a Path (as a String) pointing to a file that does not contain a json" in {
+ assert(JsonFile(pathToFileNotJson, sqlContext).isEmpty)
+ }
+ }
+
+ "should be built from" - {
+ "a FileWrapper that contains a json" in {
+ FileWrapper(pathToFileJson).map { aFile =>
+ JsonFile(aFile, sqlContext).map {
+ jsonFile => assert(true)
+ }.getOrElse(assert(false))
+ }.getOrElse(withClue(s"Impossible to build File from ${pathToFileJson}") { assert(false) })
+ }
+ "a Path (as a String) pointing to a file that contains a json" in {
+ assert(JsonFile(pathToFileJson, sqlContext).isDefined)
+ }
+ }
+ }
+
+}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment