Skip to content

Instantly share code, notes, and snippets.

//Inside the Kafka Streams thread (extending the example above) push the enriched orders into a topic of their own.
enrichedOrders.through(“enriched-orders-topic”);
//Now build a table from that topic, which will be pushed into the “enriched-orders-store”
KTable enrichedOrdersTable = builder.table(..., “enriched-orders-topic”, “enriched-orders-store”);
//Inside our request-response thread (e.g. the webserver’s thread)
ReadOnlyKeyValueStore ordersStore = streams.store(“enriched-orders-store”,...);
ordersStore.get(orderId);
orders.join(customers, Tuple::new) //join customers and orders
.filter((k, tuple) → tuple.customer.level().equals(PLATINUM) //filter platinum customers
&& tuple.order.state().equals(CONFIRMED)) //only consider confirmed orders
.peek((k, tuple) → emailer.sendMail(tuple)); //send email for each cust/order tuple
//Execute query in a sidecar
ksql> CREATE STREAM orders (ORDERID string, ORDERTIME bigint...) WITH (kafka_topic='orders', value_format='JSON');
ksql> CREATE STREAM platinum_emails as select * from orders, customers where client_level == ‘PLATINUM’ and state == ‘CONFIRMED’;
//In Node.js service
var nodemailer = require('nodemailer');
var kafka = require('kafka-node'),
Consumer = kafka.Consumer,
client = new kafka.Client(),
consumer = new Consumer(client, [ { topic: 'platinum_emails', partition: 0 } ] );
consumer.on('message', function (orderConsumerTuple) {
sendMail(orderConsumerTuple);
# Submit an order. Immediately retrieving it will block until validation completes.
$ curl -X POST ... --data {"id":"1"...} http://server:8081/orders/
$ curl -X GET http://server:8081/orders/validated/1?timeout=500