I've been working with Apache Kafka for over 7 years. I inevitably find myself doing the same set of activities while I'm developing or working with someone else's system. Here's a set of Kafka productivity hacks for doing a few things way faster than you're probably doing them now. 🔥
- Show me all my Kafka topics and their partitions, replicas, and consumers
- Show me the contents of a topic
- Create a Kafka topic
- Produce messages to a Kafka topic
- Validate the schema of messages before producing to a topic
- Do all of this at a distance
Most of these tricks rely on having KSQL, and the easiest way to get that is with Docker. KSQL let's you work with the data in Kafka in a client/server fashion. It won't attach/disrupt your existing Kafka setup. Launch this Docker Compose file with docker-compose up
. Adjust KSQL_BOOTSTRAP_SERVERS
to point to your Kafka installation if it's not locally available on port 9092.
---
version: '2'
services:
ksql-server:
image: confluentinc/cp-ksql-server:5.3.0
hostname: ksql-server
container_name: ksql-server
network_mode: host
ports:
- "8088:8088"
environment:
KSQL_BOOTSTRAP_SERVERS: "127.0.0.1:9092"
KSQL_HOST_NAME: ksql-server
KSQL_LISTENERS: "http://0.0.0.0:8088"
KSQL_CACHE_MAX_BYTES_BUFFERING: 0
ksql-cli:
image: confluentinc/cp-ksql-cli:5.3.0
container_name: ksql-cli
network_mode: host
depends_on:
- ksql-server
entrypoint: /bin/sh
tty: true
Now to the fun part.
What's in the cluster? No command kafka-topics.sh
+ a long string of flags needed. No UI needed. It can even approximate how much outbound activity there is on the topics by the consumer count.
ksql> show topics;
Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
-----------------------------------------------------------------------------------------
auctions | false | 3 | 1 | 2 | 1
bids | false | 12 | 1 | 3 | 3
people | false | 6 | 1 | 3 | 1
-----------------------------------------------------------------------------------------
What's in this topic? Forget kafka-console-consumer.sh
. print
can find out. It will also guess the serialization format of your data. This is handy if you don't know what the data in the topic is or who produced it.
ksql> print 'bids' from beginning;
Format:JSON
{"ROWTIME":1562876975855,"ROWKEY":"null","item":"glow stick","price_usd":1,"bid_time":"8:13"}
{"ROWTIME":1562876977966,"ROWKEY":"null","item":"magnet","price_usd":6,"bid_time":"8:17"}
{"ROWTIME":1562876971480,"ROWKEY":"null","item":"tweezers","price_usd":4,"bid_time":"8:05"}
{"ROWTIME":1562876969350,"ROWKEY":"null","item":"sponge","price_usd":3,"bid_time":"8:11"}
{"ROWTIME":1562876967096,"ROWKEY":"null","item":"spoon","price_usd":2,"bid_time":"8:07"}
{"ROWTIME":1562876973673,"ROWKEY":"null","item":"key chain","price_usd":5,"bid_time":"8:09"}
...
In this case, it auto-discovered that I have JSON in the topic. ROWKEY
and ROWTIME
are KSQL's anolog for the key and timestamp of each record.
No more fishing around the broker's machine for kafka-topics.sh
to make a topic. Make one right at the prompt. Can specify partitions/replication factor as usual.
ksql> CREATE STREAM cheese_shipments (shipmentId INT, cheese VARCHAR, shipmentTimestamp VARCHAR)
> WITH (kafka_topic='cheese_shipments', partitions=3, key = 'shipmentId', value_format='json');
Message
----------------
Stream created
----------------
This creates a Kafka topic and registers a stream over it, which is basically a schema for data validation. Check it out:
ksql> show streams;
Stream Name | Kafka Topic | Format
----------------------------------------------
CHEESE_SHIPMENTS | cheese_shipments | JSON
----------------------------------------------
It's a stream, but really it's just a topic. There's those cheese_shipments
:
ksql> show topics;
Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
----------------------------------------------------------------------------------------------
auctions | false | 3 | 1 | 2 | 1
bids | false | 12 | 1 | 3 | 3
cheese_shipments | true | 3 | 1 | 0 | 0
people | false | 6 | 1 | 3 | 1
----------------------------------------------------------------------------------------------
If the topic is active, you can get some runtime metrics about what it's doing. I don't have any here though, so it's blank at the bottom.
ksql> describe extended cheese_shipments;
Name : CHEESE_SHIPMENTS
Type : STREAM
Key field : SHIPMENTID
Key format : STRING
Timestamp field : Not set - using <ROWTIME>
Value format : JSON
Kafka topic : cheese_shipments (partitions: 3, replication: 1)
Field | Type
-----------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
SHIPMENTID | INTEGER
CHEESE | VARCHAR(STRING)
SHIPMENTTIMESTAMP | VARCHAR(STRING)
-----------------------------------------------
Local runtime statistics
------------------------
(Statistics of the local KSQL server interaction with the Kafka topic cheese_shipments)
Gotta get some data in a topic. I can either write a program which uses the producer API (ugh) or figure out how to use kafkacat (I need to relearn that every time). How about insert into
instead?
ksql> insert into cheese_shipments (shipmentId, cheese, shipmentTimestamp) values (42, 'provolone', 'june 5th 2019');
ksql> insert into cheese_shipments (shipmentId, cheese, shipmentTimestamp) values (45, 'chedar', 'june 8th 2019');
ksql> insert into cheese_shipments (shipmentId, cheese, shipmentTimestamp) values (47, 'swiss', 'june 8th 2019');
ksql> insert into cheese_shipments (shipmentId, cheese, shipmentTimestamp) values (51, 'cooper', 'june 11th 2019');
It's definitely in the topic. Check it out. Note that the topic is keyed by shipment ID and there's 3 partitions, so the order seen here doesn't match how I inserted it.
ksql> print 'cheese_shipments' from beginning;
Format:JSON
{"ROWTIME":1562882124666,"ROWKEY":"45","SHIPMENTID":45,"CHEESE":"chedar","SHIPMENTTIMESTAMP":"june 8th 2019"}
{"ROWTIME":1562882127795,"ROWKEY":"45","SHIPMENTID":47,"CHEESE":"swiss","SHIPMENTTIMESTAMP":"june 8th 2019"}
{"ROWTIME":1562882078365,"ROWKEY":"42","SHIPMENTID":42,"CHEESE":"provolone","SHIPMENTTIMESTAMP":"june 5th 2019"}
{"ROWTIME":1562882163295,"ROWKEY":"51","SHIPMENTID":51,"CHEESE":"cooper","SHIPMENTTIMESTAMP":"june 11th 2019"}
I often wish I had better client-side validation of my messages before I put them on a topic. insert into
does that for free because we placed a schema over it. 🎉
ksql> insert into cheese_shipments (shipmentId, cheese, shipmentTimestamp) values ('bad shipment id', 'american', 'june 12th 2019');
Expected type INT32 for field SHIPMENTID but got bad shipment id(STRING)
Bonus: all of this works over KSQL's REST API. So if the command line doesn't work for your situation, you can take all of this to the programming language you're using. One less container to run, too, so you can reasonably grab the tools you need with one docker run
command.