@rmoff / 04 Apr 2018
This is easy with KSQL :)
Here’s a dummy topic, in JSON:
$ kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic mysql_users
{"uid":1,"name":"Cliff","locale":"en_US","address_city":"St Louis","elite":"P"}
{"uid":2,"name":"Nick","locale":"en_US","address_city":"Palo Alto","elite":"G"}
In KSQL declare the source stream, specifying the schema (here a subset of the full schema, just for brevity):
ksql> CREATE STREAM source (uid INT, name VARCHAR) WITH (KAFKA_TOPIC='mysql_users', VALUE_FORMAT='JSON');
Message
----------------
Stream created
----------------
Now create a derived stream, specifying the target serialisation (Avro) and the target topic (this is optional; without it will just take the name of the stream):
ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
ksql> CREATE STREAM target_avro WITH (VALUE_FORMAT='AVRO', KAFKA_TOPIC='mysql_users_avro') AS SELECT * FROM source;
Message
----------------------------
Stream created and running
----------------------------
ksql>
Check out the resulting Avro topic:
$ kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--topic mysql_users_avro --from-beginning
{"UID":{"int":1},"NAME":{"string":"Cliff"}}
{"UID":{"int":2},"NAME":{"string":"Nick"}}
Because KSQL is a continuous query, any new records arriving on the source JSON topic will be automagically converted to Avro on the derived topic.
Very nice Robin. And since Avro is being sent to topic
target_avro
, it will automatically register with the Schema Registry, correct? I see you are specifying the schema registry in your consumer, so I assume so.