Created
April 2, 2015 14:45
-
-
Save ldacosta/90aee2b9315b001a2c63 to your computer and use it in GitHub Desktop.
Diff
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/report-core/src/main/scala/mediative/util/wrapper/FileWrapper.scala b/report-core/src/main/scala/mediative/util/wrapper/FileWrapper.scala | |
new file mode 100644 | |
index 0000000..acbdfd8 | |
--- /dev/null | |
+++ b/report-core/src/main/scala/mediative/util/wrapper/FileWrapper.scala | |
@@ -0,0 +1,42 @@ | |
+package mediative.util.wrapper | |
+ | |
+import java.io | |
+ | |
+import mediative.util.Logging | |
+ | |
+/** | |
+ * Simply labels a Path as a File (as opposed to a Directory). | |
+ */ | |
+trait FileWrapper { | |
+ def value: String | |
+} | |
+ | |
+object FileWrapper extends Logging { | |
+ /** | |
+ * Wraps a file name into a FileWrapper. This will succeed if | |
+ * * file name does not point to a directory | |
+ * * file name points to an existing file | |
+ * * the file pointed at is readable | |
+ * @param maybeFile | |
+ * @return | |
+ */ | |
+ def apply(maybeFile: String): Option[FileWrapper] = { | |
+ | |
+ // if security manager is not present we can't check for attributes of the file, | |
+ // specifically for its readability. So we fail in that case: | |
+ // (cf. https://docs.oracle.com/javase/tutorial/essential/environment/security.html) | |
+ if (System.getSecurityManager == null) { | |
+ logger.error("Security Manager not present. File wrapping will fail in all cases.") | |
+ None | |
+ } else { | |
+ try { | |
+ if (new io.File(maybeFile).isFile) Some(FileWrapperImpl(value = maybeFile)) | |
+ else None | |
+ } catch { | |
+ case e: SecurityException => None // in case java.io.File attributes can't be accessed | |
+ } | |
+ } | |
+ } | |
+ private case class FileWrapperImpl(value: String) extends FileWrapper | |
+} | |
+ | |
diff --git a/report-core/src/main/scala/mediative/util/wrapper/String.scala b/report-core/src/main/scala/mediative/util/wrapper/String.scala | |
deleted file mode 100644 | |
index 59d6404..0000000 | |
--- a/report-core/src/main/scala/mediative/util/wrapper/String.scala | |
+++ /dev/null | |
@@ -1,10 +0,0 @@ | |
-package mediative.util.wrapper | |
- | |
-trait String extends Wrapper[Predef.String] { | |
- def value: Predef.String | |
-} | |
- | |
-object String { | |
- // if I have a Wrapper[String] I want to be able to implicitly extract the value | |
- implicit def wrapper2String: String => Predef.String = ws => ws.value | |
-} | |
diff --git a/report-core/src/main/scala/mediative/util/wrapper/StringWrapper.scala b/report-core/src/main/scala/mediative/util/wrapper/StringWrapper.scala | |
new file mode 100644 | |
index 0000000..eab3adb | |
--- /dev/null | |
+++ b/report-core/src/main/scala/mediative/util/wrapper/StringWrapper.scala | |
@@ -0,0 +1,10 @@ | |
+package mediative.util.wrapper | |
+ | |
+trait StringWrapper extends Wrapper[String] { | |
+ def value: String | |
+} | |
+ | |
+object StringWrapper { | |
+ // if I have a Wrapper[String] I want to be able to implicitly extract the value | |
+ implicit def wrapper2String(ws: StringWrapper): String = ws.value | |
+} | |
diff --git a/report-core/src/test/scala/mediative/util/wrapper/FileWrapperTest.scala b/report-core/src/test/scala/mediative/util/wrapper/FileWrapperTest.scala | |
new file mode 100644 | |
index 0000000..a19e0a2 | |
--- /dev/null | |
+++ b/report-core/src/test/scala/mediative/util/wrapper/FileWrapperTest.scala | |
@@ -0,0 +1,73 @@ | |
+package mediative.util.wrapper | |
+ | |
+import org.scalacheck.Gen | |
+import org.scalatest.prop.GeneratorDrivenPropertyChecks | |
+import org.scalatest.{ Matchers, BeforeAndAfterAll, FreeSpec } | |
+import java.io | |
+ | |
+class FileWrapperTest extends FreeSpec with BeforeAndAfterAll with Matchers with GeneratorDrivenPropertyChecks { | |
+ | |
+ override def beforeAll() { | |
+ | |
+ } | |
+ | |
+ override def afterAll(): Unit = { | |
+ | |
+ } | |
+ | |
+ "A File should" - { | |
+ "be recognized when the wrapped String is" - { | |
+ "an existing file which we have access to" in { | |
+ val currDir = new io.File(".") | |
+ withClue(s"Current directory is not such a thing??") { assert(currDir.isDirectory) } | |
+ assert(currDir.listFiles().filter(_.isFile).forall(f => FileWrapper(f.getAbsolutePath).isDefined)) | |
+ } | |
+ } | |
+ "be rejected when the wrapped String" - { | |
+ "is empty" in { | |
+ assert(FileWrapper("").isEmpty) | |
+ } | |
+ "represents a non-existent file, when" - { | |
+ "no directory is specified" in { | |
+ forAll(Gen.alphaStr) { aStr => | |
+ val fileName = aStr + ".txt" | |
+ whenever(!(new io.File(fileName).exists())) { | |
+ assert(FileWrapper(fileName).isEmpty) | |
+ } | |
+ } | |
+ } | |
+ "the current directory is specified" in { | |
+ forAll(Gen.alphaStr) { aStr => | |
+ val fileName = s"./${aStr}.txt" | |
+ whenever(!(new io.File(fileName).exists())) { | |
+ assert(FileWrapper(fileName).isEmpty) | |
+ } | |
+ } | |
+ } | |
+ "a random directory is specified" in { | |
+ forAll(Gen.alphaStr, Gen.alphaStr) { (aDir, aStr) => | |
+ val fileName = s"${aDir}${io.File.separator}${aStr}.txt" | |
+ whenever(!(new io.File(fileName).exists())) { | |
+ assert(FileWrapper(fileName).isEmpty) | |
+ } | |
+ } | |
+ } | |
+ } | |
+ "represent a directory, eg" - { | |
+ "the current one" in { | |
+ assert(FileWrapper(".").isEmpty) | |
+ } | |
+ "a non-existing directory (but with a directory name)" in { | |
+ val aDir = s"${io.File.separator}mnt${io.File.separator}" | |
+ withClue(s"${aDir} seems to be a File") { assert(FileWrapper(aDir).isEmpty) } | |
+ } | |
+ "an existing directory" in { | |
+ val currDir = new io.File(".") | |
+ withClue(s"Current directory is not such a thing??") { assert(currDir.isDirectory) } | |
+ assert(currDir.listFiles().filter(_.isDirectory).forall(f => FileWrapper(f.getAbsolutePath).isEmpty)) | |
+ } | |
+ } | |
+ } | |
+ } | |
+ | |
+} | |
diff --git a/report-db/src/main/scala/mediative/distributed/schema/design/MockExpected.scala b/report-db/src/main/scala/mediative/distributed/schema/design/MockExpected.scala | |
new file mode 100644 | |
index 0000000..cbafd06 | |
--- /dev/null | |
+++ b/report-db/src/main/scala/mediative/distributed/schema/design/MockExpected.scala | |
@@ -0,0 +1,73 @@ | |
+package mediative.distributed.schema.design | |
+ | |
+import mediative.distributed.util.RDDable | |
+import mediative.util.Logging | |
+import org.apache.spark.SparkContext | |
+import org.apache.spark.rdd.RDD | |
+import org.apache.spark.sql._ | |
+import org.apache.spark.sql.catalyst.errors.TreeNodeException | |
+import scala.util.control.Exception._ | |
+import mediative.distributed.util.json.JsonFile | |
+ | |
+case class MockExpected(i: Int, s: String, d: Option[Double]) | |
+ | |
+object MockExpected extends Serializable with Logging { | |
+ | |
+ val expectedSchema: StructType = { | |
+ StructType(Seq( | |
+ StructField(name = "i", dataType = IntegerType, nullable = false), | |
+ StructField(name = "s", dataType = StringType, nullable = false), | |
+ StructField(name = "d", dataType = DoubleType, nullable = true))) | |
+ | |
+ } | |
+ | |
+ implicit object MockExpectedRDDable extends RDDable[JsonFile, MockExpected] { | |
+ def toRDD(aJsonFile: JsonFile, sc: SparkContext, sqlContext: SQLContext): RDD[MockExpected] = { | |
+ import sqlContext._ // as always, brings implicits into scope | |
+ | |
+ { | |
+ val schemaRDD = aJsonFile.toSchemaRDD(sqlContext) | |
+ val diff = schemaRDD.schema.fieldNames.toSet -- expectedSchema.fieldNames.toSet | |
+ if (!diff.isEmpty) { | |
+ logger.info(s"There are extra fields: ${diff.toString()}") | |
+ None | |
+ } else { | |
+ // assumption: if an optional field NEVER appears we assume it is missing | |
+ // (as opposed to empty all the time, which could also be reasonable) | |
+ // NB: from a practical point of view, if a field is not present in the data, | |
+ // then the empirical schema will not contain it, which screws all later queries | |
+ val newSchemaRDD = | |
+ catching(classOf[Exception]).opt { schemaRDD.select(Symbol("d")).count() } match { | |
+ case None => | |
+ logger.info(s"Field 'd' never appears in dataset") | |
+ schemaRDD.select('i, 's) | |
+ case Some(_) => | |
+ schemaRDD.select('i, 's, 'd) | |
+ } | |
+ try { | |
+ newSchemaRDD.count() // this will throw an Exception if fields not there | |
+ Some( | |
+ newSchemaRDD.map { | |
+ case Row(i: Int, s: String, d: Double) => MockExpected(i, s, Some(d)) | |
+ case Row(i: Int, s: String, _) => MockExpected(i, s, None) // d is Null | |
+ case Row(i: Int, s: String) => MockExpected(i, s, None) // d was never there | |
+ }) | |
+ } catch { | |
+ case e: TreeNodeException[_] => | |
+ // ends up here because the 'select' is asking for fields that do not exist | |
+ logger.info(s"'select' failed: ${e.getMessage}") | |
+ None | |
+ } | |
+ } | |
+ }.getOrElse(sc.emptyRDD[MockExpected]) | |
+ } | |
+ } | |
ldacost1@shaddam [~/dev/mpn/databricks-trial] (on git::develop) $ git diff develop origin/feature/MPN-854-design-of-validation-strategy | |
diff --git a/report-core/src/main/scala/mediative/util/wrapper/FileWrapper.scala b/report-core/src/main/scala/mediative/util/wrapper/FileWrapper.scala | |
new file mode 100644 | |
index 0000000..acbdfd8 | |
--- /dev/null | |
+++ b/report-core/src/main/scala/mediative/util/wrapper/FileWrapper.scala | |
@@ -0,0 +1,42 @@ | |
+package mediative.util.wrapper | |
+ | |
+import java.io | |
+ | |
+import mediative.util.Logging | |
+ | |
+/** | |
+ * Simply labels a Path as a File (as opposed to a Directory). | |
+ */ | |
+trait FileWrapper { | |
+ def value: String | |
+} | |
+ | |
+object FileWrapper extends Logging { | |
+ /** | |
+ * Wraps a file name into a FileWrapper. This will succeed if | |
+ * * file name does not point to a directory | |
+ * * file name points to an existing file | |
+ * * the file pointed at is readable | |
+ * @param maybeFile | |
+ * @return | |
+ */ | |
+ def apply(maybeFile: String): Option[FileWrapper] = { | |
+ | |
+ // if security manager is not present we can't check for attributes of the file, | |
+ // specifically for its readability. So we fail in that case: | |
+ // (cf. https://docs.oracle.com/javase/tutorial/essential/environment/security.html) | |
+ if (System.getSecurityManager == null) { | |
+ logger.error("Security Manager not present. File wrapping will fail in all cases.") | |
+ None | |
+ } else { | |
+ try { | |
+ if (new io.File(maybeFile).isFile) Some(FileWrapperImpl(value = maybeFile)) | |
+ else None | |
+ } catch { | |
+ case e: SecurityException => None // in case java.io.File attributes can't be accessed | |
+ } | |
+ } | |
+ } | |
+ private case class FileWrapperImpl(value: String) extends FileWrapper | |
+} | |
+ | |
diff --git a/report-core/src/main/scala/mediative/util/wrapper/String.scala b/report-core/src/main/scala/mediative/util/wrapper/String.scala | |
deleted file mode 100644 | |
index 59d6404..0000000 | |
--- a/report-core/src/main/scala/mediative/util/wrapper/String.scala | |
+++ /dev/null | |
@@ -1,10 +0,0 @@ | |
-package mediative.util.wrapper | |
- | |
-trait String extends Wrapper[Predef.String] { | |
- def value: Predef.String | |
-} | |
- | |
-object String { | |
- // if I have a Wrapper[String] I want to be able to implicitly extract the value | |
- implicit def wrapper2String: String => Predef.String = ws => ws.value | |
-} | |
diff --git a/report-core/src/main/scala/mediative/util/wrapper/StringWrapper.scala b/report-core/src/main/scala/mediative/util/wrapper/StringWrapper.scala | |
new file mode 100644 | |
index 0000000..eab3adb | |
--- /dev/null | |
+++ b/report-core/src/main/scala/mediative/util/wrapper/StringWrapper.scala | |
@@ -0,0 +1,10 @@ | |
+package mediative.util.wrapper | |
+ | |
+trait StringWrapper extends Wrapper[String] { | |
+ def value: String | |
+} | |
+ | |
+object StringWrapper { | |
+ // if I have a Wrapper[String] I want to be able to implicitly extract the value | |
+ implicit def wrapper2String(ws: StringWrapper): String = ws.value | |
+} | |
diff --git a/report-core/src/test/scala/mediative/util/wrapper/FileWrapperTest.scala b/report-core/src/test/scala/mediative/util/wrapper/FileWrapperTest.scala | |
new file mode 100644 | |
index 0000000..a19e0a2 | |
--- /dev/null | |
+++ b/report-core/src/test/scala/mediative/util/wrapper/FileWrapperTest.scala | |
@@ -0,0 +1,73 @@ | |
+package mediative.util.wrapper | |
+ | |
+import org.scalacheck.Gen | |
+import org.scalatest.prop.GeneratorDrivenPropertyChecks | |
+import org.scalatest.{ Matchers, BeforeAndAfterAll, FreeSpec } | |
+import java.io | |
+ | |
+class FileWrapperTest extends FreeSpec with BeforeAndAfterAll with Matchers with GeneratorDrivenPropertyChecks { | |
+ | |
+ override def beforeAll() { | |
+ | |
+ } | |
+ | |
+ override def afterAll(): Unit = { | |
+ | |
+ } | |
+ | |
+ "A File should" - { | |
+ "be recognized when the wrapped String is" - { | |
+ "an existing file which we have access to" in { | |
+ val currDir = new io.File(".") | |
+ withClue(s"Current directory is not such a thing??") { assert(currDir.isDirectory) } | |
+ assert(currDir.listFiles().filter(_.isFile).forall(f => FileWrapper(f.getAbsolutePath).isDefined)) | |
+ } | |
+ } | |
+ "be rejected when the wrapped String" - { | |
+ "is empty" in { | |
+ assert(FileWrapper("").isEmpty) | |
+ } | |
+ "represents a non-existent file, when" - { | |
+ "no directory is specified" in { | |
+ forAll(Gen.alphaStr) { aStr => | |
+ val fileName = aStr + ".txt" | |
+ whenever(!(new io.File(fileName).exists())) { | |
+ assert(FileWrapper(fileName).isEmpty) | |
+ } | |
+ } | |
+ } | |
+ "the current directory is specified" in { | |
+ forAll(Gen.alphaStr) { aStr => | |
+ val fileName = s"./${aStr}.txt" | |
+ whenever(!(new io.File(fileName).exists())) { | |
+ assert(FileWrapper(fileName).isEmpty) | |
+ } | |
+ } | |
+ } | |
+ "a random directory is specified" in { | |
+ forAll(Gen.alphaStr, Gen.alphaStr) { (aDir, aStr) => | |
+ val fileName = s"${aDir}${io.File.separator}${aStr}.txt" | |
+ whenever(!(new io.File(fileName).exists())) { | |
+ assert(FileWrapper(fileName).isEmpty) | |
+ } | |
+ } | |
+ } | |
+ } | |
+ "represent a directory, eg" - { | |
+ "the current one" in { | |
+ assert(FileWrapper(".").isEmpty) | |
+ } | |
+ "a non-existing directory (but with a directory name)" in { | |
+ val aDir = s"${io.File.separator}mnt${io.File.separator}" | |
+ withClue(s"${aDir} seems to be a File") { assert(FileWrapper(aDir).isEmpty) } | |
+ } | |
+ "an existing directory" in { | |
+ val currDir = new io.File(".") | |
+ withClue(s"Current directory is not such a thing??") { assert(currDir.isDirectory) } | |
+ assert(currDir.listFiles().filter(_.isDirectory).forall(f => FileWrapper(f.getAbsolutePath).isEmpty)) | |
+ } | |
+ } | |
+ } | |
+ } | |
+ | |
+} | |
diff --git a/report-db/src/main/scala/mediative/distributed/schema/design/MockExpected.scala b/report-db/src/main/scala/mediative/distributed/schema/design/MockExpected.scala | |
new file mode 100644 | |
index 0000000..cbafd06 | |
--- /dev/null | |
+++ b/report-db/src/main/scala/mediative/distributed/schema/design/MockExpected.scala | |
@@ -0,0 +1,73 @@ | |
+package mediative.distributed.schema.design | |
+ | |
+import mediative.distributed.util.RDDable | |
+import mediative.util.Logging | |
+import org.apache.spark.SparkContext | |
+import org.apache.spark.rdd.RDD | |
+import org.apache.spark.sql._ | |
+import org.apache.spark.sql.catalyst.errors.TreeNodeException | |
+import scala.util.control.Exception._ | |
+import mediative.distributed.util.json.JsonFile | |
+ | |
+case class MockExpected(i: Int, s: String, d: Option[Double]) | |
+ | |
+object MockExpected extends Serializable with Logging { | |
+ | |
+ val expectedSchema: StructType = { | |
+ StructType(Seq( | |
+ StructField(name = "i", dataType = IntegerType, nullable = false), | |
+ StructField(name = "s", dataType = StringType, nullable = false), | |
+ StructField(name = "d", dataType = DoubleType, nullable = true))) | |
+ | |
+ } | |
+ | |
+ implicit object MockExpectedRDDable extends RDDable[JsonFile, MockExpected] { | |
+ def toRDD(aJsonFile: JsonFile, sc: SparkContext, sqlContext: SQLContext): RDD[MockExpected] = { | |
+ import sqlContext._ // as always, brings implicits into scope | |
+ | |
+ { | |
+ val schemaRDD = aJsonFile.toSchemaRDD(sqlContext) | |
+ val diff = schemaRDD.schema.fieldNames.toSet -- expectedSchema.fieldNames.toSet | |
+ if (!diff.isEmpty) { | |
+ logger.info(s"There are extra fields: ${diff.toString()}") | |
+ None | |
+ } else { | |
+ // assumption: if an optional field NEVER appears we assume it is missing | |
+ // (as opposed to empty all the time, which could also be reasonable) | |
+ // NB: from a practical point of view, if a field is not present in the data, | |
+ // then the empirical schema will not contain it, which screws all later queries | |
+ val newSchemaRDD = | |
+ catching(classOf[Exception]).opt { schemaRDD.select(Symbol("d")).count() } match { | |
+ case None => | |
+ logger.info(s"Field 'd' never appears in dataset") | |
+ schemaRDD.select('i, 's) | |
+ case Some(_) => | |
+ schemaRDD.select('i, 's, 'd) | |
+ } | |
+ try { | |
+ newSchemaRDD.count() // this will throw an Exception if fields not there | |
+ Some( | |
+ newSchemaRDD.map { | |
+ case Row(i: Int, s: String, d: Double) => MockExpected(i, s, Some(d)) | |
+ case Row(i: Int, s: String, _) => MockExpected(i, s, None) // d is Null | |
+ case Row(i: Int, s: String) => MockExpected(i, s, None) // d was never there | |
+ }) | |
+ } catch { | |
+ case e: TreeNodeException[_] => | |
+ // ends up here because the 'select' is asking for fields that do not exist | |
+ logger.info(s"'select' failed: ${e.getMessage}") | |
+ None | |
+ } | |
+ } | |
+ }.getOrElse(sc.emptyRDD[MockExpected]) | |
+ } | |
+ } | |
+ | |
+ def readJSonFromFile(jsonFile: JsonFile, | |
+ sc: SparkContext, | |
+ sqlContext: SQLContext)(implicit ops: RDDable[JsonFile, MockExpected]): RDD[MockExpected] = { | |
+ | |
+ ops.toRDD(jsonFile, sc, sqlContext) | |
+ } | |
+ | |
+} | |
diff --git a/report-db/src/main/scala/mediative/distributed/util/RDDable.scala b/report-db/src/main/scala/mediative/distributed/util/RDDable.scala | |
new file mode 100644 | |
index 0000000..27297be | |
--- /dev/null | |
+++ b/report-db/src/main/scala/mediative/distributed/util/RDDable.scala | |
@@ -0,0 +1,9 @@ | |
+package mediative.distributed.util | |
+ | |
+import org.apache.spark.SparkContext | |
+import org.apache.spark.rdd.RDD | |
+import org.apache.spark.sql.SQLContext | |
+ | |
+trait RDDable[I, O] { | |
+ def toRDD(input: I, sc: SparkContext, sqlContext: SQLContext): RDD[O] | |
+} | |
diff --git a/report-db/src/main/scala/mediative/distributed/util/json/JsonFile.scala b/report-db/src/main/scala/mediative/distributed/util/json/JsonFile.scala | |
new file mode 100644 | |
index 0000000..1964479 | |
--- /dev/null | |
+++ b/report-db/src/main/scala/mediative/distributed/util/json/JsonFile.scala | |
@@ -0,0 +1,49 @@ | |
+package mediative.distributed.util.json | |
+ | |
+import mediative.util.Logging | |
+import mediative.util.wrapper.FileWrapper | |
+import org.apache.spark.sql.{ SchemaRDD, SQLContext } | |
+ | |
+trait JsonFile { | |
+ def value: FileWrapper | |
+ | |
+ /** | |
+ * Translates a JSon File into a SchemaRDD | |
+ * @note this should never fail, since the only way to construct this type is through the smart constructor | |
+ * @param sqlContext | |
+ * @return | |
+ */ | |
+ def toSchemaRDD(sqlContext: SQLContext): SchemaRDD = | |
+ sqlContext.jsonFile(value.value) | |
+ | |
+} | |
+ | |
+object JsonFile extends Logging { | |
+ /* a JsonFile contains a json if I can read it as such and there is no corrupt fields in it */ | |
+ def apply(maybeJsonFile: FileWrapper, sqlContext: SQLContext): Option[JsonFile] = { | |
+ | |
+ (try { | |
+ Some(sqlContext.jsonFile(maybeJsonFile.value)) | |
+ } catch { | |
+ // NB: I would like to get more specific but there is no information | |
+ // about exact Exception thrown and when (in 1.2.0) | |
+ // cf, // https://spark.apache.org/docs/1.2.0/api/scala/index.html#org.apache.spark.sql.SQLContext | |
+ // We have only seen that an Exception is thrown if the content of the file is not a JSon | |
+ // TODO: make a better exception catcher here | |
+ case e: Exception => | |
+ logger.error(s"Parsing of '${maybeJsonFile.value}' as a JSon failed. (details: ${e.getMessage})") | |
+ None | |
+ }).filter(rdd => !rdd.schema.fieldNames.exists(_.startsWith("_corrupt"))).map { _ => | |
+ JsonFileImpl(value = maybeJsonFile) | |
+ } | |
+ } | |
+ | |
+ def apply(aPath: String, sqlContext: SQLContext): Option[JsonFile] = { | |
+ FileWrapper(aPath).map { | |
+ JsonFile(_, sqlContext).map { identity _ } | |
+ }.flatten | |
+ } | |
+ | |
+ private case class JsonFileImpl(value: FileWrapper) extends JsonFile | |
+} | |
+ | |
diff --git a/report-db/src/main/scala/mediative/util/wrapper/DistributedCSVFileName.scala b/report-db/src/main/scala/mediative/util/wrapper/DistributedCSVFileName.scala | |
index 00a2d52..df03283 100644 | |
--- a/report-db/src/main/scala/mediative/util/wrapper/DistributedCSVFileName.scala | |
+++ b/report-db/src/main/scala/mediative/util/wrapper/DistributedCSVFileName.scala | |
@@ -1,9 +1,7 @@ | |
package mediative.util.wrapper | |
-import mediative.util.wrapper.{ String => StringWrapper } | |
- | |
/** | |
* Simply labels a String as a distributed file name (eg, living in HDFS, DBFS, ...) | |
*/ | |
-case class DistributedCSVFileName(value: Predef.String) extends StringWrapper | |
+case class DistributedCSVFileName(value: String) extends StringWrapper | |
diff --git a/report-db/src/test/scala/mediative/distributed/schema/design/MockExpectedTest.scala b/report-db/src/test/scala/mediative/distributed/schema/design/MockExpectedTest.scala | |
new file mode 100644 | |
index 0000000..35c1cf7 | |
--- /dev/null | |
+++ b/report-db/src/test/scala/mediative/distributed/schema/design/MockExpectedTest.scala | |
@@ -0,0 +1,123 @@ | |
+package mediative.distributed.schema.design | |
+ | |
+import java.io.File | |
+ | |
+import org.apache.spark.SparkContext | |
+import org.apache.spark.sql.SQLContext | |
+import org.scalatest.{ BeforeAndAfterAll, FreeSpec } | |
+import mediative.util.{ Base => MediativeUtil } | |
+import MediativeUtil._ | |
+import mediative.distributed.util.json.JsonFile | |
+ | |
+class MockExpectedTest extends FreeSpec with BeforeAndAfterAll { | |
+ | |
+ var sc: SparkContext = _ | |
+ val nonExistentPath = MediativeUtil.String.random(10) + ".json" | |
+ | |
+ val pathToFileNotJson = "thisIsNotAJson.txt" | |
+ val pathToFileJsonTooBig = "pathToFileJsonTooBig.json" | |
+ val pathToFileMockExpectedJson = "pathToFileMockExpectedJson.json" | |
+ val pathToFileSubsetRequiredJson = "pathToFileSubsetJson.json" | |
+ | |
+ private def writeOnFile(fileName: String, what: String) = { | |
+ import java.io._ | |
+ using(new PrintWriter(new File(fileName))) { | |
+ writer => writer.write(what) | |
+ } | |
+ | |
+ } | |
+ private def writeNeededFiles() = { | |
+ val jsonLessFieldsThanNeeded = """{"i":1,"s": "potatoe"}""" | |
+ val jsonMoreFieldsThanNeeded = | |
+ """{"name":"Yin","lastname":"Lee", "address":{"city":"Columbus","state":"Ohio"}} | |
+ |{"name":"Luis","address":{"city":"Montreal","state":"Quebec"}} """.stripMargin | |
+ val jsonFieldsExactlyNeeded = | |
+ """{"i":1, "s": "luis", "d": 2.0} | |
+ |{"i":11, "s": "luis1"}""".stripMargin | |
+ | |
+ writeOnFile(pathToFileNotJson, "Hello Scala") | |
+ writeOnFile(pathToFileJsonTooBig, jsonMoreFieldsThanNeeded) | |
+ writeOnFile(pathToFileSubsetRequiredJson, jsonLessFieldsThanNeeded) | |
+ writeOnFile(pathToFileMockExpectedJson, jsonFieldsExactlyNeeded) | |
+ } | |
+ | |
+ private def deleteNeededFiles() = { | |
+ (new File(pathToFileNotJson)).delete() | |
+ (new File(pathToFileJsonTooBig)).delete() | |
+ (new File(pathToFileSubsetRequiredJson)).delete() | |
+ (new File(pathToFileMockExpectedJson)).delete() | |
+ } | |
+ | |
+ override def beforeAll() { | |
+ assert(!(new File(nonExistentPath)).exists()) | |
+ writeNeededFiles() | |
+ sc = new SparkContext("local", "test1") // An existing spark context. | |
+ } | |
+ | |
+ override def afterAll() { | |
+ sc.stop() | |
+ sc = null | |
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown | |
+ System.clearProperty("spark.master.port") | |
+ deleteNeededFiles() | |
+ } | |
+ | |
+ /* | |
+From https://issues.ypg.com/browse/MPN-854: | |
+ | |
+Pipeline: | |
+ 1. The interaction is started with a String _s_ (ex: _/mnt/myfile.txt_) | |
+ 2. _s_ is expected to represent a Path (which is not always true/possible); we try to do such a conversion: | |
+{code}String => Option[FileWrapper]{code} | |
+ 3. The successful FileWrapper will be converted, on success, into a File containing a Json object | |
+{code}File => Option[JSonFile]{code} | |
+ 4. The JSon comes with metadata information (about the data it contains): _EmpiricalMetadata_ | |
+ 5. We have expectations about what that metadata has to be: _ExpectedMetadata_ | |
+ 6. The JSon contained in the JsonFile has to be transformed into element of type _ExpectedMetadata_, which can then be saved and operated upon. | |
+{code}JsonFile => DataContainer[ExpectedMetadata]{code} | |
+ */ | |
+ | |
+ "A MockExpected object M" - { | |
+ lazy val sqlContext = new SQLContext(sc) | |
+ import MockExpected._ // brings implicits into scope | |
+ | |
+ // NB: MockExpected.readJSonFromFile implements pipeline: (4), (5), (6) | |
+ "can't be built from" - { | |
+ "a non-existent file" in { | |
+ assert(JsonFile(nonExistentPath, sqlContext).isEmpty) | |
+ } | |
+ "a file that does not contain a json" in { | |
+ assert(JsonFile(pathToFileNotJson, sqlContext).isEmpty) | |
+ } | |
+ "a file that contains a json with fields that are not in M" in { | |
+ val jsonFileOpt = JsonFile(pathToFileJsonTooBig, sqlContext) | |
+ assert(jsonFileOpt.isDefined) | |
+ val jsonFile = jsonFileOpt.get | |
+ assert(MockExpected.readJSonFromFile(jsonFile, sc, sqlContext).count() == 0) | |
+ } | |
+ } | |
+ | |
+ "should be built from" - { | |
+ "a file that contains a json with the same schema as M" in { | |
+ val jsonFileOpt = JsonFile(pathToFileMockExpectedJson, sqlContext) | |
+ assert(jsonFileOpt.isDefined) | |
+ val jsonFile = jsonFileOpt.get | |
+ val r = MockExpected.readJSonFromFile(jsonFile, sc, sqlContext) | |
+ assert(r.count() > 0) | |
+ // just for feedback... | |
+ r.collect().foreach(m => info(m.toString)) | |
+ } | |
+ "a file that contains a json with a schema whose required fields are a subset of M's" in { | |
+ val jsonFileOpt = JsonFile(pathToFileSubsetRequiredJson, sqlContext) | |
+ assert(jsonFileOpt.isDefined) | |
+ val jsonFile = jsonFileOpt.get | |
+ val r = MockExpected.readJSonFromFile(jsonFile, sc, sqlContext) | |
+ assert(r.count() > 0) | |
+ // just for feedback... | |
+ r.collect().foreach(m => info(m.toString)) | |
+ } | |
+ } | |
+ } | |
+ | |
+} | |
+ | |
diff --git a/report-db/src/test/scala/mediative/distributed/util/BaseTest.scala b/report-db/src/test/scala/mediative/distributed/util/BaseTest.scala | |
index 8ec61fd..857adda 100644 | |
--- a/report-db/src/test/scala/mediative/distributed/util/BaseTest.scala | |
+++ b/report-db/src/test/scala/mediative/distributed/util/BaseTest.scala | |
@@ -155,10 +155,11 @@ class BaseTest extends FreeSpec with BeforeAndAfterAll { | |
"remove non-empty directories" in { | |
val dirName = StringHelper.random(10) | |
+ val aRandomFileName = randomFileName | |
withClue(s"Impossible to create non-empty directory '${dirName}'") { assert(HDFS.getFileSystem.mkdirs(new Path(dirName))) } | |
- val srcFileName = s"${dirName}${File.separator}${randomFileName}" | |
+ val srcFileName = s"${dirName}${File.separator}${aRandomFileName}" | |
withClue(s"Impossible to create HDFS file ${srcFileName}") { assert(writeASampleFile(srcFileName)) } | |
- info(s"Populated directory '${dirName}' with file named '${dirName}' (full path is '${srcFileName}')") | |
+ info(s"Populated directory '${dirName}' with file named '${aRandomFileName}' (full path is '${srcFileName}')") | |
withClue(s"HDFS file '${srcFileName}' does not exist after creation ") { assert(HDFS.fileExists(srcFileName)) } | |
withClue(s"Impossible to remove directory '${dirName}'") { assert(HDFS.rm(dirName)) } | |
withClue(s"Directory '${dirName}' exists after 'rm'!!!!") { assert(!HDFS.fileExists(dirName)) } | |
diff --git a/report-db/src/test/scala/mediative/distributed/util/json/JsonFileTest.scala b/report-db/src/test/scala/mediative/distributed/util/json/JsonFileTest.scala | |
new file mode 100644 | |
index 0000000..b92704d | |
--- /dev/null | |
+++ b/report-db/src/test/scala/mediative/distributed/util/json/JsonFileTest.scala | |
@@ -0,0 +1,105 @@ | |
+package mediative.distributed.util.json | |
+ | |
+import mediative.util.Base._ | |
+import mediative.util.wrapper.FileWrapper | |
+import mediative.util.{ Base => MediativeUtil } | |
+import org.apache.spark.SparkContext | |
+import org.apache.spark.sql.SQLContext | |
+import org.scalacheck.Gen | |
+import org.scalatest.prop.GeneratorDrivenPropertyChecks | |
+import org.scalatest.{ Matchers, BeforeAndAfterAll, FreeSpec } | |
+import java.io | |
+ | |
+class JsonFileTest extends FreeSpec with BeforeAndAfterAll with Matchers with GeneratorDrivenPropertyChecks { | |
+ | |
+ var sc: SparkContext = _ | |
+ val nonExistentPath = MediativeUtil.String.random(10) + ".json" | |
+ | |
+ val pathToFileNotJson = "thisIsNotAJson.txt" | |
+ val pathToFileJson = "pathToFileJson.json" | |
+ | |
+ private def writeOnFile(fileName: String, what: String) = { | |
+ import java.io._ | |
+ using(new PrintWriter(new io.File(fileName))) { | |
+ writer => writer.write(what) | |
+ } | |
+ | |
+ } | |
+ private def writeNeededFiles() = { | |
+ val aJson = | |
+ """{"i":1, "s": "luis", "d": 2.0} | |
+ |{"i":11, "s": "luis1"}""".stripMargin | |
+ | |
+ writeOnFile(pathToFileNotJson, "Hello Scala") | |
+ writeOnFile(pathToFileJson, aJson) | |
+ } | |
+ | |
+ private def deleteNeededFiles() = { | |
+ (new io.File(pathToFileNotJson)).delete() | |
+ (new io.File(pathToFileJson)).delete() | |
+ } | |
+ | |
+ override def beforeAll() { | |
+ assert(!(new io.File(nonExistentPath)).exists()) | |
+ writeNeededFiles() | |
+ sc = new SparkContext("local", "test1") // An existing spark context. | |
+ } | |
+ | |
+ override def afterAll() { | |
+ sc.stop() | |
+ sc = null | |
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown | |
+import org.apache.spark.sql.SQLContext | |
+import org.scalacheck.Gen | |
+import org.scalatest.prop.GeneratorDrivenPropertyChecks | |
+import org.scalatest.{ Matchers, BeforeAndAfterAll, FreeSpec } | |
+import java.io | |
+ | |
+class JsonFileTest extends FreeSpec with BeforeAndAfterAll with Matchers with GeneratorDrivenPropertyChecks { | |
+ | |
+ var sc: SparkContext = _ | |
+ val nonExistentPath = MediativeUtil.String.random(10) + ".json" | |
+ | |
+ val pathToFileNotJson = "thisIsNotAJson.txt" | |
+ val pathToFileJson = "pathToFileJson.json" | |
+ | |
+ private def writeOnFile(fileName: String, what: String) = { | |
+ import java.io._ | |
+ using(new PrintWriter(new io.File(fileName))) { | |
+ writer => writer.write(what) | |
+ } | |
+ | |
+ } | |
+ private def writeNeededFiles() = { | |
+ val aJson = | |
+ """{"i":1, "s": "luis", "d": 2.0} | |
+ |{"i":11, "s": "luis1"}""".stripMargin | |
+ | |
+ writeOnFile(pathToFileNotJson, "Hello Scala") | |
+ writeOnFile(pathToFileJson, aJson) | |
+ } | |
+ | |
+ private def deleteNeededFiles() = { | |
+ (new io.File(pathToFileNotJson)).delete() | |
+ (new io.File(pathToFileJson)).delete() | |
+ } | |
+ | |
+ override def beforeAll() { | |
+ assert(!(new io.File(nonExistentPath)).exists()) | |
+ writeNeededFiles() | |
+ sc = new SparkContext("local", "test1") // An existing spark context. | |
+ } | |
+ | |
+ override def afterAll() { | |
+ sc.stop() | |
+ sc = null | |
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown | |
+ System.clearProperty("spark.master.port") | |
+ deleteNeededFiles() | |
+ } | |
+ | |
+ "A Json File" - { | |
+ lazy val sqlContext = new SQLContext(sc) | |
+ | |
+ "can't be built from" - { | |
+ "an invalid FileWrapper" in { | |
ldacost1@shaddam [~/dev/mpn/databricks-trial] (on git::develop) $ git diff develop origin/feature/MPN-854-design-of-validation-strategy | |
diff --git a/report-core/src/main/scala/mediative/util/wrapper/FileWrapper.scala b/report-core/src/main/scala/mediative/util/wrapper/FileWrapper.scala | |
new file mode 100644 | |
index 0000000..acbdfd8 | |
--- /dev/null | |
+++ b/report-core/src/main/scala/mediative/util/wrapper/FileWrapper.scala | |
@@ -0,0 +1,42 @@ | |
+package mediative.util.wrapper | |
+ | |
+import java.io | |
+ | |
+import mediative.util.Logging | |
+ | |
+/** | |
+ * Simply labels a Path as a File (as opposed to a Directory). | |
+ */ | |
+trait FileWrapper { | |
+ def value: String | |
+} | |
+ | |
+object FileWrapper extends Logging { | |
+ /** | |
+ * Wraps a file name into a FileWrapper. This will succeed if | |
+ * * file name does not point to a directory | |
+ * * file name points to an existing file | |
+ * * the file pointed at is readable | |
+ * @param maybeFile | |
+ * @return | |
+ */ | |
+ def apply(maybeFile: String): Option[FileWrapper] = { | |
+ | |
+ // if security manager is not present we can't check for attributes of the file, | |
+ // specifically for its readability. So we fail in that case: | |
+ // (cf. https://docs.oracle.com/javase/tutorial/essential/environment/security.html) | |
+ if (System.getSecurityManager == null) { | |
+ logger.error("Security Manager not present. File wrapping will fail in all cases.") | |
+ None | |
+ } else { | |
+ try { | |
+ if (new io.File(maybeFile).isFile) Some(FileWrapperImpl(value = maybeFile)) | |
+ else None | |
+ } catch { | |
+ case e: SecurityException => None // in case java.io.File attributes can't be accessed | |
+ } | |
+ } | |
+ } | |
+ private case class FileWrapperImpl(value: String) extends FileWrapper | |
+} | |
+ | |
diff --git a/report-core/src/main/scala/mediative/util/wrapper/String.scala b/report-core/src/main/scala/mediative/util/wrapper/String.scala | |
deleted file mode 100644 | |
index 59d6404..0000000 | |
--- a/report-core/src/main/scala/mediative/util/wrapper/String.scala | |
+++ /dev/null | |
@@ -1,10 +0,0 @@ | |
-package mediative.util.wrapper | |
- | |
-trait String extends Wrapper[Predef.String] { | |
- def value: Predef.String | |
-} | |
- | |
-object String { | |
- // if I have a Wrapper[String] I want to be able to implicitly extract the value | |
- implicit def wrapper2String: String => Predef.String = ws => ws.value | |
-} | |
diff --git a/report-core/src/main/scala/mediative/util/wrapper/StringWrapper.scala b/report-core/src/main/scala/mediative/util/wrapper/StringWrapper.scala | |
new file mode 100644 | |
index 0000000..eab3adb | |
--- /dev/null | |
+++ b/report-core/src/main/scala/mediative/util/wrapper/StringWrapper.scala | |
@@ -0,0 +1,10 @@ | |
+package mediative.util.wrapper | |
+ | |
+trait StringWrapper extends Wrapper[String] { | |
+ def value: String | |
+} | |
+ | |
+object StringWrapper { | |
+ // if I have a Wrapper[String] I want to be able to implicitly extract the value | |
+ implicit def wrapper2String(ws: StringWrapper): String = ws.value | |
+} | |
diff --git a/report-core/src/test/scala/mediative/util/wrapper/FileWrapperTest.scala b/report-core/src/test/scala/mediative/util/wrapper/FileWrapperTest.scala | |
new file mode 100644 | |
index 0000000..a19e0a2 | |
--- /dev/null | |
+++ b/report-core/src/test/scala/mediative/util/wrapper/FileWrapperTest.scala | |
@@ -0,0 +1,73 @@ | |
+package mediative.util.wrapper | |
+ | |
+import org.scalacheck.Gen | |
+import org.scalatest.prop.GeneratorDrivenPropertyChecks | |
+import org.scalatest.{ Matchers, BeforeAndAfterAll, FreeSpec } | |
+import java.io | |
+ | |
+class FileWrapperTest extends FreeSpec with BeforeAndAfterAll with Matchers with GeneratorDrivenPropertyChecks { | |
+ | |
+ override def beforeAll() { | |
+ | |
+ } | |
+ | |
+ override def afterAll(): Unit = { | |
+ | |
+ } | |
+ | |
+ "A File should" - { | |
+ "be recognized when the wrapped String is" - { | |
+ "an existing file which we have access to" in { | |
+ val currDir = new io.File(".") | |
+ withClue(s"Current directory is not such a thing??") { assert(currDir.isDirectory) } | |
+ assert(currDir.listFiles().filter(_.isFile).forall(f => FileWrapper(f.getAbsolutePath).isDefined)) | |
+ } | |
+ } | |
+ "be rejected when the wrapped String" - { | |
+ "is empty" in { | |
+ assert(FileWrapper("").isEmpty) | |
+ } | |
+ "represents a non-existent file, when" - { | |
+ "no directory is specified" in { | |
+ forAll(Gen.alphaStr) { aStr => | |
+ val fileName = aStr + ".txt" | |
+ whenever(!(new io.File(fileName).exists())) { | |
+ assert(FileWrapper(fileName).isEmpty) | |
+ } | |
+ } | |
+ } | |
+ "the current directory is specified" in { | |
+ forAll(Gen.alphaStr) { aStr => | |
+ val fileName = s"./${aStr}.txt" | |
+ whenever(!(new io.File(fileName).exists())) { | |
+ assert(FileWrapper(fileName).isEmpty) | |
+ } | |
+ } | |
+ } | |
+ "a random directory is specified" in { | |
+ forAll(Gen.alphaStr, Gen.alphaStr) { (aDir, aStr) => | |
+ val fileName = s"${aDir}${io.File.separator}${aStr}.txt" | |
+ whenever(!(new io.File(fileName).exists())) { | |
+ assert(FileWrapper(fileName).isEmpty) | |
+ } | |
+ } | |
+ } | |
+ } | |
+ "represent a directory, eg" - { | |
+ "the current one" in { | |
+ assert(FileWrapper(".").isEmpty) | |
+ } | |
+ "a non-existing directory (but with a directory name)" in { | |
+ val aDir = s"${io.File.separator}mnt${io.File.separator}" | |
+ withClue(s"${aDir} seems to be a File") { assert(FileWrapper(aDir).isEmpty) } | |
+ } | |
+ "an existing directory" in { | |
+ val currDir = new io.File(".") | |
+ withClue(s"Current directory is not such a thing??") { assert(currDir.isDirectory) } | |
+ assert(currDir.listFiles().filter(_.isDirectory).forall(f => FileWrapper(f.getAbsolutePath).isEmpty)) | |
+ } | |
+ } | |
+ } | |
+ } | |
+ | |
+} | |
diff --git a/report-db/src/main/scala/mediative/distributed/schema/design/MockExpected.scala b/report-db/src/main/scala/mediative/distributed/schema/design/MockExpected.scala | |
new file mode 100644 | |
index 0000000..cbafd06 | |
--- /dev/null | |
+++ b/report-db/src/main/scala/mediative/distributed/schema/design/MockExpected.scala | |
@@ -0,0 +1,73 @@ | |
+package mediative.distributed.schema.design | |
+ | |
+import mediative.distributed.util.RDDable | |
+import mediative.util.Logging | |
+import org.apache.spark.SparkContext | |
+import org.apache.spark.rdd.RDD | |
+import org.apache.spark.sql._ | |
+import org.apache.spark.sql.catalyst.errors.TreeNodeException | |
+import scala.util.control.Exception._ | |
+import mediative.distributed.util.json.JsonFile | |
+ | |
+case class MockExpected(i: Int, s: String, d: Option[Double]) | |
+ | |
+object MockExpected extends Serializable with Logging { | |
+ | |
+ val expectedSchema: StructType = { | |
+ StructType(Seq( | |
+ StructField(name = "i", dataType = IntegerType, nullable = false), | |
+ StructField(name = "s", dataType = StringType, nullable = false), | |
+ StructField(name = "d", dataType = DoubleType, nullable = true))) | |
+ | |
+ } | |
+ | |
+ implicit object MockExpectedRDDable extends RDDable[JsonFile, MockExpected] { | |
+ def toRDD(aJsonFile: JsonFile, sc: SparkContext, sqlContext: SQLContext): RDD[MockExpected] = { | |
+ import sqlContext._ // as always, brings implicits into scope | |
+ | |
+ { | |
+ val schemaRDD = aJsonFile.toSchemaRDD(sqlContext) | |
+ val diff = schemaRDD.schema.fieldNames.toSet -- expectedSchema.fieldNames.toSet | |
+ if (!diff.isEmpty) { | |
+ logger.info(s"There are extra fields: ${diff.toString()}") | |
+ None | |
+ } else { | |
+ // assumption: if an optional field NEVER appears we assume it is missing | |
+ // (as opposed to empty all the time, which could also be reasonable) | |
+ // NB: from a practical point of view, if a field is not present in the data, | |
+ // then the empirical schema will not contain it, which screws all later queries | |
+ val newSchemaRDD = | |
+ catching(classOf[Exception]).opt { schemaRDD.select(Symbol("d")).count() } match { | |
+ case None => | |
+ logger.info(s"Field 'd' never appears in dataset") | |
+ schemaRDD.select('i, 's) | |
+ case Some(_) => | |
+ schemaRDD.select('i, 's, 'd) | |
+ } | |
+ try { | |
+ newSchemaRDD.count() // this will throw an Exception if fields not there | |
+ Some( | |
+ newSchemaRDD.map { | |
+ case Row(i: Int, s: String, d: Double) => MockExpected(i, s, Some(d)) | |
+ case Row(i: Int, s: String, _) => MockExpected(i, s, None) // d is Null | |
+ case Row(i: Int, s: String) => MockExpected(i, s, None) // d was never there | |
+ }) | |
+ } catch { | |
+ case e: TreeNodeException[_] => | |
+ // ends up here because the 'select' is asking for fields that do not exist | |
+ logger.info(s"'select' failed: ${e.getMessage}") | |
+ None | |
+ } | |
+ } | |
+ }.getOrElse(sc.emptyRDD[MockExpected]) | |
+ } | |
+ } | |
+ | |
+ def readJSonFromFile(jsonFile: JsonFile, | |
+ sc: SparkContext, | |
+ sqlContext: SQLContext)(implicit ops: RDDable[JsonFile, MockExpected]): RDD[MockExpected] = { | |
+ | |
+ ops.toRDD(jsonFile, sc, sqlContext) | |
+ } | |
+ | |
+} | |
diff --git a/report-db/src/main/scala/mediative/distributed/util/RDDable.scala b/report-db/src/main/scala/mediative/distributed/util/RDDable.scala | |
new file mode 100644 | |
index 0000000..27297be | |
--- /dev/null | |
+++ b/report-db/src/main/scala/mediative/distributed/util/RDDable.scala | |
@@ -0,0 +1,9 @@ | |
+package mediative.distributed.util | |
+ | |
+import org.apache.spark.SparkContext | |
+import org.apache.spark.rdd.RDD | |
+import org.apache.spark.sql.SQLContext | |
+ | |
+trait RDDable[I, O] { | |
+ def toRDD(input: I, sc: SparkContext, sqlContext: SQLContext): RDD[O] | |
+} | |
diff --git a/report-db/src/main/scala/mediative/distributed/util/json/JsonFile.scala b/report-db/src/main/scala/mediative/distributed/util/json/JsonFile.scala | |
new file mode 100644 | |
index 0000000..1964479 | |
--- /dev/null | |
+++ b/report-db/src/main/scala/mediative/distributed/util/json/JsonFile.scala | |
@@ -0,0 +1,49 @@ | |
+package mediative.distributed.util.json | |
+ | |
+import mediative.util.Logging | |
+import mediative.util.wrapper.FileWrapper | |
+import org.apache.spark.sql.{ SchemaRDD, SQLContext } | |
+ | |
+trait JsonFile { | |
+ def value: FileWrapper | |
+ | |
+ /** | |
+ * Translates a JSon File into a SchemaRDD | |
+ * @note this should never fail, since the only way to construct this type is through the smart constructor | |
+ * @param sqlContext | |
+ * @return | |
+ */ | |
+ def toSchemaRDD(sqlContext: SQLContext): SchemaRDD = | |
+ sqlContext.jsonFile(value.value) | |
+ | |
+} | |
+ | |
+object JsonFile extends Logging { | |
+ /* a JsonFile contains a json if I can read it as such and there is no corrupt fields in it */ | |
+ def apply(maybeJsonFile: FileWrapper, sqlContext: SQLContext): Option[JsonFile] = { | |
+ | |
+ (try { | |
+ Some(sqlContext.jsonFile(maybeJsonFile.value)) | |
+ } catch { | |
+ // NB: I would like to get more specific but there is no information | |
+ // about exact Exception thrown and when (in 1.2.0) | |
+ // cf, // https://spark.apache.org/docs/1.2.0/api/scala/index.html#org.apache.spark.sql.SQLContext | |
+ // We have only seen that an Exception is thrown if the content of the file is not a JSon | |
+ // TODO: make a better exception catcher here | |
+ case e: Exception => | |
+ logger.error(s"Parsing of '${maybeJsonFile.value}' as a JSon failed. (details: ${e.getMessage})") | |
+ None | |
+ }).filter(rdd => !rdd.schema.fieldNames.exists(_.startsWith("_corrupt"))).map { _ => | |
+ JsonFileImpl(value = maybeJsonFile) | |
+ } | |
+ } | |
+ | |
+ def apply(aPath: String, sqlContext: SQLContext): Option[JsonFile] = { | |
+ FileWrapper(aPath).map { | |
+ JsonFile(_, sqlContext).map { identity _ } | |
+ }.flatten | |
+ } | |
+ | |
+ private case class JsonFileImpl(value: FileWrapper) extends JsonFile | |
+} | |
+ | |
diff --git a/report-db/src/main/scala/mediative/util/wrapper/DistributedCSVFileName.scala b/report-db/src/main/scala/mediative/util/wrapper/DistributedCSVFileName.scala | |
index 00a2d52..df03283 100644 | |
--- a/report-db/src/main/scala/mediative/util/wrapper/DistributedCSVFileName.scala | |
+++ b/report-db/src/main/scala/mediative/util/wrapper/DistributedCSVFileName.scala | |
@@ -1,9 +1,7 @@ | |
package mediative.util.wrapper | |
-import mediative.util.wrapper.{ String => StringWrapper } | |
- | |
/** | |
* Simply labels a String as a distributed file name (eg, living in HDFS, DBFS, ...) | |
*/ | |
-case class DistributedCSVFileName(value: Predef.String) extends StringWrapper | |
+case class DistributedCSVFileName(value: String) extends StringWrapper | |
diff --git a/report-db/src/test/scala/mediative/distributed/schema/design/MockExpectedTest.scala b/report-db/src/test/scala/mediative/distributed/schema/design/MockExpectedTest.scala | |
new file mode 100644 | |
index 0000000..35c1cf7 | |
--- /dev/null | |
+++ b/report-db/src/test/scala/mediative/distributed/schema/design/MockExpectedTest.scala | |
@@ -0,0 +1,123 @@ | |
+package mediative.distributed.schema.design | |
+ | |
+import java.io.File | |
+ | |
+import org.apache.spark.SparkContext | |
+import org.apache.spark.sql.SQLContext | |
+import org.scalatest.{ BeforeAndAfterAll, FreeSpec } | |
+import mediative.util.{ Base => MediativeUtil } | |
+import MediativeUtil._ | |
+import mediative.distributed.util.json.JsonFile | |
+ | |
+class MockExpectedTest extends FreeSpec with BeforeAndAfterAll { | |
+ | |
+ var sc: SparkContext = _ | |
+ val nonExistentPath = MediativeUtil.String.random(10) + ".json" | |
+ | |
+ val pathToFileNotJson = "thisIsNotAJson.txt" | |
+ val pathToFileJsonTooBig = "pathToFileJsonTooBig.json" | |
+ val pathToFileMockExpectedJson = "pathToFileMockExpectedJson.json" | |
+ val pathToFileSubsetRequiredJson = "pathToFileSubsetJson.json" | |
+ | |
+ private def writeOnFile(fileName: String, what: String) = { | |
+ import java.io._ | |
+ using(new PrintWriter(new File(fileName))) { | |
+ writer => writer.write(what) | |
+ } | |
+ | |
+ } | |
+ private def writeNeededFiles() = { | |
+ val jsonLessFieldsThanNeeded = """{"i":1,"s": "potatoe"}""" | |
+ val jsonMoreFieldsThanNeeded = | |
+ """{"name":"Yin","lastname":"Lee", "address":{"city":"Columbus","state":"Ohio"}} | |
+ |{"name":"Luis","address":{"city":"Montreal","state":"Quebec"}} """.stripMargin | |
+ val jsonFieldsExactlyNeeded = | |
+ """{"i":1, "s": "luis", "d": 2.0} | |
+ |{"i":11, "s": "luis1"}""".stripMargin | |
+ | |
+ writeOnFile(pathToFileNotJson, "Hello Scala") | |
+ writeOnFile(pathToFileJsonTooBig, jsonMoreFieldsThanNeeded) | |
+ writeOnFile(pathToFileSubsetRequiredJson, jsonLessFieldsThanNeeded) | |
+ writeOnFile(pathToFileMockExpectedJson, jsonFieldsExactlyNeeded) | |
+ } | |
+ | |
+ private def deleteNeededFiles() = { | |
+ (new File(pathToFileNotJson)).delete() | |
+ (new File(pathToFileJsonTooBig)).delete() | |
+ (new File(pathToFileSubsetRequiredJson)).delete() | |
+ (new File(pathToFileMockExpectedJson)).delete() | |
+ } | |
+ | |
+ override def beforeAll() { | |
+ assert(!(new File(nonExistentPath)).exists()) | |
+ writeNeededFiles() | |
+ sc = new SparkContext("local", "test1") // An existing spark context. | |
+ } | |
+ | |
+ override def afterAll() { | |
+ sc.stop() | |
+ sc = null | |
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown | |
+ System.clearProperty("spark.master.port") | |
+ deleteNeededFiles() | |
+ } | |
+ | |
+ /* | |
+From https://issues.ypg.com/browse/MPN-854: | |
+ | |
+Pipeline: | |
+ 1. The interaction is started with a String _s_ (ex: _/mnt/myfile.txt_) | |
+ 2. _s_ is expected to represent a Path (which is not always true/possible); we try to do such a conversion: | |
+{code}String => Option[FileWrapper]{code} | |
+ 3. The successful FileWrapper will be converted, on success, into a File containing a Json object | |
+{code}File => Option[JSonFile]{code} | |
+ 4. The JSon comes with metadata information (about the data it contains): _EmpiricalMetadata_ | |
+ 5. We have expectations about what that metadata has to be: _ExpectedMetadata_ | |
+ 6. The JSon contained in the JsonFile has to be transformed into element of type _ExpectedMetadata_, which can then be saved and operated upon. | |
+{code}JsonFile => DataContainer[ExpectedMetadata]{code} | |
+ */ | |
+ | |
+ "A MockExpected object M" - { | |
+ lazy val sqlContext = new SQLContext(sc) | |
+ import MockExpected._ // brings implicits into scope | |
+ | |
+ // NB: MockExpected.readJSonFromFile implements pipeline: (4), (5), (6) | |
+ "can't be built from" - { | |
+ "a non-existent file" in { | |
+ assert(JsonFile(nonExistentPath, sqlContext).isEmpty) | |
+ } | |
+ "a file that does not contain a json" in { | |
+ assert(JsonFile(pathToFileNotJson, sqlContext).isEmpty) | |
+ } | |
+ "a file that contains a json with fields that are not in M" in { | |
+ val jsonFileOpt = JsonFile(pathToFileJsonTooBig, sqlContext) | |
+ assert(jsonFileOpt.isDefined) | |
+ val jsonFile = jsonFileOpt.get | |
+ assert(MockExpected.readJSonFromFile(jsonFile, sc, sqlContext).count() == 0) | |
+ } | |
+ } | |
+ | |
+ "should be built from" - { | |
+ "a file that contains a json with the same schema as M" in { | |
+ val jsonFileOpt = JsonFile(pathToFileMockExpectedJson, sqlContext) | |
+ assert(jsonFileOpt.isDefined) | |
+ val jsonFile = jsonFileOpt.get | |
+ val r = MockExpected.readJSonFromFile(jsonFile, sc, sqlContext) | |
+ assert(r.count() > 0) | |
+ // just for feedback... | |
+ r.collect().foreach(m => info(m.toString)) | |
+ } | |
+ "a file that contains a json with a schema whose required fields are a subset of M's" in { | |
+ val jsonFileOpt = JsonFile(pathToFileSubsetRequiredJson, sqlContext) | |
+ assert(jsonFileOpt.isDefined) | |
+ val jsonFile = jsonFileOpt.get | |
+ val r = MockExpected.readJSonFromFile(jsonFile, sc, sqlContext) | |
+ assert(r.count() > 0) | |
+ // just for feedback... | |
+ r.collect().foreach(m => info(m.toString)) | |
+ } | |
+ } | |
+ } | |
+ | |
+} | |
+ | |
diff --git a/report-db/src/test/scala/mediative/distributed/util/BaseTest.scala b/report-db/src/test/scala/mediative/distributed/util/BaseTest.scala | |
index 8ec61fd..857adda 100644 | |
--- a/report-db/src/test/scala/mediative/distributed/util/BaseTest.scala | |
+++ b/report-db/src/test/scala/mediative/distributed/util/BaseTest.scala | |
@@ -155,10 +155,11 @@ class BaseTest extends FreeSpec with BeforeAndAfterAll { | |
"remove non-empty directories" in { | |
val dirName = StringHelper.random(10) | |
+ val aRandomFileName = randomFileName | |
withClue(s"Impossible to create non-empty directory '${dirName}'") { assert(HDFS.getFileSystem.mkdirs(new Path(dirName))) } | |
- val srcFileName = s"${dirName}${File.separator}${randomFileName}" | |
+ val srcFileName = s"${dirName}${File.separator}${aRandomFileName}" | |
withClue(s"Impossible to create HDFS file ${srcFileName}") { assert(writeASampleFile(srcFileName)) } | |
- info(s"Populated directory '${dirName}' with file named '${dirName}' (full path is '${srcFileName}')") | |
+ info(s"Populated directory '${dirName}' with file named '${aRandomFileName}' (full path is '${srcFileName}')") | |
withClue(s"HDFS file '${srcFileName}' does not exist after creation ") { assert(HDFS.fileExists(srcFileName)) } | |
withClue(s"Impossible to remove directory '${dirName}'") { assert(HDFS.rm(dirName)) } | |
withClue(s"Directory '${dirName}' exists after 'rm'!!!!") { assert(!HDFS.fileExists(dirName)) } | |
diff --git a/report-db/src/test/scala/mediative/distributed/util/json/JsonFileTest.scala b/report-db/src/test/scala/mediative/distributed/util/json/JsonFileTest.scala | |
new file mode 100644 | |
index 0000000..b92704d | |
--- /dev/null | |
+++ b/report-db/src/test/scala/mediative/distributed/util/json/JsonFileTest.scala | |
@@ -0,0 +1,105 @@ | |
+package mediative.distributed.util.json | |
+ | |
+import mediative.util.Base._ | |
+import mediative.util.wrapper.FileWrapper | |
+import mediative.util.{ Base => MediativeUtil } | |
+import org.apache.spark.SparkContext | |
+import org.apache.spark.sql.SQLContext | |
+import org.scalacheck.Gen | |
+import org.scalatest.prop.GeneratorDrivenPropertyChecks | |
+import org.scalatest.{ Matchers, BeforeAndAfterAll, FreeSpec } | |
+import java.io | |
+ | |
+class JsonFileTest extends FreeSpec with BeforeAndAfterAll with Matchers with GeneratorDrivenPropertyChecks { | |
+ | |
+ var sc: SparkContext = _ | |
+ val nonExistentPath = MediativeUtil.String.random(10) + ".json" | |
+ | |
+ val pathToFileNotJson = "thisIsNotAJson.txt" | |
+ val pathToFileJson = "pathToFileJson.json" | |
+ | |
+ private def writeOnFile(fileName: String, what: String) = { | |
+ import java.io._ | |
+ using(new PrintWriter(new io.File(fileName))) { | |
+ writer => writer.write(what) | |
+ } | |
+ | |
+ } | |
+ private def writeNeededFiles() = { | |
+ val aJson = | |
+ """{"i":1, "s": "luis", "d": 2.0} | |
+ |{"i":11, "s": "luis1"}""".stripMargin | |
+ | |
+ writeOnFile(pathToFileNotJson, "Hello Scala") | |
+ writeOnFile(pathToFileJson, aJson) | |
+ } | |
+ | |
+ private def deleteNeededFiles() = { | |
+ (new io.File(pathToFileNotJson)).delete() | |
+ (new io.File(pathToFileJson)).delete() | |
+ } | |
+ | |
+ override def beforeAll() { | |
+ assert(!(new io.File(nonExistentPath)).exists()) | |
+ writeNeededFiles() | |
+ sc = new SparkContext("local", "test1") // An existing spark context. | |
+ } | |
+ | |
+ override def afterAll() { | |
+ sc.stop() | |
+ sc = null | |
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown | |
+ System.clearProperty("spark.master.port") | |
+ deleteNeededFiles() | |
+ } | |
+ | |
+ "A Json File" - { | |
+ lazy val sqlContext = new SQLContext(sc) | |
+ | |
+ "can't be built from" - { | |
+ "an invalid FileWrapper" in { | |
+ forAll(Gen.alphaStr) { aStr => | |
+ whenever(!(new io.File(aStr)).exists()) { | |
+ assert(FileWrapper(aStr).isEmpty) | |
+ // and since I don't have a FileWrapper, I can't create the JSonFile. | |
+ // So this test is trivially successful | |
+ } | |
+ } | |
+ } | |
+ "a String denoting an non-existent file" in { | |
+ forAll(Gen.alphaStr) { aStr => | |
+ whenever(!(new io.File(aStr)).exists()) { | |
+ assert(JsonFile(aStr, sqlContext).isEmpty) | |
+ } | |
+ } | |
+ } | |
+ | |
+ "a FileWrapper that does not contain a json" in { | |
+ FileWrapper(pathToFileNotJson).map { aFile => | |
+ JsonFile(aFile, sqlContext).map { | |
+ jsonFile => assert(false) | |
+ }.getOrElse(assert(true)) | |
+ }.getOrElse(withClue(s"Impossible to build File from ${pathToFileNotJson}") { assert(false) }) | |
+ } | |
+ | |
+ "a Path (as a String) pointing to a file that does not contain a json" in { | |
+ assert(JsonFile(pathToFileNotJson, sqlContext).isEmpty) | |
+ } | |
+ } | |
+ | |
+ "should be built from" - { | |
+ "a FileWrapper that contains a json" in { | |
+ FileWrapper(pathToFileJson).map { aFile => | |
+ JsonFile(aFile, sqlContext).map { | |
+ jsonFile => assert(true) | |
+ }.getOrElse(assert(false)) | |
+ }.getOrElse(withClue(s"Impossible to build File from ${pathToFileJson}") { assert(false) }) | |
+ } | |
+ "a Path (as a String) pointing to a file that contains a json" in { | |
+ assert(JsonFile(pathToFileJson, sqlContext).isDefined) | |
+ } | |
+ } | |
+ } | |
+ | |
+} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment