- Reference: http://kafka.apache.org/documentation.html#connect_transforms
- Good presentation on SMT: https://kafka-summit.org/sessions/single-message-transformations-not-transformations-youre-looking/
- KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-66%3A+Single+Message+Transforms+for+Kafka+Connect
-
-
Save rmoff/09cd2ac2f7c226ce493e04a3095e05ac to your computer and use it in GitHub Desktop.
- Inbound topic:
soe-ORDERS
- Resulting topic:
ORDERS
Config:
"transforms":"dropPrefix",
"transforms.dropPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropPrefix.regex":"soe-(.*)",
"transforms.dropPrefix.replacement":"$1"
How does the regex work? See http://rubular.com/r/3mrKOMdow6
@toritsejuFO hello friend,have u solved this problem? how to RegexRouter a topic to an existed index
I am also running into the same problem, I tried with below config, when fired a curl command I noticed the , but the index name has not been changed and getting an exception
"transforms.dropPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropPrefix.regex":"kafka_(.*)",
"transforms.dropPrefix.replacement":"$1",
"transforms.routeTS.type":"org.apache.kafka.connect.transforms.TimestampRouter",
"transforms.routeTS.topic.format":"${timestamp}",
"transforms.routeTS.timestamp.format":"YYYYMM"
getting the below exception
{"root_cause":[{"type":"invalid_index_name_exception","reason":"Invalid index name [kafka_v1_TableName], must be lowercase"
Please, how do I use the suggested RegexRouter to map a topic name in Confluent Kafka to a different index name in Elasticsearch during sinking? According to the docs, the topic.index.map is deprecated, and I don't seem to understand how RegexRouter works.
What I'm trying to do is have Kafka sink messages to Elasticsearch from an existing topic to an index in Elasticsearch once I load the ES sink connector, but with a different name than the topic name. This index doesn't exist and hence will be created.