Last active
August 8, 2023 18:05
-
-
Save yossisp/a268e60c4ed2a08a001be3fabaa6d1af to your computer and use it in GitHub Desktop.
How to convert CRUD status change log table into table with date and entity column?
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# The code below refers to the following Stackoverflow answer: https://stackoverflow.com/a/76818322/5863693 | |
!pip install -q pyspark==3.3.0 spark-nlp==5.0.0 | |
import sparknlp | |
import pandas as pd | |
from pyspark.sql import SparkSession | |
from pyspark import SparkContext | |
spark = SparkSession.builder \ | |
.appName("Python Spark SQL basic example") \ | |
.getOrCreate() | |
json_data_deleted = ''' | |
[ | |
{ | |
"timestamp": "2023-07-01T12:00:00Z", | |
"parent": "p1", | |
"child": "c1", | |
"status": "new" | |
}, | |
{ | |
"timestamp": "2023-07-01T13:00:00Z", | |
"parent": "p1", | |
"child": "c1", | |
"status": "existing" | |
}, | |
{ | |
"timestamp": "2023-07-05T13:00:00Z", | |
"parent": "p1", | |
"child": "c1", | |
"status": "deleted" | |
} | |
] | |
''' | |
# Create a RDD from the JSON data | |
sc = SparkContext.getOrCreate() | |
rdd = sc.parallelize([json_data_deleted]) | |
# Read the JSON data into a DataFrame | |
df_deleted = spark.read.json(rdd) | |
# Show the DataFrame content | |
df_deleted.show() | |
df_deleted.createOrReplaceTempView("df_deleted") # correct | |
# sql_results = spark.sql("SELECT name FROM people") | |
sql_results = spark.sql(""" | |
WITH transformed_input AS ( | |
SELECT | |
CAST(timestamp AS DATE) AS dt, | |
parent, | |
child, | |
status, | |
transform( | |
sequence( | |
0, | |
datediff( | |
cast(MAX(timestamp) OVER(PARTITION BY parent, child) AS DATE) , | |
cast(timestamp AS DATE) | |
) - 1 | |
), | |
sid -> date_add(CAST(timestamp AS DATE), sid) | |
) as dates | |
FROM df_deleted | |
) | |
SELECT | |
explode_outer(dates) AS date, | |
parent, | |
child | |
FROM transformed_input | |
WHERE status not in ('existing', 'deleted') | |
""") | |
sql_results.show(truncate=False) | |
# result above correct | |
# ------------------------------ | |
json_data_only_new_children = ''' | |
[ | |
{ | |
"timestamp": "2023-07-01T08:00:00Z", | |
"parent": "p1", | |
"child": "c1", | |
"status": "new" | |
}, | |
{ | |
"timestamp": "2023-07-09T08:00:00Z", | |
"parent": "p2", | |
"child": "c1", | |
"status": "new" | |
} | |
] | |
''' | |
# Create a RDD from the JSON data | |
sc = SparkContext.getOrCreate() | |
rdd = sc.parallelize([json_data_only_new_children]) | |
# Read the JSON data into a DataFrame | |
df_only_new_children = spark.read.json(rdd) | |
# Show the DataFrame content | |
df_only_new_children.show() | |
df_only_new_children.createOrReplaceTempView("df_only_new_children") # not correct | |
sql_results = spark.sql(""" | |
WITH transformed_input AS ( | |
SELECT | |
CAST(timestamp AS DATE) AS dt, | |
parent, | |
child, | |
status, | |
transform( | |
sequence( | |
0, | |
datediff( | |
cast(MAX(timestamp) OVER(PARTITION BY parent, child) AS DATE) , | |
cast(timestamp AS DATE) | |
) - 1 | |
), | |
sid -> date_add(CAST(timestamp AS DATE), sid) | |
) as dates | |
FROM df_only_new_children | |
) | |
SELECT | |
explode_outer(dates) AS date, | |
parent, | |
child | |
FROM transformed_input | |
WHERE status not in ('existing', 'deleted') | |
""") | |
sql_results.show(truncate=False) | |
# result above not correct | |
# +----------+------+-----+ | |
# |date |parent|child| | |
# +----------+------+-----+ | |
# |2023-07-01|p1 |c1 | | |
# |2023-07-02|p2 |c1 | | |
# |2023-07-03|p2 |c1 | | |
# |2023-07-03|p2 |c1 | | |
# . | |
# . | |
# . | |
# |2023-08-08|p2 |c1 | (today's date) | |
# --------- | |
json_data_multiple_combinations = ''' | |
[ | |
{ | |
"timestamp": "2023-07-01T08:00:00Z", | |
"parent": "p1", | |
"child": "c1", | |
"status": "new" | |
}, | |
{ | |
"timestamp": "2023-07-02T08:00:00Z", | |
"parent": "p2", | |
"child": "c1", | |
"status": "deleted" | |
}, | |
{ | |
"timestamp": "2023-07-04T08:00:00Z", | |
"parent": "p1", | |
"child": "c1", | |
"status": "new" | |
}, | |
{ | |
"timestamp": "2023-07-05T08:00:00Z", | |
"parent": "p2", | |
"child": "c1", | |
"status": "deleted" | |
} | |
] | |
''' | |
# Create a RDD from the JSON data | |
sc = SparkContext.getOrCreate() | |
rdd = sc.parallelize([json_data_multiple_combinations]) | |
# Read the JSON data into a DataFrame | |
df_json_data_multiple_combinations = spark.read.json(rdd) | |
# Show the DataFrame content | |
# df_only_new_children.show() | |
df_json_data_multiple_combinations.createOrReplaceTempView("df_json_data_multiple_combinations") | |
sql_results = spark.sql(""" | |
WITH transformed_input AS ( | |
SELECT | |
CAST(timestamp AS DATE) AS dt, | |
parent, | |
child, | |
status, | |
transform( | |
sequence( | |
0, -- Added below logic based git code. | |
CASE WHEN first(status) OVER(PARTITION BY parent, child order by timestamp desc) == 'existing' | |
THEN datediff(cast(MAX(timestamp) OVER(PARTITION BY parent, child) AS DATE) ,cast(timestamp AS DATE)) | |
WHEN first(status) OVER(PARTITION BY parent, child order by timestamp desc) == 'new' | |
THEN 0 | |
ELSE | |
datediff( | |
cast(MAX(timestamp) OVER(PARTITION BY parent, child) AS DATE) , | |
cast(timestamp AS DATE) | |
) - 1 | |
END | |
), | |
sid -> date_add(CAST(timestamp AS DATE), sid) | |
) as dates | |
FROM df_json_data_multiple_combinations | |
) | |
SELECT | |
distinct -- to remove duplicate entries in dataset. | |
explode_outer(dates) AS date, | |
parent, | |
child | |
FROM transformed_input | |
WHERE status not in ('existing', 'deleted') | |
""") | |
sql_results.show(truncate=False) | |
# the result above is not correct | |
# expected: | |
# | date | parent | child | | |
# | ---------- | ------ | ----- | | |
# | 2022-07-01 | p1 | c1 | | |
# | 2022-07-02 | p1 | c1 | | |
# | 2022-07-04 | p1 | c1 | | |
# | 2023-07-05 | p1 | c1 | | |
# received | |
# +----------+------+-----+ | |
# |date |parent|child| | |
# +----------+------+-----+ | |
# |2023-07-04|p1 |c1 | | |
# |2023-07-01|p1 |c1 | | |
# +----------+------+-----+ | |
#------------ | |
json_data_only_deleted = ''' | |
[ | |
{ | |
"timestamp": "2023-07-11T12:00:00Z", | |
"parent": "p1", | |
"child": "c1", | |
"status": "deleted" | |
} | |
] | |
''' | |
# Create a RDD from the JSON data | |
sc = SparkContext.getOrCreate() | |
rdd = sc.parallelize([json_data_only_deleted]) | |
# Read the JSON data into a DataFrame | |
df_only_deleted = spark.read.json(rdd) | |
df_only_deleted.createOrReplaceTempView("df_only_deleted") | |
sql_results = spark.sql(""" | |
WITH input AS ( | |
SELECT | |
CAST(timestamp AS DATE) AS dt, | |
parent, | |
child, | |
status, | |
cast(MAX(timestamp) OVER(PARTITION BY parent, child) AS DATE) as max_ts, | |
CASE WHEN LEAD(CAST(timestamp AS DATE), 1) OVER(PARTITION BY parent, child order by timestamp desc) IS NULL | |
THEN CAST(timestamp AS DATE) | |
ELSE LEAD(CAST(timestamp AS DATE), 1) OVER(PARTITION BY parent, child order by timestamp desc) | |
END as min_ts, | |
row_number() OVER(PARTITION BY parent, child order by timestamp desc) as row_number | |
FROM df_only_deleted | |
), | |
transformed_input AS ( | |
SELECT | |
dt, | |
parent, | |
child, | |
max_ts, | |
min_ts, | |
row_number, | |
CASE WHEN dt == max_ts AND (status == 'existing' OR status == 'new') THEN datediff(current_date, min_ts) | |
WHEN dt == max_ts AND status == 'deleted' THEN datediff(date_sub(max_ts, 1), min_ts) | |
END AS new_date_diff , | |
transform( | |
sequence( | |
0, | |
CASE WHEN dt == max_ts AND (status == 'existing' OR status == 'new') | |
THEN datediff(current_date, min_ts) | |
WHEN dt == max_ts AND status == 'deleted' | |
THEN datediff(date_sub(max_ts, 1), min_ts) | |
END | |
), | |
sid -> date_add(min_ts, sid) | |
) as dates | |
FROM input WHERE row_number = 1 | |
) | |
SELECT | |
distinct | |
explode_outer(dates) AS date, | |
parent, | |
child | |
FROM | |
transformed_input | |
""") | |
sql_results.show(1000, truncate=False) | |
# the result above is not correct | |
# expected: | |
# | date | parent | child | | |
# | ---------- | ------ | ----- | | |
# | 2022-07-08 | p1 | c1 | | |
# | 2022-07-09 | p1 | c1 | | |
# | 2022-07-10 | p1 | c1 | | |
# | 2022-07-11 | p1 | c1 | | |
# received | |
# +----------+------+-----+ | |
# |date |parent|child| | |
# +----------+------+-----+ | |
# |2023-07-11|p1 |c1 | | |
# |2023-07-10|p1 |c1 | | |
# +----------+------+-----+ |
psrinuhp
commented
Aug 8, 2023
•
The above SQL doesn't work with the following input:
[
{
"timestamp": "2023-07-01T12:00:00Z",
"parent": "p1",
"child": "c1",
"status": "new"
},
{
"timestamp": "2023-07-01T13:00:00Z",
"parent": "p1",
"child": "c1",
"status": "existing"
},
{
"timestamp": "2023-07-05T13:00:00Z",
"parent": "p1",
"child": "c1",
"status": "existing"
}
]
val mdata = """
[
{
"timestamp": "2023-08-01T12:00:00Z",
"parent": "p1",
"child": "c1",
"status": "new"
},
{
"timestamp": "2023-08-01T13:00:00Z",
"parent": "p1",
"child": "c1",
"status": "existing"
},
{
"timestamp": "2023-08-05T13:00:00Z",
"parent": "p1",
"child": "c1",
"status": "existing"
},
{
"timestamp": "2023-08-01T12:00:00Z",
"parent": "p2",
"child": "c2",
"status": "new"
},
{
"timestamp": "2023-08-01T13:00:00Z",
"parent": "p2",
"child": "c2",
"status": "existing"
},
{
"timestamp": "2023-08-05T13:00:00Z",
"parent": "p2",
"child": "c2",
"status": "deleted"
},
{
"timestamp": "2023-08-01T08:00:00Z",
"parent": "p3",
"child": "c3",
"status": "new"
},
{
"timestamp": "2023-08-01T13:00:00Z",
"parent": "p3",
"child": "c3",
"status": "existing"
},
{
"timestamp": "2023-08-01T14:00:00Z",
"parent": "p3",
"child": "c3",
"status": "deleted"
},
{
"timestamp": "2023-08-01T15:00:00Z",
"parent": "p3",
"child": "c3",
"status": "new"
},
{
"timestamp": "2023-08-01T08:00:00Z",
"parent": "p4",
"child": "c4",
"status": "new"
},
{
"timestamp": "2023-08-08T08:00:00Z",
"parent": "p5",
"child": "c1",
"status": "new"
},
{
"timestamp": "2023-08-01T12:00:00Z",
"parent": "p6",
"child": "c6",
"status": "new"
},
{
"timestamp": "2023-08-01T13:00:00Z",
"parent": "p6",
"child": "c6",
"status": "existing"
},
{
"timestamp": "2023-08-05T13:00:00Z",
"parent": "p6",
"child": "c6",
"status": "existing"
}
]"""
spark.read.json(Seq(mdata).toDS).createOrReplaceTempView("curd_data")
spark.table("curd_data").show(false)
spark.sql("""WITH input AS (
SELECT
CAST(timestamp AS DATE) AS dt,
parent,
child,
status,
cast(MAX(timestamp) OVER(PARTITION BY parent, child) AS DATE) as max_ts,
CASE WHEN LEAD(CAST(timestamp AS DATE), 1) OVER(PARTITION BY parent, child order by timestamp desc) IS NULL
THEN CAST(timestamp AS DATE)
ELSE LEAD(CAST(timestamp AS DATE), 1) OVER(PARTITION BY parent, child order by timestamp desc)
END as min_ts,
row_number() OVER(PARTITION BY parent, child order by timestamp desc) as row_number
FROM curd_data
),
transformed_input AS (
SELECT
dt,
parent,
child,
max_ts,
min_ts,
row_number,
CASE WHEN dt == max_ts AND (status == 'existing' OR status == 'new') THEN datediff(current_date, min_ts)
WHEN dt == max_ts AND status == 'deleted' THEN datediff(date_sub(max_ts, 1), min_ts)
END AS new_date_diff ,
transform(
sequence(
0,
CASE WHEN dt == max_ts AND (status == 'existing' OR status == 'new')
THEN datediff(current_date, min_ts)
WHEN dt == max_ts AND status == 'deleted'
THEN datediff(date_sub(max_ts, 1), min_ts)
END
),
sid -> date_add(min_ts, sid)
) as dates
FROM input WHERE row_number = 1
)
SELECT
distinct
explode_outer(dates) AS date,
parent,
child
FROM
transformed_input
""").show(100, false)
+-----+------+--------+--------------------+
|child|parent|status |timestamp |
+-----+------+--------+--------------------+
|c1 |p1 |new |2023-08-01T12:00:00Z|
|c1 |p1 |existing|2023-08-01T13:00:00Z|
|c1 |p1 |existing|2023-08-05T13:00:00Z|
|c2 |p2 |new |2023-08-01T12:00:00Z|
|c2 |p2 |existing|2023-08-01T13:00:00Z|
|c2 |p2 |deleted |2023-08-05T13:00:00Z|
|c3 |p3 |new |2023-08-01T08:00:00Z|
|c3 |p3 |existing|2023-08-01T13:00:00Z|
|c3 |p3 |deleted |2023-08-01T14:00:00Z|
|c3 |p3 |new |2023-08-01T15:00:00Z|
|c4 |p4 |new |2023-08-01T08:00:00Z|
|c1 |p5 |new |2023-08-08T08:00:00Z|
|c6 |p6 |new |2023-08-01T12:00:00Z|
|c6 |p6 |existing|2023-08-01T13:00:00Z|
|c6 |p6 |existing|2023-08-05T13:00:00Z|
+-----+------+--------+--------------------+
+----------+------+-----+
|date |parent|child|
+----------+------+-----+
|2023-08-01|p1 |c1 |
|2023-08-02|p1 |c1 |
|2023-08-03|p1 |c1 |
|2023-08-04|p1 |c1 |
|2023-08-05|p1 |c1 |
|2023-08-06|p1 |c1 |
|2023-08-07|p1 |c1 |
|2023-08-08|p1 |c1 |
|2023-08-01|p2 |c2 |
|2023-08-02|p2 |c2 |
|2023-08-03|p2 |c2 |
|2023-08-04|p2 |c2 |
|2023-08-01|p3 |c3 |
|2023-08-02|p3 |c3 |
|2023-08-03|p3 |c3 |
|2023-08-04|p3 |c3 |
|2023-08-05|p3 |c3 |
|2023-08-06|p3 |c3 |
|2023-08-07|p3 |c3 |
|2023-08-08|p3 |c3 |
|2023-08-01|p4 |c4 |
|2023-08-02|p4 |c4 |
|2023-08-03|p4 |c4 |
|2023-08-04|p4 |c4 |
|2023-08-05|p4 |c4 |
|2023-08-06|p4 |c4 |
|2023-08-07|p4 |c4 |
|2023-08-08|p4 |c4 |
|2023-08-08|p5 |c1 |
|2023-08-01|p6 |c6 |
|2023-08-02|p6 |c6 |
|2023-08-03|p6 |c6 |
|2023-08-04|p6 |c6 |
|2023-08-05|p6 |c6 |
|2023-08-06|p6 |c6 |
|2023-08-07|p6 |c6 |
|2023-08-08|p6 |c6 |
+----------+------+-----+
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment