Created
December 13, 2017 18:46
-
-
Save vikas-tikoo-zefr/152100accce437adb258e5c6af031866 to your computer and use it in GitHub Desktop.
Add namespace to connect schema
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.kafka.common.config.ConfigDef; | |
import org.apache.kafka.connect.connector.ConnectRecord; | |
import org.apache.kafka.connect.data.Field; | |
import org.apache.kafka.connect.data.Schema; | |
import org.apache.kafka.connect.data.SchemaBuilder; | |
import org.apache.kafka.connect.data.Struct; | |
import org.apache.kafka.connect.transforms.Transformation; | |
import org.apache.kafka.connect.transforms.util.SchemaUtil; | |
import org.apache.kafka.connect.transforms.util.SimpleConfig; | |
import java.util.Date; | |
import java.util.Map; | |
import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct; | |
public class Namespacefy<R extends ConnectRecord<R>> implements Transformation<R> { | |
public static final String OVERVIEW_DOC = | |
"Lift record's name into the specified namespace." | |
+ "<p/>" | |
+ "This is mainly useful for jdbc source connectors, where namespace is null by default." | |
+ "It allows easier integration with commons-avro definitions of these records."; | |
public static final ConfigDef CONFIG_DEF = new ConfigDef() | |
.define(ConfigName.NAMESPACE, | |
ConfigDef.Type.STRING, | |
"com.zefr.avro.message.connect", | |
ConfigDef.Importance.HIGH, | |
"Fully qualified namespace for the record."); | |
private interface ConfigName { | |
String NAMESPACE = "record.namespace"; | |
} | |
private static final String PURPOSE = "add namespace to record"; | |
private String recordNamespace; | |
@Override | |
public void configure(Map<String, ?> props) { | |
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props); | |
recordNamespace = config.getString(ConfigName.NAMESPACE); | |
} | |
@Override | |
public R apply(R record) { | |
final Struct value = requireStruct(record.value(), PURPOSE); | |
Schema updatedSchema = makeUpdatedSchema(record.valueSchema(), recordNamespace); | |
final Struct updatedValue = new Struct(updatedSchema); | |
for (Field field: updatedValue.schema().fields()) { | |
updatedValue.put(field.name(), value.get(field)); | |
} | |
return record.newRecord( | |
record.topic(), | |
record.kafkaPartition(), | |
record.keySchema(), | |
record.key(), | |
updatedSchema, | |
updatedValue, | |
record.timestamp() | |
); | |
} | |
private Schema makeUpdatedSchema(Schema schema, String namespace) { | |
final SchemaBuilder builder = SchemaBuilder.struct(); | |
builder.name(namespace+"."+schema.name()); | |
builder.version(schema.version()); | |
builder.doc(schema.doc()); | |
Map<String, String> params = schema.parameters(); | |
if (params != null) { | |
builder.parameters(params); | |
} | |
for (Field field : schema.fields()) { | |
builder.field(field.name(), field.schema()); | |
} | |
return builder.build(); | |
} | |
@Override | |
public void close() {} | |
@Override | |
public ConfigDef config() { | |
return CONFIG_DEF; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment