Skip to content

Instantly share code, notes, and snippets.

@dasl-
Created March 21, 2017 21:44
Show Gist options
  • Save dasl-/fb2af1f865f8395dab9e2b204ebbd29a to your computer and use it in GitHub Desktop.
Save dasl-/fb2af1f865f8395dab9e2b204ebbd29a to your computer and use it in GitHub Desktop.
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java
index def91ed..9b7040a 100644
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java
@@ -28,6 +28,7 @@
import io.debezium.relational.TableId;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.history.KafkaDatabaseHistory;
+import io.debezium.relational.topic.ByTableTopicMapper;
/**
* The configuration properties.
@@ -614,6 +615,15 @@ public static SecureConnectionMode parse(String value, String defaultValue) {
+ "The default is 'true'. This is independent of how the connector internally records database history.")
.withDefault(true);
+ public static final Field TOPIC_MAPPERS = Field.create("topic.mappers")
+ .withDisplayName("Topic mapper")
+ .withType(Type.LIST)
+ .withWidth(Width.LONG)
+ .withImportance(Importance.LOW)
+ .withDescription("The name of the TopicMapper class that should be used to determine how change events for tables should be mapped into topics. "
+ + "Built in options include '" + ByTableTopicMapper.class.getName()
+ + "' (the default).");
+
public static final Field SNAPSHOT_MODE = Field.create("snapshot.mode")
.withDisplayName("Snapshot mode")
.withEnum(SnapshotMode.class, SnapshotMode.INITIAL)
@@ -701,7 +711,7 @@ public static final Field MASK_COLUMN(int length) {
TABLE_WHITELIST, TABLE_BLACKLIST, TABLES_IGNORE_BUILTIN,
DATABASE_WHITELIST, DATABASE_BLACKLIST,
COLUMN_BLACKLIST, SNAPSHOT_MODE, SNAPSHOT_MINIMAL_LOCKING,
- GTID_SOURCE_INCLUDES, GTID_SOURCE_EXCLUDES,
+ TOPIC_MAPPERS, GTID_SOURCE_INCLUDES, GTID_SOURCE_EXCLUDES,
GTID_SOURCE_FILTER_DML_EVENTS,
TIME_PRECISION_MODE, DECIMAL_HANDLING_MODE,
SSL_MODE, SSL_KEYSTORE, SSL_KEYSTORE_PASSWORD,
@@ -726,7 +736,7 @@ protected static ConfigDef configDef() {
KafkaDatabaseHistory.TOPIC, KafkaDatabaseHistory.RECOVERY_POLL_ATTEMPTS,
KafkaDatabaseHistory.RECOVERY_POLL_INTERVAL_MS, DATABASE_HISTORY);
Field.group(config, "Events", INCLUDE_SCHEMA_CHANGES, TABLES_IGNORE_BUILTIN, DATABASE_WHITELIST, TABLE_WHITELIST,
- COLUMN_BLACKLIST, TABLE_BLACKLIST, DATABASE_BLACKLIST,
+ COLUMN_BLACKLIST, TABLE_BLACKLIST, DATABASE_BLACKLIST, TOPIC_MAPPERS,
GTID_SOURCE_INCLUDES, GTID_SOURCE_EXCLUDES, GTID_SOURCE_FILTER_DML_EVENTS);
Field.group(config, "Connector", CONNECTION_TIMEOUT_MS, KEEP_ALIVE, MAX_QUEUE_SIZE, MAX_BATCH_SIZE, POLL_INTERVAL_MS,
SNAPSHOT_MODE, SNAPSHOT_MINIMAL_LOCKING, TIME_PRECISION_MODE, DECIMAL_HANDLING_MODE);
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java
index 9482e24..2214692 100644
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java
@@ -35,10 +35,7 @@
private volatile ChainedReader readers;
/**
- * Create an instance of the log reader that uses Kafka to store database schema history and the
- * {@link TopicSelector#defaultSelector(String) default topic selector} of "{@code <serverName>.<databaseName>.<tableName>}"
- * for
- * data and "{@code <serverName>}" for metadata.
+ * Create an instance of the log reader that uses Kafka to store database schema history and DDL changes.
*/
public MySqlConnectorTask() {
}
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java
index 990bf1e..2de526f 100644
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java
@@ -12,6 +12,7 @@
import java.util.concurrent.Callable;
import java.util.function.Predicate;
+import io.debezium.relational.topic.TopicMappers;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
@@ -89,6 +90,16 @@ public MySqlSchema(Configuration config, String serverName, Predicate<String> gt
this.ddlChanges = new DdlChanges(this.ddlParser.terminator());
this.ddlParser.addListener(ddlChanges);
+ // Set up the topic mapper ...
+ final String topicMapperAliasesString = config.getString(MySqlConnectorConfig.TOPIC_MAPPERS);
+ final String[] topicMapperAliases;
+ if (topicMapperAliasesString == null || topicMapperAliasesString.trim().isEmpty()) {
+ topicMapperAliases = null;
+ } else {
+ topicMapperAliases = topicMapperAliasesString.trim().split(",");
+ }
+ TopicMappers topicMappers = new TopicMappers(config, topicMapperAliases);
+
// Use MySQL-specific converters and schemas for values ...
String timePrecisionModeStr = config.getString(MySqlConnectorConfig.TIME_PRECISION_MODE);
TemporalPrecisionMode timePrecisionMode = TemporalPrecisionMode.parse(timePrecisionModeStr);
@@ -97,7 +108,7 @@ public MySqlSchema(Configuration config, String serverName, Predicate<String> gt
DecimalHandlingMode decimalHandlingMode = DecimalHandlingMode.parse(decimalHandlingModeStr);
DecimalMode decimalMode = decimalHandlingMode.asDecimalMode();
MySqlValueConverters valueConverters = new MySqlValueConverters(decimalMode, adaptiveTimePrecision);
- this.schemaBuilder = new TableSchemaBuilder(valueConverters, schemaNameValidator::validate);
+ this.schemaBuilder = new TableSchemaBuilder(topicMappers, valueConverters, schemaNameValidator::validate);
// Set up the server name and schema prefix ...
if (serverName != null) serverName = serverName.trim();
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java
index 9ea86f5..33013d5 100644
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java
@@ -29,7 +29,6 @@
private final SourceInfo source;
private final MySqlSchema dbSchema;
- private final TopicSelector topicSelector;
private final RecordMakers recordProcessor;
private final Predicate<String> gtidSourceFilter;
private final Clock clock = Clock.system();
@@ -37,9 +36,6 @@
public MySqlTaskContext(Configuration config) {
super(config);
- // Set up the topic selector ...
- this.topicSelector = TopicSelector.defaultSelector(serverName());
-
// Set up the source information ...
this.source = new SourceInfo();
this.source.setServerName(serverName());
@@ -54,17 +50,13 @@ public MySqlTaskContext(Configuration config) {
this.dbSchema = new MySqlSchema(config, serverName(), this.gtidSourceFilter);
// Set up the record processor ...
- this.recordProcessor = new RecordMakers(dbSchema, source, topicSelector);
+ this.recordProcessor = new RecordMakers(dbSchema, source, serverName());
}
public String connectorName() {
return config.getString("name");
}
- public TopicSelector topicSelector() {
- return topicSelector;
- }
-
public SourceInfo source() {
return source;
}
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java
index 0f5da85..e73eac5 100644
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java
@@ -37,7 +37,7 @@
private final Logger logger = LoggerFactory.getLogger(getClass());
private final MySqlSchema schema;
private final SourceInfo source;
- private final TopicSelector topicSelector;
+ private final String ddlChangeTopicName;
private final Map<Long, Converter> convertersByTableNumber = new HashMap<>();
private final Map<TableId, Long> tableNumbersByTableId = new HashMap<>();
private final Schema schemaChangeKeySchema;
@@ -49,12 +49,12 @@
*
* @param schema the schema information about the MySQL server databases; may not be null
* @param source the connector's source information; may not be null
- * @param topicSelector the selector for topic names; may not be null
+ * @param ddlChangeTopicName the name of the topic to which DDL changes are written; may not be null
*/
- public RecordMakers(MySqlSchema schema, SourceInfo source, TopicSelector topicSelector) {
+ public RecordMakers(MySqlSchema schema, SourceInfo source, String ddlChangeTopicName) {
this.schema = schema;
this.source = source;
- this.topicSelector = topicSelector;
+ this.ddlChangeTopicName = ddlChangeTopicName;
this.schemaChangeKeySchema = SchemaBuilder.struct()
.name(schemaNameValidator.validate("io.debezium.connector.mysql.SchemaChangeKey"))
.field(Fields.DATABASE_NAME, Schema.STRING_SCHEMA)
@@ -117,12 +117,12 @@ public RecordsForTable forTable(long tableNumber, BitSet includedColumns, Blocki
* @return the number of records produced; will be 0 or more
*/
public int schemaChanges(String databaseName, String ddlStatements, BlockingConsumer<SourceRecord> consumer) {
- String topicName = topicSelector.getPrimaryTopic();
Integer partition = 0;
Struct key = schemaChangeRecordKey(databaseName);
Struct value = schemaChangeRecordValue(databaseName, ddlStatements);
SourceRecord record = new SourceRecord(source.partition(), source.offset(),
- topicName, partition, schemaChangeKeySchema, key, schemaChangeValueSchema, value);
+ ddlChangeTopicName, partition,
+ schemaChangeKeySchema, key, schemaChangeValueSchema, value);
try {
consumer.accept(record);
return 1;
@@ -173,7 +173,7 @@ public boolean assign(long tableNumber, TableId id) {
TableSchema tableSchema = schema.schemaFor(id);
if (tableSchema == null) return false;
- String topicName = topicSelector.getTopic(id);
+ String topicName = tableSchema.getEnvelopeSchemaName();
Envelope envelope = Envelope.defineSchema()
.withName(schemaNameValidator.validate(topicName + ".Envelope"))
.withRecord(schema.schemaFor(id).valueSchema())
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/TopicSelector.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/TopicSelector.java
deleted file mode 100644
index 16574d2..0000000
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/TopicSelector.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Copyright Debezium Authors.
- *
- * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
- */
-package io.debezium.connector.mysql;
-
-import io.debezium.annotation.ThreadSafe;
-import io.debezium.relational.TableId;
-
-/**
- * A function that determines the name of topics for data and metadata.
- *
- * @author Randall Hauch
- */
-@ThreadSafe
-public interface TopicSelector {
- /**
- * Get the default topic selector logic, which uses a '.' delimiter character when needed.
- *
- * @param prefix the name of the prefix to be used for all topics; may not be null and must not terminate in the
- * {@code delimiter}
- * @return the topic selector; never null
- */
- static TopicSelector defaultSelector(String prefix) {
- return defaultSelector(prefix,".");
- }
-
- /**
- * Get the default topic selector logic, which uses the supplied delimiter character when needed.
- *
- * @param prefix the name of the prefix to be used for all topics; may not be null and must not terminate in the
- * {@code delimiter}
- * @param delimiter the string delineating the server, database, and table names; may not be null
- * @return the topic selector; never null
- */
- static TopicSelector defaultSelector(String prefix, String delimiter) {
- return new TopicSelector() {
- /**
- * Get the name of the topic for the given server, database, and table names. This method returns
- * "{@code <serverName>}".
- *
- * @return the topic name; never null
- */
- @Override
- public String getPrimaryTopic() {
- return prefix;
- }
-
- /**
- * Get the name of the topic for the given server name. This method returns
- * "{@code <prefix>.<databaseName>.<tableName>}".
- *
- * @param databaseName the name of the database; may not be null
- * @param tableName the name of the table; may not be null
- * @return the topic name; never null
- */
- @Override
- public String getTopic(String databaseName, String tableName) {
- return String.join(delimiter, prefix, databaseName, tableName);
- }
-
- };
- }
-
- /**
- * Get the name of the topic for the given server name.
- *
- * @param tableId the identifier of the table; may not be null
- * @return the topic name; never null
- */
- default String getTopic(TableId tableId) {
- return getTopic(tableId.catalog(),tableId.table());
- }
-
- /**
- * Get the name of the topic for the given server name.
- *
- * @param databaseName the name of the database; may not be null
- * @param tableName the name of the table; may not be null
- * @return the topic name; never null
- */
- String getTopic(String databaseName, String tableName);
-
- /**
- * Get the name of the primary topic.
- *
- * @return the topic name; never null
- */
- String getPrimaryTopic();
-}
diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTaskContextIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTaskContextIT.java
index 1ffd20a..6c5cc72 100644
--- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTaskContextIT.java
+++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTaskContextIT.java
@@ -31,7 +31,6 @@ public void shouldCreateTaskFromConfiguration() throws Exception {
assertThat(context.logger()).isNotNull();
assertThat(context.makeRecord()).isNotNull();
assertThat(context.source()).isNotNull();
- assertThat(context.topicSelector()).isNotNull();
assertThat(context.hostname()).isEqualTo(hostname);
assertThat(context.port()).isEqualTo(port);
diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchema.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchema.java
index a5188b6..07e2db8 100644
--- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchema.java
+++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchema.java
@@ -27,6 +27,7 @@
import io.debezium.relational.TableSchema;
import io.debezium.relational.TableSchemaBuilder;
import io.debezium.relational.Tables;
+import io.debezium.relational.topic.TopicMappers;
import io.debezium.util.AvroValidator;
/**
@@ -63,7 +64,7 @@ protected PostgresSchema(PostgresConnectorConfig config) {
PostgresValueConverter valueConverter = new PostgresValueConverter(config.adaptiveTimePrecision(), ZoneOffset.UTC);
this.schemaNameValidator = AvroValidator.create(LOGGER)::validate;
- this.schemaBuilder = new TableSchemaBuilder(valueConverter, this.schemaNameValidator);
+ this.schemaBuilder = new TableSchemaBuilder(new TopicMappers(null, null), valueConverter, this.schemaNameValidator);
// Set up the server name and schema prefix ...
String serverName = config.serverName();
diff --git a/debezium-core/src/main/java/io/debezium/config/Field.java b/debezium-core/src/main/java/io/debezium/config/Field.java
index 00031d5..ad82065 100644
--- a/debezium-core/src/main/java/io/debezium/config/Field.java
+++ b/debezium-core/src/main/java/io/debezium/config/Field.java
@@ -1020,7 +1020,7 @@ public static int isListOfRegex(Configuration config, Field field, ValidationOut
int errors = 0;
if (value != null) {
try {
- Strings.listOfRegex(value.toString(), Pattern.CASE_INSENSITIVE);
+ Strings.listOfRegex(value, Pattern.CASE_INSENSITIVE);
} catch (PatternSyntaxException e) {
problems.accept(field, value, "A comma-separated list of valid regular expressions is expected, but " + e.getMessage());
++errors;
@@ -1029,6 +1029,20 @@ public static int isListOfRegex(Configuration config, Field field, ValidationOut
return errors;
}
+ public static int isRegex(Configuration config, Field field, ValidationOutput problems) {
+ String value = config.getString(field);
+ int errors = 0;
+ if (value != null) {
+ try {
+ Pattern.compile(value, Pattern.CASE_INSENSITIVE);
+ } catch (PatternSyntaxException e) {
+ problems.accept(field, value, "A valid regular expressions is expected, but " + e.getMessage());
+ ++errors;
+ }
+ }
+ return errors;
+ }
+
public static int isClassName(Configuration config, Field field, ValidationOutput problems) {
String value = config.getString(field);
if (value == null || SourceVersion.isName(value)) return 0;
diff --git a/debezium-core/src/main/java/io/debezium/relational/TableSchema.java b/debezium-core/src/main/java/io/debezium/relational/TableSchema.java
index 778fd0d..092bda9 100644
--- a/debezium-core/src/main/java/io/debezium/relational/TableSchema.java
+++ b/debezium-core/src/main/java/io/debezium/relational/TableSchema.java
@@ -47,6 +47,7 @@
@Immutable
public class TableSchema {
+ private final String envelopeSchemaName;
private final Schema keySchema;
private final Schema valueSchema;
private final Function<Object[], Object> keyGenerator;
@@ -55,7 +56,8 @@
/**
* Create an instance with the specified {@link Schema}s for the keys and values, and the functions that generate the
* key and value for a given row of data.
- *
+ *
+ * @param envelopeSchemaName the name of the schema; may be null
* @param keySchema the schema for the primary key; may be null
* @param keyGenerator the function that converts a row into a single key object for Kafka Connect; may not be null but may
* return nulls
@@ -63,8 +65,9 @@
* @param valueGenerator the function that converts a row into a single value object for Kafka Connect; may not be null but
* may return nulls
*/
- public TableSchema(Schema keySchema, Function<Object[], Object> keyGenerator,
+ public TableSchema(String envelopeSchemaName, Schema keySchema, Function<Object[], Object> keyGenerator,
Schema valueSchema, Function<Object[], Struct> valueGenerator) {
+ this.envelopeSchemaName = envelopeSchemaName;
this.keySchema = keySchema;
this.valueSchema = valueSchema;
this.keyGenerator = keyGenerator != null ? keyGenerator : (row) -> null;
@@ -72,6 +75,14 @@ public TableSchema(Schema keySchema, Function<Object[], Object> keyGenerator,
}
/**
+ * Get the name of the envelope schema for both the key and value.
+ * @return the envelope schema name; may be null
+ */
+ public String getEnvelopeSchemaName() {
+ return envelopeSchemaName;
+ }
+
+ /**
* Get the {@link Schema} that represents the table's columns, excluding those that make up the {@link #keySchema()}.
*
* @return the Schema describing the columns in the table; never null
diff --git a/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java b/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java
index 27345e4..a7172cb 100644
--- a/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java
+++ b/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java
@@ -10,11 +10,13 @@
import java.sql.Types;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Predicate;
+import io.debezium.relational.topic.TopicMappers;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
@@ -29,6 +31,7 @@
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.mapping.ColumnMapper;
import io.debezium.relational.mapping.ColumnMappers;
+import io.debezium.relational.topic.TopicMapper;
/**
* Builder that constructs {@link TableSchema} instances for {@link Table} definitions.
@@ -49,17 +52,21 @@
private static final Logger LOGGER = LoggerFactory.getLogger(TableSchemaBuilder.class);
+ private final TopicMappers topicMappers;
private final Function<String, String> schemaNameValidator;
private final ValueConverterProvider valueConverterProvider;
/**
* Create a new instance of the builder.
- *
+ *
+ * @param topicMappers the TopicMapper for each table; may not be null
* @param valueConverterProvider the provider for obtaining {@link ValueConverter}s and {@link SchemaBuilder}s; may not be
* null
* @param schemaNameValidator the validation function for schema names; may not be null
*/
- public TableSchemaBuilder(ValueConverterProvider valueConverterProvider, Function<String, String> schemaNameValidator) {
+ public TableSchemaBuilder(TopicMappers topicMappers, ValueConverterProvider valueConverterProvider,
+ Function<String, String> schemaNameValidator) {
+ this.topicMappers = topicMappers;
this.schemaNameValidator = schemaNameValidator;
this.valueConverterProvider = valueConverterProvider;
}
@@ -89,7 +96,7 @@ public TableSchema create(ResultSet resultSet, String name) throws SQLException
Function<Object[], Struct> valueGenerator = createValueGenerator(valueSchema, id, columns, null, null);
// Finally create our result object with no primary key or key generator ...
- return new TableSchema(null, null, valueSchema, valueGenerator);
+ return new TableSchema(schemaName, null, null, valueSchema, valueGenerator);
}
/**
@@ -129,11 +136,14 @@ public TableSchema create(String schemaPrefix, Table table, Predicate<ColumnId>
if (schemaPrefix == null) schemaPrefix = "";
// Build the schemas ...
final TableId tableId = table.id();
- final String tableIdStr = tableId.toString();
- final String schemaNamePrefix = schemaPrefix + tableIdStr;
- LOGGER.debug("Mapping table '{}' to schemas under '{}'", tableId, schemaNamePrefix);
- SchemaBuilder valSchemaBuilder = SchemaBuilder.struct().name(schemaNameValidator.apply(schemaNamePrefix + ".Value"));
- SchemaBuilder keySchemaBuilder = SchemaBuilder.struct().name(schemaNameValidator.apply(schemaNamePrefix + ".Key"));
+ final TopicMapper topicMapper = topicMappers.getTopicMapperToUse(schemaPrefix, table);
+ final String topicName = topicMapper.getTopicName(schemaPrefix, table);
+ final String keySchemaName = schemaNameValidator.apply(topicName + ".Key");
+ final String valueSchemaName = schemaNameValidator.apply(topicName + ".Value");
+ final String envelopeSchemaName = schemaNameValidator.apply(topicName); // TODO: better name
+ LOGGER.debug("Mapping table '{}' to key schemas '{}' and value schema '{}'", tableId, keySchemaName, valueSchemaName);
+ SchemaBuilder valSchemaBuilder = SchemaBuilder.struct().name(valueSchemaName);
+ SchemaBuilder keySchemaBuilder = SchemaBuilder.struct().name(keySchemaName);
AtomicBoolean hasPrimaryKey = new AtomicBoolean(false);
table.columns().forEach(column -> {
if (table.isPrimaryKeyColumn(column.name())) {
@@ -147,6 +157,9 @@ public TableSchema create(String schemaPrefix, Table table, Predicate<ColumnId>
addField(valSchemaBuilder, column, mapper);
}
});
+ // Enhance the key schema if necessary ...
+ topicMapper.enhanceKeySchema(keySchemaBuilder);
+ // Create the schemas ...
Schema valSchema = valSchemaBuilder.optional().build();
Schema keySchema = hasPrimaryKey.get() ? keySchemaBuilder.build() : null;
@@ -156,11 +169,11 @@ public TableSchema create(String schemaPrefix, Table table, Predicate<ColumnId>
}
// Create the generators ...
- Function<Object[], Object> keyGenerator = createKeyGenerator(keySchema, tableId, table.primaryKeyColumns());
+ Function<Object[], Object> keyGenerator = createKeyGenerator(keySchema, tableId, table, topicMapper, schemaPrefix);
Function<Object[], Struct> valueGenerator = createValueGenerator(valSchema, tableId, table.columns(), filter, mappers);
// And the table schema ...
- return new TableSchema(keySchema, keyGenerator, valSchema, valueGenerator);
+ return new TableSchema(envelopeSchemaName, keySchema, keyGenerator, valSchema, valueGenerator);
}
/**
@@ -169,15 +182,21 @@ public TableSchema create(String schemaPrefix, Table table, Predicate<ColumnId>
* @param schema the Kafka Connect schema for the key; may be null if there is no known schema, in which case the generator
* will be null
* @param columnSetName the name for the set of columns, used in error messages; may not be null
- * @param columns the column definitions for the table that defines the row; may not be null
+ * @param table the table for the row of data
+ * @param topicMapper the TopicMapper for the table whose key we are generating; may not be null
+ * @param schemaPrefix the prefix added to the table identifier to construct the schema names; may be null if there is no
+ * prefix
* @return the key-generating function, or null if there is no key schema
*/
- protected Function<Object[], Object> createKeyGenerator(Schema schema, TableId columnSetName, List<Column> columns) {
+ protected Function<Object[], Object> createKeyGenerator(Schema schema, TableId columnSetName, Table table,
+ TopicMapper topicMapper, String schemaPrefix) {
if (schema != null) {
+ List<Column> columns = table.primaryKeyColumns();
int[] recordIndexes = indexesForColumns(columns);
Field[] fields = fieldsForColumns(schema, columns);
int numFields = recordIndexes.length;
ValueConverter[] converters = convertersForColumns(schema, columnSetName, columns, null, null);
+ Map<String, Object> nonRowFieldsToAddToKey = topicMapper.getNonRowFieldsToAddToKey(schema, schemaPrefix, table);
return (row) -> {
Struct result = new Struct(schema);
for (int i = 0; i != numFields; ++i) {
@@ -194,6 +213,13 @@ public TableSchema create(String schemaPrefix, Table table, Predicate<ColumnId>
}
}
}
+
+ if (nonRowFieldsToAddToKey != null) {
+ for (Map.Entry<String, Object> nonRowField : nonRowFieldsToAddToKey.entrySet()) {
+ result.put(nonRowField.getKey(), nonRowField.getValue());
+ }
+ }
+
return result;
};
}
diff --git a/debezium-core/src/main/java/io/debezium/relational/topic/ByLogicalTableTopicMapper.java b/debezium-core/src/main/java/io/debezium/relational/topic/ByLogicalTableTopicMapper.java
new file mode 100644
index 0000000..d014a95
--- /dev/null
+++ b/debezium-core/src/main/java/io/debezium/relational/topic/ByLogicalTableTopicMapper.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.debezium.relational.topic;
+
+import io.debezium.config.Field;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+
+import io.debezium.relational.Table;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * A logical table consists of one or more physical tables with the same schema. A common use case is sharding -- the
+ * two physical tables `db_shard1.my_table` and `db_shard2.my_table` together form one logical table.
+ *
+ * This TopicMapper allows us to send change events from both physical tables to one topic.
+ *
+ * @author David Leibovic
+ */
+public class ByLogicalTableTopicMapper extends TopicMapper {
+
+ // "^.*?(?=\\..+\\..+)\\.(?<logicalDb>etsy_.+?(?=(_\\d+\\.)|\\.))(_\\d+)?\\.(?<table>.+)$"
+ private static final Field LOGICAL_TABLE_REGEX = Field.create("logical.table.regex")
+ .withDisplayName("Logical table regex")
+ .withType(ConfigDef.Type.STRING)
+ .withWidth(ConfigDef.Width.LONG)
+ .withImportance(ConfigDef.Importance.LOW)
+ .withValidation(Field::isRegex)
+ .withDescription("The tables for which changes are to be captured");
+
+ // "${logicalDb}.${table}"
+ private static final Field LOGICAL_TABLE_REPLACEMENT = Field.create("logical.table.replacement")
+ .withDisplayName("Logical table replacement")
+ .withType(ConfigDef.Type.STRING)
+ .withWidth(ConfigDef.Width.LONG)
+ .withImportance(ConfigDef.Importance.LOW)
+ .withDescription("The tables for which changes are to be captured");
+
+ // "^.*?(?=\\..+\\..+)\\.(?<logicalDb>etsy_.+?(?=\\.))\\.(?<table>.+)$"
+ private static final Field PHYSICAL_TABLE_REGEX = Field.create("physical.table.regex")
+ .withDisplayName("Physical table regex")
+ .withType(ConfigDef.Type.STRING)
+ .withWidth(ConfigDef.Width.LONG)
+ .withImportance(ConfigDef.Importance.LOW)
+ .withValidation(Field::isRegex)
+ .withDescription("The tables for which changes are to be captured");
+
+ // "${logicalDb}"
+ private static final Field PHYSICAL_TABLE_REPLACEMENT = Field.create("physical.table.replacement")
+ .withDisplayName("Physical table replacement")
+ .withType(ConfigDef.Type.STRING)
+ .withWidth(ConfigDef.Width.LONG)
+ .withImportance(ConfigDef.Importance.LOW)
+ .withDescription("The tables for which changes are to be captured");
+
+ public Field.Set configFields() {
+ return Field.setOf(LOGICAL_TABLE_REGEX, PHYSICAL_TABLE_REGEX);
+ }
+
+ public String getTopicName(String topicPrefix, Table table) {
+ final String fullyQualifiedTableName = composeFullyQualifiedTableName(topicPrefix, table);
+ Pattern logicalTablePattern = Pattern.compile(config.getString(LOGICAL_TABLE_REGEX));
+ Matcher logicalTableMatcher = logicalTablePattern.matcher(fullyQualifiedTableName);
+ if (logicalTableMatcher.matches()) {
+ return logicalTableMatcher.replaceAll(config.getString(LOGICAL_TABLE_REPLACEMENT));
+ }
+ return null;
+ }
+
+ public void enhanceKeySchema(SchemaBuilder keySchemaBuilder) {
+ // Now that multiple physical tables can share a topic, the Key Schema can no longer consist of solely the
+ // record's primary / unique key fields, since they are not guaranteed to be unique across tables. We need some
+ // identifier added to the key that distinguishes the different physical tables.
+ keySchemaBuilder.field("__dbz__physicalTableIdentifier", Schema.STRING_SCHEMA);
+ }
+
+ public Map<String, Object> getNonRowFieldsToAddToKey(Schema schema, String topicPrefix, Table table) {
+ final String fullyQualifiedTableName = composeFullyQualifiedTableName(topicPrefix, table);
+ Pattern physicalTableIdentifierPattern = Pattern.compile(config.getString(PHYSICAL_TABLE_REGEX));
+ Matcher physicalTableIdentifierMatcher = physicalTableIdentifierPattern.matcher(fullyQualifiedTableName);
+
+ final String physicalTableIdentifier;
+ if (physicalTableIdentifierMatcher.matches()) {
+ physicalTableIdentifier = physicalTableIdentifierMatcher.replaceAll(config.getString(PHYSICAL_TABLE_REPLACEMENT));
+ } else {
+ physicalTableIdentifier = fullyQualifiedTableName;
+ }
+
+ Map<String, Object> nonRowFields = new HashMap<>();
+ nonRowFields.put("__dbz__physicalTableIdentifier", physicalTableIdentifier);
+ return nonRowFields;
+ }
+
+ private String composeFullyQualifiedTableName(String topicPrefix, Table table) {
+ return topicPrefix + table.id().toString();
+ }
+
+}
diff --git a/debezium-core/src/main/java/io/debezium/relational/topic/ByTableTopicMapper.java b/debezium-core/src/main/java/io/debezium/relational/topic/ByTableTopicMapper.java
new file mode 100644
index 0000000..c0f7390
--- /dev/null
+++ b/debezium-core/src/main/java/io/debezium/relational/topic/ByTableTopicMapper.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.debezium.relational.topic;
+
+import io.debezium.config.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+
+import io.debezium.relational.Table;
+
+import java.util.Map;
+
+/**
+ * @author David Leibovic
+ */
+public class ByTableTopicMapper extends TopicMapper {
+
+ public Field.Set configFields() {
+ return null;
+ }
+
+ public String getTopicName(String topicPrefix, Table table) {
+ return topicPrefix + table.id().toString();
+ }
+
+ public void enhanceKeySchema(SchemaBuilder keySchemaBuilder) {
+ // do nothing ...
+ }
+
+ public Map<String, Object> getNonRowFieldsToAddToKey(Schema schema, String topicPrefix, Table table) {
+ // do nothing ...
+ return null;
+ }
+
+}
diff --git a/debezium-core/src/main/java/io/debezium/relational/topic/TopicMapper.java b/debezium-core/src/main/java/io/debezium/relational/topic/TopicMapper.java
new file mode 100644
index 0000000..f52a149
--- /dev/null
+++ b/debezium-core/src/main/java/io/debezium/relational/topic/TopicMapper.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.debezium.relational.topic;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+
+import io.debezium.config.Configuration;
+import io.debezium.config.Field;
+import io.debezium.relational.Table;
+
+import java.util.Map;
+
+/**
+ * @author David Leibovic
+ */
+public abstract class TopicMapper {
+
+ protected Configuration config;
+
+ public TopicMapper setConfig(Configuration config) {
+ this.config = config;
+ return this;
+ }
+
+ /**
+ * Get the name of the topic given for the table.
+ *
+ * @param topicPrefix prefix for the topic
+ * @param table the table that we are getting the topic name for
+ * @return the topic name; may be null if this strategy could not be applied
+ */
+ abstract public String getTopicName(String topicPrefix, Table table);
+
+ abstract public Field.Set configFields();
+
+ /**
+ * Depending on your TopicMapper implementation and which rows in a database may occupy the same topic,
+ * it may be necessary to enhance the key schema for the events to ensure each distinct record in the topic
+ * has a unique key.
+ *
+ * @param keySchemaBuilder the {@link SchemaBuilder} for the key, pre-populated with the table's primary/unique key
+ */
+ abstract public void enhanceKeySchema(SchemaBuilder keySchemaBuilder);
+
+ /**
+ * Get the extra key-value pairs necessary to add to the event's key.
+ * @param schema the schema for the key; never null
+ * @param topicPrefix prefix for the topic
+ * @param table the table the event is for
+ * @return the extra key-value pairs, or null if none are necessary.
+ */
+ abstract public Map<String, Object> getNonRowFieldsToAddToKey(Schema schema, String topicPrefix, Table table);
+
+}
diff --git a/debezium-core/src/main/java/io/debezium/relational/topic/TopicMappers.java b/debezium-core/src/main/java/io/debezium/relational/topic/TopicMappers.java
new file mode 100644
index 0000000..5cdfcab
--- /dev/null
+++ b/debezium-core/src/main/java/io/debezium/relational/topic/TopicMappers.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.debezium.relational.topic;
+
+import io.debezium.config.Configuration;
+import io.debezium.config.Field;
+import io.debezium.relational.Table;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author David Leibovic
+ */
+public class TopicMappers {
+
+ private final Logger logger = LoggerFactory.getLogger(getClass());
+ private List<TopicMapper> topicMappers;
+
+ public TopicMappers(Configuration config, String[] topicMapperAliases) {
+ topicMappers = new ArrayList<>();
+ if (topicMapperAliases != null) {
+ for (String alias : topicMapperAliases) {
+ TopicMapper topicMapper = config.getInstance("topic.mappers." + alias + ".type", TopicMapper.class);
+ if (topicMapper != null) {
+ topicMappers.add(topicMapper);
+ validateTopicMapperConfig(topicMapper, alias, config);
+ }
+ }
+ }
+ TopicMapper defaultTopicMapper = new ByTableTopicMapper();
+ topicMappers.add(defaultTopicMapper);
+ validateTopicMapperConfig(defaultTopicMapper, null, config);
+ }
+
+ /**
+ * Get the topic mapper to use for the given table.
+ *
+ * @param topicPrefix prefix for the topic
+ * @param table the table that we are getting the topic name for
+ * @return the TopicMapper; never null
+ */
+ public TopicMapper getTopicMapperToUse(String topicPrefix, Table table) {
+ for (TopicMapper topicMapper : topicMappers) {
+ if (topicMapper.getTopicName(topicPrefix, table) != null) {
+ return topicMapper;
+ }
+ }
+ return topicMappers.get(topicMappers.size() - 1);
+ }
+
+ private void validateTopicMapperConfig(TopicMapper topicMapper, String alias, Configuration config) {
+ if (alias == null) {
+ return;
+ }
+ Configuration topicMapperConfig = config.subset("topic.mappers." + alias + ".", true);
+ topicMapper.setConfig(topicMapperConfig);
+ Field.Set topicMapperConfigFields = topicMapper.configFields();
+ if (topicMapperConfigFields != null && !topicMapperConfig.validateAndRecord(topicMapperConfigFields, logger::error)) {
+ throw new ConnectException("Unable to validate config for topic mapper: " + topicMapper.getClass() +
+ " with alias " + alias + ".");
+ }
+ }
+
+}
diff --git a/debezium-core/src/test/java/io/debezium/relational/TableSchemaBuilderTest.java b/debezium-core/src/test/java/io/debezium/relational/TableSchemaBuilderTest.java
index 57316eb..0363b09 100644
--- a/debezium-core/src/test/java/io/debezium/relational/TableSchemaBuilderTest.java
+++ b/debezium-core/src/test/java/io/debezium/relational/TableSchemaBuilderTest.java
@@ -19,6 +19,7 @@
import static org.fest.assertions.Assertions.assertThat;
import io.debezium.jdbc.JdbcValueConverters;
+import io.debezium.relational.topic.TopicMappers;
import io.debezium.time.Date;
import io.debezium.util.AvroValidator;
@@ -34,9 +35,11 @@
private Column c4;
private TableSchema schema;
private AvroValidator validator;
+ private TopicMappers topicMappers;
@Before
public void beforeEach() {
+ topicMappers = new TopicMappers(null, null);
validator = AvroValidator.create((original,replacement, conflict)->{
fail("Should not have come across an invalid schema name");
});
@@ -78,19 +81,19 @@ public void checkPreconditions() {
@Test(expected = NullPointerException.class)
public void shouldFailToBuildTableSchemaFromNullTable() {
- new TableSchemaBuilder(new JdbcValueConverters(),validator::validate).create(prefix,null);
+ new TableSchemaBuilder(topicMappers, new JdbcValueConverters(),validator::validate).create(prefix,null);
}
@Test
public void shouldBuildTableSchemaFromTable() {
- schema = new TableSchemaBuilder(new JdbcValueConverters(),validator::validate).create(prefix,table);
+ schema = new TableSchemaBuilder(topicMappers, new JdbcValueConverters(),validator::validate).create(prefix,table);
assertThat(schema).isNotNull();
}
@Test
public void shouldBuildTableSchemaFromTableWithoutPrimaryKey() {
table = table.edit().setPrimaryKeyNames().create();
- schema = new TableSchemaBuilder(new JdbcValueConverters(),validator::validate).create(prefix,table);
+ schema = new TableSchemaBuilder(topicMappers, new JdbcValueConverters(),validator::validate).create(prefix,table);
assertThat(schema).isNotNull();
// Check the keys ...
assertThat(schema.keySchema()).isNull();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment