val taxa = spark.read.option("delimiter","""\t""").option("header","true").csv("taxonCache.tsv.bz2")

taxa.printSchema
import spark.implicits._
val taxonCache = spark.read.option("delimiter","""\t""").option("header","true").csv("taxonCache.tsv.bz2")
val taxonIdsPaths = taxonCache.select("id", "pathNames", "path").as[(String, String, String)].filter(_._2 != null).filter( _._3 != null).filter(_._1 != null)
    
val taxaIdToKingdom = taxonIdsPaths.map( r=> (r._1, r._2.split("\\|").map(_.trim), r._3.split("\\|").map(_.trim))).map(r => (r._1, r._2.zip(r._3))).map(r => (r._1, r._2.filter(_._1 == "kingdom").map(_._2).mkString)).filter(_._2.nonEmpty).filter(r => List("GBIF", "ITIS","WORMS", "INAT_TAXON").contains(r._1.split(":").head)).filter(_._2 != "incertae sedis")
  
taxaIdToKingdom.write.option("delimiter","""\t""").csv("taxaIdToKingdom.tsv")
  
val interactions = spark.read.option("delimiter","""\t""").option("header", "true").csv("interactions.tsv.bz2")
val interactionIds = interactions.select("sourceTaxonIds", "interactionTypeName", "targetTaxonIds", "sourceNamespace")
val flatIdsDistinctByDataset = interactionIds.as[(String, String, String, String)].filter(_._1 != null).filter(_._3 != null).flatMap(r => r._1.split("""\|""").map(x => (x.trim, r._2, r._3, r._4))).filter(_._1 != "no:match").flatMap(r => r._3.split("""\|""").map(x => (r._1, r._2, x.trim, r._4))).filter(_._3 != "no:match").filter(r => List("GBIF", "ITIS","WORMS", "INAT_TAXON").contains(r._1.split(":").head)).filter(r => List("GBIF", "ITIS", "WORMS", "INAT_TAXON").contains(r._3.split(":").head))
flatIdsDistinctByDataset.write.option("delimiter","""\t""").csv("idIteractionsWithNamespace.tsv")
  
val interactionsIds = spark.read.option("delimiter","""\t""").csv("idIteractionsWithNamespace.tsv")
val taxonIdKingdom = spark.read.option("delimiter","""\t""").csv("taxaIdToKingdom.tsv")
  
val idLookup = taxonIdKingdom.as[(String, String)].distinct.collect.toMap
val mappedInteractions = interactionsIds.as[(String, String, String, String)].map(r => (idLookup.get(r._1), r._2, idLookup.get(r._3), r._4))
  
mappedInteractions.write.option("delimiter","""\t""").csv("kingdom2kingdom.tsv")
mappedInteractions.distinct.write.option("delimiter", """\t""").csv("kingdom2kingdom.distinct.tsv")