Created
April 8, 2024 13:43
-
-
Save dineshdharme/553b01316e1df6609ef3f5c9280a037e to your computer and use it in GitHub Desktop.
Dynamic Json Formatting in Pyspark using schema_of_json function.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| https://stackoverflow.com/questions/78290764/flatten-dynamic-json-payload-string-using-pyspark/ | |
| There is a nifty method `schema_of_json` in pyspark which derives the schema of json string and applies to the whole column. | |
| So the following method to handly dynamic json payloads is as follows: | |
| - First take `json_payload` of first row of dataframe | |
| - Create a schema of the json_payload using `schema_of_json` | |
| - Then if all rows are correctly parsed, there would be no `null` value. | |
| If there is a null value, it means those rows haven't been parsed correctly. | |
| So we will conver those rows back to string. Then using `.contains` we will | |
| check if `null` string is present in the string column. | |
| - This way we will have two dataframes. One dataframe will have correctly | |
| parsed values. Another dataframe will have incorrectly parsed values. | |
| - Now we will repeat the process over incorrectly parsed dataframe. | |
| Here's a sample script with custom data. | |
| from pyspark.sql.functions import * | |
| from pyspark.sql import SparkSession, Row | |
| from pyspark.sql.types import StringType, StructType, StructField | |
| spark = SparkSession.builder \ | |
| .appName("JsonPayloadDataFrame") \ | |
| .getOrCreate() | |
| schema = StructType([ | |
| StructField("json_payload", StringType(), True) | |
| ]) | |
| data = [ | |
| Row(json_payload='{"id": 1, "name": "Alice"}'), | |
| Row(json_payload='{"id": 2, "tags": ["spark", "python"], "active": true}'), | |
| Row(json_payload='{"id": 3, "details": {"age": 30, "location": "New York"}}'), | |
| Row(json_payload='{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}') | |
| ] | |
| df = spark.createDataFrame(data, schema=schema) | |
| df.printSchema() | |
| df.show(n=30, truncate=False) | |
| wanted_df_list = [] | |
| df_looped = df | |
| while not df_looped.isEmpty(): | |
| print("USING SCHEMA FROM ONE ROW") | |
| current_string_repr = df_looped.limit(1).select(col("json_payload")).rdd.map(lambda x: x["json_payload"]).collect()[0] | |
| print(f"{current_string_repr=}") | |
| df_parsed = df_looped.withColumn("parsed_struct", from_json(col("json_payload"), schema_of_json(current_string_repr))) | |
| df_parsed.show(n=30, truncate=False) | |
| df_parsed.printSchema() | |
| df_null_check = df_parsed.withColumn("null_present_str", col("parsed_struct").cast(StringType())) | |
| df_null_check.show(n=30, truncate=False) | |
| df_null_check.printSchema() | |
| df_null_present = df_null_check.withColumn("null_present_bool", col("null_present_str").contains("null")) | |
| df_null_present.show(n=30, truncate=False) | |
| df_null_present.printSchema() | |
| df_partial_correct = df_null_present.filter(col("null_present_bool") == False) | |
| df_partial_incorrect = df_null_present.filter(col("null_present_bool") == True) | |
| df_partial_correct.cache().show(n=30, truncate=False) | |
| df_partial_correct.printSchema() | |
| df_partial_incorrect.cache().show(n=30, truncate=False) | |
| df_partial_incorrect.printSchema() | |
| wanted_df_list.append(df_partial_correct) | |
| df_looped = df_partial_incorrect | |
| for df_ele in wanted_df_list: | |
| df_ele.show(n=30, truncate=False) | |
| Final Output : | |
| +--------------------------+-------------+----------------+-----------------+ | |
| |json_payload |parsed_struct|null_present_str|null_present_bool| | |
| +--------------------------+-------------+----------------+-----------------+ | |
| |{"id": 1, "name": "Alice"}|{1, Alice} |{1, Alice} |false | | |
| +--------------------------+-------------+----------------+-----------------+ | |
| +------------------------------------------------------+--------------------------+--------------------------+-----------------+ | |
| |json_payload |parsed_struct |null_present_str |null_present_bool| | |
| +------------------------------------------------------+--------------------------+--------------------------+-----------------+ | |
| |{"id": 2, "tags": ["spark", "python"], "active": true}|{true, 2, [spark, python]}|{true, 2, [spark, python]}|false | | |
| +------------------------------------------------------+--------------------------+--------------------------+-----------------+ | |
| +---------------------------------------------------------+-------------------+-------------------+-----------------+ | |
| |json_payload |parsed_struct |null_present_str |null_present_bool| | |
| +---------------------------------------------------------+-------------------+-------------------+-----------------+ | |
| |{"id": 3, "details": {"age": 30, "location": "New York"}}|{{30, New York}, 3}|{{30, New York}, 3}|false | | |
| +---------------------------------------------------------+-------------------+-------------------+-----------------+ | |
| +-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+ | |
| |json_payload |parsed_struct |null_present_str |null_present_bool| | |
| +-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+ | |
| |{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{[{login, 2023-01-01}, {logout, 2023-01-02}], 4}|{[{login, 2023-01-01}, {logout, 2023-01-02}], 4}|false | | |
| +-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+ | |
| Full Output : | |
| USING SCHEMA FROM ONE ROW | |
| current_string_repr='{"id": 1, "name": "Alice"}' | |
| +-------------------------------------------------------------------------------------------------------------+-------------+ | |
| |json_payload |parsed_struct| | |
| +-------------------------------------------------------------------------------------------------------------+-------------+ | |
| |{"id": 1, "name": "Alice"} |{1, Alice} | | |
| |{"id": 2, "tags": ["spark", "python"], "active": true} |{2, NULL} | | |
| |{"id": 3, "details": {"age": 30, "location": "New York"}} |{3, NULL} | | |
| |{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{4, NULL} | | |
| +-------------------------------------------------------------------------------------------------------------+-------------+ | |
| +-------------------------------------------------------------------------------------------------------------+-------------+----------------+ | |
| |json_payload |parsed_struct|null_present_str| | |
| +-------------------------------------------------------------------------------------------------------------+-------------+----------------+ | |
| |{"id": 1, "name": "Alice"} |{1, Alice} |{1, Alice} | | |
| |{"id": 2, "tags": ["spark", "python"], "active": true} |{2, NULL} |{2, null} | | |
| |{"id": 3, "details": {"age": 30, "location": "New York"}} |{3, NULL} |{3, null} | | |
| |{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{4, NULL} |{4, null} | | |
| +-------------------------------------------------------------------------------------------------------------+-------------+----------------+ | |
| +-------------------------------------------------------------------------------------------------------------+-------------+----------------+-----------------+ | |
| |json_payload |parsed_struct|null_present_str|null_present_bool| | |
| +-------------------------------------------------------------------------------------------------------------+-------------+----------------+-----------------+ | |
| |{"id": 1, "name": "Alice"} |{1, Alice} |{1, Alice} |false | | |
| |{"id": 2, "tags": ["spark", "python"], "active": true} |{2, NULL} |{2, null} |true | | |
| |{"id": 3, "details": {"age": 30, "location": "New York"}} |{3, NULL} |{3, null} |true | | |
| |{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{4, NULL} |{4, null} |true | | |
| +-------------------------------------------------------------------------------------------------------------+-------------+----------------+-----------------+ | |
| +--------------------------+-------------+----------------+-----------------+ | |
| |json_payload |parsed_struct|null_present_str|null_present_bool| | |
| +--------------------------+-------------+----------------+-----------------+ | |
| |{"id": 1, "name": "Alice"}|{1, Alice} |{1, Alice} |false | | |
| +--------------------------+-------------+----------------+-----------------+ | |
| +-------------------------------------------------------------------------------------------------------------+-------------+----------------+-----------------+ | |
| |json_payload |parsed_struct|null_present_str|null_present_bool| | |
| +-------------------------------------------------------------------------------------------------------------+-------------+----------------+-----------------+ | |
| |{"id": 2, "tags": ["spark", "python"], "active": true} |{2, NULL} |{2, null} |true | | |
| |{"id": 3, "details": {"age": 30, "location": "New York"}} |{3, NULL} |{3, null} |true | | |
| |{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{4, NULL} |{4, null} |true | | |
| +-------------------------------------------------------------------------------------------------------------+-------------+----------------+-----------------+ | |
| USING SCHEMA FROM ONE ROW | |
| current_string_repr='{"id": 2, "tags": ["spark", "python"], "active": true}' | |
| +-------------------------------------------------------------------------------------------------------------+--------------------------+----------------+-----------------+ | |
| |json_payload |parsed_struct |null_present_str|null_present_bool| | |
| +-------------------------------------------------------------------------------------------------------------+--------------------------+----------------+-----------------+ | |
| |{"id": 2, "tags": ["spark", "python"], "active": true} |{true, 2, [spark, python]}|{2, null} |true | | |
| |{"id": 3, "details": {"age": 30, "location": "New York"}} |{NULL, 3, NULL} |{3, null} |true | | |
| |{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{NULL, 4, NULL} |{4, null} |true | | |
| +-------------------------------------------------------------------------------------------------------------+--------------------------+----------------+-----------------+ | |
| +-------------------------------------------------------------------------------------------------------------+--------------------------+--------------------------+-----------------+ | |
| |json_payload |parsed_struct |null_present_str |null_present_bool| | |
| +-------------------------------------------------------------------------------------------------------------+--------------------------+--------------------------+-----------------+ | |
| |{"id": 2, "tags": ["spark", "python"], "active": true} |{true, 2, [spark, python]}|{true, 2, [spark, python]}|true | | |
| |{"id": 3, "details": {"age": 30, "location": "New York"}} |{NULL, 3, NULL} |{null, 3, null} |true | | |
| |{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{NULL, 4, NULL} |{null, 4, null} |true | | |
| +-------------------------------------------------------------------------------------------------------------+--------------------------+--------------------------+-----------------+ | |
| +-------------------------------------------------------------------------------------------------------------+--------------------------+--------------------------+-----------------+ | |
| |json_payload |parsed_struct |null_present_str |null_present_bool| | |
| +-------------------------------------------------------------------------------------------------------------+--------------------------+--------------------------+-----------------+ | |
| |{"id": 2, "tags": ["spark", "python"], "active": true} |{true, 2, [spark, python]}|{true, 2, [spark, python]}|false | | |
| |{"id": 3, "details": {"age": 30, "location": "New York"}} |{NULL, 3, NULL} |{null, 3, null} |true | | |
| |{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{NULL, 4, NULL} |{null, 4, null} |true | | |
| +-------------------------------------------------------------------------------------------------------------+--------------------------+--------------------------+-----------------+ | |
| +------------------------------------------------------+--------------------------+--------------------------+-----------------+ | |
| |json_payload |parsed_struct |null_present_str |null_present_bool| | |
| +------------------------------------------------------+--------------------------+--------------------------+-----------------+ | |
| |{"id": 2, "tags": ["spark", "python"], "active": true}|{true, 2, [spark, python]}|{true, 2, [spark, python]}|false | | |
| +------------------------------------------------------+--------------------------+--------------------------+-----------------+ | |
| +-------------------------------------------------------------------------------------------------------------+---------------+----------------+-----------------+ | |
| |json_payload |parsed_struct |null_present_str|null_present_bool| | |
| +-------------------------------------------------------------------------------------------------------------+---------------+----------------+-----------------+ | |
| |{"id": 3, "details": {"age": 30, "location": "New York"}} |{NULL, 3, NULL}|{null, 3, null} |true | | |
| |{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{NULL, 4, NULL}|{null, 4, null} |true | | |
| +-------------------------------------------------------------------------------------------------------------+---------------+----------------+-----------------+ | |
| USING SCHEMA FROM ONE ROW | |
| current_string_repr='{"id": 3, "details": {"age": 30, "location": "New York"}}' | |
| +-------------------------------------------------------------------------------------------------------------+-------------------+----------------+-----------------+ | |
| |json_payload |parsed_struct |null_present_str|null_present_bool| | |
| +-------------------------------------------------------------------------------------------------------------+-------------------+----------------+-----------------+ | |
| |{"id": 3, "details": {"age": 30, "location": "New York"}} |{{30, New York}, 3}|{null, 3, null} |true | | |
| |{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{NULL, 4} |{null, 4, null} |true | | |
| +-------------------------------------------------------------------------------------------------------------+-------------------+----------------+-----------------+ | |
| +-------------------------------------------------------------------------------------------------------------+-------------------+-------------------+-----------------+ | |
| |json_payload |parsed_struct |null_present_str |null_present_bool| | |
| +-------------------------------------------------------------------------------------------------------------+-------------------+-------------------+-----------------+ | |
| |{"id": 3, "details": {"age": 30, "location": "New York"}} |{{30, New York}, 3}|{{30, New York}, 3}|true | | |
| |{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{NULL, 4} |{null, 4} |true | | |
| +-------------------------------------------------------------------------------------------------------------+-------------------+-------------------+-----------------+ | |
| +-------------------------------------------------------------------------------------------------------------+-------------------+-------------------+-----------------+ | |
| |json_payload |parsed_struct |null_present_str |null_present_bool| | |
| +-------------------------------------------------------------------------------------------------------------+-------------------+-------------------+-----------------+ | |
| |{"id": 3, "details": {"age": 30, "location": "New York"}} |{{30, New York}, 3}|{{30, New York}, 3}|false | | |
| |{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{NULL, 4} |{null, 4} |true | | |
| +-------------------------------------------------------------------------------------------------------------+-------------------+-------------------+-----------------+ | |
| +---------------------------------------------------------+-------------------+-------------------+-----------------+ | |
| |json_payload |parsed_struct |null_present_str |null_present_bool| | |
| +---------------------------------------------------------+-------------------+-------------------+-----------------+ | |
| |{"id": 3, "details": {"age": 30, "location": "New York"}}|{{30, New York}, 3}|{{30, New York}, 3}|false | | |
| +---------------------------------------------------------+-------------------+-------------------+-----------------+ | |
| +-------------------------------------------------------------------------------------------------------------+-------------+----------------+-----------------+ | |
| |json_payload |parsed_struct|null_present_str|null_present_bool| | |
| +-------------------------------------------------------------------------------------------------------------+-------------+----------------+-----------------+ | |
| |{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{NULL, 4} |{null, 4} |true | | |
| +-------------------------------------------------------------------------------------------------------------+-------------+----------------+-----------------+ | |
| USING SCHEMA FROM ONE ROW | |
| current_string_repr='{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}' | |
| +-------------------------------------------------------------------------------------------------------------+------------------------------------------------+----------------+-----------------+ | |
| |json_payload |parsed_struct |null_present_str|null_present_bool| | |
| +-------------------------------------------------------------------------------------------------------------+------------------------------------------------+----------------+-----------------+ | |
| |{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{[{login, 2023-01-01}, {logout, 2023-01-02}], 4}|{null, 4} |true | | |
| +-------------------------------------------------------------------------------------------------------------+------------------------------------------------+----------------+-----------------+ | |
| +-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+ | |
| |json_payload |parsed_struct |null_present_str |null_present_bool| | |
| +-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+ | |
| |{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{[{login, 2023-01-01}, {logout, 2023-01-02}], 4}|{[{login, 2023-01-01}, {logout, 2023-01-02}], 4}|true | | |
| +-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+ | |
| +-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+ | |
| |json_payload |parsed_struct |null_present_str |null_present_bool| | |
| +-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+ | |
| |{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{[{login, 2023-01-01}, {logout, 2023-01-02}], 4}|{[{login, 2023-01-01}, {logout, 2023-01-02}], 4}|false | | |
| +-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+ | |
| +-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+ | |
| |json_payload |parsed_struct |null_present_str |null_present_bool| | |
| +-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+ | |
| |{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{[{login, 2023-01-01}, {logout, 2023-01-02}], 4}|{[{login, 2023-01-01}, {logout, 2023-01-02}], 4}|false | | |
| +-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+ | |
| +------------+-------------+----------------+-----------------+ | |
| |json_payload|parsed_struct|null_present_str|null_present_bool| | |
| +------------+-------------+----------------+-----------------+ | |
| +------------+-------------+----------------+-----------------+ | |
| +--------------------------+-------------+----------------+-----------------+ | |
| |json_payload |parsed_struct|null_present_str|null_present_bool| | |
| +--------------------------+-------------+----------------+-----------------+ | |
| |{"id": 1, "name": "Alice"}|{1, Alice} |{1, Alice} |false | | |
| +--------------------------+-------------+----------------+-----------------+ | |
| +------------------------------------------------------+--------------------------+--------------------------+-----------------+ | |
| |json_payload |parsed_struct |null_present_str |null_present_bool| | |
| +------------------------------------------------------+--------------------------+--------------------------+-----------------+ | |
| |{"id": 2, "tags": ["spark", "python"], "active": true}|{true, 2, [spark, python]}|{true, 2, [spark, python]}|false | | |
| +------------------------------------------------------+--------------------------+--------------------------+-----------------+ | |
| +---------------------------------------------------------+-------------------+-------------------+-----------------+ | |
| |json_payload |parsed_struct |null_present_str |null_present_bool| | |
| +---------------------------------------------------------+-------------------+-------------------+-----------------+ | |
| |{"id": 3, "details": {"age": 30, "location": "New York"}}|{{30, New York}, 3}|{{30, New York}, 3}|false | | |
| +---------------------------------------------------------+-------------------+-------------------+-----------------+ | |
| +-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+ | |
| |json_payload |parsed_struct |null_present_str |null_present_bool| | |
| +-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+ | |
| |{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{[{login, 2023-01-01}, {logout, 2023-01-02}], 4}|{[{login, 2023-01-01}, {logout, 2023-01-02}], 4}|false | | |
| +-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+ | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment