Skip to content

Instantly share code, notes, and snippets.

@DennisFederico
Last active June 6, 2024 20:42
Show Gist options
  • Save DennisFederico/52db58938ed38933b28118ad24dd052b to your computer and use it in GitHub Desktop.
Save DennisFederico/52db58938ed38933b28118ad24dd052b to your computer and use it in GitHub Desktop.

Event Deduplication in Kafka Stream processing using ksqlDB (SIMPLIFIED)

Introduction

Event deduplication emiting the very first message and filtering the rest of the duplicates within a tumbling window or a session window, using ksqlDB.

The ID to deduplicate is the eventId field inside the record payload, thus the first step is to re-key the stream by this field.

The deduplication is done by counting the number of times the eventId appears in the stream within a window, and only emitting the first event that has a count of 1.

The we re-key the stream back to the original key, keeping the original schema intact so external consumers can consume the deduplicated stream without major code changes.

CCloud Enviroment

Kafka Cluster

+----------------------+-----------------------------------------------------------------+
| ID                   | lkc-o5zmrx                                                      |
| Name                 | fastweb                                                         |
| Type                 | STANDARD                                                        |
| Endpoint             | SASL_SSL://pkc-mxqvx.europe-southwest1.gcp.confluent.cloud:9092 |
| REST Endpoint        | https://pkc-mxqvx.europe-southwest1.gcp.confluent.cloud:443     |
+----------------------+-----------------------------------------------------------------+

@kafka_cluster_id = lkc-o5zmrx
@kafka_endpoint = https://pkc-mxqvx.europe-southwest1.gcp.confluent.cloud
@kafa_api_key = <API_KEY or USERNAME>
@kafka_api_secret = <API_KEY_SECRET or PASSWORD>
  • Kafka Check
GET {{kafka_endpoint}}/kafka/v3/clusters/{{kafka_cluster_id}}/topics
Authorization: Basic {{kafa_api_key}}:{{kafka_api_secret}}

Schema Registry Cluster

+-------------------------+-----------------------------------------------------------+
| Cluster                 | lsrc-q5z30p                                               |
| Endpoint URL            | https://psrc-yo2rpj.europe-southwest1.gcp.confluent.cloud |
| Package                 | essentials                                                |
+-------------------------+-----------------------------------------------------------+

@sr_cluster_id = lsrc-q5z30p
@sr_endpoint = https://psrc-yo2rpj.europe-southwest1.gcp.confluent.cloud
@sr_api_key = <API_KEY or USERNAME>
@sr_api_secret = <API_KEY_SECRET or PASSWORD>
  • Schema Registry Check
GET {{sr_endpoint}}/subjects
Authorization: Basic {{sr_api_key}}:{{sr_api_secret}}

ksqlDB Cluster

+-------------------------+-----------------------------------------------------------------+
| ID                      | lksqlc-7odgr1                                                   |
| Name                    | fastweb_KSQLDB                                                  |
| Topic Prefix            | pksqlc-w7q70g                                                   |
| Kafka                   | lkc-o5zmrx                                                      |
| Storage                 |                                                             125 |
| Endpoint                | https://pksqlc-w7q70g.europe-southwest1.gcp.confluent.cloud:443 |
| Status                  | PROVISIONED                                                     |
| Detailed Processing Log | true                                                            |
+-------------------------+-----------------------------------------------------------------+

@ksql_cluster_id = lksqlc-vy8qd5
@ksql_endpoint = https://pksqlc-w7q70g.europe-southwest1.gcp.confluent.cloud:443
@ksql_api_key = <API_KEY or USERNAME>
@ksql_api_secret = <API_KEY_SECRET or PASSWORD>
POST {{ksql_endpoint}}/ksql
Authorization: Basic {{ksql_api_key}}:{{ksql_api_secret}}

{
  "ksql": "list streams;",
  "streamProperties":{}
}

Preparation

Events input and de-duplicated topic

@events_input_topic = events_input
@events_deduped_topic_tumbling = events_deduped_tumbling
@events_deduped_topic_session = events_deduped_session

CREATE Topics

POST {{kafka_endpoint}}/kafka/v3/clusters/{{kafka_cluster_id}}/topics
Authorization: Basic {{kafa_api_key}}:{{kafka_api_secret}}

{
    "topic_name": "{{events_input_topic}}",
    "partitions_count": 3
}
POST {{kafka_endpoint}}/kafka/v3/clusters/{{kafka_cluster_id}}/topics
Authorization: Basic {{kafa_api_key}}:{{kafka_api_secret}}

{
    "topic_name": "{{events_deduped_topic_tumbling}}",
    "partitions_count": 3
}
POST {{kafka_endpoint}}/kafka/v3/clusters/{{kafka_cluster_id}}/topics
Authorization: Basic {{kafa_api_key}}:{{kafka_api_secret}}

{
    "topic_name": "{{events_deduped_topic_session}}",
    "partitions_count": 3
}

DELETE Topics

DELETE {{kafka_endpoint}}/kafka/v3/clusters/{{kafka_cluster_id}}/topics/{{events_input_topic}}
Authorization: Basic {{kafa_api_key}}:{{kafka_api_secret}}
DELETE {{kafka_endpoint}}/kafka/v3/clusters/{{kafka_cluster_id}}/topics/{{events_deduped_topic_tumbling}}
Authorization: Basic {{kafa_api_key}}:{{kafka_api_secret}}
DELETE {{kafka_endpoint}}/kafka/v3/clusters/{{kafka_cluster_id}}/topics/{{events_deduped_topic_session}}
Authorization: Basic {{kafa_api_key}}:{{kafka_api_secret}}

Create Key and Value Schemas

Schema Registry REST API

  • Key Schema
POST {{sr_endpoint}}/subjects/{{events_input_topic}}-key/versions
Authorization: Basic {{sr_api_key}}:{{sr_api_secret}}
Content-Type: application/vnd.schemaregistry.v1+json

{
    "schema": "{\"type\":\"record\",\"name\":\"eventKey\",\"fields\":[{\"name\":\"customerId\",\"type\":\"string\"},{\"name\":\"countryId\",\"type\":\"string\"}]}",
    "schemaType": "AVRO"
}
# @name latest_schema_key
GET {{sr_endpoint}}/subjects/{{events_input_topic}}-key/versions/latest
Authorization: Basic {{sr_api_key}}:{{sr_api_secret}}
Content-Type: application/vnd.schemaregistry.v1+json
@schema_key_id = {{latest_schema_key.response.body.id}}
GET {{sr_endpoint}}/schemas/ids/{{schema_key_id}}
Authorization: Basic {{sr_api_key}}:{{sr_api_secret}}
  • Value Schema
POST {{sr_endpoint}}/subjects/{{events_input_topic}}-value/versions
Authorization: Basic {{sr_api_key}}:{{sr_api_secret}}
Content-Type: application/vnd.schemaregistry.v1+json

{
    "schema": "{\"type\":\"record\",\"name\":\"eventValue\",\"fields\":[{\"name\":\"eventId\",\"type\":\"string\"},{\"name\":\"eventDate\",\"type\":\"string\"},{\"name\":\"eventTs\",\"type\":\"long\"},{\"name\":\"eventData\",\"type\":\"string\"}]}",
    "schemaType": "AVRO"
}
# @name latest_schema_value
GET {{sr_endpoint}}/subjects/{{events_input_topic}}-value/versions/latest
Authorization: Basic {{sr_api_key}}:{{sr_api_secret}}
Content-Type: application/vnd.schemaregistry.v1+json
@schema_value_id = {{latest_schema_value.response.body.id}}
GET {{sr_endpoint}}/schemas/ids/{{schema_value_id}}
Authorization: Basic {{sr_api_key}}:{{sr_api_secret}}

Deduplication Pipeline

Step 1. Create ksqlDB Initial data stream - Stream wraping the Input topic

CREATE events_input Stream

Note: Don't forget to capture the schema ids for the key and value schemas.

POST {{ksql_endpoint}}/ksql
Authorization: Basic {{ksql_api_key}}:{{ksql_api_secret}}
Accept: application/vnd.ksql.v1+json
  
  {
    "ksql": "CREATE STREAM events_input WITH (KAFKA_TOPIC='{{events_input_topic}}', VALUE_FORMAT='AVRO', KEY_FORMAT='AVRO', KEY_SCHEMA_ID={{schema_key_id}}, VALUE_SCHEMA_ID={{schema_value_id}});",
    "streamsProperties": {
      "ksql.streams.auto.offset.reset": "earliest"
    }
  }
POST {{ksql_endpoint}}/ksql
Authorization: Basic {{ksql_api_key}}:{{ksql_api_secret}}
Accept: application/vnd.ksql.v1+json
  
  {
    "ksql": "DESCRIBE events_input;"
  }

DROP events_input Stream

POST {{ksql_endpoint}}/ksql
Authorization: Basic {{ksql_api_key}}:{{ksql_api_secret}}
Accept: application/vnd.ksql.v1+json
  
  {
    "ksql": "DROP STREAM events_input;",
    "streamsProperties": {
    }
  }
DELETE {{kafka_endpoint}}/kafka/v3/clusters/{{kafka_cluster_id}}/topics/{{events_input_topic}}
Authorization: Basic {{kafa_api_key}}:{{kafka_api_secret}}

Step 2. Re-key the stream by eventId

CREATE Re-Keyed Stream

POST {{ksql_endpoint}}/ksql
Authorization : Basic {{  ksql_api_key }}:{{  ksql_api_secret }}
Accept: application/vnd.ksql.v1+json

{
  "ksql": "CREATE STREAM events_input_rekeyed WITH (KAFKA_TOPIC='events_input_rekeyed') AS SELECT * FROM events_input PARTITION BY `eventId`;",
  "streamsProperties": {
    "ksql.streams.auto.offset.reset": "earliest"
  }
}

DROP Re-Keyed Stream

POST {{ksql_endpoint}}/ksql
Authorization : Basic {{  ksql_api_key }}:{{  ksql_api_secret }}
Accept: application/vnd.ksql.v1+json

{
  "ksql": "DROP STREAM events_input_rekeyed;",
  "streamsProperties": {
  }
}
DELETE {{kafka_endpoint}}/kafka/v3/clusters/{{kafka_cluster_id}}/topics/events_input_rekeyed
Authorization: Basic {{kafa_api_key}}:{{kafka_api_secret}}
DELETE {{sr_endpoint}}/subjects/events_input_rekeyed-key
Authorization: Basic {{sr_api_key}}:{{sr_api_secret}}
DELETE {{sr_endpoint}}/subjects/events_input_rekeyed-key?permanent=true
Authorization: Basic {{sr_api_key}}:{{sr_api_secret}}
DELETE {{sr_endpoint}}/subjects/events_input_rekeyed-value
Authorization: Basic {{sr_api_key}}:{{sr_api_secret}}
DELETE {{sr_endpoint}}/subjects/events_input_rekeyed-value?permanent=true
Authorization: Basic {{sr_api_key}}:{{sr_api_secret}}

Step 3. Track DUPLICATE events (State) using Tumbling or Session Windows

Defining a time window to state table helps to configure the underlying topic as both delete and compact, avoiding the topic to grow unbounded, considering that the cardinality of the eventIds might be infinite.

@window_duration = 3 MINUTES
@window_retention = 2 HOURS
@window_grace = 0 MILLISECOND

Using Tumbling Window

CREATE Tumbling Window
POST {{ksql_endpoint}}/ksql
Authorization : Basic {{  ksql_api_key }}:{{  ksql_api_secret }}
Accept: application/vnd.ksql.v1+json

{
  "ksql": "CREATE TABLE events_count_table_tumbling WITH (KAFKA_TOPIC='events_count_table_tumbling', RETENTION_MS=3600000) AS SELECT `eventId`, EARLIEST_BY_OFFSET(`eventDate`) as `eventDate`, EARLIEST_BY_OFFSET(`eventTs`) as `eventTs`, EARLIEST_BY_OFFSET(`eventData`) as `eventData`, EARLIEST_BY_OFFSET(ROWKEY) as ORIGINAL_KEY, COUNT(*) AS count FROM EVENTS_INPUT_REKEYED WINDOW TUMBLING (SIZE {{window_duration}}, RETENTION {{window_retention}}, GRACE PERIOD {{window_grace}}) GROUP BY `eventId` HAVING COUNT(*) = 1 EMIT CHANGES;",
  "streamsProperties": {
    "ksql.streams.auto.offset.reset": "earliest",
    "cache.max.bytes.buffering" : 0
  }
}
DROP Tumbling Window
POST {{ksql_endpoint}}/ksql
Authorization : Basic {{  ksql_api_key }}:{{  ksql_api_secret }}
Accept: application/vnd.ksql.v1+json

{
  "ksql": "DROP TABLE events_count_table_tumbling;",
  "streamsProperties": {
  }
}
DELETE {{kafka_endpoint}}/kafka/v3/clusters/{{kafka_cluster_id}}/topics/events_count_table_tumbling
Authorization: Basic {{kafa_api_key}}:{{kafka_api_secret}}
DELETE {{sr_endpoint}}/subjects/events_count_table_tumbling-key
Authorization: Basic {{sr_api_key}}:{{sr_api_secret}}
DELETE {{sr_endpoint}}/subjects/events_count_table_tumbling-key?permanent=true
Authorization: Basic {{sr_api_key}}:{{sr_api_secret}}
DELETE {{sr_endpoint}}/subjects/events_count_table_tumbling-value
Authorization: Basic {{sr_api_key}}:{{sr_api_secret}}
DELETE {{sr_endpoint}}/subjects/events_count_table_tumbling-value?permanent=true
Authorization: Basic {{sr_api_key}}:{{sr_api_secret}}

Using Session Window

CREATE Session Window
POST {{ksql_endpoint}}/ksql
Authorization : Basic {{  ksql_api_key }}:{{  ksql_api_secret }}
Accept: application/vnd.ksql.v1+json

{
  "ksql": "CREATE TABLE events_count_table_session WITH (KAFKA_TOPIC='events_count_table_session', RETENTION_MS=3600000)  AS SELECT `eventId`, EARLIEST_BY_OFFSET(`eventDate`) as `eventDate`, EARLIEST_BY_OFFSET(`eventTs`) as `eventTs`, EARLIEST_BY_OFFSET(`eventData`) as `eventData`, EARLIEST_BY_OFFSET(ROWKEY) as ORIGINAL_KEY, COUNT(*) AS count FROM EVENTS_INPUT_REKEYED WINDOW SESSION ({{window_duration}}, RETENTION {{window_retention}}, GRACE PERIOD {{window_grace}}) GROUP BY `eventId` HAVING COUNT(*) = 1  EMIT CHANGES;",
  "streamsProperties": {
    "ksql.streams.auto.offset.reset": "earliest",
    "cache.max.bytes.buffering" : 0
  }
}
DROP Session Window
POST {{ksql_endpoint}}/ksql
Authorization : Basic {{  ksql_api_key }}:{{  ksql_api_secret }}
Accept: application/vnd.ksql.v1+json

{
  "ksql": "DROP TABLE events_count_table_session;",
  "streamsProperties": {
  }
}
DELETE {{kafka_endpoint}}/kafka/v3/clusters/{{kafka_cluster_id}}/topics/events_count_table_session
Authorization: Basic {{kafa_api_key}}:{{kafka_api_secret}}
DELETE {{sr_endpoint}}/subjects/events_count_table_session-key
Authorization: Basic {{sr_api_key}}:{{sr_api_secret}}
DELETE {{sr_endpoint}}/subjects/events_count_table_session-key?permanent=true
Authorization: Basic {{sr_api_key}}:{{sr_api_secret}}
DELETE {{sr_endpoint}}/subjects/events_count_table_session-value
Authorization: Basic {{sr_api_key}}:{{sr_api_secret}}
DELETE {{sr_endpoint}}/subjects/events_count_table_session-value?permanent=true
Authorization: Basic {{sr_api_key}}:{{sr_api_secret}}

Step 4. Stream of Deduplicated Events

Syntetic Stream backed by the Table topic, required for Stream-Stream Join.

CREATE Deduped Stream from TUMBLING Window

  • Count Events Stream from TUMBLING WINDOW Table using events_count_table_tumbling topic
POST {{ksql_endpoint}}/ksql
Authorization : Basic {{  ksql_api_key }}:{{  ksql_api_secret }}
Accept: application/vnd.ksql.v1+json

{
  "ksql": "CREATE STREAM deduped_events_from_tumbling WITH (KAFKA_TOPIC='events_count_table_tumbling', KEY_FORMAT='AVRO', VALUE_FORMAT='AVRO');",
  "streamsProperties": {  
    "ksql.streams.auto.offset.reset": "earliest"
  }
}

DROP Deduped Stream from TUMBLING Window

POST {{ksql_endpoint}}/ksql
Authorization : Basic {{  ksql_api_key }}:{{  ksql_api_secret }}
Accept: application/vnd.ksql.v1+json

{
  "ksql": "DROP STREAM deduped_events_from_tumbling;",
  "streamsProperties": {  
    "ksql.streams.auto.offset.reset": "earliest"
  }
}

CREATE Count Stream from SESSION Window

  • Count Events Stream using events_count_table_session (SESSION WINDOW)
POST {{ksql_endpoint}}/ksql
Authorization : Basic {{  ksql_api_key }}:{{  ksql_api_secret }}
Accept: application/vnd.ksql.v1+json

{
  "ksql": "CREATE STREAM deduped_events_from_session WITH (KAFKA_TOPIC='events_count_table_session', KEY_FORMAT='AVRO', VALUE_FORMAT='AVRO');",
  "streamsProperties": {
    "ksql.streams.auto.offset.reset": "earliest"
  }
}

DROP Count Stream for SESSION Window

POST {{ksql_endpoint}}/ksql
Authorization : Basic {{  ksql_api_key }}:{{  ksql_api_secret }}
Accept: application/vnd.ksql.v1+json

{
  "ksql": "DROP STREAM deduped_events_from_session;",
  "streamsProperties": {
    "ksql.streams.auto.offset.reset": "earliest"
  }
}

Step 5. RE-KEY back the stream to the original key

CREATE Unique values filtered with tumbling window

POST {{ksql_endpoint}}/ksql
Authorization : Basic {{  ksql_api_key }}:{{  ksql_api_secret }}
Accept: application/vnd.ksql.v1+json

{
  "ksql": "CREATE STREAM events_deduped_tumbling WITH (KAFKA_TOPIC='events_deduped_tumbling', VALUE_FORMAT='AVRO', KEY_FORMAT='AVRO', KEY_SCHEMA_ID={{schema_key_id}}, VALUE_SCHEMA_ID={{schema_value_id}}, RETENTION_MS=604800000) AS SELECT  STRUCT(`customerId` := ORIGINAL_KEY->CUSTOMERID, `countryId` := ORIGINAL_KEY->COUNTRYID) as ROWKEY, ROWKEY as `eventId`, EVENTDATE as `eventDate`, EVENTTS as `eventTs`, EVENTDATA as `eventData` FROM deduped_events_from_tumbling PARTITION BY STRUCT(`customerId` := ORIGINAL_KEY->CUSTOMERID, `countryId` := ORIGINAL_KEY->COUNTRYID) EMIT CHANGES;",
  "streamsProperties": {
    "ksql.streams.auto.offset.reset": "earliest"
  }
}

DROP Unique values filtered with tumbling window

POST {{ksql_endpoint}}/ksql
Authorization : Basic {{  ksql_api_key }}:{{  ksql_api_secret }}
Accept: application/vnd.ksql.v1+json

{
  "ksql": "DROP STREAM events_deduped_tumbling;",
  "streamsProperties": {
  }
}

CREATE Unique values filtered with session window

POST {{ksql_endpoint}}/ksql
Authorization : Basic {{  ksql_api_key }}:{{  ksql_api_secret }}
Accept: application/vnd.ksql.v1+json

{
  "ksql": "CREATE STREAM events_deduped_session WITH (KAFKA_TOPIC='events_deduped_session', VALUE_FORMAT='AVRO', KEY_FORMAT='AVRO', KEY_SCHEMA_ID={{schema_key_id}}, VALUE_SCHEMA_ID={{schema_value_id}}, RETENTION_MS=604800000) AS SELECT  STRUCT(`customerId` := ORIGINAL_KEY->CUSTOMERID, `countryId` := ORIGINAL_KEY->COUNTRYID) as ROWKEY, ROWKEY as `eventId`, EVENTDATE as `eventDate`, EVENTTS as `eventTs`, EVENTDATA as `eventData` FROM deduped_events_from_tumbling  WHERE ROWKEY IS NOT NULL PARTITION BY STRUCT(`customerId` := ORIGINAL_KEY->CUSTOMERID, `countryId` := ORIGINAL_KEY->COUNTRYID) EMIT CHANGES;",
  "streamsProperties": {
    "ksql.streams.auto.offset.reset": "earliest"
  }
}

DROP Unique values filtered with session window

POST {{ksql_endpoint}}/ksql
Authorization : Basic {{  ksql_api_key }}:{{  ksql_api_secret }}
Accept: application/vnd.ksql.v1+json

{
  "ksql": "DROP STREAM events_deduped_session;",
  "streamsProperties": {
  }
}

(Optional) - Keeping track of all the count events

One could keep all the count events in a topic, to have a log of all the events that were deduplicated, by removing the HAVING COUNT(*) = 1 clause from the CREATE TABLE statement of step 3, and adding a filter like WHERE COUNT=1 in the CSAS statement of step 5 when re/keying back


Other Scripts

Check the deduplicated stream

NOTE: PUSH QUERIES MAY NOT WORK WITH THE VSCODE REST PLUGIN (EMIT CHANGES)

POST {{ksql_endpoint}}/query
Authorization: Basic {{ksql_api_key}}:{{ksql_api_secret}}
Accept: application/vnd.ksql.v1+json
  
  {
    "ksql": "SELECT * FROM events_deduped_tumbling;",
    "streamsProperties": {
      "ksql.streams.auto.offset.reset": "earliest"
    }
  }
POST {{ksql_endpoint}}/query
Authorization: Basic {{ksql_api_key}}:{{ksql_api_secret}}
Accept: application/vnd.ksql.v1+json
  
  {
    "ksql": "SELECT * FROM events_deduped_session;",
    "streamsProperties": {
      "ksql.streams.auto.offset.reset": "earliest"
    }
  }

CLEANUP ksqlDB

POST {{ksql_endpoint}}/ksql
Authorization : Basic {{  ksql_api_key }}:{{  ksql_api_secret }}
Accept: application/vnd.ksql.v1+json

{
  "ksql": "DROP STREAM events_deduped_tumbling;",
  "streamsProperties": {
    "ksql.streams.auto.offset.reset": "earliest"
  }
}
POST {{ksql_endpoint}}/ksql
Authorization : Basic {{  ksql_api_key }}:{{  ksql_api_secret }}
Accept: application/vnd.ksql.v1+json

{
  "ksql": "DROP STREAM events_deduped_session;",
  "streamsProperties": {
    "ksql.streams.auto.offset.reset": "earliest"
  }
}
POST {{ksql_endpoint}}/ksql
Authorization : Basic {{  ksql_api_key }}:{{  ksql_api_secret }}
Accept: application/vnd.ksql.v1+json

{
  "ksql": "DROP STREAM deduped_events_from_session;",
  "streamsProperties": {
    "ksql.streams.auto.offset.reset": "earliest"
  }
}
POST {{ksql_endpoint}}/ksql
Authorization : Basic {{  ksql_api_key }}:{{  ksql_api_secret }}
Accept: application/vnd.ksql.v1+json

{
  "ksql": "DROP STREAM deduped_events_from_tumbling;",
  "streamsProperties": {  
    "ksql.streams.auto.offset.reset": "earliest"
  }
}
POST {{ksql_endpoint}}/ksql
Authorization : Basic {{  ksql_api_key }}:{{  ksql_api_secret }}
Accept: application/vnd.ksql.v1+json

