Last active
January 4, 2018 19:31
-
-
Save vikas-tikoo-zefr/1fc04df70b1738159bf9cb50c5a31086 to your computer and use it in GitHub Desktop.
Update the record's specified field with the connect timestamp time to an optional INT64/long type
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.zefr.connect.transforms; | |
import org.apache.kafka.common.config.ConfigDef; | |
import org.apache.kafka.connect.connector.ConnectRecord; | |
import org.apache.kafka.connect.data.Field; | |
import org.apache.kafka.connect.data.Schema; | |
import org.apache.kafka.connect.data.SchemaBuilder; | |
import org.apache.kafka.connect.data.Struct; | |
import org.apache.kafka.connect.transforms.Transformation; | |
import org.apache.kafka.connect.transforms.util.SchemaUtil; | |
import org.apache.kafka.connect.transforms.util.SimpleConfig; | |
import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct; | |
import java.util.Date; | |
import java.util.Map; | |
/** | |
Usage: | |
"transforms":"ConvertKConnectTimestampField", | |
"transforms.ConvertKConnectTimestampField.type":"com.zefr.connect.transforms.ConvertKConnectTimestampField", | |
"transforms.ConvertKConnectTimestampField.timestamp.field.name":"<FIELD_NAME>" | |
*/ | |
public class ConvertKConnectTimestampField<R extends ConnectRecord<R>> implements Transformation<R> { | |
public static final String OVERVIEW_DOC = | |
"Update the record's specified field with the connect timestamp time to an optional INT64/long type." | |
+ "<p/>" | |
+ "This is mainly useful for jdbc source connectors timestamp type."; | |
public static final ConfigDef CONFIG_DEF = new ConfigDef() | |
.define(ConfigName.FIELD_NAME, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, | |
"Field name with the connect timestamp format."); | |
private interface ConfigName { | |
String FIELD_NAME = "timestamp.field.name"; | |
} | |
private static final String PURPOSE = "connect timestamp field type replacement"; | |
private String fieldName; | |
@Override | |
public void configure(Map<String, ?> props) { | |
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props); | |
fieldName = config.getString(ConfigName.FIELD_NAME); | |
} | |
@Override | |
public R apply(R record) { | |
final Struct value = requireStruct(record.value(), PURPOSE); | |
Schema updatedSchema = makeUpdatedSchema(record.valueSchema()); | |
final Struct updatedValue = new Struct(updatedSchema); | |
for (Field field: updatedValue.schema().fields()) { | |
if (field.name().equals(fieldName)) { | |
updatedValue.put(field.name(), ((Date)value.get(field)).getTime()); | |
} else { | |
updatedValue.put(field.name(), value.get(field)); | |
} | |
} | |
return record.newRecord( | |
record.topic(), | |
record.kafkaPartition(), | |
record.keySchema(), | |
record.key(), | |
updatedSchema, | |
updatedValue, | |
record.timestamp() | |
); | |
} | |
private Schema makeUpdatedSchema(Schema schema) { | |
final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct()); | |
for (Field field : schema.fields()) { | |
if (field.name().equals(fieldName)) { | |
builder.field(field.name(), Schema.OPTIONAL_INT64_SCHEMA); | |
} | |
else { | |
builder.field(field.name(), field.schema()); | |
} | |
} | |
return builder.build(); | |
} | |
@Override | |
public void close() { | |
// timestampFormat = null; | |
} | |
@Override | |
public ConfigDef config() { | |
return CONFIG_DEF; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment