Event deduplication emiting the very first message and filtering the rest of the duplicates within a tumbling window or a session window, using ksqlDB.
The ID to deduplicate is the eventId
field inside the record payload, thus the first step is to re-key the stream by this field.
The deduplication is done by counting the number of times the eventId
appears in the stream within a window, and only emitting the first event that has a count of 1.
The we re-key the stream back to the original key, keeping the original schema intact so external consumers can consume the deduplicated stream without major code changes.
+----------------------+-----------------------------------------------------------------+
| ID | lkc-o5zmrx |
| Name | fastweb |
| Type | STANDARD |
| Endpoint | SASL_SSL://pkc-mxqvx.europe-southwest1.gcp.confluent.cloud:9092 |
| REST Endpoint | https://pkc-mxqvx.europe-southwest1.gcp.confluent.cloud:443 |
+----------------------+-----------------------------------------------------------------+
@kafka_cluster_id = lkc-o5zmrx
@kafka_endpoint = https://pkc-mxqvx.europe-southwest1.gcp.confluent.cloud
@kafa_api_key = <API_KEY or USERNAME>
@kafka_api_secret = <API_KEY_SECRET or PASSWORD>
- Kafka Check
GET {{kafka_endpoint}}/kafka/v3/clusters/{{kafka_cluster_id}}/topics
Authorization: Basic {{kafa_api_key}}:{{kafka_api_secret}}
+-------------------------+-----------------------------------------------------------+
| Cluster | lsrc-q5z30p |
| Endpoint URL | https://psrc-yo2rpj.europe-southwest1.gcp.confluent.cloud |
| Package | essentials |
+-------------------------+-----------------------------------------------------------+
@sr_cluster_id = lsrc-q5z30p
@sr_endpoint = https://psrc-yo2rpj.europe-southwest1.gcp.confluent.cloud
@sr_api_key = <API_KEY or USERNAME>
@sr_api_secret = <API_KEY_SECRET or PASSWORD>
- Schema Registry Check
GET {{sr_endpoint}}/subjects
Authorization: Basic {{sr_api_key}}:{{sr_api_secret}}
+-------------------------+-----------------------------------------------------------------+
| ID | lksqlc-7odgr1 |
| Name | fastweb_KSQLDB |
| Topic Prefix | pksqlc-w7q70g |
| Kafka | lkc-o5zmrx |
| Storage | 125 |
| Endpoint | https://pksqlc-w7q70g.europe-southwest1.gcp.confluent.cloud:443 |
| Status | PROVISIONED |
| Detailed Processing Log | true |
+-------------------------+-----------------------------------------------------------------+
@ksql_cluster_id = lksqlc-vy8qd5
@ksql_endpoint = https://pksqlc-w7q70g.europe-southwest1.gcp.confluent.cloud:443
@ksql_api_key = <API_KEY or USERNAME>
@ksql_api_secret = <API_KEY_SECRET or PASSWORD>
POST {{ksql_endpoint}}/ksql
Authorization: Basic {{ksql_api_key}}:{{ksql_api_secret}}
{
"ksql": "list streams;",
"streamProperties":{}
}
@events_input_topic = events_input
@events_deduped_topic_tumbling = events_deduped_tumbling
@events_deduped_topic_session = events_deduped_session
POST {{kafka_endpoint}}/kafka/v3/clusters/{{kafka_cluster_id}}/topics
Authorization: Basic {{kafa_api_key}}:{{kafka_api_secret}}
{
"topic_name": "{{events_input_topic}}",
"partitions_count": 3
}
POST {{kafka_endpoint}}/kafka/v3/clusters/{{kafka_cluster_id}}/topics
Authorization: Basic {{kafa_api_key}}:{{kafka_api_secret}}
{
"topic_name": "{{events_deduped_topic_tumbling}}",
"partitions_count": 3
}
POST {{kafka_endpoint}}/kafka/v3/clusters/{{kafka_cluster_id}}/topics
Authorization: Basic {{kafa_api_key}}:{{kafka_api_secret}}
{
"topic_name": "{{events_deduped_topic_session}}",
"partitions_count": 3
}
DELETE {{kafka_endpoint}}/kafka/v3/clusters/{{kafka_cluster_id}}/topics/{{events_input_topic}}
Authorization: Basic {{kafa_api_key}}:{{kafka_api_secret}}
DELETE {{kafka_endpoint}}/kafka/v3/clusters/{{kafka_cluster_id}}/topics/{{events_deduped_topic_tumbling}}
Authorization: Basic {{kafa_api_key}}:{{kafka_api_secret}}
DELETE {{kafka_endpoint}}/kafka/v3/clusters/{{kafka_cluster_id}}/topics/{{events_deduped_topic_session}}
Authorization: Basic {{kafa_api_key}}:{{kafka_api_secret}}
- Key Schema
POST {{sr_endpoint}}/subjects/{{events_input_topic}}-key/versions
Authorization: Basic {{sr_api_key}}:{{sr_api_secret}}
Content-Type: application/vnd.schemaregistry.v1+json
{
"schema": "{\"type\":\"record\",\"name\":\"eventKey\",\"fields\":[{\"name\":\"customerId\",\"type\":\"string\"},{\"name\":\"countryId\",\"type\":\"string\"}]}",
"schemaType": "AVRO"
}
# @name latest_schema_key
GET {{sr_endpoint}}/subjects/{{events_input_topic}}-key/versions/latest
Authorization: Basic {{sr_api_key}}:{{sr_api_secret}}
Content-Type: application/vnd.schemaregistry.v1+json
@schema_key_id = {{latest_schema_key.response.body.id}}
GET {{sr_endpoint}}/schemas/ids/{{schema_key_id}}
Authorization: Basic {{sr_api_key}}:{{sr_api_secret}}
- Value Schema
POST {{sr_endpoint}}/subjects/{{events_input_topic}}-value/versions
Authorization: Basic {{sr_api_key}}:{{sr_api_secret}}
Content-Type: application/vnd.schemaregistry.v1+json
{
"schema": "{\"type\":\"record\",\"name\":\"eventValue\",\"fields\":[{\"name\":\"eventId\",\"type\":\"string\"},{\"name\":\"eventDate\",\"type\":\"string\"},{\"name\":\"eventTs\",\"type\":\"long\"},{\"name\":\"eventData\",\"type\":\"string\"}]}",
"schemaType": "AVRO"
}
# @name latest_schema_value
GET {{sr_endpoint}}/subjects/{{events_input_topic}}-value/versions/latest
Authorization: Basic {{sr_api_key}}:{{sr_api_secret}}
Content-Type: application/vnd.schemaregistry.v1+json
@schema_value_id = {{latest_schema_value.response.body.id}}
GET {{sr_endpoint}}/schemas/ids/{{schema_value_id}}
Authorization: Basic {{sr_api_key}}:{{sr_api_secret}}
Note: Don't forget to capture the schema ids for the key and value schemas.
POST {{ksql_endpoint}}/ksql
Authorization: Basic {{ksql_api_key}}:{{ksql_api_secret}}
Accept: application/vnd.ksql.v1+json
{
"ksql": "CREATE STREAM events_input WITH (KAFKA_TOPIC='{{events_input_topic}}', VALUE_FORMAT='AVRO', KEY_FORMAT='AVRO', KEY_SCHEMA_ID={{schema_key_id}}, VALUE_SCHEMA_ID={{schema_value_id}});",
"streamsProperties": {
"ksql.streams.auto.offset.reset": "earliest"
}
}
POST {{ksql_endpoint}}/ksql
Authorization: Basic {{ksql_api_key}}:{{ksql_api_secret}}
Accept: application/vnd.ksql.v1+json
{
"ksql": "DESCRIBE events_input;"
}
POST {{ksql_endpoint}}/ksql
Authorization: Basic {{ksql_api_key}}:{{ksql_api_secret}}
Accept: application/vnd.ksql.v1+json
{
"ksql": "DROP STREAM events_input;",
"streamsProperties": {
}
}
DELETE {{kafka_endpoint}}/kafka/v3/clusters/{{kafka_cluster_id}}/topics/{{events_input_topic}}
Authorization: Basic {{kafa_api_key}}:{{kafka_api_secret}}
POST {{ksql_endpoint}}/ksql
Authorization : Basic {{ ksql_api_key }}:{{ ksql_api_secret }}
Accept: application/vnd.ksql.v1+json
{
"ksql": "CREATE STREAM events_input_rekeyed WITH (KAFKA_TOPIC='events_input_rekeyed') AS SELECT * FROM events_input PARTITION BY `eventId`;",
"streamsProperties": {
"ksql.streams.auto.offset.reset": "earliest"
}
}
POST {{ksql_endpoint}}/ksql
Authorization : Basic {{ ksql_api_key }}:{{ ksql_api_secret }}
Accept: application/vnd.ksql.v1+json
{
"ksql": "DROP STREAM events_input_rekeyed;",
"streamsProperties": {
}
}
DELETE {{kafka_endpoint}}/kafka/v3/clusters/{{kafka_cluster_id}}/topics/events_input_rekeyed
Authorization: Basic {{kafa_api_key}}:{{kafka_api_secret}}
DELETE {{sr_endpoint}}/subjects/events_input_rekeyed-key
Authorization: Basic {{sr_api_key}}:{{sr_api_secret}}
DELETE {{sr_endpoint}}/subjects/events_input_rekeyed-key?permanent=true
Authorization: Basic {{sr_api_key}}:{{sr_api_secret}}
DELETE {{sr_endpoint}}/subjects/events_input_rekeyed-value
Authorization: Basic {{sr_api_key}}:{{sr_api_secret}}
DELETE {{sr_endpoint}}/subjects/events_input_rekeyed-value?permanent=true
Authorization: Basic {{sr_api_key}}:{{sr_api_secret}}
Defining a time window to state table helps to configure the underlying topic as both delete and compact, avoiding the topic to grow unbounded, considering that the cardinality of the eventIds might be infinite.
@window_duration = 3 MINUTES
@window_retention = 2 HOURS
@window_grace = 0 MILLISECOND
POST {{ksql_endpoint}}/ksql
Authorization : Basic {{ ksql_api_key }}:{{ ksql_api_secret }}
Accept: application/vnd.ksql.v1+json
{
"ksql": "CREATE TABLE events_count_table_tumbling WITH (KAFKA_TOPIC='events_count_table_tumbling', RETENTION_MS=3600000) AS SELECT `eventId`, EARLIEST_BY_OFFSET(`eventDate`) as `eventDate`, EARLIEST_BY_OFFSET(`eventTs`) as `eventTs`, EARLIEST_BY_OFFSET(`eventData`) as `eventData`, EARLIEST_BY_OFFSET(ROWKEY) as ORIGINAL_KEY, COUNT(*) AS count FROM EVENTS_INPUT_REKEYED WINDOW TUMBLING (SIZE {{window_duration}}, RETENTION {{window_retention}}, GRACE PERIOD {{window_grace}}) GROUP BY `eventId` HAVING COUNT(*) = 1 EMIT CHANGES;",
"streamsProperties": {
"ksql.streams.auto.offset.reset": "earliest",
"cache.max.bytes.buffering" : 0
}
}
POST {{ksql_endpoint}}/ksql
Authorization : Basic {{ ksql_api_key }}:{{ ksql_api_secret }}
Accept: application/vnd.ksql.v1+json
{
"ksql": "DROP TABLE events_count_table_tumbling;",
"streamsProperties": {
}
}
DELETE {{kafka_endpoint}}/kafka/v3/clusters/{{kafka_cluster_id}}/topics/events_count_table_tumbling
Authorization: Basic {{kafa_api_key}}:{{kafka_api_secret}}
DELETE {{sr_endpoint}}/subjects/events_count_table_tumbling-key
Authorization: Basic {{sr_api_key}}:{{sr_api_secret}}
DELETE {{sr_endpoint}}/subjects/events_count_table_tumbling-key?permanent=true
Authorization: Basic {{sr_api_key}}:{{sr_api_secret}}
DELETE {{sr_endpoint}}/subjects/events_count_table_tumbling-value
Authorization: Basic {{sr_api_key}}:{{sr_api_secret}}
DELETE {{sr_endpoint}}/subjects/events_count_table_tumbling-value?permanent=true
Authorization: Basic {{sr_api_key}}:{{sr_api_secret}}
POST {{ksql_endpoint}}/ksql
Authorization : Basic {{ ksql_api_key }}:{{ ksql_api_secret }}
Accept: application/vnd.ksql.v1+json
{
"ksql": "CREATE TABLE events_count_table_session WITH (KAFKA_TOPIC='events_count_table_session', RETENTION_MS=3600000) AS SELECT `eventId`, EARLIEST_BY_OFFSET(`eventDate`) as `eventDate`, EARLIEST_BY_OFFSET(`eventTs`) as `eventTs`, EARLIEST_BY_OFFSET(`eventData`) as `eventData`, EARLIEST_BY_OFFSET(ROWKEY) as ORIGINAL_KEY, COUNT(*) AS count FROM EVENTS_INPUT_REKEYED WINDOW SESSION ({{window_duration}}, RETENTION {{window_retention}}, GRACE PERIOD {{window_grace}}) GROUP BY `eventId` HAVING COUNT(*) = 1 EMIT CHANGES;",
"streamsProperties": {
"ksql.streams.auto.offset.reset": "earliest",
"cache.max.bytes.buffering" : 0
}
}
POST {{ksql_endpoint}}/ksql
Authorization : Basic {{ ksql_api_key }}:{{ ksql_api_secret }}
Accept: application/vnd.ksql.v1+json
{
"ksql": "DROP TABLE events_count_table_session;",
"streamsProperties": {
}
}
DELETE {{kafka_endpoint}}/kafka/v3/clusters/{{kafka_cluster_id}}/topics/events_count_table_session
Authorization: Basic {{kafa_api_key}}:{{kafka_api_secret}}
DELETE {{sr_endpoint}}/subjects/events_count_table_session-key
Authorization: Basic {{sr_api_key}}:{{sr_api_secret}}
DELETE {{sr_endpoint}}/subjects/events_count_table_session-key?permanent=true
Authorization: Basic {{sr_api_key}}:{{sr_api_secret}}
DELETE {{sr_endpoint}}/subjects/events_count_table_session-value
Authorization: Basic {{sr_api_key}}:{{sr_api_secret}}
DELETE {{sr_endpoint}}/subjects/events_count_table_session-value?permanent=true
Authorization: Basic {{sr_api_key}}:{{sr_api_secret}}
Syntetic Stream backed by the Table topic, required for Stream-Stream Join.
- Count Events Stream from TUMBLING WINDOW Table using
events_count_table_tumbling
topic
POST {{ksql_endpoint}}/ksql
Authorization : Basic {{ ksql_api_key }}:{{ ksql_api_secret }}
Accept: application/vnd.ksql.v1+json
{
"ksql": "CREATE STREAM deduped_events_from_tumbling WITH (KAFKA_TOPIC='events_count_table_tumbling', KEY_FORMAT='AVRO', VALUE_FORMAT='AVRO');",
"streamsProperties": {
"ksql.streams.auto.offset.reset": "earliest"
}
}
POST {{ksql_endpoint}}/ksql
Authorization : Basic {{ ksql_api_key }}:{{ ksql_api_secret }}
Accept: application/vnd.ksql.v1+json
{
"ksql": "DROP STREAM deduped_events_from_tumbling;",
"streamsProperties": {
"ksql.streams.auto.offset.reset": "earliest"
}
}
- Count Events Stream using
events_count_table_session
(SESSION WINDOW)
POST {{ksql_endpoint}}/ksql
Authorization : Basic {{ ksql_api_key }}:{{ ksql_api_secret }}
Accept: application/vnd.ksql.v1+json
{
"ksql": "CREATE STREAM deduped_events_from_session WITH (KAFKA_TOPIC='events_count_table_session', KEY_FORMAT='AVRO', VALUE_FORMAT='AVRO');",
"streamsProperties": {
"ksql.streams.auto.offset.reset": "earliest"
}
}
POST {{ksql_endpoint}}/ksql
Authorization : Basic {{ ksql_api_key }}:{{ ksql_api_secret }}
Accept: application/vnd.ksql.v1+json
{
"ksql": "DROP STREAM deduped_events_from_session;",
"streamsProperties": {
"ksql.streams.auto.offset.reset": "earliest"
}
}
POST {{ksql_endpoint}}/ksql
Authorization : Basic {{ ksql_api_key }}:{{ ksql_api_secret }}
Accept: application/vnd.ksql.v1+json
{
"ksql": "CREATE STREAM events_deduped_tumbling WITH (KAFKA_TOPIC='events_deduped_tumbling', VALUE_FORMAT='AVRO', KEY_FORMAT='AVRO', KEY_SCHEMA_ID={{schema_key_id}}, VALUE_SCHEMA_ID={{schema_value_id}}, RETENTION_MS=604800000) AS SELECT STRUCT(`customerId` := ORIGINAL_KEY->CUSTOMERID, `countryId` := ORIGINAL_KEY->COUNTRYID) as ROWKEY, ROWKEY as `eventId`, EVENTDATE as `eventDate`, EVENTTS as `eventTs`, EVENTDATA as `eventData` FROM deduped_events_from_tumbling PARTITION BY STRUCT(`customerId` := ORIGINAL_KEY->CUSTOMERID, `countryId` := ORIGINAL_KEY->COUNTRYID) EMIT CHANGES;",
"streamsProperties": {
"ksql.streams.auto.offset.reset": "earliest"
}
}
POST {{ksql_endpoint}}/ksql
Authorization : Basic {{ ksql_api_key }}:{{ ksql_api_secret }}
Accept: application/vnd.ksql.v1+json
{
"ksql": "DROP STREAM events_deduped_tumbling;",
"streamsProperties": {
}
}
POST {{ksql_endpoint}}/ksql
Authorization : Basic {{ ksql_api_key }}:{{ ksql_api_secret }}
Accept: application/vnd.ksql.v1+json
{
"ksql": "CREATE STREAM events_deduped_session WITH (KAFKA_TOPIC='events_deduped_session', VALUE_FORMAT='AVRO', KEY_FORMAT='AVRO', KEY_SCHEMA_ID={{schema_key_id}}, VALUE_SCHEMA_ID={{schema_value_id}}, RETENTION_MS=604800000) AS SELECT STRUCT(`customerId` := ORIGINAL_KEY->CUSTOMERID, `countryId` := ORIGINAL_KEY->COUNTRYID) as ROWKEY, ROWKEY as `eventId`, EVENTDATE as `eventDate`, EVENTTS as `eventTs`, EVENTDATA as `eventData` FROM deduped_events_from_tumbling WHERE ROWKEY IS NOT NULL PARTITION BY STRUCT(`customerId` := ORIGINAL_KEY->CUSTOMERID, `countryId` := ORIGINAL_KEY->COUNTRYID) EMIT CHANGES;",
"streamsProperties": {
"ksql.streams.auto.offset.reset": "earliest"
}
}
POST {{ksql_endpoint}}/ksql
Authorization : Basic {{ ksql_api_key }}:{{ ksql_api_secret }}
Accept: application/vnd.ksql.v1+json
{
"ksql": "DROP STREAM events_deduped_session;",
"streamsProperties": {
}
}
One could keep all the count events in a topic, to have a log of all the events that were deduplicated,
by removing the HAVING COUNT(*) = 1
clause from the CREATE TABLE
statement of step 3, and adding a filter like WHERE COUNT=1
in the CSAS
statement of step 5 when re/keying back
NOTE: PUSH QUERIES MAY NOT WORK WITH THE VSCODE REST PLUGIN (EMIT CHANGES)
POST {{ksql_endpoint}}/query
Authorization: Basic {{ksql_api_key}}:{{ksql_api_secret}}
Accept: application/vnd.ksql.v1+json
{
"ksql": "SELECT * FROM events_deduped_tumbling;",
"streamsProperties": {
"ksql.streams.auto.offset.reset": "earliest"
}
}
POST {{ksql_endpoint}}/query
Authorization: Basic {{ksql_api_key}}:{{ksql_api_secret}}
Accept: application/vnd.ksql.v1+json
{
"ksql": "SELECT * FROM events_deduped_session;",
"streamsProperties": {
"ksql.streams.auto.offset.reset": "earliest"
}
}
POST {{ksql_endpoint}}/ksql
Authorization : Basic {{ ksql_api_key }}:{{ ksql_api_secret }}
Accept: application/vnd.ksql.v1+json
{
"ksql": "DROP STREAM events_deduped_tumbling;",
"streamsProperties": {
"ksql.streams.auto.offset.reset": "earliest"
}
}
POST {{ksql_endpoint}}/ksql
Authorization : Basic {{ ksql_api_key }}:{{ ksql_api_secret }}
Accept: application/vnd.ksql.v1+json
{
"ksql": "DROP STREAM events_deduped_session;",
"streamsProperties": {
"ksql.streams.auto.offset.reset": "earliest"
}
}
POST {{ksql_endpoint}}/ksql
Authorization : Basic {{ ksql_api_key }}:{{ ksql_api_secret }}
Accept: application/vnd.ksql.v1+json
{
"ksql": "DROP STREAM deduped_events_from_session;",
"streamsProperties": {
"ksql.streams.auto.offset.reset": "earliest"
}
}
POST {{ksql_endpoint}}/ksql
Authorization : Basic {{ ksql_api_key }}:{{ ksql_api_secret }}
Accept: application/vnd.ksql.v1+json
{
"ksql": "DROP STREAM deduped_events_from_tumbling;",
"streamsProperties": {
"ksql.streams.auto.offset.reset": "earliest"
}
}
POST {{ksql_endpoint}}/ksql
Authorization : Basic {{ ksql_api_key }}:{{ ksql_api_secret }}
Accept: application/vnd.ksql.v1+json
{
"ksql": "DROP TABLE events_count_table_session;",
"streamsProperties": {
}
}
POST {{ksql_endpoint}}/ksql
Authorization : Basic {{ ksql_api_key }}:{{ ksql_api_secret }}
Accept: application/vnd.ksql.v1+json
{
"ksql": "DROP TABLE events_count_table_tumbling;",
"streamsProperties": {
}
}
POST {{ksql_endpoint}}/ksql
Authorization : Basic {{ ksql_api_key }}:{{ ksql_api_secret }}
Accept: application/vnd.ksql.v1+json
{
"ksql": "DROP STREAM events_input_rekeyed;",
"streamsProperties": {
}
}
POST {{ksql_endpoint}}/ksql
Authorization: Basic {{ksql_api_key}}:{{ksql_api_secret}}
Accept: application/vnd.ksql.v1+json
{
"ksql": "DROP STREAM events_input;",
"streamsProperties": {
}
}
Topics:
- events_input
- events_input_rekeyed
- events_count_table_tumbling
- events_count_table_session
- events_deduped_tumbling
- events_deduped_session
@delete_this_topic = <TOPIC_NAME_HERE>
DELETE {{kafka_endpoint}}/kafka/v3/clusters/{{kafka_cluster_id}}/topics/{{delete_this_topic}}
Authorization: Basic {{kafa_api_key}}:{{kafka_api_secret}}
- DELETE SCHEMA REGISTRY SUBJECTS
GET {{sr_endpoint}}/subjects
Authorization: Basic {{sr_api_key}}:{{sr_api_secret}}
@subject = <SCHEMA_HERE>
DELETE {{sr_endpoint}}/subjects/{{subject}}
Authorization: Basic {{sr_api_key}}:{{sr_api_secret}}
DELETE {{sr_endpoint}}/subjects/{{subject}}?permanent=true
Authorization: Basic {{sr_api_key}}:{{sr_api_secret}}
POST {{ksql_endpoint}}/query
Authorization: Basic {{ksql_api_key}}:{{ksql_api_secret}}
Accept: application/vnd.ksql.v1+json
{
"ksql": "PRINT events_input_rekeyed FROM BEGINNING LIMIT 3;",
"streamsProperties": {
"ksql.streams.auto.offset.reset": "earliest"
}
}
# Required connection configs for Kafka producer, consumer, and admin
bootstrap.servers=pkc-mxqvx.europe-southwest1.gcp.confluent.cloud:9092
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='<API_KEY or USERNAME>' password='<API_KEY_SECRET or PASSWORD>';
sasl.mechanism=PLAIN
# Required for correctness in Apache Kafka clients prior to 2.6
client.dns.lookup=use_all_dns_ips
# Best practice for higher availability in Apache Kafka clients prior to 3.0
session.timeout.ms=45000
# Best practice for Kafka producer to prevent data loss
acks=all
kafka-avro-console-consumer \
--bootstrap-server SASL_SSL://pkc-mxqvx.europe-southwest1.gcp.confluent.cloud:9092 \
--property schema.registry.url=https://psrc-yo2rpj.europe-southwest1.gcp.confluent.cloud \
--property schema.registry.basic.auth.credentials.source=USER_INFO \
--property schema.registry.basic.auth.user.info=<API_KEY or USERNAME>:<API_KEY_SECRET or PASSWORD> \
--consumer.config ccloud.properties \
--group consumer.console1 \
--property print.key=true \
--from-beginning \
--topic events_input_rekeyed
kafka-avro-console-consumer \
--bootstrap-server SASL_SSL://pkc-mxqvx.europe-southwest1.gcp.confluent.cloud:9092 \
--property schema.registry.url=https://psrc-yo2rpj.europe-southwest1.gcp.confluent.cloud \
--property schema.registry.basic.auth.credentials.source=USER_INFO \
--property schema.registry.basic.auth.user.info=<API_KEY or USERNAME>:<API_KEY_SECRET or PASSWORD> \
--consumer.config ccloud.properties \
--group consumer.console1 \
--property print.key=true \
--from-beginning \
--topic events_deduped_tumbling
kafka-avro-console-consumer \
--bootstrap-server SASL_SSL://pkc-mxqvx.europe-southwest1.gcp.confluent.cloud:9092 \
--property schema.registry.url=https://psrc-yo2rpj.europe-southwest1.gcp.confluent.cloud \
--property schema.registry.basic.auth.credentials.source=USER_INFO \
--property schema.registry.basic.auth.user.info=<API_KEY or USERNAME>:<API_KEY_SECRET or PASSWORD> \
--consumer.config ccloud.properties \
--group consumer.console1 \
--property print.key=true \
--from-beginning \
--topic events_deduped_session
Note: Refer back to this section after deploying the whole pipeline, change the eventId (123) at your convenience.
POST {{ksql_endpoint}}/ksql
Authorization: Basic {{ksql_api_key}}:{{ksql_api_secret}}
Accept: application/vnd.ksql.v1+json
{
"ksql": "INSERT INTO events_input (ROWKEY, `eventId`, `eventDate`, `eventTs`, `eventData`) VALUES (STRUCT(`customerId`:='dfederico', `countryId`:='Spain'), '123', '{{$localDatetime 'YYYYMMdDD-HH:mm:ss.ms'}}', {{$tiemstamp}}, 'FirstEvent');"
}
POST {{ksql_endpoint}}/ksql
Authorization: Basic {{ksql_api_key}}:{{ksql_api_secret}}
Accept: application/vnd.ksql.v1+json
{
"ksql": "INSERT INTO events_input (ROWKEY, `eventId`, `eventDate`, `eventTs`, `eventData`) VALUES (STRUCT(`customerId`:='dfederico', `countryId`:='Spain'), '123', '{{$localDatetime 'YYYYMMdDD-HH:mm:ss.ms'}}', {{$tiemstamp}}, 'SecondEvent');"
}
POST {{ksql_endpoint}}/ksql
Authorization: Basic {{ksql_api_key}}:{{ksql_api_secret}}
Accept: application/vnd.ksql.v1+json
{
"ksql": "INSERT INTO events_input (ROWKEY, `eventId`, `eventDate`, `eventTs`, `eventData`) VALUES (STRUCT(`customerId`:='dfederico', `countryId`:='Spain'), '123', '{{$localDatetime 'YYYYMMdDD-HH:mm:ss.ms'}}', {{$tiemstamp}}, 'Thirdevent');"
}
Note: In the above insert we use the backticks because of the case-sensitive fields.
POST {{ksql_endpoint}}/query
Authorization: Basic {{ksql_api_key}}:{{ksql_api_secret}}
Accept: application/vnd.ksql.v1+json
{
"ksql": "SELECT * FROM events_input LIMIT 3;",
"streamsProperties": {
"ksql.streams.auto.offset.reset": "earliest"
}
}
POST {{ksql_endpoint}}/query
Authorization: Basic {{ksql_api_key}}:{{ksql_api_secret}}
Accept: application/vnd.ksql.v1+json
{
"ksql": "SELECT * FROM events_deduped_tumbling LIMIT 3;",
"streamsProperties": {
"ksql.streams.auto.offset.reset": "earliest"
}
}
POST {{ksql_endpoint}}/query
Authorization: Basic {{ksql_api_key}}:{{ksql_api_secret}}
Accept: application/vnd.ksql.v1+json
{
"ksql": "SELECT * FROM events_deduped_session LIMIT 3;",
"streamsProperties": {
"ksql.streams.auto.offset.reset": "earliest"
}
}