I’ve since written a full blog on this particular Single Message Transform. It can be found at See also https://rmoff.net/2020/12/17/twelve-days-of-smt-day-8-timestampconverter/
-
-
Save rmoff/179ed4067b9f042344cf597286ac1840 to your computer and use it in GitHub Desktop.
"_comment": "Use SMT to cast op_ts and current_ts to timestamp datatype (TimestampConverter is Kafka >=0.11 / Confluent Platform >=3.3). Format from https://docs.oracle.com/javase/8/docs/api/java/text/SimpleDateFormat.html", | |
"transforms": "convert_op_ts,convert_current_ts", | |
"transforms.convert_op_ts.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value", | |
"transforms.convert_op_ts.target.type": "Timestamp", | |
"transforms.convert_op_ts.field": "current_ts", | |
"transforms.convert_op_ts.format": "yyyy-MM-dd HH:mm:ss.SSSSSS", | |
"transforms.convert_current_ts.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value", | |
"transforms.convert_current_ts.target.type": "Timestamp", | |
"transforms.convert_current_ts.field": "op_ts", | |
"transforms.convert_current_ts.format": "yyyy-MM-dd HH:mm:ss.SSSSSS" |
I converted a timestamp with success, but the time zone of date isn't correct. The time zone is always UTC, even my server isn't in this time zone. My server is setup for America/Sao_Paulo and the date created by Kafka Connect Transform is in UTC timezone.
"transforms.ConvertTimeValue.type":"org.apache.kafka.connect.transforms.TimestampConverter$Value", "transforms.ConvertTimeValue.target.type":"string", "transforms.ConvertTimeValue.field":"dataHoraChegadaElasticUTC", "transforms.ConvertTimeValue.format":"yyyy/MM/dd HH:mm:ss",
U can use the "db.timezone" parameter in the connector configuration, just set your timezone and U will have the correct date in your target database.
U can use the "db.timezone" parameter in the connector configuration, just set your timezone and U will have the correct date in your target database.
I'm using a sink elasticsearch connector. I could'n find this parameter "db.timezone" in the docs. Actually, I thing this parameter db.timezone is exclusive for JDBC connectors.
Sorry I didn't notice the question is about elasticsearch sink connector.
It works like charm! Thank you a lot.
The issue related to micro/nano seconds which doesn't exist in SimpleDateFormat class .
The simplest solution could be dividing the value into 1000 or 1000000 before connector, depending either U have microseconds or nanoseconds.