Created
August 4, 2022 15:20
-
-
Save DennisFederico/2736e2ad506992411b44a1175015817d to your computer and use it in GitHub Desktop.
Explode JSON Array of Structs using ksqldb
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE TYPE BALANCE_TYPE AS STRUCT<account_id STRING, balance DOUBLE>; | |
CREATE STREAM CUSTOMER_STATS ( | |
CUSTOMER_ID STRING KEY, | |
CUSTOMER_NAME STRING, | |
BALANCES ARRAY<BALANCE_TYPE> | |
) WITH ( | |
KAFKA_TOPIC='CUSTOMER_STATS', | |
PARTITIONS=1, | |
VALUE_FORMAT='JSON_SR'); | |
INSERT INTO CUSTOMER_STATS (CUSTOMER_ID, CUSTOMER_NAME, BALANCES) | |
VALUES ( | |
'DF1234', 'DENNIS', ARRAY[ | |
STRUCT(ACCOUNT_ID:='ACC123', BALANCE:=123.45), | |
STRUCT(ACCOUNT_ID:='ACC456', BALANCE:=567.86), | |
STRUCT(ACCOUNT_ID:='ACC996', BALANCE:=996.00) | |
] | |
); | |
SELECT CUSTOMER_NAME, EXPLODE(BALANCES) AS BALANCE FROM CUSTOMER_STATS EMIT CHANGES; | |
SELECT CUSTOMER_NAME, EXPLODE(BALANCES)->ACCOUNT_ID, EXPLODE(BALANCES)->BALANCE FROM CUSTOMER_STATS EMIT CHANGES; | |
CREATE TABLE CUSTOMER_TOTAL_BALANCE WITH (KAFKA_TOPIC='CUSTOMER_TOTAL_BALANCES', VALUE_FORMAT='AVRO') AS | |
SELECT CUSTOMER_ID, | |
LATEST_BY_OFFSET(CUSTOMER_NAME) AS CUSTOMER_NAME, | |
SUM(BALANCE) AS TOTAL_BALANCE | |
FROM CUSTOMER_BALANCES | |
GROUP BY CUSTOMER_ID EMIT CHANGES; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment