This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.kafka.common.serialization.Serdes; | |
import org.apache.kafka.streams.KafkaStreams; | |
import org.apache.kafka.streams.StreamsConfig; | |
import org.apache.kafka.streams.kstream.KStream; | |
import org.apache.kafka.streams.kstream.KStreamBuilder; | |
import org.apache.kafka.streams.kstream.KTable; | |
import org.apache.kafka.streams.kstream.KeyValueMapper; | |
import org.apache.kafka.streams.kstream.ValueMapper; | |
import java.util.Arrays; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
KStream<CustomerId, KeyValue<SpecificRecord, OutboundMessage>> addressedMessages = | |
builder.<AccountId, AccountEntry>stream(accountEntryStream) | |
.leftJoin(accountToCustomerIds, (accountEntry, customerIds) -> { | |
if (isNull(customerIds)) { | |
return Collections.<KeyValue<CustomerId, AccountEntry>>emptyList(); | |
} else { | |
return customerIds.getCustomerIds().stream() | |
.map(customerId -> KeyValue.pair(customerId, accountEntry)) | |
.collect(toList()); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static List<KeyValue<SpecificRecord, OutboundMessage>> generateAlerts(AccountEntry accountEntry, | |
CustomerAlertSettings settings) { | |
/* Generates addressed alerts for an AccountEntry, using the alert settings with the following steps: | |
* 1) Settings are for a specific account, drop AccountEntries not for this account | |
* 2) Match each setting with all alerts to generate appropriate messages | |
* 3) Address the generated messages | |
*/ | |
if (settings == null) { | |
return new ArrayList<>(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE STREAM sensor_events_json (sensor_id VARCHAR, temperature INTEGER, ...) | |
WITH (KAFKA_TOPIC='events-topic', VALUE_FORMAT='JSON'); | |
CREATE STREAM sensor_events_avro WITH (VALUE_FORMAT='AVRO') AS SELECT * FROM sensor_events_json; | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE STREAM pageviews (viewtime BIGINT, user_id VARCHAR, ...) WITH (KAFKA_TOPIC='pageviews-topic', VALUE_FORMAT='AVRO'); | |
CREATE TABLE users (user_id VARCHAR, registertime BIGINT, ...) WITH (KAFKA_TOPIC='users-topic', KEY='user_id', VALUE_FORMAT='JSON'); | |
CREATE STREAM pageviews_enriched AS | |
SELECT pv.viewtime, pv.userid AS userid, ... FROM pageviews pv | |
LEFT JOIN users ON users.user_id = pv.user_id; | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE STREAM pageviews WITH (KAFKA_TOPIC='pageviews_topic', VALUE_FORMAT='AVRO'); | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
ksql> DESCRIBE EXTENDED ip_sum; | |
Type : TABLE | |
Key field : CLICKSTREAM.IP | |
Timestamp field : Not set - using <ROWTIME> | |
Key format : STRING | |
Value format : JSON | |
Kafka output topic : IP_SUM (partitions: 4, replication: 1) | |
Field | Type | |
------------------------------------- |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
ksql> EXPLAIN ctas_ip_sum; | |
Type : QUERY | |
SQL : CREATE TABLE IP_SUM as SELECT ip, SUM(bytes)/1024 as kbytes FROM CLICKSTREAM WINDOW SESSION (300 SECONDS) GROUP BY ip; | |
Local runtime statistics | |
------------------------ | |
messages-per-sec: 104.38 total-messages: 14238 last-message: 12/14/17 4:30:42 PM GMT | |
failed-messages: 0 last-failed: n/a | |
(Statistics of the local KSQL Server interaction with the Kafka topic IP_SUM) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
SELECT user_id, page, action FROM clickstream c | |
LEFT JOIN users u ON c.user_id = u.user_id | |
WHERE u.level = 'Platinum'; |