@rmoff / 04 Apr 2018
This is easy with KSQL :)
Here’s a dummy topic, in JSON:
$ kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic mysql_users
{"uid":1,"name":"Cliff","locale":"en_US","address_city":"St Louis","elite":"P"}
{"uid":2,"name":"Nick","locale":"en_US","address_city":"Palo Alto","elite":"G"}
In KSQL declare the source stream, specifying the schema (here a subset of the full schema, just for brevity):
ksql> CREATE STREAM source (uid INT, name VARCHAR) WITH (KAFKA_TOPIC='mysql_users', VALUE_FORMAT='JSON');
Message
----------------
Stream created
----------------
Now create a derived stream, specifying the target serialisation (Avro) and the target topic (this is optional; without it will just take the name of the stream):
ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
ksql> CREATE STREAM target_avro WITH (VALUE_FORMAT='AVRO', KAFKA_TOPIC='mysql_users_avro') AS SELECT * FROM source;
Message
----------------------------
Stream created and running
----------------------------
ksql>
Check out the resulting Avro topic:
$ kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--topic mysql_users_avro --from-beginning
{"UID":{"int":1},"NAME":{"string":"Cliff"}}
{"UID":{"int":2},"NAME":{"string":"Nick"}}
Because KSQL is a continuous query, any new records arriving on the source JSON topic will be automagically converted to Avro on the derived topic.
Mistake of mine, solved.
Any field declared as "key" in the source topic is shown as ROWKEY in ksql, and the original name is lost. (Why?)
So, writing "create stream ( item_id string, item_name string ... ) ..." I eventually created a new "item_id", obviously null.
Coming from decades on relational systems, I assumed that referencing a non-existent column would be flagged as an error.
It is not, lesson learned ;)