Skip to content

Instantly share code, notes, and snippets.

@vikas-tikoo-zefr
Last active January 4, 2018 19:31
Show Gist options
  • Save vikas-tikoo-zefr/1fc04df70b1738159bf9cb50c5a31086 to your computer and use it in GitHub Desktop.
Save vikas-tikoo-zefr/1fc04df70b1738159bf9cb50c5a31086 to your computer and use it in GitHub Desktop.
Update the record's specified field with the connect timestamp time to an optional INT64/long type
package com.zefr.connect.transforms;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.util.SchemaUtil;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
import java.util.Date;
import java.util.Map;
/**
Usage:
"transforms":"ConvertKConnectTimestampField",
"transforms.ConvertKConnectTimestampField.type":"com.zefr.connect.transforms.ConvertKConnectTimestampField",
"transforms.ConvertKConnectTimestampField.timestamp.field.name":"<FIELD_NAME>"
*/
public class ConvertKConnectTimestampField<R extends ConnectRecord<R>> implements Transformation<R> {
public static final String OVERVIEW_DOC =
"Update the record's specified field with the connect timestamp time to an optional INT64/long type."
+ "<p/>"
+ "This is mainly useful for jdbc source connectors timestamp type.";
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(ConfigName.FIELD_NAME, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
"Field name with the connect timestamp format.");
private interface ConfigName {
String FIELD_NAME = "timestamp.field.name";
}
private static final String PURPOSE = "connect timestamp field type replacement";
private String fieldName;
@Override
public void configure(Map<String, ?> props) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
fieldName = config.getString(ConfigName.FIELD_NAME);
}
@Override
public R apply(R record) {
final Struct value = requireStruct(record.value(), PURPOSE);
Schema updatedSchema = makeUpdatedSchema(record.valueSchema());
final Struct updatedValue = new Struct(updatedSchema);
for (Field field: updatedValue.schema().fields()) {
if (field.name().equals(fieldName)) {
updatedValue.put(field.name(), ((Date)value.get(field)).getTime());
} else {
updatedValue.put(field.name(), value.get(field));
}
}
return record.newRecord(
record.topic(),
record.kafkaPartition(),
record.keySchema(),
record.key(),
updatedSchema,
updatedValue,
record.timestamp()
);
}
private Schema makeUpdatedSchema(Schema schema) {
final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct());
for (Field field : schema.fields()) {
if (field.name().equals(fieldName)) {
builder.field(field.name(), Schema.OPTIONAL_INT64_SCHEMA);
}
else {
builder.field(field.name(), field.schema());
}
}
return builder.build();
}
@Override
public void close() {
// timestampFormat = null;
}
@Override
public ConfigDef config() {
return CONFIG_DEF;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment