Created
August 8, 2023 13:19
-
-
Save yossisp/7ce90216e61a548198489450a99052fa to your computer and use it in GitHub Desktop.
How to convert CRUD status change log table into table with date and entity column?
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
!pip install -q pyspark==3.3.0 spark-nlp==5.0.0 | |
import pandas as pd | |
from pyspark.sql import SparkSession | |
from pyspark import SparkContext | |
spark = SparkSession.builder \ | |
.appName("Python Spark SQL basic example") \ | |
.getOrCreate() | |
json_data_no_deleted = ''' | |
[ | |
{ | |
"timestamp": "2023-07-01T12:00:00Z", | |
"parent": "p1", | |
"child": "c1", | |
"status": "new" | |
}, | |
{ | |
"timestamp": "2023-07-01T13:00:00Z", | |
"parent": "p1", | |
"child": "c1", | |
"status": "existing" | |
}, | |
{ | |
"timestamp": "2023-07-05T13:00:00Z", | |
"parent": "p1", | |
"child": "c1", | |
"status": "existing" | |
} | |
] | |
''' | |
# Create a RDD from the JSON data | |
sc = SparkContext.getOrCreate() | |
rdd = sc.parallelize([json_data_no_deleted]) | |
# Read the JSON data into a DataFrame | |
df_no_deleted = spark.read.json(rdd) | |
# Show the DataFrame content | |
df_no_deleted.show() | |
df_no_deleted.createOrReplaceTempView("df_no_deleted") # not correct | |
sql_results = spark.sql(""" | |
WITH | |
changes AS ( | |
SELECT | |
parent, | |
child, | |
CASE | |
WHEN status = 'new' THEN timestamp | |
WHEN LAG(status) OVER (PARTITION BY parent, child ORDER BY timestamp) = 'new' AND status = 'deleted' THEN timestamp | |
END as active_period, | |
status | |
FROM | |
df_no_deleted | |
WHERE | |
status IN ('new', 'deleted') | |
), | |
matched_periods_floored AS ( | |
SELECT | |
parent, | |
child, | |
status, | |
active_period, | |
FLOOR((ROW_NUMBER() OVER (PARTITION BY parent, child ORDER BY active_period) - 1) / 2) as floored | |
FROM | |
changes | |
), | |
matched_periods AS ( | |
SELECT | |
parent, | |
child, | |
MIN(CASE WHEN status = 'new' THEN active_period END) as start_date, | |
MIN(CASE WHEN status = 'deleted' THEN active_period END) as end_date | |
FROM | |
matched_periods_floored | |
GROUP BY | |
parent, | |
child, | |
floored | |
), | |
final_table AS ( | |
SELECT | |
EXPLODE(SEQUENCE(TO_DATE(start_date), TO_DATE(end_date), INTERVAL 1 DAY)) AS date, | |
parent, | |
child | |
FROM | |
matched_periods | |
) | |
select * from final_table | |
""") | |
sql_results.show(truncate=False) | |
# result above is not correct | |
# ------ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment