Skip to content

Instantly share code, notes, and snippets.

@DennisFederico
Created August 4, 2022 15:20
Show Gist options
  • Save DennisFederico/2736e2ad506992411b44a1175015817d to your computer and use it in GitHub Desktop.
Save DennisFederico/2736e2ad506992411b44a1175015817d to your computer and use it in GitHub Desktop.
Explode JSON Array of Structs using ksqldb
CREATE TYPE BALANCE_TYPE AS STRUCT<account_id STRING, balance DOUBLE>;
CREATE STREAM CUSTOMER_STATS (
CUSTOMER_ID STRING KEY,
CUSTOMER_NAME STRING,
BALANCES ARRAY<BALANCE_TYPE>
) WITH (
KAFKA_TOPIC='CUSTOMER_STATS',
PARTITIONS=1,
VALUE_FORMAT='JSON_SR');
INSERT INTO CUSTOMER_STATS (CUSTOMER_ID, CUSTOMER_NAME, BALANCES)
VALUES (
'DF1234', 'DENNIS', ARRAY[
STRUCT(ACCOUNT_ID:='ACC123', BALANCE:=123.45),
STRUCT(ACCOUNT_ID:='ACC456', BALANCE:=567.86),
STRUCT(ACCOUNT_ID:='ACC996', BALANCE:=996.00)
]
);
SELECT CUSTOMER_NAME, EXPLODE(BALANCES) AS BALANCE FROM CUSTOMER_STATS EMIT CHANGES;
SELECT CUSTOMER_NAME, EXPLODE(BALANCES)->ACCOUNT_ID, EXPLODE(BALANCES)->BALANCE FROM CUSTOMER_STATS EMIT CHANGES;
CREATE TABLE CUSTOMER_TOTAL_BALANCE WITH (KAFKA_TOPIC='CUSTOMER_TOTAL_BALANCES', VALUE_FORMAT='AVRO') AS
SELECT CUSTOMER_ID,
LATEST_BY_OFFSET(CUSTOMER_NAME) AS CUSTOMER_NAME,
SUM(BALANCE) AS TOTAL_BALANCE
FROM CUSTOMER_BALANCES
GROUP BY CUSTOMER_ID EMIT CHANGES;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment