import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
# Initialize Spark session
spark = SparkSession.builder.appName("CreateTableFromImage").getOrCreate()
# Define the schema for the table
schema = StructType([
StructField("pag_code", StringType(), True),
StructField("additional_info", StringType(), True)
])
# Create a list of data extracted from the image
data = [
("1000008611B", '{"serviceTimestamp":"2023-07-27 14:40:32.0","rxNumber":"٩íJ٦۰J","stockPreviousOutstanding":"0","stockPreviousOnShelf":"0"}'),
("10000091223", '{"serviceTimestamp":"2023-07-20 19:14:43.0","rxNumber":"äß۶dJ","stockPreviousOutstanding":"0","stockPreviousOnShelf":"0"}'),
("1000008463", '{"receivedDatetime":"2023-07-26 15:10:52.162","serviceTimestamp":"2023-07-26 15:10:52.162","stockPreviousOutstanding":"0","stockPreviousOnShelf":"0"}'),
("10000091735", '{"serviceTimestamp":"2023-07-07 18:29:28.0","rxNumber":"ñ۱íJ٦۰J","stockPreviousOutstanding":"0","stockPreviousOnShelf":"0"}'),
("10000086294", '{"serviceTimestamp":"2023-07-21 14:35:26.0","rxNumber":"Ø۷ÛÇÛJ","stockPreviousOutstanding":"0","stockPreviousOnShelf":"0"}'),
("10000092389", '{"receivedDatetime":"2023-07-26 15:00:55.505","serviceTimestamp":"2023-07-26 15:00:55.505","stockPreviousOutstanding":"0","stockPreviousOnShelf":"180.0"}'),
("10000089106", '{"receivedDatetime":"2023-07-22 13:55:26.156","serviceTimestamp":"2023-07-22 13:55:26.156","stockPreviousOutstanding":"0","stockPreviousOnShelf":"18.0"}'),
("10000085132", '{"receivedDatetime":"2023-07-14 14:35:21.533","serviceTimestamp":"2023-07-14 14:35:21.533","stockPreviousOutstanding":"0","stockPreviousOnShelf":"18.0"}'),
("10000094870", '{"serviceTimestamp":"2023-07-27 18:13:10.0","rxNumber":"ØéÓÇÛJ","stockPreviousOutstanding":"0","stockPreviousOnShelf":"0"}'),
("10000090677", '{"stockIdempotenceCode":"162e8f3a-ef85-4e5d-85b0-003840b40d09","serviceTimestamp":"2023-07-03 01:50:03.1337","stockPreviousOutstanding":"0","stockPreviousOnShelf":"0"}'),
("10000086218", '{"serviceTimestamp":"2023-07-13 17:42:07.0","rxNumber":"Ý۲é۶ÛJ","stockPreviousOutstanding":"0","stockPreviousOnShelf":"0"}'),
("10000072977", '{"receivedDatetime":"2023-07-26 16:46:50.102","serviceTimestamp":"2023-07-26 16:46:50.102","rxNumber":"عµé۶ÛJ","stockPreviousOutstanding":"0","stockPreviousOnShelf":"0"}'),
("10000092489", '{"serviceTimestamp":"2023-07-12 19:24:20.0","rxNumber":"ø۵hý٦J","stockPreviousOutstanding":"0","stockPreviousOnShelf":"0"}'),
("10000090636", '{"serviceTimestamp":"2023-07-23 21:19:41.672614","rxNumber":"٠èÌfíJ","stockPreviousOutstanding":"0","stockPreviousOnShelf":"6","stockIdempotenceCode":"0f28167d-d5eb-4b23-9c5d-9ff27d221d10"}')
]
# Create DataFrame
df = spark.createDataFrame(data, schema)
# Show the DataFrame
df.display()
additional_info_schema = StructType([
StructField("serviceTimestamp", StringType(), True),
StructField("stockPreviousOutstanding", StringType(), True),
StructField("stockPreviousOnShelf", StringType(), True),
StructField("stockIdempotenceCode", StringType(), True),
StructField("estimatedDeliveryDate", StringType(), True)
])
# Convert the map to a JSON string and then parse the JSON string in the additional_info column
df_parsed = df.withColumn("parsed_additional_info", F.from_json(F.to_json(F.col("additional_info")), additional_info_schema))
df_parsed.display()
# Select the relevant columns from the parsed JSON
df_final = df_parsed.select(*key_cols,
F.col("parsed_additional_info.serviceTimestamp").alias("serviceTimestamp"),
F.col("parsed_additional_info.stockPreviousOutstanding").alias("stockPreviousOutstanding"),
F.col("parsed_additional_info.stockPreviousOnShelf").alias("stockPreviousOnShelf"),
F.col("parsed_additional_info.stockIdempotenceCode").alias("stockIdempotenceCode"),
F.col("parsed_additional_info.estimatedDeliveryDate").alias("estimatedDeliveryDate")
)
# Show the result
df_final.display()