Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save dvu4/250b707d8479dcf4fb6ec2e1efe169e6 to your computer and use it in GitHub Desktop.
Save dvu4/250b707d8479dcf4fb6ec2e1efe169e6 to your computer and use it in GitHub Desktop.
Divide the column containing JSON string into separate columns in PySpark

Divide the column containing JSON string into separate columns in PySpark

  • create Dataframe
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType

# Initialize Spark session
spark = SparkSession.builder.appName("CreateTableFromImage").getOrCreate()

# Define the schema for the table
schema = StructType([
    StructField("pag_code", StringType(), True),
    StructField("additional_info", StringType(), True)
])

# Create a list of data extracted from the image
data = [
    ("1000008611B", '{"serviceTimestamp":"2023-07-27 14:40:32.0","rxNumber":"٩íJ٦۰J","stockPreviousOutstanding":"0","stockPreviousOnShelf":"0"}'),
    ("10000091223", '{"serviceTimestamp":"2023-07-20 19:14:43.0","rxNumber":"äß۶dJ","stockPreviousOutstanding":"0","stockPreviousOnShelf":"0"}'),
    ("1000008463", '{"receivedDatetime":"2023-07-26 15:10:52.162","serviceTimestamp":"2023-07-26 15:10:52.162","stockPreviousOutstanding":"0","stockPreviousOnShelf":"0"}'),
    ("10000091735", '{"serviceTimestamp":"2023-07-07 18:29:28.0","rxNumber":"ñ۱íJ٦۰J","stockPreviousOutstanding":"0","stockPreviousOnShelf":"0"}'),
    ("10000086294", '{"serviceTimestamp":"2023-07-21 14:35:26.0","rxNumber":"Ø۷ÛÇÛJ","stockPreviousOutstanding":"0","stockPreviousOnShelf":"0"}'),
    ("10000092389", '{"receivedDatetime":"2023-07-26 15:00:55.505","serviceTimestamp":"2023-07-26 15:00:55.505","stockPreviousOutstanding":"0","stockPreviousOnShelf":"180.0"}'),
    ("10000089106", '{"receivedDatetime":"2023-07-22 13:55:26.156","serviceTimestamp":"2023-07-22 13:55:26.156","stockPreviousOutstanding":"0","stockPreviousOnShelf":"18.0"}'),
    ("10000085132", '{"receivedDatetime":"2023-07-14 14:35:21.533","serviceTimestamp":"2023-07-14 14:35:21.533","stockPreviousOutstanding":"0","stockPreviousOnShelf":"18.0"}'),
    ("10000094870", '{"serviceTimestamp":"2023-07-27 18:13:10.0","rxNumber":"ØéÓÇÛJ","stockPreviousOutstanding":"0","stockPreviousOnShelf":"0"}'),
    ("10000090677", '{"stockIdempotenceCode":"162e8f3a-ef85-4e5d-85b0-003840b40d09","serviceTimestamp":"2023-07-03 01:50:03.1337","stockPreviousOutstanding":"0","stockPreviousOnShelf":"0"}'),
    ("10000086218", '{"serviceTimestamp":"2023-07-13 17:42:07.0","rxNumber":"Ý۲é۶ÛJ","stockPreviousOutstanding":"0","stockPreviousOnShelf":"0"}'),
    ("10000072977", '{"receivedDatetime":"2023-07-26 16:46:50.102","serviceTimestamp":"2023-07-26 16:46:50.102","rxNumber":"عµé۶ÛJ","stockPreviousOutstanding":"0","stockPreviousOnShelf":"0"}'),
    ("10000092489", '{"serviceTimestamp":"2023-07-12 19:24:20.0","rxNumber":"ø۵hý٦J","stockPreviousOutstanding":"0","stockPreviousOnShelf":"0"}'),
    ("10000090636", '{"serviceTimestamp":"2023-07-23 21:19:41.672614","rxNumber":"٠èÌfíJ","stockPreviousOutstanding":"0","stockPreviousOnShelf":"6","stockIdempotenceCode":"0f28167d-d5eb-4b23-9c5d-9ff27d221d10"}')
]

# Create DataFrame
df = spark.createDataFrame(data, schema)

# Show the DataFrame
df.display()
  • Divide json-style column into multiple columns
additional_info_schema = StructType([
    StructField("serviceTimestamp", StringType(), True),
    StructField("stockPreviousOutstanding", StringType(), True),
    StructField("stockPreviousOnShelf", StringType(), True),
    StructField("stockIdempotenceCode", StringType(), True),
    StructField("estimatedDeliveryDate", StringType(), True)
])


# Convert the map to a JSON string and then parse the JSON string in the additional_info column
df_parsed = df.withColumn("parsed_additional_info",  F.from_json(F.to_json(F.col("additional_info")), additional_info_schema))
df_parsed.display()


# Select the relevant columns from the parsed JSON
df_final = df_parsed.select(*key_cols,
    F.col("parsed_additional_info.serviceTimestamp").alias("serviceTimestamp"),
    F.col("parsed_additional_info.stockPreviousOutstanding").alias("stockPreviousOutstanding"),
    F.col("parsed_additional_info.stockPreviousOnShelf").alias("stockPreviousOnShelf"),
    F.col("parsed_additional_info.stockIdempotenceCode").alias("stockIdempotenceCode"),
    F.col("parsed_additional_info.estimatedDeliveryDate").alias("estimatedDeliveryDate")
)

# Show the result
df_final.display()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment