Skip to content

Instantly share code, notes, and snippets.

@miguno
miguno / WordCount.java
Last active June 27, 2018 17:01
WordCount application in Java 7, using Kafka's Streams API (Kafka version 0.11.0.0)
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.ValueMapper;
import java.util.Arrays;
KStream<CustomerId, KeyValue<SpecificRecord, OutboundMessage>> addressedMessages =
builder.<AccountId, AccountEntry>stream(accountEntryStream)
.leftJoin(accountToCustomerIds, (accountEntry, customerIds) -> {
if (isNull(customerIds)) {
return Collections.<KeyValue<CustomerId, AccountEntry>>emptyList();
} else {
return customerIds.getCustomerIds().stream()
.map(customerId -> KeyValue.pair(customerId, accountEntry))
.collect(toList());
}
public static List<KeyValue<SpecificRecord, OutboundMessage>> generateAlerts(AccountEntry accountEntry,
CustomerAlertSettings settings) {
/* Generates addressed alerts for an AccountEntry, using the alert settings with the following steps:
* 1) Settings are for a specific account, drop AccountEntries not for this account
* 2) Match each setting with all alerts to generate appropriate messages
* 3) Address the generated messages
*/
if (settings == null) {
return new ArrayList<>();
@miguno
miguno / json-to-avro.sql
Last active December 17, 2022 03:20
Using ksqlDB to convert data (i.e., events/messages) in a Kafka topic from JSON to Avro format.
CREATE STREAM sensor_events_json (sensor_id VARCHAR, temperature INTEGER, ...)
WITH (KAFKA_TOPIC='events-topic', VALUE_FORMAT='JSON');
CREATE STREAM sensor_events_avro WITH (VALUE_FORMAT='AVRO') AS SELECT * FROM sensor_events_json;
@miguno
miguno / join.sql
Last active August 16, 2018 13:57
Using KSQL to join data sources with different formats, e.g. a stream with JSON and a table with Avro data.
CREATE STREAM pageviews (viewtime BIGINT, user_id VARCHAR, ...) WITH (KAFKA_TOPIC='pageviews-topic', VALUE_FORMAT='AVRO');
CREATE TABLE users (user_id VARCHAR, registertime BIGINT, ...) WITH (KAFKA_TOPIC='users-topic', KEY='user_id', VALUE_FORMAT='JSON');
CREATE STREAM pageviews_enriched AS
SELECT pv.viewtime, pv.userid AS userid, ... FROM pageviews pv
LEFT JOIN users ON users.user_id = pv.user_id;
@miguno
miguno / avro.sql
Last active October 18, 2022 19:08
Creating STREAMs and TABLEs in KSQL to read and write Avro data in Kafka.
CREATE STREAM pageviews WITH (KAFKA_TOPIC='pageviews_topic', VALUE_FORMAT='AVRO');
@miguno
miguno / describe.txt
Last active December 20, 2017 22:32
DESCRIBE EXTENDED example output
ksql> DESCRIBE EXTENDED ip_sum;
Type : TABLE
Key field : CLICKSTREAM.IP
Timestamp field : Not set - using <ROWTIME>
Key format : STRING
Value format : JSON
Kafka output topic : IP_SUM (partitions: 4, replication: 1)
Field | Type
-------------------------------------
@miguno
miguno / explain.txt
Last active December 20, 2017 22:32
EXPLAIN example output.txt
ksql> EXPLAIN ctas_ip_sum;
Type : QUERY
SQL : CREATE TABLE IP_SUM as SELECT ip, SUM(bytes)/1024 as kbytes FROM CLICKSTREAM WINDOW SESSION (300 SECONDS) GROUP BY ip;
Local runtime statistics
------------------------
messages-per-sec: 104.38 total-messages: 14238 last-message: 12/14/17 4:30:42 PM GMT
failed-messages: 0 last-failed: n/a
(Statistics of the local KSQL Server interaction with the Kafka topic IP_SUM)
@miguno
miguno / example.sql
Created January 31, 2018 17:26
KSQL example for SELECT
SELECT user_id, page, action FROM clickstream c
LEFT JOIN users u ON c.user_id = u.user_id
WHERE u.level = 'Platinum';

Live Coding a KSQL Application

Introduction