{
  "ksql": "DROP TABLE events_count_table_session;",
  "streamsProperties": {
  }
}
POST {{ksql_endpoint}}/ksql
Authorization : Basic {{  ksql_api_key }}:{{  ksql_api_secret }}
Accept: application/vnd.ksql.v1+json

{
  "ksql": "DROP TABLE events_count_table_tumbling;",
  "streamsProperties": {
  }
}
POST {{ksql_endpoint}}/ksql
Authorization : Basic {{  ksql_api_key }}:{{  ksql_api_secret }}
Accept: application/vnd.ksql.v1+json

{
  "ksql": "DROP STREAM events_input_rekeyed;",
  "streamsProperties": {
  }
}
POST {{ksql_endpoint}}/ksql
Authorization: Basic {{ksql_api_key}}:{{ksql_api_secret}}
Accept: application/vnd.ksql.v1+json
  
  {
    "ksql": "DROP STREAM events_input;",
    "streamsProperties": {
    }
  }

CLEANUP Kafka

Topics:

  • events_input
  • events_input_rekeyed
  • events_count_table_tumbling
  • events_count_table_session
  • events_deduped_tumbling
  • events_deduped_session
@delete_this_topic = <TOPIC_NAME_HERE>
DELETE {{kafka_endpoint}}/kafka/v3/clusters/{{kafka_cluster_id}}/topics/{{delete_this_topic}}
Authorization: Basic {{kafa_api_key}}:{{kafka_api_secret}}

CLEANUP SCHEMAS

  • DELETE SCHEMA REGISTRY SUBJECTS
GET {{sr_endpoint}}/subjects
Authorization: Basic {{sr_api_key}}:{{sr_api_secret}}
@subject = <SCHEMA_HERE>
DELETE {{sr_endpoint}}/subjects/{{subject}}
Authorization: Basic {{sr_api_key}}:{{sr_api_secret}}
DELETE {{sr_endpoint}}/subjects/{{subject}}?permanent=true
Authorization: Basic {{sr_api_key}}:{{sr_api_secret}}

(MISC) PRINT TOPIC CONTENT FROM KSQLDB

POST {{ksql_endpoint}}/query
Authorization: Basic {{ksql_api_key}}:{{ksql_api_secret}}
Accept: application/vnd.ksql.v1+json
  
  {
    "ksql": "PRINT events_input_rekeyed FROM BEGINNING LIMIT 3;",
    "streamsProperties": {
      "ksql.streams.auto.offset.reset": "earliest"
    }
  }

Kafka AVRO Console Consumer

# Required connection configs for Kafka producer, consumer, and admin
bootstrap.servers=pkc-mxqvx.europe-southwest1.gcp.confluent.cloud:9092
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='<API_KEY or USERNAME>' password='<API_KEY_SECRET or PASSWORD>';
sasl.mechanism=PLAIN
# Required for correctness in Apache Kafka clients prior to 2.6
client.dns.lookup=use_all_dns_ips

# Best practice for higher availability in Apache Kafka clients prior to 3.0
session.timeout.ms=45000

# Best practice for Kafka producer to prevent data loss
acks=all
kafka-avro-console-consumer \
  --bootstrap-server SASL_SSL://pkc-mxqvx.europe-southwest1.gcp.confluent.cloud:9092 \
  --property schema.registry.url=https://psrc-yo2rpj.europe-southwest1.gcp.confluent.cloud \
  --property schema.registry.basic.auth.credentials.source=USER_INFO \
  --property schema.registry.basic.auth.user.info=<API_KEY or USERNAME>:<API_KEY_SECRET or PASSWORD> \
  --consumer.config ccloud.properties \
  --group consumer.console1 \
  --property print.key=true \
  --from-beginning \
  --topic events_input_rekeyed
kafka-avro-console-consumer \
  --bootstrap-server SASL_SSL://pkc-mxqvx.europe-southwest1.gcp.confluent.cloud:9092 \
  --property schema.registry.url=https://psrc-yo2rpj.europe-southwest1.gcp.confluent.cloud \
  --property schema.registry.basic.auth.credentials.source=USER_INFO \
  --property schema.registry.basic.auth.user.info=<API_KEY or USERNAME>:<API_KEY_SECRET or PASSWORD> \
  --consumer.config ccloud.properties \
  --group consumer.console1 \
  --property print.key=true \
  --from-beginning \
  --topic events_deduped_tumbling
kafka-avro-console-consumer \
  --bootstrap-server SASL_SSL://pkc-mxqvx.europe-southwest1.gcp.confluent.cloud:9092 \
  --property schema.registry.url=https://psrc-yo2rpj.europe-southwest1.gcp.confluent.cloud \
  --property schema.registry.basic.auth.credentials.source=USER_INFO \
  --property schema.registry.basic.auth.user.info=<API_KEY or USERNAME>:<API_KEY_SECRET or PASSWORD> \
  --consumer.config ccloud.properties \
  --group consumer.console1 \
  --property print.key=true \
  --from-beginning \
  --topic events_deduped_session

Inserting "DUPLICATED" message

Note: Refer back to this section after deploying the whole pipeline, change the eventId (123) at your convenience.

POST {{ksql_endpoint}}/ksql
Authorization: Basic {{ksql_api_key}}:{{ksql_api_secret}}
Accept: application/vnd.ksql.v1+json
  
  {
    "ksql": "INSERT INTO events_input (ROWKEY, `eventId`, `eventDate`, `eventTs`, `eventData`) VALUES (STRUCT(`customerId`:='dfederico', `countryId`:='Spain'), '123', '{{$localDatetime 'YYYYMMdDD-HH:mm:ss.ms'}}', {{$tiemstamp}}, 'FirstEvent');"
  }
POST {{ksql_endpoint}}/ksql
Authorization: Basic {{ksql_api_key}}:{{ksql_api_secret}}
Accept: application/vnd.ksql.v1+json
  
  {
    "ksql": "INSERT INTO events_input (ROWKEY, `eventId`, `eventDate`, `eventTs`, `eventData`) VALUES (STRUCT(`customerId`:='dfederico', `countryId`:='Spain'), '123', '{{$localDatetime 'YYYYMMdDD-HH:mm:ss.ms'}}', {{$tiemstamp}}, 'SecondEvent');"
  }
POST {{ksql_endpoint}}/ksql
Authorization: Basic {{ksql_api_key}}:{{ksql_api_secret}}
Accept: application/vnd.ksql.v1+json
  
  {
    "ksql": "INSERT INTO events_input (ROWKEY, `eventId`, `eventDate`, `eventTs`, `eventData`) VALUES (STRUCT(`customerId`:='dfederico', `countryId`:='Spain'), '123', '{{$localDatetime 'YYYYMMdDD-HH:mm:ss.ms'}}', {{$tiemstamp}}, 'Thirdevent');"
  }

Note: In the above insert we use the backticks because of the case-sensitive fields.

QUICK CHECK

POST {{ksql_endpoint}}/query
Authorization: Basic {{ksql_api_key}}:{{ksql_api_secret}}
Accept: application/vnd.ksql.v1+json
  
  {
    "ksql": "SELECT * FROM events_input LIMIT 3;",
    "streamsProperties": {
      "ksql.streams.auto.offset.reset": "earliest"
    }
  }
POST {{ksql_endpoint}}/query
Authorization: Basic {{ksql_api_key}}:{{ksql_api_secret}}
Accept: application/vnd.ksql.v1+json
  
  {
    "ksql": "SELECT * FROM events_deduped_tumbling LIMIT 3;",
    "streamsProperties": {
      "ksql.streams.auto.offset.reset": "earliest"
    }
  }
POST {{ksql_endpoint}}/query
Authorization: Basic {{ksql_api_key}}:{{ksql_api_secret}}
Accept: application/vnd.ksql.v1+json
  
  {
    "ksql": "SELECT * FROM events_deduped_session LIMIT 3;",
    "streamsProperties": {
      "ksql.streams.auto.offset.reset": "earliest"
    }
  }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment