# List all running instances of this application
$ http://localhost:7070/state/instances[
{
"host": "localhost",
"port": 7070,# List all running instances of this application
$ http://localhost:7070/state/instances[
{
"host": "localhost",
"port": 7070,# List running app instances that currently manage (parts of) state store "word-count"
$ http://localhost:7070/state/instances/word-count[
{
"host": "localhost",
"port": 7070,# Get all key-value records of state store "word-count" across all instances
$ http://localhost:7070/state/keyvalues/word-count/all [
{
"key": "a",
"value": 147# Get the latest value for key "jolly" in state store "word-count"
$ http://localhost:7070/state/keyvalue/word-count/jolly{
"key": "jolly",
"value": 55
}# Get all key-value records of state store "word-count" that have keys within the range from "hat" to "kafka"
$ http://localhost:7070/state/keyvalues/word-count/range/hat/kafka[
{
"key": "hat",
"value": 65# Find the app instance that contains key "jolly" (if it exists) for the state store "word-count"
$ http://localhost:7070/state/instance/word-count/jolly{
"host": "localhost",
"port": 7070,
"storeNames": [| public static void main(String[] args) throws Exception { | |
| Properties streamsConfiguration = new Properties(); | |
| streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-lambda-example"); | |
| streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); | |
| streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181"); | |
| streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); | |
| streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); | |
| final Serde<String> stringSerde = Serdes.String(); | |
| final Serde<Long> longSerde = Serdes.Long(); |
| import java.lang.Long | |
| import java.util.Properties | |
| import java.util.concurrent.TimeUnit | |
| import org.apache.kafka.common.serialization._ | |
| import org.apache.kafka.streams._ | |
| import org.apache.kafka.streams.kstream.{KStream, KStreamBuilder, KTable} | |
| import scala.collection.JavaConverters.asJavaIterableConverter |
This gist has been moved to https://github.com/miguno/kafka-streams-docker.
| # MAC USERS ONLY! | |
| # | |
| # If you haven't done so, create a VM with 6GB of memory as your Docker Machine | |
| # and name it `confluent`. If you already created the `confluent` machine earlier, | |
| # ensure that it is running (`status`) and `start` it if need be. | |
| $ docker-machine create --driver virtualbox --virtualbox-memory 6000 confluent | |
| # Next, configure your terminal window to attach it to your new Docker Machine. | |
| $ eval $(docker-machine env confluent) |