I’ve since written a full blog on this particular Single Message Transform. It can be found at See also https://rmoff.net/2020/12/17/twelve-days-of-smt-day-8-timestampconverter/
-
-
Save rmoff/179ed4067b9f042344cf597286ac1840 to your computer and use it in GitHub Desktop.
"_comment": "Use SMT to cast op_ts and current_ts to timestamp datatype (TimestampConverter is Kafka >=0.11 / Confluent Platform >=3.3). Format from https://docs.oracle.com/javase/8/docs/api/java/text/SimpleDateFormat.html", | |
"transforms": "convert_op_ts,convert_current_ts", | |
"transforms.convert_op_ts.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value", | |
"transforms.convert_op_ts.target.type": "Timestamp", | |
"transforms.convert_op_ts.field": "current_ts", | |
"transforms.convert_op_ts.format": "yyyy-MM-dd HH:mm:ss.SSSSSS", | |
"transforms.convert_current_ts.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value", | |
"transforms.convert_current_ts.target.type": "Timestamp", | |
"transforms.convert_current_ts.field": "op_ts", | |
"transforms.convert_current_ts.format": "yyyy-MM-dd HH:mm:ss.SSSSSS" |
Very helpful indeed!!
I am struggling with one thing concerning my elasticsearch connector. The "transforms.convert_op_ts.target.type" actually work with "string" and "unix" but not with "Date" and "Timestamp" types. Here is my sink config:
'{
"name": "elasticsearch-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"schema.ignore": "true",
"connection.url": "http://localhost:9200",
"type.name": "kafka-connect",
"topics": "test-elasticsearch-sink",
"key.ignore": "true",
"transforms": "InsertMessageTime,ConvertTimeValue",
"transforms.InsertMessageTime.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertMessageTime.timestamp.field":"creationDate",
"transforms.ConvertTimeValue.type":"org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.ConvertTimeValue.target.type":"Date",
"transforms.ConvertTimeValue.field":"creationDate",
"transforms.ConvertTimeValue.format":"yyyy-MM-dd HH:mm:ss.SSSSSS"
}
}'
I am looking to integrate it with Grafana and as you know the date field should be of "Date" type. I am getting the following error:
Caused by: org.apache.kafka.connect.errors.DataException: Java class class java.util.Date does not have corresponding schema type.\n\tat org.apache.kafka.connect.json.JsonConverter.convertToJson(JsonConverter.java:604)
at org.apache.kafka.connect.json.JsonConverter.convertToJson(JsonConverter.java:668)
at org.apache.kafka.connect.json.JsonConverter.convertToJsonWithoutEnvelope(JsonConverter.java:574)
at org.apache.kafka.connect.json.JsonConverter.fromConnectData(JsonConverter.java:324)
at io.confluent.connect.elasticsearch.DataConverter.getPayload(DataConverter.java:181)
at io.confluent.connect.elasticsearch.DataConverter.convertRecord(DataConverter.java:163)
at io.confluent.connect.elasticsearch.ElasticsearchWriter.tryWriteRecord(ElasticsearchWriter.java:283)
at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:268)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:162)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)\n\t... 10 more\n"}]
Any idea on how to addrress this issue?
Thanks for your help
Working great with JdbcSinkConnector.
Thank you.
Hello, Im generating data using avro schema, and this is the field i'm using to generate timestamps :
{
"name": "Timestamp",
"type": "long",
"logicalType": "timestamp-millis"
}
but in Kafka i get values as this/Unix epochs : 6028130266367963000 and I tried to use TimestampConverter to convert it to a readable format using this configuration :
"transforms": "TimestampConverter",
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss.SSSSSS"
"transforms.TimestampConverter.target.type": "string"
"transforms.TimestampConverter.field ": "Timestamp"
but then I get this as a result : 3414461-02-18 00:36:47.000234 . Can Anyone please advise on what to do ?
I can only achieve 3 digits of subsecond precision using this method. "yyyy-MM-dd HH:mm:ss.SSSSSS" is truncated to "yyyy-MM-dd HH:mm:ss.SSS"
Hello, Im generating data using avro schema, and this is the field i'm using to generate timestamps :
{ "name": "Timestamp", "type": "long", "logicalType": "timestamp-millis" }
but in Kafka i get values as this/Unix epochs : 6028130266367963000 and I tried to use TimestampConverter to convert it to a readable format using this configuration :
"transforms": "TimestampConverter", "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value", "transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss.SSSSSS" "transforms.TimestampConverter.target.type": "string" "transforms.TimestampConverter.field ": "Timestamp"but then I get this as a result : 3414461-02-18 00:36:47.000234 . Can Anyone please advise on what to do ?
Were you able to correct this? I am hitting a wall this time.
The issue related to micro/nano seconds which doesn't exist in SimpleDateFormat class .
The simplest solution could be dividing the value into 1000 or 1000000 before connector, depending either U have microseconds or nanoseconds.
I converted a timestamp with success, but the time zone of date isn't correct. The time zone is always UTC, even my server isn't in this time zone. My server is setup for America/Sao_Paulo and the date created by Kafka Connect Transform is in UTC timezone.
"transforms.ConvertTimeValue.type":"org.apache.kafka.connect.transforms.TimestampConverter$Value", "transforms.ConvertTimeValue.target.type":"string", "transforms.ConvertTimeValue.field":"dataHoraChegadaElasticUTC", "transforms.ConvertTimeValue.format":"yyyy/MM/dd HH:mm:ss",
U can use the "db.timezone" parameter in the connector configuration, just set your timezone and U will have the correct date in your target database.
U can use the "db.timezone" parameter in the connector configuration, just set your timezone and U will have the correct date in your target database.
I'm using a sink elasticsearch connector. I could'n find this parameter "db.timezone" in the docs. Actually, I thing this parameter db.timezone is exclusive for JDBC connectors.
Sorry I didn't notice the question is about elasticsearch sink connector.
It works like charm! Thank you a lot.
Does this only work when we have "value.converter": "io.confluent.connect.avro.AvroConverter" and "value.converter.schemas.enable": "false",