Created
October 12, 2016 08:58
-
-
Save fmendezh/69fe53b0aff3dd0b255f52eefd549dd2 to your computer and use it in GitHub Desktop.
Spark REPL Test
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
name := "spark-validation" | |
version := "1.0" | |
scalaVersion := "2.10.5" | |
libraryDependencies ++= Seq( | |
"org.apache.spark" % "spark-core_2.10" % "1.6.1" exclude ("org.apache.hadoop","hadoop-yarn-server-web-proxy"), | |
"org.apache.spark" % "spark-sql_2.10" % "1.6.1" exclude ("org.apache.hadoop","hadoop-yarn-server-web-proxy"), | |
"org.apache.hadoop" % "hadoop-common" % "2.6.0" exclude ("org.apache.hadoop","hadoop-yarn-server-web-proxy"), | |
"org.apache.spark" % "spark-sql_2.10" % "1.6.1" exclude ("org.apache.hadoop","hadoop-yarn-server-web-proxy"), | |
"org.apache.spark" % "spark-hive_2.10" % "1.6.1" exclude ("org.apache.hadoop","hadoop-yarn-sver-web-proxy"), | |
"org.apache.spark" % "spark-yarn_2.10" % "1.6.1" exclude ("org.apache.hadoop","hadoop-yarn-server-web-proxy"), | |
"org.apache.spark" % "spark-repl_2.10" % "1.6.1" exclude ("org.apache.hadoop","hadoop-yarn-server-web-proxy") exclude("org.scala-lang","scala-library"), | |
"org.gbif.validator" % "validator-core" % "0.1-SNAPSHOT" exclude("org.slf4j","slf4j-log4j12") exclude("org.slf4j","log4j-over-slf4j") exclude("com.typesafe.akka","akka-actor_2.11") exclude("org.scala-lang","scala-library"), | |
"org.gbif" % "gbif-api" % "0.48-SNAPSHOT", | |
"org.gbif.registry" % "registry-ws-client" % "2.60-SNAPSHOT", | |
"org.gbif" % "dwc-api" % "1.17-SNAPSHOT", | |
"com.sun.jersey" % "jersey-servlet" % "1.19" | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* SimpleApp.scala */ | |
import org.apache.spark.repl.SparkIMain | |
import org.apache.spark.sql.SQLContext | |
import org.apache.spark.{SparkConf, SparkContext} | |
import scala.tools.nsc.GenericRunnerSettings | |
object DataValidation { | |
def main(args: Array[String]) { | |
val t0 = System.currentTimeMillis(); | |
// Should be some file on your system | |
val conf = new SparkConf().setAppName("Simple Application").setMaster("spark://fmendez.gbif.org:7077") | |
.setSparkHome("/Users/fmendez/Downloads/spark-1.6.1-bin-hadoop2.6/") | |
val sc = new SparkContext(conf) | |
val sqlContext: SQLContext = new SQLContext(sc); | |
val interpreter = { | |
val settings = new GenericRunnerSettings(println _ ) | |
settings.usejavacp.value = true | |
new SparkIMain(settings) | |
} | |
val methodRef = | |
""" | |
import org.apache.spark.rdd.RDD | |
import org.apache.spark.sql.types.{StringType, StructField, StructType} | |
import org.apache.spark.sql.{Row, SQLContext} | |
import org.gbif.dwc.terms.Term | |
import org.gbif.occurrence.validation.evaluator.OccurrenceEvaluatorFactory | |
import org.gbif.occurrence.validation.util.TempTermsUtils | |
import scala.collection.JavaConverters._ | |
val dataFile = "/Users/fmendez/dev/git/gbif/gbif-data-validator/validator-core/src/test/resources/0008759-160822134323880.csvar" | |
val data = sqlContext.sparkContext.textFile(dataFile, 20).cache() | |
val header = data.first() | |
val rawHeader = header.split("\t") | |
val terms: Array[Term] = TempTermsUtils.buildTermMapping(header.split("\t")) | |
//This creates a schema from the header | |
val schema = StructType(rawHeader.map(fieldName ⇒ StructField(fieldName, StringType, true))) | |
// this is to ensure that each row has the same number as columns as reported in the header | |
//RDD[Row] is the data type expected by the session.createDataFrame | |
val rowData: RDD[Row] = data.zipWithIndex().filter({case(_,idx) => idx != 0}) | |
.map(line => Row.fromSeq(line._1.split("\t").padTo(rawHeader.length,""))) | |
val ds = sqlContext.createDataFrame(rowData,schema) | |
//Creates the view | |
val occDs = ds.registerTempTable("occ") | |
//runs a sql statement | |
sqlContext.sql("select count(distinct occurrenceid) from occ").collect() | |
//This is a bit of duplication: runs all the processing | |
val results = data.zipWithIndex().filter( {case(line,idx) => idx != 0}) | |
.map({case(line,idx) => (idx,(terms.zip(line.split("\t"))).toMap)}) | |
.mapPartitions( partition => { | |
val occEvaluator = new OccurrenceEvaluatorFactory("http://api.gbif.org/v1/").create(rawHeader) | |
val newPartition = partition.map( { case(idx,record) => { | |
occEvaluator.process(idx, record.asJava)}}).toList | |
// consumes the iterator, thus calls readMatchingFromDB | |
newPartition.iterator | |
}).collect() | |
""" | |
interpreter.initializeSynchronous() | |
interpreter.bind("sqlContext","org.apache.spark.sql.SQLContext", sqlContext) | |
val resultFlag = interpreter.interpret(methodRef) | |
val t1 = System.currentTimeMillis(); | |
println("Elapsed time: " + (t1 - t0)/1000 + "s") | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment