Skip to content

Instantly share code, notes, and snippets.

final KStreamBuilder builder = new KStreamBuilder();
// map the json transaction stream to (id, Transaction) key-value pairs
final KStream<String, Transaction> trnxStream = builder.stream(stringSerde, jsonSerde, TOPIC_TRNX)
.map((key, value) -> {
Transaction tx = Transaction.newFromJson(value);
return new KeyValue<>(tx.id, tx);
});
// aggregate the transaction stream: S1
dataByAirportStream.join(regressionsByAirPortTable,
(k, v) -> k,
DataRegression::new)
1. .mapValues(Predictor::predict)
{
"CreatedAt": 1506570308000,
"Text": "RT @gwenshap: This is the best thing since partitioned bread :) https://t.co/1wbv3KwRM6",
[…]
"User": {
"Id": 82564066,
"Name": "Robin Moffatt \uD83C\uDF7B\uD83C\uDFC3\uD83E\uDD53",
"ScreenName": "rmoff",
[…]
======================================
= _ __ _____ ____ _ =
= | |/ // ____|/ __ \| | =
= | ' /| (___ | | | | | =
= | < \___ \| | | | | =
= | . \ ____) | |__| | |____ =
= |_|\_\_____/ \___\_\______| =
= =
= Streaming SQL Engine for Kafka =
Copyright 2017 Confluent Inc.
$ kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic twitter_json_01|jq '.Text'
{
"string": "RT @rickastley: 30 years ago today I said I was Never Gonna Give You Up. I am a man of my word - Rick x https://t.co/VmbMQA6tQB"
}
{
"string": "RT @mariteg10: @rickastley @Carfestevent Wonderful Rick!!\nDo not forget Chile!!\nWe hope you get back someday!!\nHappy weekend for you!!\n❤…"
}
{
 "name": "twitter_source_json_01",
 "config": {
   "connector.class": "com.github.jcustenborder.kafka.connect.twitter.TwitterSourceConnector",
   "twitter.oauth.accessToken": "xxxx",
   "twitter.oauth.consumerSecret": "xxxxx",
   "twitter.oauth.consumerKey": "xxxx",
   "twitter.oauth.accessTokenSecret": "xxxxx",
   "kafka.delete.topic": "twitter_deletes_json_01",
   "value.converter": "org.apache.kafka.connect.json.JsonConverter",
@manjuapu
manjuapu / kafka-broker-api-versions
Last active July 18, 2017 16:37
API versions of all the nodes in the cluster
cmccabe@aurora:~/confluent-3.2.0/bin> ./kafka-broker-api-versions.sh --bootstrap-server localhost:9092
aurora:9092 (id: 0 rack: null) -> (
Produce(0): 0 to 2 [usable: 2],
Fetch(1): 0 to 2 [usable: 2],
Offsets(2): 0 [usable: 0],
Metadata(3): 0 to 1 [usable: 1],
LeaderAndIsr(4): 0 [usable: 0],
StopReplica(5): 0 [usable: 0],
UpdateMetadata(6): 0 to 2 [usable: 2],
ControlledShutdown(7): 1 [usable: 1],
cmccabe@aurora:~/confluent-3.2.0/bin> ./kafka-broker-api-versions.sh
Missing required argument "[bootstrap-server]"
Option Description
------ -----------
--bootstrap-server <String: server(s) REQUIRED: The server to connect to.
to use for bootstrapping>
--command-config <String: command A property file containing configs to
config property file> be passed to Admin Client.
public static List<KeyValue<SpecificRecord, OutboundMessage>> generateAlerts(AccountEntry accountEntry,
CustomerAlertSettings settings) {
/* Generates addressed alerts for an AccountEntry, using the alert settings with the following steps:
* 1) Settings are for a specific account, drop AccountEntries not for this account
* 2) Match each setting with all alerts to generate appropriate messages
* 3) Address the generated messages
*/
if (settings == null) {
return new ArrayList<>();
KStream<CustomerId, KeyValue<SpecificRecord, OutboundMessage>> addressedMessages =
builder.<AccountId, AccountEntry>stream(accountEntryStream)
.leftJoin(accountToCustomerIds, (accountEntry, customerIds) -> {
if (isNull(customerIds)) {
return Collections.<KeyValue<CustomerId, AccountEntry>>emptyList();
} else {
return customerIds.getCustomerIds().stream()
.map(customerId -> KeyValue.pair(customerId, accountEntry))
.collect(toList());
}