val taxa = spark.read.option("delimiter","""\t""").option("header","true").csv("taxonCache.tsv.bz2") taxa.printSchema import spark.implicits._ val taxonCache = spark.read.option("delimiter","""\t""").option("header","true").csv("taxonCache.tsv.bz2") val taxonIdsPaths = taxonCache.select("id", "pathNames", "path").as[(String, String, String)].filter(_._2 != null).filter( _._3 != null).filter(_._1 != null) val taxaIdToKingdom = taxonIdsPaths.map( r=> (r._1, r._2.split("\\|").map(_.trim), r._3.split("\\|").map(_.trim))).map(r => (r._1, r._2.zip(r._3))).map(r => (r._1, r._2.filter(_._1 == "kingdom").map(_._2).mkString)).filter(_._2.nonEmpty).filter(r => List("GBIF", "ITIS","WORMS", "INAT_TAXON").contains(r._1.split(":").head)).filter(_._2 != "incertae sedis") taxaIdToKingdom.write.option("delimiter","""\t""").csv("taxaIdToKingdom.tsv") val interactions = spark.read.option("delimiter","""\t""").option("header", "true").csv("interactions.tsv.bz2") val interactionIds = interactions.select("sourceTaxonIds", "interactionTypeName", "targetTaxonIds", "sourceNamespace") val flatIdsDistinctByDataset = interactionIds.as[(String, String, String, String)].filter(_._1 != null).filter(_._3 != null).flatMap(r => r._1.split("""\|""").map(x => (x.trim, r._2, r._3, r._4))).filter(_._1 != "no:match").flatMap(r => r._3.split("""\|""").map(x => (r._1, r._2, x.trim, r._4))).filter(_._3 != "no:match").filter(r => List("GBIF", "ITIS","WORMS", "INAT_TAXON").contains(r._1.split(":").head)).filter(r => List("GBIF", "ITIS", "WORMS", "INAT_TAXON").contains(r._3.split(":").head)) flatIdsDistinctByDataset.write.option("delimiter","""\t""").csv("idIteractionsWithNamespace.tsv") val interactionsIds = spark.read.option("delimiter","""\t""").csv("idIteractionsWithNamespace.tsv") val taxonIdKingdom = spark.read.option("delimiter","""\t""").csv("taxaIdToKingdom.tsv") val idLookup = taxonIdKingdom.as[(String, String)].distinct.collect.toMap val mappedInteractions = interactionsIds.as[(String, String, String, String)].map(r => (idLookup.get(r._1), r._2, idLookup.get(r._3), r._4)) mappedInteractions.write.option("delimiter","""\t""").csv("kingdom2kingdom.tsv") mappedInteractions.distinct.write.option("delimiter", """\t""").csv("kingdom2kingdom.distinct.tsv")