Last active
October 11, 2022 13:53
-
-
Save lordlinus/a9942992db691818e14c478f8ef0f811 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Databricks notebook source | |
import org.graphframes.GraphFrame | |
import org.apache.spark.sql.functions._ | |
import org.apache.spark.sql.types._ | |
import org.apache.spark.sql.{Row, SparkSession} | |
import org.apache.spark.sql.functions.struct | |
sc.setCheckpointDir("/dbfs/cp") | |
// COMMAND ---------- | |
val data = | |
spark.read.option("header", "true").csv("dbfs:/FileStore/sampleHashData.csv") | |
val v1 = data | |
.withColumnRenamed("_beforeDataHash", "id") | |
.withColumn("changeSequence", col("changeSequence").cast("decimal(38,0)")) | |
.select("id", "changeSequence") | |
val v2 = data | |
.withColumnRenamed("_rowHash", "id") | |
.withColumn("changeSequence", col("changeSequence").cast("decimal(38,0)")) | |
.select("id", "changeSequence") | |
val v = v1.union(v2).repartition() | |
val e = data | |
.withColumnRenamed("_beforeDataHash", "src") | |
.withColumnRenamed("_rowHash", "dst") | |
.select("src", "dst", "operation") | |
val g = GraphFrame(v.dropDuplicates, e.dropDuplicates) | |
// COMMAND ---------- | |
data.printSchema | |
// COMMAND ---------- | |
// val v = sqlContext.createDataFrame(List( | |
// ("a", 10), | |
// ("b", 20), | |
// ("c", 30), | |
// ("x", 10), | |
// ("h", 80), | |
// ("y", 20), | |
// ("z", 30), | |
// ("d", 40), | |
// ("e", 50), | |
// ("f", 60), | |
// ("g", 70), | |
// )).toDF("id", "changeSequence") | |
// val e = sqlContext.createDataFrame(List( | |
// ("a", "b", "UPDATE"), | |
// ("b", "c", "UPDATE"), | |
// ("g", "h", "UPDATE"), | |
// ("x", "y", "UPDATE"), | |
// ("y", "z", "UPDATE"), | |
// ("c", "d", "UPDATE"), | |
// ("d", "e", "UPDATE"), | |
// ("e", "f", "UPDATE"), | |
// ("f", "g", "UPDATE"), | |
// )).toDF("src", "dst", "operation") | |
// val g = GraphFrame(v, e) | |
// COMMAND ---------- | |
val df1 = g.connectedComponents | |
.run() | |
.withColumn("col1", struct(col("id"), col("changeSequence"))) | |
.groupBy("component") | |
.agg(collect_list(col("col1"))) | |
.withColumnRenamed("collect_list(col1)", "col2") | |
.select("col2") | |
.withColumn("x_max", expr("""array_max(col2.changeSequence)""")) | |
.withColumn("x_min", expr("""array_min(col2.changeSequence)""")) | |
// COMMAND ---------- | |
display(df1) | |
// COMMAND ---------- | |
val _ = df1.createOrReplaceTempView("tmpTable") | |
val df2 = spark | |
.sql( | |
""" select filter(col2, a -> a.changeSequence=x_min ) startHash, filter(col2, a -> a.changeSequence=x_max ) endHash from tmpTable """ | |
) | |
.withColumn("startids", explode(col("startHash.id"))) | |
// COMMAND ---------- | |
// val df3 = df2 | |
// .as("joineddf1") | |
// .join(data.as("maindf1"), $"startids" === $"_beforeDataHash") | |
// val df4 = | |
// df3.as("joineddf2").join(data.as("maindf2"), $"endids" === "maindf2._rowHash") | |
// COMMAND ---------- | |
display(df3) | |
// COMMAND ---------- | |
df4.show(false) | |
// COMMAND ---------- |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment