This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import org.apache.kafka.common.serialization.Serdes; | |
| import org.apache.kafka.streams.KafkaStreams; | |
| import org.apache.kafka.streams.StreamsConfig; | |
| import org.apache.kafka.streams.kstream.KStream; | |
| import org.apache.kafka.streams.kstream.KStreamBuilder; | |
| import org.apache.kafka.streams.kstream.KTable; | |
| import org.apache.kafka.streams.kstream.KeyValueMapper; | |
| import org.apache.kafka.streams.kstream.ValueMapper; | |
| import java.util.Arrays; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| KStream<CustomerId, KeyValue<SpecificRecord, OutboundMessage>> addressedMessages = | |
| builder.<AccountId, AccountEntry>stream(accountEntryStream) | |
| .leftJoin(accountToCustomerIds, (accountEntry, customerIds) -> { | |
| if (isNull(customerIds)) { | |
| return Collections.<KeyValue<CustomerId, AccountEntry>>emptyList(); | |
| } else { | |
| return customerIds.getCustomerIds().stream() | |
| .map(customerId -> KeyValue.pair(customerId, accountEntry)) | |
| .collect(toList()); | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| public static List<KeyValue<SpecificRecord, OutboundMessage>> generateAlerts(AccountEntry accountEntry, | |
| CustomerAlertSettings settings) { | |
| /* Generates addressed alerts for an AccountEntry, using the alert settings with the following steps: | |
| * 1) Settings are for a specific account, drop AccountEntries not for this account | |
| * 2) Match each setting with all alerts to generate appropriate messages | |
| * 3) Address the generated messages | |
| */ | |
| if (settings == null) { | |
| return new ArrayList<>(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| CREATE STREAM sensor_events_json (sensor_id VARCHAR, temperature INTEGER, ...) | |
| WITH (KAFKA_TOPIC='events-topic', VALUE_FORMAT='JSON'); | |
| CREATE STREAM sensor_events_avro WITH (VALUE_FORMAT='AVRO') AS SELECT * FROM sensor_events_json; | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| CREATE STREAM pageviews (viewtime BIGINT, user_id VARCHAR, ...) WITH (KAFKA_TOPIC='pageviews-topic', VALUE_FORMAT='AVRO'); | |
| CREATE TABLE users (user_id VARCHAR, registertime BIGINT, ...) WITH (KAFKA_TOPIC='users-topic', KEY='user_id', VALUE_FORMAT='JSON'); | |
| CREATE STREAM pageviews_enriched AS | |
| SELECT pv.viewtime, pv.userid AS userid, ... FROM pageviews pv | |
| LEFT JOIN users ON users.user_id = pv.user_id; | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| CREATE STREAM pageviews WITH (KAFKA_TOPIC='pageviews_topic', VALUE_FORMAT='AVRO'); | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| ksql> DESCRIBE EXTENDED ip_sum; | |
| Type : TABLE | |
| Key field : CLICKSTREAM.IP | |
| Timestamp field : Not set - using <ROWTIME> | |
| Key format : STRING | |
| Value format : JSON | |
| Kafka output topic : IP_SUM (partitions: 4, replication: 1) | |
| Field | Type | |
| ------------------------------------- |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| ksql> EXPLAIN ctas_ip_sum; | |
| Type : QUERY | |
| SQL : CREATE TABLE IP_SUM as SELECT ip, SUM(bytes)/1024 as kbytes FROM CLICKSTREAM WINDOW SESSION (300 SECONDS) GROUP BY ip; | |
| Local runtime statistics | |
| ------------------------ | |
| messages-per-sec: 104.38 total-messages: 14238 last-message: 12/14/17 4:30:42 PM GMT | |
| failed-messages: 0 last-failed: n/a | |
| (Statistics of the local KSQL Server interaction with the Kafka topic IP_SUM) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| SELECT user_id, page, action FROM clickstream c | |
| LEFT JOIN users u ON c.user_id = u.user_id | |
| WHERE u.level = 'Platinum'; |