This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
final KStreamBuilder builder = new KStreamBuilder(); | |
// map the json transaction stream to (id, Transaction) key-value pairs | |
final KStream<String, Transaction> trnxStream = builder.stream(stringSerde, jsonSerde, TOPIC_TRNX) | |
.map((key, value) -> { | |
Transaction tx = Transaction.newFromJson(value); | |
return new KeyValue<>(tx.id, tx); | |
}); | |
// aggregate the transaction stream: S1 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
dataByAirportStream.join(regressionsByAirPortTable, | |
(k, v) -> k, | |
DataRegression::new) | |
1. .mapValues(Predictor::predict) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"CreatedAt": 1506570308000, | |
"Text": "RT @gwenshap: This is the best thing since partitioned bread :) https://t.co/1wbv3KwRM6", | |
[…] | |
"User": { | |
"Id": 82564066, | |
"Name": "Robin Moffatt \uD83C\uDF7B\uD83C\uDFC3\uD83E\uDD53", | |
"ScreenName": "rmoff", | |
[…] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
====================================== | |
= _ __ _____ ____ _ = | |
= | |/ // ____|/ __ \| | = | |
= | ' /| (___ | | | | | = | |
= | < \___ \| | | | | = | |
= | . \ ____) | |__| | |____ = | |
= |_|\_\_____/ \___\_\______| = | |
= = | |
= Streaming SQL Engine for Kafka = | |
Copyright 2017 Confluent Inc. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic twitter_json_01|jq '.Text' | |
{ | |
"string": "RT @rickastley: 30 years ago today I said I was Never Gonna Give You Up. I am a man of my word - Rick x https://t.co/VmbMQA6tQB" | |
} | |
{ | |
"string": "RT @mariteg10: @rickastley @Carfestevent Wonderful Rick!!\nDo not forget Chile!!\nWe hope you get back someday!!\nHappy weekend for you!!\n❤…" | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"name": "twitter_source_json_01", | |
"config": { | |
"connector.class": "com.github.jcustenborder.kafka.connect.twitter.TwitterSourceConnector", | |
"twitter.oauth.accessToken": "xxxx", | |
"twitter.oauth.consumerSecret": "xxxxx", | |
"twitter.oauth.consumerKey": "xxxx", | |
"twitter.oauth.accessTokenSecret": "xxxxx", | |
"kafka.delete.topic": "twitter_deletes_json_01", | |
"value.converter": "org.apache.kafka.connect.json.JsonConverter", |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
cmccabe@aurora:~/confluent-3.2.0/bin> ./kafka-broker-api-versions.sh --bootstrap-server localhost:9092 | |
aurora:9092 (id: 0 rack: null) -> ( | |
Produce(0): 0 to 2 [usable: 2], | |
Fetch(1): 0 to 2 [usable: 2], | |
Offsets(2): 0 [usable: 0], | |
Metadata(3): 0 to 1 [usable: 1], | |
LeaderAndIsr(4): 0 [usable: 0], | |
StopReplica(5): 0 [usable: 0], | |
UpdateMetadata(6): 0 to 2 [usable: 2], | |
ControlledShutdown(7): 1 [usable: 1], |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
cmccabe@aurora:~/confluent-3.2.0/bin> ./kafka-broker-api-versions.sh | |
Missing required argument "[bootstrap-server]" | |
Option Description | |
------ ----------- | |
--bootstrap-server <String: server(s) REQUIRED: The server to connect to. | |
to use for bootstrapping> | |
--command-config <String: command A property file containing configs to | |
config property file> be passed to Admin Client. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static List<KeyValue<SpecificRecord, OutboundMessage>> generateAlerts(AccountEntry accountEntry, | |
CustomerAlertSettings settings) { | |
/* Generates addressed alerts for an AccountEntry, using the alert settings with the following steps: | |
* 1) Settings are for a specific account, drop AccountEntries not for this account | |
* 2) Match each setting with all alerts to generate appropriate messages | |
* 3) Address the generated messages | |
*/ | |
if (settings == null) { | |
return new ArrayList<>(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
KStream<CustomerId, KeyValue<SpecificRecord, OutboundMessage>> addressedMessages = | |
builder.<AccountId, AccountEntry>stream(accountEntryStream) | |
.leftJoin(accountToCustomerIds, (accountEntry, customerIds) -> { | |
if (isNull(customerIds)) { | |
return Collections.<KeyValue<CustomerId, AccountEntry>>emptyList(); | |
} else { | |
return customerIds.getCustomerIds().stream() | |
.map(customerId -> KeyValue.pair(customerId, accountEntry)) | |
.collect(toList()); | |
} |