Skip to content

Instantly share code, notes, and snippets.

@saswata-dutta
Created August 16, 2020 18:41
Show Gist options
  • Save saswata-dutta/4918f03d1d763275006d622b1ebc685f to your computer and use it in GitHub Desktop.
Save saswata-dutta/4918f03d1d763275006d622b1ebc685f to your computer and use it in GitHub Desktop.
import org.apache.spark.sql.types._
val subjectSchema = StructType(Array(
StructField("correspondence_id", StringType, false),
StructField("subject", StringType, false),
StructField("thread_subject", StringType, false)
))
val internal_subjects = spark.read.schema(subjectSchema).csv("internal")
val external_subjects = spark.read.schema(subjectSchema).csv("external")
external_subjects.select("thread_subject").dropDuplicates().count()
internal_subjects.select("thread_subject").dropDuplicates().count()
val truthSchema = StructType(Array(
StructField("correspondence_id", StringType, false),
StructField("orig_sys_ref", StringType, false),
StructField("from_type", StringType, false),
StructField("match_type", StringType, false)
))
val ground_truths = spark.read.schema(truthSchema).csv("ground_truths")
val internal = internal_subjects.join(right = ground_truths, usingColumns = Seq("correspondence_id"), joinType = "inner")
val external = external_subjects.join(right = ground_truths, usingColumns = Seq("correspondence_id"), joinType = "inner")
val external_result = external.withColumn("present_in_subject", col("thread_subject").contains(col("orig_sys_ref")))
val internal_result = internal.withColumn("present_in_subject", col("thread_subject").contains(col("orig_sys_ref")))
internal_result.filter(col("present_in_subject")).filter("match_type != 'SINGLE_MATCH'").count()
external_result.filter(col("present_in_subject")).filter("match_type != 'SINGLE_MATCH'").count()
internal_result.filter(col("present_in_subject")).filter("match_type != 'SINGLE_MATCH'").groupBy("match_type").count().show
internal_result.coalesce(1)
.write
.option("header","true")
.mode("overwrite")
.csv("internal_result")
external_result.coalesce(1)
.write
.option("header","true")
.mode("overwrite")
.csv("external_result")
internal_result.select("orig_sys_ref", "thread_subject").coalesce(1).write.option("header", false).csv("internal_subjects")
external_result.select("orig_sys_ref", "thread_subject").coalesce(1).write.option("header", false).csv("external_subjects")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment