# List all running instances of this application
$ http://localhost:7070/state/instances
[
{
"host": "localhost",
"port": 7070,
# List all running instances of this application
$ http://localhost:7070/state/instances
[
{
"host": "localhost",
"port": 7070,
# List running app instances that currently manage (parts of) state store "word-count"
$ http://localhost:7070/state/instances/word-count
[
{
"host": "localhost",
"port": 7070,
# Get all key-value records of state store "word-count" across all instances
$ http://localhost:7070/state/keyvalues/word-count/all
[
{
"key": "a",
"value": 147
# Get the latest value for key "jolly" in state store "word-count"
$ http://localhost:7070/state/keyvalue/word-count/jolly
{
"key": "jolly",
"value": 55
}
# Get all key-value records of state store "word-count" that have keys within the range from "hat" to "kafka"
$ http://localhost:7070/state/keyvalues/word-count/range/hat/kafka
[
{
"key": "hat",
"value": 65
# Find the app instance that contains key "jolly" (if it exists) for the state store "word-count"
$ http://localhost:7070/state/instance/word-count/jolly
{
"host": "localhost",
"port": 7070,
"storeNames": [
public static void main(String[] args) throws Exception { | |
Properties streamsConfiguration = new Properties(); | |
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-lambda-example"); | |
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); | |
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181"); | |
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); | |
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); | |
final Serde<String> stringSerde = Serdes.String(); | |
final Serde<Long> longSerde = Serdes.Long(); |
import java.lang.Long | |
import java.util.Properties | |
import java.util.concurrent.TimeUnit | |
import org.apache.kafka.common.serialization._ | |
import org.apache.kafka.streams._ | |
import org.apache.kafka.streams.kstream.{KStream, KStreamBuilder, KTable} | |
import scala.collection.JavaConverters.asJavaIterableConverter |
This gist has been moved to https://github.com/miguno/kafka-streams-docker.
# MAC USERS ONLY! | |
# | |
# If you haven't done so, create a VM with 6GB of memory as your Docker Machine | |
# and name it `confluent`. If you already created the `confluent` machine earlier, | |
# ensure that it is running (`status`) and `start` it if need be. | |
$ docker-machine create --driver virtualbox --virtualbox-memory 6000 confluent | |
# Next, configure your terminal window to attach it to your new Docker Machine. | |
$ eval $(docker-machine env confluent) |