Skip to content

Instantly share code, notes, and snippets.

@saswata-dutta
Created July 21, 2022 17:51
Show Gist options
  • Save saswata-dutta/ec1863ff25ba71e4de6f6ae68044768d to your computer and use it in GitHub Desktop.
Save saswata-dutta/ec1863ff25ba71e4de6f6ae68044768d to your computer and use it in GitHub Desktop.
spark edge validation
val oid_info = Seq((1,1), (1, 2), (1, 1), (2, 1), (2, 3), (3, 4), (3, 3), (3, 4)).toDF("oid", "ip")
"""
+---+---+
|oid| ip|
+---+---+
| 1| 1|
| 1| 2|
| 1| 1|
| 2| 1|
| 2| 3|
| 3| 4|
| 3| 3|
| 3| 4|
+---+---+
"""
val edges = Seq((1, 2), (2, 1), (3, 2), (2, 3), (1, 3), (3, 1)).toDF("u", "v")
"""
+---+---+
| u| v|
+---+---+
| 1| 2|
| 2| 1|
| 3| 2|
| 2| 3|
| 1| 3|
| 3| 1|
+---+---+
"""
val fat = edges.join(oid_info.selectExpr("oid as u_oid", "ip as u_ip"), col("u") === col("u_oid")).join(oid_info.selectExpr("oid as v_oid", "ip as v_ip"), col("v") === col("v_oid"))
"""
+---+---+-----+----+-----+----+
| u| v|u_oid|u_ip|v_oid|v_ip|
+---+---+-----+----+-----+----+
| 1| 3| 1| 1| 3| 4|
| 1| 3| 1| 1| 3| 3|
| 1| 3| 1| 1| 3| 4|
| 1| 2| 1| 1| 2| 3|
| 1| 2| 1| 1| 2| 1|
| 1| 3| 1| 2| 3| 4|
| 1| 3| 1| 2| 3| 3|
| 1| 3| 1| 2| 3| 4|
| 1| 2| 1| 2| 2| 3|
| 1| 2| 1| 2| 2| 1|
| 1| 3| 1| 1| 3| 4|
| 1| 3| 1| 1| 3| 3|
| 1| 3| 1| 1| 3| 4|
| 1| 2| 1| 1| 2| 3|
| 1| 2| 1| 1| 2| 1|
| 2| 3| 2| 1| 3| 4|
| 2| 3| 2| 1| 3| 3|
| 2| 3| 2| 1| 3| 4|
| 2| 1| 2| 1| 1| 1|
| 2| 1| 2| 1| 1| 2|
+---+---+-----+----+-----+----+
only showing top 20 rows
"""
fat.groupBy("u", "v").agg(count(when(col("u_ip") === col("v_ip"), 1)).as("count")).filter(col("count") < 1).show
"""
+---+---+-----+
| u| v|count|
+---+---+-----+
| 3| 1| 0|
| 1| 3| 0|
+---+---+-----+
"""
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment