@rmoff / 04 Apr 2018
This is easy with KSQL :)
Here’s a dummy topic, in JSON:
$ kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic mysql_users
{"uid":1,"name":"Cliff","locale":"en_US","address_city":"St Louis","elite":"P"}
{"uid":2,"name":"Nick","locale":"en_US","address_city":"Palo Alto","elite":"G"}
In KSQL declare the source stream, specifying the schema (here a subset of the full schema, just for brevity):
ksql> CREATE STREAM source (uid INT, name VARCHAR) WITH (KAFKA_TOPIC='mysql_users', VALUE_FORMAT='JSON');
Message
----------------
Stream created
----------------
Now create a derived stream, specifying the target serialisation (Avro) and the target topic (this is optional; without it will just take the name of the stream):
ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
ksql> CREATE STREAM target_avro WITH (VALUE_FORMAT='AVRO', KAFKA_TOPIC='mysql_users_avro') AS SELECT * FROM source;
Message
----------------------------
Stream created and running
----------------------------
ksql>
Check out the resulting Avro topic:
$ kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--topic mysql_users_avro --from-beginning
{"UID":{"int":1},"NAME":{"string":"Cliff"}}
{"UID":{"int":2},"NAME":{"string":"Nick"}}
Because KSQL is a continuous query, any new records arriving on the source JSON topic will be automagically converted to Avro on the derived topic.
This helped me out of a dead end with a small prototype - well done, thank you!
I wasn't able to apply the same technique "as is" to a JSON topic with a key defined (please note, I'm new to Kafka).
I didn't expect the key field "ITEM_ID" to vanish from the topic, as viewed through the kafka-console-consumer (Confluent Platform OSS 5.0.0). On the other hand, in ksql "ITEM_ID" is a column of the corresponding stream, but constantly null, its value being assigned -as expected- to ROWKEY. Oddly enough, a "SELECT ROWTIME, ROWKEY, ROWKEY AS ITEM_ID, ..." returns what I had expected.
A mistake of mine, or is it "working as designed"?
This is not a dead end, but any help would be greatly appreciated (and thank you again).