Last active
March 27, 2022 16:44
-
-
Save thanoojgithub/48e2add331c91a8c6cfc2a5fe16314db to your computer and use it in GitHub Desktop.
Sample code 2 - Implementing SCD Type 2 Data model using PySpark
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.sql import SparkSession | |
from pyspark.sql.functions import * | |
spark = SparkSession \ | |
.builder \ | |
.master('local') \ | |
.appName('pyspark-test-run') \ | |
.getOrCreate() | |
spark.sparkContext.setLogLevel("ERROR") | |
temporaryGcsBucket = "temporarygcsbucket1" | |
spark.conf.set('temporaryGcsBucket', temporaryGcsBucket) | |
df1 = spark.read.format('bigquery').option('table', 'cancellationData1.cancellationData').load() | |
df2 = spark.read.format("csv").options(header='True', inferSchema='True', delimiter=',') \ | |
.load("gs://bucket_27122021/data/cancellationData/Cancellation_Details_Jan012022.csv") | |
# For now, converting csv to parquet format, writing to GCP bucket, to make expected input to start transformation. | |
df2.write.mode("overwrite").parquet("gs://bucket_27122021/data/cancellationData/Jan2022/parquet") | |
# actual transformation will start from here. | |
# reading airLine data as parquet format from GCP bucket and dropping unwanted columns | |
df2_1 = spark.read.parquet("gs://bucket_27122021/data/cancellationData/Jan2022/parquet") \ | |
.drop('CreatedTimestamp', 'UpdatedTimestamp') | |
df2_2 = df2_1.filter(col("CancellationCode").isNotNull() & col("CancellationDesc").isNotNull()) | |
ccDF2_3 = df2_2.withColumn("StartDate", to_timestamp(lit("2022-01-31 23:59:59"), 'yyyy-MM-dd HH:mm:ss')) \ | |
.withColumn("EndDate", to_timestamp(lit("9999-12-31 23:59:59"), 'yyyy-MM-dd HH:mm:ss')) | |
df1.show(truncate=False) | |
ccDF2_3.show(truncate=False) | |
ccDF1_1 = df1.filter(to_timestamp(col('EndDate')) < to_timestamp(lit("9999-12-31 23:59:59"), 'yyyy-MM-dd HH:mm:ss')) | |
ccDF1_1.show(truncate=False) | |
ccDF1_2 = df1.filter(to_timestamp(col('EndDate')) == to_timestamp(lit("9999-12-31 23:59:59"), 'yyyy-MM-dd HH:mm:ss')) \ | |
.withColumn("EndDate", | |
when(col('CancellationCode').isin( | |
ccDF2_3.select(col('CancellationCode')).rdd.flatMap(lambda x: x).collect()), | |
to_timestamp(current_timestamp(), "yyyy-MM-dd HH:mm:ss")).otherwise(col('EndDate'))) | |
ccDF1_2.printSchema() | |
ccDF1_2.show(truncate=False) | |
ccDF1_3 = ccDF1_1.unionByName(ccDF1_2).unionByName(ccDF2_3).orderBy(col('CancellationCode'), col('EndDate')) | |
ccDF1_3.printSchema() | |
ccDF1_3.show(truncate=False) | |
ccDF1_3.write.format('bigquery') \ | |
.option('table', 'cancellationData1.cancellationData') \ | |
.partitionBy('CancellationCode') \ | |
.mode("overwrite") \ | |
.save() | |
+----------------+------------------------------+-------------------+-----------------------+ | |
|CancellationCode|CancellationDesc |StartDate |EndDate | | |
+----------------+------------------------------+-------------------+-----------------------+ | |
|A |Due to heavy rainfall |2021-12-31 23:59:59|2022-01-31 08:21:12.648| | |
|B |Due to heavy snowfall |2021-12-31 23:59:59|2022-01-31 08:21:12.648| | |
|C |Due to some technical issue |2021-12-31 23:59:59|2022-01-31 08:21:12.648| | |
|A |Due to heavy rainfall |2022-01-31 23:59:59|9999-12-31 23:59:59 | | |
|B |Due to heavy near by tornadoes|2022-01-31 23:59:59|9999-12-31 23:59:59 | | |
|C |Due to some technical issue |2022-01-31 23:59:59|9999-12-31 23:59:59 | | |
|D |Due to bad radio signals |2022-01-31 23:59:59|9999-12-31 23:59:59 | | |
|E |Due to some technical glitches|2022-01-31 23:59:59|9999-12-31 23:59:59 | | |
+----------------+------------------------------+-------------------+-----------------------+ | |
+----------------+------------------------------+-------------------+-------------------+ | |
|CancellationCode|CancellationDesc |StartDate |EndDate | | |
+----------------+------------------------------+-------------------+-------------------+ | |
|A |Due to heavy rainfall |2022-01-31 23:59:59|9999-12-31 23:59:59| | |
|C |Due to some technical issue |2022-01-31 23:59:59|9999-12-31 23:59:59| | |
|B |Due to heavy near by tornadoes|2022-01-31 23:59:59|9999-12-31 23:59:59| | |
|E |Due to some technical glitches|2022-01-31 23:59:59|9999-12-31 23:59:59| | |
|D |Due to bad radio signals |2022-01-31 23:59:59|9999-12-31 23:59:59| | |
+----------------+------------------------------+-------------------+-------------------+ | |
+----------------+---------------------------+-------------------+-----------------------+ | |
|CancellationCode|CancellationDesc |StartDate |EndDate | | |
+----------------+---------------------------+-------------------+-----------------------+ | |
|A |Due to heavy rainfall |2021-12-31 23:59:59|2022-01-31 08:21:12.648| | |
|B |Due to heavy snowfall |2021-12-31 23:59:59|2022-01-31 08:21:12.648| | |
|C |Due to some technical issue|2021-12-31 23:59:59|2022-01-31 08:21:12.648| | |
+----------------+---------------------------+-------------------+-----------------------+ | |
root | |
|-- CancellationCode: string (nullable = true) | |
|-- CancellationDesc: string (nullable = true) | |
|-- StartDate: timestamp (nullable = false) | |
|-- EndDate: timestamp (nullable = true) | |
+----------------+------------------------------+-------------------+-----------------------+ | |
|CancellationCode|CancellationDesc |StartDate |EndDate | | |
+----------------+------------------------------+-------------------+-----------------------+ | |
|A |Due to heavy rainfall |2022-01-31 23:59:59|2022-01-31 08:56:50.429| | |
|B |Due to heavy near by tornadoes|2022-01-31 23:59:59|2022-01-31 08:56:50.429| | |
|C |Due to some technical issue |2022-01-31 23:59:59|2022-01-31 08:56:50.429| | |
|D |Due to bad radio signals |2022-01-31 23:59:59|2022-01-31 08:56:50.429| | |
|E |Due to some technical glitches|2022-01-31 23:59:59|2022-01-31 08:56:50.429| | |
+----------------+------------------------------+-------------------+-----------------------+ | |
root | |
|-- CancellationCode: string (nullable = true) | |
|-- CancellationDesc: string (nullable = true) | |
|-- StartDate: timestamp (nullable = true) | |
|-- EndDate: timestamp (nullable = true) | |
+----------------+------------------------------+-------------------+-----------------------+ | |
|CancellationCode|CancellationDesc |StartDate |EndDate | | |
+----------------+------------------------------+-------------------+-----------------------+ | |
|A |Due to heavy rainfall |2021-12-31 23:59:59|2022-01-31 08:21:12.648| | |
|A |Due to heavy rainfall |2022-01-31 23:59:59|2022-01-31 08:56:51.105| | |
|A |Due to heavy rainfall |2022-01-31 23:59:59|9999-12-31 23:59:59 | | |
|B |Due to heavy snowfall |2021-12-31 23:59:59|2022-01-31 08:21:12.648| | |
|B |Due to heavy near by tornadoes|2022-01-31 23:59:59|2022-01-31 08:56:51.105| | |
|B |Due to heavy near by tornadoes|2022-01-31 23:59:59|9999-12-31 23:59:59 | | |
|C |Due to some technical issue |2021-12-31 23:59:59|2022-01-31 08:21:12.648| | |
|C |Due to some technical issue |2022-01-31 23:59:59|2022-01-31 08:56:51.105| | |
|C |Due to some technical issue |2022-01-31 23:59:59|9999-12-31 23:59:59 | | |
|D |Due to bad radio signals |2022-01-31 23:59:59|2022-01-31 08:56:51.105| | |
|D |Due to bad radio signals |2022-01-31 23:59:59|9999-12-31 23:59:59 | | |
|E |Due to some technical glitches|2022-01-31 23:59:59|2022-01-31 08:56:51.105| | |
|E |Due to some technical glitches|2022-01-31 23:59:59|9999-12-31 23:59:59 | | |
+----------------+------------------------------+-------------------+-----------------------+ | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment