This is a quick test of a modified version of the Bloodhound spark script to check it runs on the GBIF Cloudera cluster (CDH 5.16.2).
From the gateway, grab the file from HDFS (skip HTTP for speed), unzip (15-20 mins) and upload to HDFS:
hdfs dfs -getmerge /occurrence-download/prod-downloads/0002504-181003121212138.zip /mnt/auto/misc/bloodhound/data.zip
unzip /mnt/auto/misc/bloodhound/data.zip -d /mnt/auto/misc/bloodhound/data
hdfs dfs -rm /tmp/verbatim.txt
hdfs dfs -rm /tmp/occurrence.txt
hdfs dfs -put /mnt/auto/misc/bloodhound/data/verbatim.txt /tmp/verbatim.txt
hdfs dfs -put /mnt/auto/misc/bloodhound/data/occurrence.txt /tmp/occurrence.txt
Note: There are options GBIF could explore do to avoid the above by moving processing into the download Oozie job.
Launch a spark cluster (using shell here but would be a scripted):
spark2-shell --master yarn --num-executors 20 --driver-memory 4g --executor-memory 12g
Run the process. Here I skip the MySQL load as a first test, which could be avoided by producing a file to subsequently load in to MySQL. Script runs in X mins.
import sys.process._
import org.apache.spark.sql.Column
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
val verbatimTerms = List(
"gbifID",
"occurrenceID",
"dateIdentified",
"decimalLatitude",
"decimalLongitude",
"country",
"eventDate",
"year",
"family",
"identifiedBy",
"institutionCode",
"collectionCode",
"catalogNumber",
"recordedBy",
"scientificName",
"typeStatus"
)
//load a big, verbatim tsv file from a DwC-A download
val df1 = spark.
read.
format("csv").
option("header", "true").
option("mode", "DROPMALFORMED").
option("delimiter", "\t").
option("quote", "\"").
option("escape", "\"").
option("treatEmptyValuesAsNulls", "true").
option("ignoreLeadingWhiteSpace", "true").
load("/tmp//verbatim.txt").
select(verbatimTerms.map(col): _*).
filter(coalesce($"identifiedBy",$"recordedBy").isNotNull).
where(!$"scientificName".contains("BOLD:")).
where(!$"scientificName".contains("BOLD-")).
where(!$"scientificName".contains("BIOUG"))
//optionally save the DataFrame to disk so we don't have to do the above again
df1.write.mode("overwrite").parquet("/tmp/tmp_verbatim")
//load the saved DataFrame, can later skip the above processes and start from here
val df1 = spark.
read.
parquet("/tmp/tmp_verbatim")
val processedTerms = List(
"gbifID",
"datasetKey",
"countryCode",
"dateIdentified",
"eventDate"
)
val df2 = spark.
read.
format("csv").
option("header", "true").
option("mode", "DROPMALFORMED").
option("delimiter", "\t").
option("quote", "\"").
option("escape", "\"").
option("treatEmptyValuesAsNulls", "true").
option("ignoreLeadingWhiteSpace", "true").
load("/tmp/occurrence.txt").
select(processedTerms.map(col): _*).
filter(coalesce($"countryCode",$"dateIdentified",$"eventDate").isNotNull)
//optionally save the DataFrame to disk so we don't have to do the above again
df2.withColumnRenamed("dateIdentified","dateIdentified_processed").
withColumnRenamed("eventDate", "eventDate_processed").
write.mode("overwrite").parquet("/tmp/tmp_processed")
//load the saved DataFrame, can later skip the above processes and start from here
val df2 = spark.
read.
parquet("/tmp/tmp_processed").
withColumn("eventDate_processed", to_timestamp($"eventDate_processed")).
withColumn("dateIdentified_processed", to_timestamp($"dateIdentified_processed"))
val occurrences = df1.join(df2, Seq("gbifID"), "leftouter").orderBy($"gbifID").distinct
//set some properties for a MySQL connection
val prop = new java.util.Properties
prop.setProperty("driver", "com.mysql.cj.jdbc.Driver")
prop.setProperty("user", "root")
prop.setProperty("password", "")
val url = "jdbc:mysql://localhost:3306/bloodhound?serverTimezone=UTC&useSSL=false"
// Best to drop indices then recreate later
// ALTER TABLE `occurrences` DROP KEY `typeStatus_idx`, DROP KEY `index_occurrences_on_datasetKey`;
// SKIP DB writing
//write occurrences data to the database
// occurrences.write.mode("append").jdbc(url, "occurrences", prop)
// Recreate indices
// ALTER TABLE `occurrences` ADD KEY `typeStatus_idx` (`typeStatus`(256)), ADD KEY `index_occurrences_on_datasetKey` (`datasetKey`);
//aggregate recordedBy
val recordedByGroups = occurrences.
filter($"recordedBy".isNotNull).
groupBy($"recordedBy" as "agents").
agg(collect_set($"gbifID") as "gbifIDs_recordedBy")
//aggregate identifiedBy
val identifiedByGroups = occurrences.
filter($"identifiedBy".isNotNull).
groupBy($"identifiedBy" as "agents").
agg(collect_set($"gbifID") as "gbifIDs_identifiedBy")
//union identifiedBy and recordedBy entries
val unioned = spark.
read.
json(recordedByGroups.toJSON.union(identifiedByGroups.toJSON))
//concatenate arrays into strings
def stringify(c: Column) = concat(lit("["), concat_ws(",", c), lit("]"))
//write aggregated agents to csv files for the Populate Agents script, /bin/populate_agents.rb
unioned.select("agents", "gbifIDs_recordedBy", "gbifIDs_identifiedBy").
withColumn("gbifIDs_recordedBy", stringify($"gbifIDs_recordedBy")).
withColumn("gbifIDs_identifiedBy", stringify($"gbifIDs_identifiedBy")).
write.
mode("overwrite").
option("header", "true").
option("quote", "\"").
option("escape", "\"").
csv("agents-unioned-csv")
// Best to drop indices then recreate later, after all jobs are complete
// ALTER TABLE `occurrence_determiners` DROP KEY `agent_idx`, DROP KEY `occurrence_idx`;
// ALTER TABLE `occurrence_recorders` DROP KEY `agent_idx`, DROP KEY `occurrence_idx`;
// Recreate indices
// ALTER TABLE `occurrence_determiners` ADD KEY `agent_idx` (`agent_id`), ADD KEY `occurrence_idx` (`occurrence_id`);
// ALTER TABLE `occurrence_recorders` ADD KEY `agent_idx` (`agent_id`), ADD KEY `occurrence_idx` (`occurrence_id`);
//aggregate families (Taxa)
val familyGroups = occurrences.
filter($"family".isNotNull).
groupBy($"family").
agg(collect_set($"gbifID") as "gbifIDs_family")
//write aggregated Families to csv files for the Populate Taxa script, /bin/populate_taxa.rb
familyGroups.select("family", "gbifIDs_family").
withColumn("gbifIDs_family", stringify($"gbifIDs_family")).
write.
mode("overwrite").
option("header", "true").
option("quote", "\"").
option("escape", "\"").
csv("/tmp/family.csv")
import sys.process._
import org.apache.spark.sql.Column
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
val verbatimTerms = List(
"gbifID",
"occurrenceID",
"dateIdentified",
"decimalLatitude",
"decimalLongitude",
"country",
"eventDate",
"year",
"family",
"identifiedBy",
"institutionCode",
"collectionCode",
"catalogNumber",
"recordedBy",
"scientificName",
"typeStatus"
)
//load a big, verbatim tsv file from a DwC-A download
val df1 = spark.
read.
format("csv").
option("header", "true").
option("mode", "DROPMALFORMED").
option("delimiter", "\t").
option("quote", "\"").
option("escape", "\"").
option("treatEmptyValuesAsNulls", "true").
option("ignoreLeadingWhiteSpace", "true").
load("/tmp/verbatim.txt").
select(verbatimTerms.map(col): _*).
filter(coalesce($"identifiedBy",$"recordedBy").isNotNull).
where(!$"scientificName".contains("BOLD:")).
where(!$"scientificName".contains("BOLD-")).
where(!$"scientificName".contains("BIOUG"))
val processedTerms = List(
"gbifID",
"datasetKey",
"countryCode",
"dateIdentified",
"eventDate"
)
val df2 = spark.
read.
format("csv").
option("header", "true").
option("mode", "DROPMALFORMED").
option("delimiter", "\t").
option("quote", "\"").
option("escape", "\"").
option("treatEmptyValuesAsNulls", "true").
option("ignoreLeadingWhiteSpace", "true").
load("/tmp/occurrence.txt").
select(processedTerms.map(col): _*).
filter(coalesce($"countryCode",$"dateIdentified",$"eventDate").isNotNull)
familyGroups.select("family", "gbifIDs_family").
withColumn("gbifIDs_family", stringify($"gbifIDs_family")).
write.
mode("overwrite").
option("header", "true").
option("quote", "\"").
option("escape", "\"").
csv("/tmp/family.csv")
Get the output from HDFS and check it looks like it has content
hdfs dfs -getmerge /tmp/family.csv /tmp/family.csv
head family.csv