Created
March 21, 2017 21:44
-
-
Save dasl-/fb2af1f865f8395dab9e2b204ebbd29a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java | |
index def91ed..9b7040a 100644 | |
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java | |
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java | |
@@ -28,6 +28,7 @@ | |
import io.debezium.relational.TableId; | |
import io.debezium.relational.history.DatabaseHistory; | |
import io.debezium.relational.history.KafkaDatabaseHistory; | |
+import io.debezium.relational.topic.ByTableTopicMapper; | |
/** | |
* The configuration properties. | |
@@ -614,6 +615,15 @@ public static SecureConnectionMode parse(String value, String defaultValue) { | |
+ "The default is 'true'. This is independent of how the connector internally records database history.") | |
.withDefault(true); | |
+ public static final Field TOPIC_MAPPERS = Field.create("topic.mappers") | |
+ .withDisplayName("Topic mapper") | |
+ .withType(Type.LIST) | |
+ .withWidth(Width.LONG) | |
+ .withImportance(Importance.LOW) | |
+ .withDescription("The name of the TopicMapper class that should be used to determine how change events for tables should be mapped into topics. " | |
+ + "Built in options include '" + ByTableTopicMapper.class.getName() | |
+ + "' (the default)."); | |
+ | |
public static final Field SNAPSHOT_MODE = Field.create("snapshot.mode") | |
.withDisplayName("Snapshot mode") | |
.withEnum(SnapshotMode.class, SnapshotMode.INITIAL) | |
@@ -701,7 +711,7 @@ public static final Field MASK_COLUMN(int length) { | |
TABLE_WHITELIST, TABLE_BLACKLIST, TABLES_IGNORE_BUILTIN, | |
DATABASE_WHITELIST, DATABASE_BLACKLIST, | |
COLUMN_BLACKLIST, SNAPSHOT_MODE, SNAPSHOT_MINIMAL_LOCKING, | |
- GTID_SOURCE_INCLUDES, GTID_SOURCE_EXCLUDES, | |
+ TOPIC_MAPPERS, GTID_SOURCE_INCLUDES, GTID_SOURCE_EXCLUDES, | |
GTID_SOURCE_FILTER_DML_EVENTS, | |
TIME_PRECISION_MODE, DECIMAL_HANDLING_MODE, | |
SSL_MODE, SSL_KEYSTORE, SSL_KEYSTORE_PASSWORD, | |
@@ -726,7 +736,7 @@ protected static ConfigDef configDef() { | |
KafkaDatabaseHistory.TOPIC, KafkaDatabaseHistory.RECOVERY_POLL_ATTEMPTS, | |
KafkaDatabaseHistory.RECOVERY_POLL_INTERVAL_MS, DATABASE_HISTORY); | |
Field.group(config, "Events", INCLUDE_SCHEMA_CHANGES, TABLES_IGNORE_BUILTIN, DATABASE_WHITELIST, TABLE_WHITELIST, | |
- COLUMN_BLACKLIST, TABLE_BLACKLIST, DATABASE_BLACKLIST, | |
+ COLUMN_BLACKLIST, TABLE_BLACKLIST, DATABASE_BLACKLIST, TOPIC_MAPPERS, | |
GTID_SOURCE_INCLUDES, GTID_SOURCE_EXCLUDES, GTID_SOURCE_FILTER_DML_EVENTS); | |
Field.group(config, "Connector", CONNECTION_TIMEOUT_MS, KEEP_ALIVE, MAX_QUEUE_SIZE, MAX_BATCH_SIZE, POLL_INTERVAL_MS, | |
SNAPSHOT_MODE, SNAPSHOT_MINIMAL_LOCKING, TIME_PRECISION_MODE, DECIMAL_HANDLING_MODE); | |
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java | |
index 9482e24..2214692 100644 | |
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java | |
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java | |
@@ -35,10 +35,7 @@ | |
private volatile ChainedReader readers; | |
/** | |
- * Create an instance of the log reader that uses Kafka to store database schema history and the | |
- * {@link TopicSelector#defaultSelector(String) default topic selector} of "{@code <serverName>.<databaseName>.<tableName>}" | |
- * for | |
- * data and "{@code <serverName>}" for metadata. | |
+ * Create an instance of the log reader that uses Kafka to store database schema history and DDL changes. | |
*/ | |
public MySqlConnectorTask() { | |
} | |
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java | |
index 990bf1e..2de526f 100644 | |
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java | |
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java | |
@@ -12,6 +12,7 @@ | |
import java.util.concurrent.Callable; | |
import java.util.function.Predicate; | |
+import io.debezium.relational.topic.TopicMappers; | |
import org.apache.kafka.connect.data.Schema; | |
import org.apache.kafka.connect.errors.ConnectException; | |
import org.slf4j.Logger; | |
@@ -89,6 +90,16 @@ public MySqlSchema(Configuration config, String serverName, Predicate<String> gt | |
this.ddlChanges = new DdlChanges(this.ddlParser.terminator()); | |
this.ddlParser.addListener(ddlChanges); | |
+ // Set up the topic mapper ... | |
+ final String topicMapperAliasesString = config.getString(MySqlConnectorConfig.TOPIC_MAPPERS); | |
+ final String[] topicMapperAliases; | |
+ if (topicMapperAliasesString == null || topicMapperAliasesString.trim().isEmpty()) { | |
+ topicMapperAliases = null; | |
+ } else { | |
+ topicMapperAliases = topicMapperAliasesString.trim().split(","); | |
+ } | |
+ TopicMappers topicMappers = new TopicMappers(config, topicMapperAliases); | |
+ | |
// Use MySQL-specific converters and schemas for values ... | |
String timePrecisionModeStr = config.getString(MySqlConnectorConfig.TIME_PRECISION_MODE); | |
TemporalPrecisionMode timePrecisionMode = TemporalPrecisionMode.parse(timePrecisionModeStr); | |
@@ -97,7 +108,7 @@ public MySqlSchema(Configuration config, String serverName, Predicate<String> gt | |
DecimalHandlingMode decimalHandlingMode = DecimalHandlingMode.parse(decimalHandlingModeStr); | |
DecimalMode decimalMode = decimalHandlingMode.asDecimalMode(); | |
MySqlValueConverters valueConverters = new MySqlValueConverters(decimalMode, adaptiveTimePrecision); | |
- this.schemaBuilder = new TableSchemaBuilder(valueConverters, schemaNameValidator::validate); | |
+ this.schemaBuilder = new TableSchemaBuilder(topicMappers, valueConverters, schemaNameValidator::validate); | |
// Set up the server name and schema prefix ... | |
if (serverName != null) serverName = serverName.trim(); | |
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java | |
index 9ea86f5..33013d5 100644 | |
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java | |
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java | |
@@ -29,7 +29,6 @@ | |
private final SourceInfo source; | |
private final MySqlSchema dbSchema; | |
- private final TopicSelector topicSelector; | |
private final RecordMakers recordProcessor; | |
private final Predicate<String> gtidSourceFilter; | |
private final Clock clock = Clock.system(); | |
@@ -37,9 +36,6 @@ | |
public MySqlTaskContext(Configuration config) { | |
super(config); | |
- // Set up the topic selector ... | |
- this.topicSelector = TopicSelector.defaultSelector(serverName()); | |
- | |
// Set up the source information ... | |
this.source = new SourceInfo(); | |
this.source.setServerName(serverName()); | |
@@ -54,17 +50,13 @@ public MySqlTaskContext(Configuration config) { | |
this.dbSchema = new MySqlSchema(config, serverName(), this.gtidSourceFilter); | |
// Set up the record processor ... | |
- this.recordProcessor = new RecordMakers(dbSchema, source, topicSelector); | |
+ this.recordProcessor = new RecordMakers(dbSchema, source, serverName()); | |
} | |
public String connectorName() { | |
return config.getString("name"); | |
} | |
- public TopicSelector topicSelector() { | |
- return topicSelector; | |
- } | |
- | |
public SourceInfo source() { | |
return source; | |
} | |
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java | |
index 0f5da85..e73eac5 100644 | |
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java | |
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java | |
@@ -37,7 +37,7 @@ | |
private final Logger logger = LoggerFactory.getLogger(getClass()); | |
private final MySqlSchema schema; | |
private final SourceInfo source; | |
- private final TopicSelector topicSelector; | |
+ private final String ddlChangeTopicName; | |
private final Map<Long, Converter> convertersByTableNumber = new HashMap<>(); | |
private final Map<TableId, Long> tableNumbersByTableId = new HashMap<>(); | |
private final Schema schemaChangeKeySchema; | |
@@ -49,12 +49,12 @@ | |
* | |
* @param schema the schema information about the MySQL server databases; may not be null | |
* @param source the connector's source information; may not be null | |
- * @param topicSelector the selector for topic names; may not be null | |
+ * @param ddlChangeTopicName the name of the topic to which DDL changes are written; may not be null | |
*/ | |
- public RecordMakers(MySqlSchema schema, SourceInfo source, TopicSelector topicSelector) { | |
+ public RecordMakers(MySqlSchema schema, SourceInfo source, String ddlChangeTopicName) { | |
this.schema = schema; | |
this.source = source; | |
- this.topicSelector = topicSelector; | |
+ this.ddlChangeTopicName = ddlChangeTopicName; | |
this.schemaChangeKeySchema = SchemaBuilder.struct() | |
.name(schemaNameValidator.validate("io.debezium.connector.mysql.SchemaChangeKey")) | |
.field(Fields.DATABASE_NAME, Schema.STRING_SCHEMA) | |
@@ -117,12 +117,12 @@ public RecordsForTable forTable(long tableNumber, BitSet includedColumns, Blocki | |
* @return the number of records produced; will be 0 or more | |
*/ | |
public int schemaChanges(String databaseName, String ddlStatements, BlockingConsumer<SourceRecord> consumer) { | |
- String topicName = topicSelector.getPrimaryTopic(); | |
Integer partition = 0; | |
Struct key = schemaChangeRecordKey(databaseName); | |
Struct value = schemaChangeRecordValue(databaseName, ddlStatements); | |
SourceRecord record = new SourceRecord(source.partition(), source.offset(), | |
- topicName, partition, schemaChangeKeySchema, key, schemaChangeValueSchema, value); | |
+ ddlChangeTopicName, partition, | |
+ schemaChangeKeySchema, key, schemaChangeValueSchema, value); | |
try { | |
consumer.accept(record); | |
return 1; | |
@@ -173,7 +173,7 @@ public boolean assign(long tableNumber, TableId id) { | |
TableSchema tableSchema = schema.schemaFor(id); | |
if (tableSchema == null) return false; | |
- String topicName = topicSelector.getTopic(id); | |
+ String topicName = tableSchema.getEnvelopeSchemaName(); | |
Envelope envelope = Envelope.defineSchema() | |
.withName(schemaNameValidator.validate(topicName + ".Envelope")) | |
.withRecord(schema.schemaFor(id).valueSchema()) | |
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/TopicSelector.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/TopicSelector.java | |
deleted file mode 100644 | |
index 16574d2..0000000 | |
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/TopicSelector.java | |
+++ /dev/null | |
@@ -1,91 +0,0 @@ | |
-/* | |
- * Copyright Debezium Authors. | |
- * | |
- * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 | |
- */ | |
-package io.debezium.connector.mysql; | |
- | |
-import io.debezium.annotation.ThreadSafe; | |
-import io.debezium.relational.TableId; | |
- | |
-/** | |
- * A function that determines the name of topics for data and metadata. | |
- * | |
- * @author Randall Hauch | |
- */ | |
-@ThreadSafe | |
-public interface TopicSelector { | |
- /** | |
- * Get the default topic selector logic, which uses a '.' delimiter character when needed. | |
- * | |
- * @param prefix the name of the prefix to be used for all topics; may not be null and must not terminate in the | |
- * {@code delimiter} | |
- * @return the topic selector; never null | |
- */ | |
- static TopicSelector defaultSelector(String prefix) { | |
- return defaultSelector(prefix,"."); | |
- } | |
- | |
- /** | |
- * Get the default topic selector logic, which uses the supplied delimiter character when needed. | |
- * | |
- * @param prefix the name of the prefix to be used for all topics; may not be null and must not terminate in the | |
- * {@code delimiter} | |
- * @param delimiter the string delineating the server, database, and table names; may not be null | |
- * @return the topic selector; never null | |
- */ | |
- static TopicSelector defaultSelector(String prefix, String delimiter) { | |
- return new TopicSelector() { | |
- /** | |
- * Get the name of the topic for the given server, database, and table names. This method returns | |
- * "{@code <serverName>}". | |
- * | |
- * @return the topic name; never null | |
- */ | |
- @Override | |
- public String getPrimaryTopic() { | |
- return prefix; | |
- } | |
- | |
- /** | |
- * Get the name of the topic for the given server name. This method returns | |
- * "{@code <prefix>.<databaseName>.<tableName>}". | |
- * | |
- * @param databaseName the name of the database; may not be null | |
- * @param tableName the name of the table; may not be null | |
- * @return the topic name; never null | |
- */ | |
- @Override | |
- public String getTopic(String databaseName, String tableName) { | |
- return String.join(delimiter, prefix, databaseName, tableName); | |
- } | |
- | |
- }; | |
- } | |
- | |
- /** | |
- * Get the name of the topic for the given server name. | |
- * | |
- * @param tableId the identifier of the table; may not be null | |
- * @return the topic name; never null | |
- */ | |
- default String getTopic(TableId tableId) { | |
- return getTopic(tableId.catalog(),tableId.table()); | |
- } | |
- | |
- /** | |
- * Get the name of the topic for the given server name. | |
- * | |
- * @param databaseName the name of the database; may not be null | |
- * @param tableName the name of the table; may not be null | |
- * @return the topic name; never null | |
- */ | |
- String getTopic(String databaseName, String tableName); | |
- | |
- /** | |
- * Get the name of the primary topic. | |
- * | |
- * @return the topic name; never null | |
- */ | |
- String getPrimaryTopic(); | |
-} | |
diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTaskContextIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTaskContextIT.java | |
index 1ffd20a..6c5cc72 100644 | |
--- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTaskContextIT.java | |
+++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTaskContextIT.java | |
@@ -31,7 +31,6 @@ public void shouldCreateTaskFromConfiguration() throws Exception { | |
assertThat(context.logger()).isNotNull(); | |
assertThat(context.makeRecord()).isNotNull(); | |
assertThat(context.source()).isNotNull(); | |
- assertThat(context.topicSelector()).isNotNull(); | |
assertThat(context.hostname()).isEqualTo(hostname); | |
assertThat(context.port()).isEqualTo(port); | |
diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchema.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchema.java | |
index a5188b6..07e2db8 100644 | |
--- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchema.java | |
+++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchema.java | |
@@ -27,6 +27,7 @@ | |
import io.debezium.relational.TableSchema; | |
import io.debezium.relational.TableSchemaBuilder; | |
import io.debezium.relational.Tables; | |
+import io.debezium.relational.topic.TopicMappers; | |
import io.debezium.util.AvroValidator; | |
/** | |
@@ -63,7 +64,7 @@ protected PostgresSchema(PostgresConnectorConfig config) { | |
PostgresValueConverter valueConverter = new PostgresValueConverter(config.adaptiveTimePrecision(), ZoneOffset.UTC); | |
this.schemaNameValidator = AvroValidator.create(LOGGER)::validate; | |
- this.schemaBuilder = new TableSchemaBuilder(valueConverter, this.schemaNameValidator); | |
+ this.schemaBuilder = new TableSchemaBuilder(new TopicMappers(null, null), valueConverter, this.schemaNameValidator); | |
// Set up the server name and schema prefix ... | |
String serverName = config.serverName(); | |
diff --git a/debezium-core/src/main/java/io/debezium/config/Field.java b/debezium-core/src/main/java/io/debezium/config/Field.java | |
index 00031d5..ad82065 100644 | |
--- a/debezium-core/src/main/java/io/debezium/config/Field.java | |
+++ b/debezium-core/src/main/java/io/debezium/config/Field.java | |
@@ -1020,7 +1020,7 @@ public static int isListOfRegex(Configuration config, Field field, ValidationOut | |
int errors = 0; | |
if (value != null) { | |
try { | |
- Strings.listOfRegex(value.toString(), Pattern.CASE_INSENSITIVE); | |
+ Strings.listOfRegex(value, Pattern.CASE_INSENSITIVE); | |
} catch (PatternSyntaxException e) { | |
problems.accept(field, value, "A comma-separated list of valid regular expressions is expected, but " + e.getMessage()); | |
++errors; | |
@@ -1029,6 +1029,20 @@ public static int isListOfRegex(Configuration config, Field field, ValidationOut | |
return errors; | |
} | |
+ public static int isRegex(Configuration config, Field field, ValidationOutput problems) { | |
+ String value = config.getString(field); | |
+ int errors = 0; | |
+ if (value != null) { | |
+ try { | |
+ Pattern.compile(value, Pattern.CASE_INSENSITIVE); | |
+ } catch (PatternSyntaxException e) { | |
+ problems.accept(field, value, "A valid regular expressions is expected, but " + e.getMessage()); | |
+ ++errors; | |
+ } | |
+ } | |
+ return errors; | |
+ } | |
+ | |
public static int isClassName(Configuration config, Field field, ValidationOutput problems) { | |
String value = config.getString(field); | |
if (value == null || SourceVersion.isName(value)) return 0; | |
diff --git a/debezium-core/src/main/java/io/debezium/relational/TableSchema.java b/debezium-core/src/main/java/io/debezium/relational/TableSchema.java | |
index 778fd0d..092bda9 100644 | |
--- a/debezium-core/src/main/java/io/debezium/relational/TableSchema.java | |
+++ b/debezium-core/src/main/java/io/debezium/relational/TableSchema.java | |
@@ -47,6 +47,7 @@ | |
@Immutable | |
public class TableSchema { | |
+ private final String envelopeSchemaName; | |
private final Schema keySchema; | |
private final Schema valueSchema; | |
private final Function<Object[], Object> keyGenerator; | |
@@ -55,7 +56,8 @@ | |
/** | |
* Create an instance with the specified {@link Schema}s for the keys and values, and the functions that generate the | |
* key and value for a given row of data. | |
- * | |
+ * | |
+ * @param envelopeSchemaName the name of the schema; may be null | |
* @param keySchema the schema for the primary key; may be null | |
* @param keyGenerator the function that converts a row into a single key object for Kafka Connect; may not be null but may | |
* return nulls | |
@@ -63,8 +65,9 @@ | |
* @param valueGenerator the function that converts a row into a single value object for Kafka Connect; may not be null but | |
* may return nulls | |
*/ | |
- public TableSchema(Schema keySchema, Function<Object[], Object> keyGenerator, | |
+ public TableSchema(String envelopeSchemaName, Schema keySchema, Function<Object[], Object> keyGenerator, | |
Schema valueSchema, Function<Object[], Struct> valueGenerator) { | |
+ this.envelopeSchemaName = envelopeSchemaName; | |
this.keySchema = keySchema; | |
this.valueSchema = valueSchema; | |
this.keyGenerator = keyGenerator != null ? keyGenerator : (row) -> null; | |
@@ -72,6 +75,14 @@ public TableSchema(Schema keySchema, Function<Object[], Object> keyGenerator, | |
} | |
/** | |
+ * Get the name of the envelope schema for both the key and value. | |
+ * @return the envelope schema name; may be null | |
+ */ | |
+ public String getEnvelopeSchemaName() { | |
+ return envelopeSchemaName; | |
+ } | |
+ | |
+ /** | |
* Get the {@link Schema} that represents the table's columns, excluding those that make up the {@link #keySchema()}. | |
* | |
* @return the Schema describing the columns in the table; never null | |
diff --git a/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java b/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java | |
index 27345e4..a7172cb 100644 | |
--- a/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java | |
+++ b/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java | |
@@ -10,11 +10,13 @@ | |
import java.sql.Types; | |
import java.util.ArrayList; | |
import java.util.List; | |
+import java.util.Map; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.function.Function; | |
import java.util.function.Predicate; | |
+import io.debezium.relational.topic.TopicMappers; | |
import org.apache.kafka.connect.data.Field; | |
import org.apache.kafka.connect.data.Schema; | |
import org.apache.kafka.connect.data.SchemaBuilder; | |
@@ -29,6 +31,7 @@ | |
import io.debezium.jdbc.JdbcConnection; | |
import io.debezium.relational.mapping.ColumnMapper; | |
import io.debezium.relational.mapping.ColumnMappers; | |
+import io.debezium.relational.topic.TopicMapper; | |
/** | |
* Builder that constructs {@link TableSchema} instances for {@link Table} definitions. | |
@@ -49,17 +52,21 @@ | |
private static final Logger LOGGER = LoggerFactory.getLogger(TableSchemaBuilder.class); | |
+ private final TopicMappers topicMappers; | |
private final Function<String, String> schemaNameValidator; | |
private final ValueConverterProvider valueConverterProvider; | |
/** | |
* Create a new instance of the builder. | |
- * | |
+ * | |
+ * @param topicMappers the TopicMapper for each table; may not be null | |
* @param valueConverterProvider the provider for obtaining {@link ValueConverter}s and {@link SchemaBuilder}s; may not be | |
* null | |
* @param schemaNameValidator the validation function for schema names; may not be null | |
*/ | |
- public TableSchemaBuilder(ValueConverterProvider valueConverterProvider, Function<String, String> schemaNameValidator) { | |
+ public TableSchemaBuilder(TopicMappers topicMappers, ValueConverterProvider valueConverterProvider, | |
+ Function<String, String> schemaNameValidator) { | |
+ this.topicMappers = topicMappers; | |
this.schemaNameValidator = schemaNameValidator; | |
this.valueConverterProvider = valueConverterProvider; | |
} | |
@@ -89,7 +96,7 @@ public TableSchema create(ResultSet resultSet, String name) throws SQLException | |
Function<Object[], Struct> valueGenerator = createValueGenerator(valueSchema, id, columns, null, null); | |
// Finally create our result object with no primary key or key generator ... | |
- return new TableSchema(null, null, valueSchema, valueGenerator); | |
+ return new TableSchema(schemaName, null, null, valueSchema, valueGenerator); | |
} | |
/** | |
@@ -129,11 +136,14 @@ public TableSchema create(String schemaPrefix, Table table, Predicate<ColumnId> | |
if (schemaPrefix == null) schemaPrefix = ""; | |
// Build the schemas ... | |
final TableId tableId = table.id(); | |
- final String tableIdStr = tableId.toString(); | |
- final String schemaNamePrefix = schemaPrefix + tableIdStr; | |
- LOGGER.debug("Mapping table '{}' to schemas under '{}'", tableId, schemaNamePrefix); | |
- SchemaBuilder valSchemaBuilder = SchemaBuilder.struct().name(schemaNameValidator.apply(schemaNamePrefix + ".Value")); | |
- SchemaBuilder keySchemaBuilder = SchemaBuilder.struct().name(schemaNameValidator.apply(schemaNamePrefix + ".Key")); | |
+ final TopicMapper topicMapper = topicMappers.getTopicMapperToUse(schemaPrefix, table); | |
+ final String topicName = topicMapper.getTopicName(schemaPrefix, table); | |
+ final String keySchemaName = schemaNameValidator.apply(topicName + ".Key"); | |
+ final String valueSchemaName = schemaNameValidator.apply(topicName + ".Value"); | |
+ final String envelopeSchemaName = schemaNameValidator.apply(topicName); // TODO: better name | |
+ LOGGER.debug("Mapping table '{}' to key schemas '{}' and value schema '{}'", tableId, keySchemaName, valueSchemaName); | |
+ SchemaBuilder valSchemaBuilder = SchemaBuilder.struct().name(valueSchemaName); | |
+ SchemaBuilder keySchemaBuilder = SchemaBuilder.struct().name(keySchemaName); | |
AtomicBoolean hasPrimaryKey = new AtomicBoolean(false); | |
table.columns().forEach(column -> { | |
if (table.isPrimaryKeyColumn(column.name())) { | |
@@ -147,6 +157,9 @@ public TableSchema create(String schemaPrefix, Table table, Predicate<ColumnId> | |
addField(valSchemaBuilder, column, mapper); | |
} | |
}); | |
+ // Enhance the key schema if necessary ... | |
+ topicMapper.enhanceKeySchema(keySchemaBuilder); | |
+ // Create the schemas ... | |
Schema valSchema = valSchemaBuilder.optional().build(); | |
Schema keySchema = hasPrimaryKey.get() ? keySchemaBuilder.build() : null; | |
@@ -156,11 +169,11 @@ public TableSchema create(String schemaPrefix, Table table, Predicate<ColumnId> | |
} | |
// Create the generators ... | |
- Function<Object[], Object> keyGenerator = createKeyGenerator(keySchema, tableId, table.primaryKeyColumns()); | |
+ Function<Object[], Object> keyGenerator = createKeyGenerator(keySchema, tableId, table, topicMapper, schemaPrefix); | |
Function<Object[], Struct> valueGenerator = createValueGenerator(valSchema, tableId, table.columns(), filter, mappers); | |
// And the table schema ... | |
- return new TableSchema(keySchema, keyGenerator, valSchema, valueGenerator); | |
+ return new TableSchema(envelopeSchemaName, keySchema, keyGenerator, valSchema, valueGenerator); | |
} | |
/** | |
@@ -169,15 +182,21 @@ public TableSchema create(String schemaPrefix, Table table, Predicate<ColumnId> | |
* @param schema the Kafka Connect schema for the key; may be null if there is no known schema, in which case the generator | |
* will be null | |
* @param columnSetName the name for the set of columns, used in error messages; may not be null | |
- * @param columns the column definitions for the table that defines the row; may not be null | |
+ * @param table the table for the row of data | |
+ * @param topicMapper the TopicMapper for the table whose key we are generating; may not be null | |
+ * @param schemaPrefix the prefix added to the table identifier to construct the schema names; may be null if there is no | |
+ * prefix | |
* @return the key-generating function, or null if there is no key schema | |
*/ | |
- protected Function<Object[], Object> createKeyGenerator(Schema schema, TableId columnSetName, List<Column> columns) { | |
+ protected Function<Object[], Object> createKeyGenerator(Schema schema, TableId columnSetName, Table table, | |
+ TopicMapper topicMapper, String schemaPrefix) { | |
if (schema != null) { | |
+ List<Column> columns = table.primaryKeyColumns(); | |
int[] recordIndexes = indexesForColumns(columns); | |
Field[] fields = fieldsForColumns(schema, columns); | |
int numFields = recordIndexes.length; | |
ValueConverter[] converters = convertersForColumns(schema, columnSetName, columns, null, null); | |
+ Map<String, Object> nonRowFieldsToAddToKey = topicMapper.getNonRowFieldsToAddToKey(schema, schemaPrefix, table); | |
return (row) -> { | |
Struct result = new Struct(schema); | |
for (int i = 0; i != numFields; ++i) { | |
@@ -194,6 +213,13 @@ public TableSchema create(String schemaPrefix, Table table, Predicate<ColumnId> | |
} | |
} | |
} | |
+ | |
+ if (nonRowFieldsToAddToKey != null) { | |
+ for (Map.Entry<String, Object> nonRowField : nonRowFieldsToAddToKey.entrySet()) { | |
+ result.put(nonRowField.getKey(), nonRowField.getValue()); | |
+ } | |
+ } | |
+ | |
return result; | |
}; | |
} | |
diff --git a/debezium-core/src/main/java/io/debezium/relational/topic/ByLogicalTableTopicMapper.java b/debezium-core/src/main/java/io/debezium/relational/topic/ByLogicalTableTopicMapper.java | |
new file mode 100644 | |
index 0000000..d014a95 | |
--- /dev/null | |
+++ b/debezium-core/src/main/java/io/debezium/relational/topic/ByLogicalTableTopicMapper.java | |
@@ -0,0 +1,106 @@ | |
+/* | |
+ * Copyright Debezium Authors. | |
+ * | |
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 | |
+ */ | |
+package io.debezium.relational.topic; | |
+ | |
+import io.debezium.config.Field; | |
+import org.apache.kafka.common.config.ConfigDef; | |
+import org.apache.kafka.connect.data.Schema; | |
+import org.apache.kafka.connect.data.SchemaBuilder; | |
+ | |
+import io.debezium.relational.Table; | |
+ | |
+import java.util.HashMap; | |
+import java.util.Map; | |
+import java.util.regex.Matcher; | |
+import java.util.regex.Pattern; | |
+ | |
+/** | |
+ * A logical table consists of one or more physical tables with the same schema. A common use case is sharding -- the | |
+ * two physical tables `db_shard1.my_table` and `db_shard2.my_table` together form one logical table. | |
+ * | |
+ * This TopicMapper allows us to send change events from both physical tables to one topic. | |
+ * | |
+ * @author David Leibovic | |
+ */ | |
+public class ByLogicalTableTopicMapper extends TopicMapper { | |
+ | |
+ // "^.*?(?=\\..+\\..+)\\.(?<logicalDb>etsy_.+?(?=(_\\d+\\.)|\\.))(_\\d+)?\\.(?<table>.+)$" | |
+ private static final Field LOGICAL_TABLE_REGEX = Field.create("logical.table.regex") | |
+ .withDisplayName("Logical table regex") | |
+ .withType(ConfigDef.Type.STRING) | |
+ .withWidth(ConfigDef.Width.LONG) | |
+ .withImportance(ConfigDef.Importance.LOW) | |
+ .withValidation(Field::isRegex) | |
+ .withDescription("The tables for which changes are to be captured"); | |
+ | |
+ // "${logicalDb}.${table}" | |
+ private static final Field LOGICAL_TABLE_REPLACEMENT = Field.create("logical.table.replacement") | |
+ .withDisplayName("Logical table replacement") | |
+ .withType(ConfigDef.Type.STRING) | |
+ .withWidth(ConfigDef.Width.LONG) | |
+ .withImportance(ConfigDef.Importance.LOW) | |
+ .withDescription("The tables for which changes are to be captured"); | |
+ | |
+ // "^.*?(?=\\..+\\..+)\\.(?<logicalDb>etsy_.+?(?=\\.))\\.(?<table>.+)$" | |
+ private static final Field PHYSICAL_TABLE_REGEX = Field.create("physical.table.regex") | |
+ .withDisplayName("Physical table regex") | |
+ .withType(ConfigDef.Type.STRING) | |
+ .withWidth(ConfigDef.Width.LONG) | |
+ .withImportance(ConfigDef.Importance.LOW) | |
+ .withValidation(Field::isRegex) | |
+ .withDescription("The tables for which changes are to be captured"); | |
+ | |
+ // "${logicalDb}" | |
+ private static final Field PHYSICAL_TABLE_REPLACEMENT = Field.create("physical.table.replacement") | |
+ .withDisplayName("Physical table replacement") | |
+ .withType(ConfigDef.Type.STRING) | |
+ .withWidth(ConfigDef.Width.LONG) | |
+ .withImportance(ConfigDef.Importance.LOW) | |
+ .withDescription("The tables for which changes are to be captured"); | |
+ | |
+ public Field.Set configFields() { | |
+ return Field.setOf(LOGICAL_TABLE_REGEX, PHYSICAL_TABLE_REGEX); | |
+ } | |
+ | |
+ public String getTopicName(String topicPrefix, Table table) { | |
+ final String fullyQualifiedTableName = composeFullyQualifiedTableName(topicPrefix, table); | |
+ Pattern logicalTablePattern = Pattern.compile(config.getString(LOGICAL_TABLE_REGEX)); | |
+ Matcher logicalTableMatcher = logicalTablePattern.matcher(fullyQualifiedTableName); | |
+ if (logicalTableMatcher.matches()) { | |
+ return logicalTableMatcher.replaceAll(config.getString(LOGICAL_TABLE_REPLACEMENT)); | |
+ } | |
+ return null; | |
+ } | |
+ | |
+ public void enhanceKeySchema(SchemaBuilder keySchemaBuilder) { | |
+ // Now that multiple physical tables can share a topic, the Key Schema can no longer consist of solely the | |
+ // record's primary / unique key fields, since they are not guaranteed to be unique across tables. We need some | |
+ // identifier added to the key that distinguishes the different physical tables. | |
+ keySchemaBuilder.field("__dbz__physicalTableIdentifier", Schema.STRING_SCHEMA); | |
+ } | |
+ | |
+ public Map<String, Object> getNonRowFieldsToAddToKey(Schema schema, String topicPrefix, Table table) { | |
+ final String fullyQualifiedTableName = composeFullyQualifiedTableName(topicPrefix, table); | |
+ Pattern physicalTableIdentifierPattern = Pattern.compile(config.getString(PHYSICAL_TABLE_REGEX)); | |
+ Matcher physicalTableIdentifierMatcher = physicalTableIdentifierPattern.matcher(fullyQualifiedTableName); | |
+ | |
+ final String physicalTableIdentifier; | |
+ if (physicalTableIdentifierMatcher.matches()) { | |
+ physicalTableIdentifier = physicalTableIdentifierMatcher.replaceAll(config.getString(PHYSICAL_TABLE_REPLACEMENT)); | |
+ } else { | |
+ physicalTableIdentifier = fullyQualifiedTableName; | |
+ } | |
+ | |
+ Map<String, Object> nonRowFields = new HashMap<>(); | |
+ nonRowFields.put("__dbz__physicalTableIdentifier", physicalTableIdentifier); | |
+ return nonRowFields; | |
+ } | |
+ | |
+ private String composeFullyQualifiedTableName(String topicPrefix, Table table) { | |
+ return topicPrefix + table.id().toString(); | |
+ } | |
+ | |
+} | |
diff --git a/debezium-core/src/main/java/io/debezium/relational/topic/ByTableTopicMapper.java b/debezium-core/src/main/java/io/debezium/relational/topic/ByTableTopicMapper.java | |
new file mode 100644 | |
index 0000000..c0f7390 | |
--- /dev/null | |
+++ b/debezium-core/src/main/java/io/debezium/relational/topic/ByTableTopicMapper.java | |
@@ -0,0 +1,38 @@ | |
+/* | |
+ * Copyright Debezium Authors. | |
+ * | |
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 | |
+ */ | |
+package io.debezium.relational.topic; | |
+ | |
+import io.debezium.config.Field; | |
+import org.apache.kafka.connect.data.Schema; | |
+import org.apache.kafka.connect.data.SchemaBuilder; | |
+ | |
+import io.debezium.relational.Table; | |
+ | |
+import java.util.Map; | |
+ | |
+/** | |
+ * @author David Leibovic | |
+ */ | |
+public class ByTableTopicMapper extends TopicMapper { | |
+ | |
+ public Field.Set configFields() { | |
+ return null; | |
+ } | |
+ | |
+ public String getTopicName(String topicPrefix, Table table) { | |
+ return topicPrefix + table.id().toString(); | |
+ } | |
+ | |
+ public void enhanceKeySchema(SchemaBuilder keySchemaBuilder) { | |
+ // do nothing ... | |
+ } | |
+ | |
+ public Map<String, Object> getNonRowFieldsToAddToKey(Schema schema, String topicPrefix, Table table) { | |
+ // do nothing ... | |
+ return null; | |
+ } | |
+ | |
+} | |
diff --git a/debezium-core/src/main/java/io/debezium/relational/topic/TopicMapper.java b/debezium-core/src/main/java/io/debezium/relational/topic/TopicMapper.java | |
new file mode 100644 | |
index 0000000..f52a149 | |
--- /dev/null | |
+++ b/debezium-core/src/main/java/io/debezium/relational/topic/TopicMapper.java | |
@@ -0,0 +1,58 @@ | |
+/* | |
+ * Copyright Debezium Authors. | |
+ * | |
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 | |
+ */ | |
+package io.debezium.relational.topic; | |
+ | |
+import org.apache.kafka.connect.data.Schema; | |
+import org.apache.kafka.connect.data.SchemaBuilder; | |
+ | |
+import io.debezium.config.Configuration; | |
+import io.debezium.config.Field; | |
+import io.debezium.relational.Table; | |
+ | |
+import java.util.Map; | |
+ | |
+/** | |
+ * @author David Leibovic | |
+ */ | |
+public abstract class TopicMapper { | |
+ | |
+ protected Configuration config; | |
+ | |
+ public TopicMapper setConfig(Configuration config) { | |
+ this.config = config; | |
+ return this; | |
+ } | |
+ | |
+ /** | |
+ * Get the name of the topic given for the table. | |
+ * | |
+ * @param topicPrefix prefix for the topic | |
+ * @param table the table that we are getting the topic name for | |
+ * @return the topic name; may be null if this strategy could not be applied | |
+ */ | |
+ abstract public String getTopicName(String topicPrefix, Table table); | |
+ | |
+ abstract public Field.Set configFields(); | |
+ | |
+ /** | |
+ * Depending on your TopicMapper implementation and which rows in a database may occupy the same topic, | |
+ * it may be necessary to enhance the key schema for the events to ensure each distinct record in the topic | |
+ * has a unique key. | |
+ * | |
+ * @param keySchemaBuilder the {@link SchemaBuilder} for the key, pre-populated with the table's primary/unique key | |
+ */ | |
+ abstract public void enhanceKeySchema(SchemaBuilder keySchemaBuilder); | |
+ | |
+ /** | |
+ * Get the extra key-value pairs necessary to add to the event's key. | |
+ * @param schema the schema for the key; never null | |
+ * @param topicPrefix prefix for the topic | |
+ * @param table the table the event is for | |
+ * @return the extra key-value pairs, or null if none are necessary. | |
+ */ | |
+ abstract public Map<String, Object> getNonRowFieldsToAddToKey(Schema schema, String topicPrefix, Table table); | |
+ | |
+} | |
diff --git a/debezium-core/src/main/java/io/debezium/relational/topic/TopicMappers.java b/debezium-core/src/main/java/io/debezium/relational/topic/TopicMappers.java | |
new file mode 100644 | |
index 0000000..5cdfcab | |
--- /dev/null | |
+++ b/debezium-core/src/main/java/io/debezium/relational/topic/TopicMappers.java | |
@@ -0,0 +1,71 @@ | |
+/* | |
+ * Copyright Debezium Authors. | |
+ * | |
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 | |
+ */ | |
+package io.debezium.relational.topic; | |
+ | |
+import io.debezium.config.Configuration; | |
+import io.debezium.config.Field; | |
+import io.debezium.relational.Table; | |
+import org.apache.kafka.connect.errors.ConnectException; | |
+import org.slf4j.Logger; | |
+import org.slf4j.LoggerFactory; | |
+ | |
+import java.util.ArrayList; | |
+import java.util.List; | |
+ | |
+/** | |
+ * @author David Leibovic | |
+ */ | |
+public class TopicMappers { | |
+ | |
+ private final Logger logger = LoggerFactory.getLogger(getClass()); | |
+ private List<TopicMapper> topicMappers; | |
+ | |
+ public TopicMappers(Configuration config, String[] topicMapperAliases) { | |
+ topicMappers = new ArrayList<>(); | |
+ if (topicMapperAliases != null) { | |
+ for (String alias : topicMapperAliases) { | |
+ TopicMapper topicMapper = config.getInstance("topic.mappers." + alias + ".type", TopicMapper.class); | |
+ if (topicMapper != null) { | |
+ topicMappers.add(topicMapper); | |
+ validateTopicMapperConfig(topicMapper, alias, config); | |
+ } | |
+ } | |
+ } | |
+ TopicMapper defaultTopicMapper = new ByTableTopicMapper(); | |
+ topicMappers.add(defaultTopicMapper); | |
+ validateTopicMapperConfig(defaultTopicMapper, null, config); | |
+ } | |
+ | |
+ /** | |
+ * Get the topic mapper to use for the given table. | |
+ * | |
+ * @param topicPrefix prefix for the topic | |
+ * @param table the table that we are getting the topic name for | |
+ * @return the TopicMapper; never null | |
+ */ | |
+ public TopicMapper getTopicMapperToUse(String topicPrefix, Table table) { | |
+ for (TopicMapper topicMapper : topicMappers) { | |
+ if (topicMapper.getTopicName(topicPrefix, table) != null) { | |
+ return topicMapper; | |
+ } | |
+ } | |
+ return topicMappers.get(topicMappers.size() - 1); | |
+ } | |
+ | |
+ private void validateTopicMapperConfig(TopicMapper topicMapper, String alias, Configuration config) { | |
+ if (alias == null) { | |
+ return; | |
+ } | |
+ Configuration topicMapperConfig = config.subset("topic.mappers." + alias + ".", true); | |
+ topicMapper.setConfig(topicMapperConfig); | |
+ Field.Set topicMapperConfigFields = topicMapper.configFields(); | |
+ if (topicMapperConfigFields != null && !topicMapperConfig.validateAndRecord(topicMapperConfigFields, logger::error)) { | |
+ throw new ConnectException("Unable to validate config for topic mapper: " + topicMapper.getClass() + | |
+ " with alias " + alias + "."); | |
+ } | |
+ } | |
+ | |
+} | |
diff --git a/debezium-core/src/test/java/io/debezium/relational/TableSchemaBuilderTest.java b/debezium-core/src/test/java/io/debezium/relational/TableSchemaBuilderTest.java | |
index 57316eb..0363b09 100644 | |
--- a/debezium-core/src/test/java/io/debezium/relational/TableSchemaBuilderTest.java | |
+++ b/debezium-core/src/test/java/io/debezium/relational/TableSchemaBuilderTest.java | |
@@ -19,6 +19,7 @@ | |
import static org.fest.assertions.Assertions.assertThat; | |
import io.debezium.jdbc.JdbcValueConverters; | |
+import io.debezium.relational.topic.TopicMappers; | |
import io.debezium.time.Date; | |
import io.debezium.util.AvroValidator; | |
@@ -34,9 +35,11 @@ | |
private Column c4; | |
private TableSchema schema; | |
private AvroValidator validator; | |
+ private TopicMappers topicMappers; | |
@Before | |
public void beforeEach() { | |
+ topicMappers = new TopicMappers(null, null); | |
validator = AvroValidator.create((original,replacement, conflict)->{ | |
fail("Should not have come across an invalid schema name"); | |
}); | |
@@ -78,19 +81,19 @@ public void checkPreconditions() { | |
@Test(expected = NullPointerException.class) | |
public void shouldFailToBuildTableSchemaFromNullTable() { | |
- new TableSchemaBuilder(new JdbcValueConverters(),validator::validate).create(prefix,null); | |
+ new TableSchemaBuilder(topicMappers, new JdbcValueConverters(),validator::validate).create(prefix,null); | |
} | |
@Test | |
public void shouldBuildTableSchemaFromTable() { | |
- schema = new TableSchemaBuilder(new JdbcValueConverters(),validator::validate).create(prefix,table); | |
+ schema = new TableSchemaBuilder(topicMappers, new JdbcValueConverters(),validator::validate).create(prefix,table); | |
assertThat(schema).isNotNull(); | |
} | |
@Test | |
public void shouldBuildTableSchemaFromTableWithoutPrimaryKey() { | |
table = table.edit().setPrimaryKeyNames().create(); | |
- schema = new TableSchemaBuilder(new JdbcValueConverters(),validator::validate).create(prefix,table); | |
+ schema = new TableSchemaBuilder(topicMappers, new JdbcValueConverters(),validator::validate).create(prefix,table); | |
assertThat(schema).isNotNull(); | |
// Check the keys ... | |
assertThat(schema.keySchema()).isNull(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment