Created
April 12, 2017 20:58
-
-
Save dasl-/483d2361feb9296d387b1762e2384be1 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java | |
index 6caa6e8..d9f4788 100644 | |
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java | |
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java | |
@@ -18,12 +18,23 @@ | |
import java.util.concurrent.atomic.AtomicLong; | |
import java.util.function.Predicate; | |
-import com.github.shyiko.mysql.binlog.event.*; | |
import org.apache.kafka.connect.errors.ConnectException; | |
import org.apache.kafka.connect.source.SourceRecord; | |
import com.github.shyiko.mysql.binlog.BinaryLogClient; | |
import com.github.shyiko.mysql.binlog.BinaryLogClient.LifecycleListener; | |
+import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData; | |
+import com.github.shyiko.mysql.binlog.event.Event; | |
+import com.github.shyiko.mysql.binlog.event.EventData; | |
+import com.github.shyiko.mysql.binlog.event.EventHeader; | |
+import com.github.shyiko.mysql.binlog.event.EventHeaderV4; | |
+import com.github.shyiko.mysql.binlog.event.EventType; | |
+import com.github.shyiko.mysql.binlog.event.GtidEventData; | |
+import com.github.shyiko.mysql.binlog.event.QueryEventData; | |
+import com.github.shyiko.mysql.binlog.event.RotateEventData; | |
+import com.github.shyiko.mysql.binlog.event.TableMapEventData; | |
+import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData; | |
+import com.github.shyiko.mysql.binlog.event.WriteRowsEventData; | |
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer; | |
import com.github.shyiko.mysql.binlog.event.deserialization.GtidEventDataDeserializer; | |
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream; | |
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java | |
index 840ea75..e98442f 100644 | |
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java | |
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java | |
@@ -28,7 +28,6 @@ | |
import io.debezium.relational.TableId; | |
import io.debezium.relational.history.DatabaseHistory; | |
import io.debezium.relational.history.KafkaDatabaseHistory; | |
-import io.debezium.relational.topic.ByTableTopicMapper; | |
/** | |
* The configuration properties. | |
@@ -633,16 +632,6 @@ public static SecureConnectionMode parse(String value, String defaultValue) { | |
.withDescription("When enabled, Debezium will track schemas based on TABLE_METADATA events") | |
.withDefault(false); | |
- public static final Field TOPIC_MAPPER = Field.create("topic.mapper") | |
- .withDisplayName("Topic mapper") | |
- .withType(Type.CLASS) | |
- .withWidth(Width.LONG) | |
- .withImportance(Importance.LOW) | |
- .withDescription("The name of the TopicMapper class that should be used to determine how change events for tables should be mapped into topics. " | |
- + "Built in options include '" + ByTableTopicMapper.class.getName() | |
- + "' (the default).") | |
- .withDefault(ByTableTopicMapper.class.getName()); | |
- | |
public static final Field SNAPSHOT_MODE = Field.create("snapshot.mode") | |
.withDisplayName("Snapshot mode") | |
.withEnum(SnapshotMode.class, SnapshotMode.INITIAL) | |
@@ -730,7 +719,7 @@ public static final Field MASK_COLUMN(int length) { | |
TABLE_WHITELIST, TABLE_BLACKLIST, TABLES_IGNORE_BUILTIN, | |
DATABASE_WHITELIST, DATABASE_BLACKLIST, | |
COLUMN_BLACKLIST, SNAPSHOT_MODE, SNAPSHOT_MINIMAL_LOCKING, | |
- TOPIC_MAPPER, GTID_SOURCE_INCLUDES, GTID_SOURCE_EXCLUDES, | |
+ GTID_SOURCE_INCLUDES, GTID_SOURCE_EXCLUDES, | |
GTID_SOURCE_FILTER_DML_EVENTS, | |
TIME_PRECISION_MODE, DECIMAL_HANDLING_MODE, | |
SSL_MODE, SSL_KEYSTORE, SSL_KEYSTORE_PASSWORD, | |
@@ -755,7 +744,7 @@ protected static ConfigDef configDef() { | |
KafkaDatabaseHistory.TOPIC, KafkaDatabaseHistory.RECOVERY_POLL_ATTEMPTS, | |
KafkaDatabaseHistory.RECOVERY_POLL_INTERVAL_MS, DATABASE_HISTORY); | |
Field.group(config, "Events", INCLUDE_SCHEMA_CHANGES, TABLES_IGNORE_BUILTIN, DATABASE_WHITELIST, TABLE_WHITELIST, | |
- COLUMN_BLACKLIST, TABLE_BLACKLIST, DATABASE_BLACKLIST, TOPIC_MAPPER, | |
+ COLUMN_BLACKLIST, TABLE_BLACKLIST, DATABASE_BLACKLIST, | |
GTID_SOURCE_INCLUDES, GTID_SOURCE_EXCLUDES, GTID_SOURCE_FILTER_DML_EVENTS); | |
Field.group(config, "Connector", CONNECTION_TIMEOUT_MS, KEEP_ALIVE, MAX_QUEUE_SIZE, MAX_BATCH_SIZE, POLL_INTERVAL_MS, | |
SNAPSHOT_MODE, SNAPSHOT_MINIMAL_LOCKING, TIME_PRECISION_MODE, DECIMAL_HANDLING_MODE); | |
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java | |
index 9786c13..2bbf988 100644 | |
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java | |
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java | |
@@ -35,7 +35,10 @@ | |
private volatile ChainedReader readers; | |
/** | |
- * Create an instance of the log reader that uses Kafka to store database schema history and DDL changes. | |
+ * Create an instance of the log reader that uses Kafka to store database schema history and the | |
+ * {@link TopicSelector#defaultSelector(String) default topic selector} of "{@code <serverName>.<databaseName>.<tableName>}" | |
+ * for | |
+ * data and "{@code <serverName>}" for metadata. | |
*/ | |
public MySqlConnectorTask() { | |
} | |
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java | |
index 3490317..32a0a7c 100644 | |
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java | |
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java | |
@@ -6,14 +6,17 @@ | |
package io.debezium.connector.mysql; | |
import java.sql.SQLException; | |
-import java.sql.Types; | |
-import java.util.*; | |
+import java.util.ArrayList; | |
+import java.util.HashMap; | |
+import java.util.List; | |
+import java.util.Map; | |
+import java.util.Set; | |
import java.util.concurrent.Callable; | |
import java.util.function.Predicate; | |
import com.github.shyiko.mysql.binlog.event.ColumnDescriptor; | |
import com.github.shyiko.mysql.binlog.event.TableMetadataEventData; | |
-import io.debezium.relational.*; | |
+ | |
import org.apache.kafka.connect.data.Schema; | |
import org.apache.kafka.connect.errors.ConnectException; | |
import org.slf4j.Logger; | |
@@ -27,12 +30,12 @@ | |
import io.debezium.document.Document; | |
import io.debezium.jdbc.JdbcConnection; | |
import io.debezium.jdbc.JdbcValueConverters.DecimalMode; | |
+import io.debezium.relational.Column; | |
import io.debezium.relational.Table; | |
import io.debezium.relational.TableId; | |
import io.debezium.relational.TableSchema; | |
import io.debezium.relational.TableSchemaBuilder; | |
import io.debezium.relational.Tables; | |
-import io.debezium.relational.topic.TopicMapper; | |
import io.debezium.relational.ddl.DdlChanges; | |
import io.debezium.relational.ddl.DdlChanges.DatabaseStatementStringConsumer; | |
import io.debezium.relational.history.DatabaseHistory; | |
@@ -92,9 +95,6 @@ public MySqlSchema(Configuration config, String serverName, Predicate<String> gt | |
this.ddlChanges = new DdlChanges(this.ddlParser.terminator()); | |
this.ddlParser.addListener(ddlChanges); | |
- // Set up the topic mapper ... | |
- TopicMapper topicMapper = config.getInstance(MySqlConnectorConfig.TOPIC_MAPPER, TopicMapper.class); | |
- | |
// Use MySQL-specific converters and schemas for values ... | |
String timePrecisionModeStr = config.getString(MySqlConnectorConfig.TIME_PRECISION_MODE); | |
TemporalPrecisionMode timePrecisionMode = TemporalPrecisionMode.parse(timePrecisionModeStr); | |
@@ -103,7 +103,7 @@ public MySqlSchema(Configuration config, String serverName, Predicate<String> gt | |
DecimalHandlingMode decimalHandlingMode = DecimalHandlingMode.parse(decimalHandlingModeStr); | |
DecimalMode decimalMode = decimalHandlingMode.asDecimalMode(); | |
MySqlValueConverters valueConverters = new MySqlValueConverters(decimalMode, adaptiveTimePrecision); | |
- this.schemaBuilder = new TableSchemaBuilder(topicMapper, valueConverters, schemaNameValidator::validate); | |
+ this.schemaBuilder = new TableSchemaBuilder(valueConverters, schemaNameValidator::validate); | |
// Set up the server name and schema prefix ... | |
if (serverName != null) serverName = serverName.trim(); | |
@@ -247,8 +247,6 @@ public TableSchema schemaFromMetadata(TableId id, TableMetadataEventData metadat | |
return schema; | |
} | |
- | |
- | |
/** | |
* Get the information about where the DDL statement history is recorded. | |
* | |
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java | |
index 07dafb8..78e9576 100644 | |
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java | |
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java | |
@@ -29,6 +29,7 @@ | |
private final SourceInfo source; | |
private final MySqlSchema dbSchema; | |
+ private final TopicSelector topicSelector; | |
private final RecordMakers recordProcessor; | |
private final Predicate<String> gtidSourceFilter; | |
private final Clock clock = Clock.system(); | |
@@ -36,6 +37,9 @@ | |
public MySqlTaskContext(Configuration config) { | |
super(config); | |
+ // Set up the topic selector ... | |
+ this.topicSelector = TopicSelector.defaultSelector(serverName()); | |
+ | |
// Set up the source information ... | |
this.source = new SourceInfo(); | |
this.source.setServerName(serverName()); | |
@@ -43,20 +47,24 @@ public MySqlTaskContext(Configuration config) { | |
// Set up the GTID filter ... | |
String gtidSetIncludes = config.getString(MySqlConnectorConfig.GTID_SOURCE_INCLUDES); | |
String gtidSetExcludes = config.getString(MySqlConnectorConfig.GTID_SOURCE_EXCLUDES); | |
- this.gtidSourceFilter = gtidSetIncludes != null ? Predicates.includes(gtidSetIncludes) | |
- : (gtidSetExcludes != null ? Predicates.excludes(gtidSetExcludes) : null); | |
+ this.gtidSourceFilter = gtidSetIncludes != null ? Predicates.includesUuids(gtidSetIncludes) | |
+ : (gtidSetExcludes != null ? Predicates.excludesUuids(gtidSetExcludes) : null); | |
// Set up the MySQL schema ... | |
this.dbSchema = new MySqlSchema(config, serverName(), this.gtidSourceFilter); | |
// Set up the record processor ... | |
- this.recordProcessor = new RecordMakers(dbSchema, source, serverName()); | |
+ this.recordProcessor = new RecordMakers(dbSchema, source, topicSelector); | |
} | |
public String connectorName() { | |
return config.getString("name"); | |
} | |
+ public TopicSelector topicSelector() { | |
+ return topicSelector; | |
+ } | |
+ | |
public SourceInfo source() { | |
return source; | |
} | |
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java | |
index 420b030..e0cbeee 100644 | |
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java | |
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java | |
@@ -38,7 +38,7 @@ | |
private final Logger logger = LoggerFactory.getLogger(getClass()); | |
private final MySqlSchema schema; | |
private final SourceInfo source; | |
- private final String ddlChangeTopicName; | |
+ private final TopicSelector topicSelector; | |
private final Map<Long, Converter> convertersByTableNumber = new HashMap<>(); | |
private final Map<TableId, Long> tableNumbersByTableId = new HashMap<>(); | |
private final Schema schemaChangeKeySchema; | |
@@ -50,12 +50,12 @@ | |
* | |
* @param schema the schema information about the MySQL server databases; may not be null | |
* @param source the connector's source information; may not be null | |
- * @param ddlChangeTopicName the name of the topic to which DDL changes are written; may not be null | |
+ * @param topicSelector the selector for topic names; may not be null | |
*/ | |
- public RecordMakers(MySqlSchema schema, SourceInfo source, String ddlChangeTopicName) { | |
+ public RecordMakers(MySqlSchema schema, SourceInfo source, TopicSelector topicSelector) { | |
this.schema = schema; | |
this.source = source; | |
- this.ddlChangeTopicName = ddlChangeTopicName; | |
+ this.topicSelector = topicSelector; | |
this.schemaChangeKeySchema = SchemaBuilder.struct() | |
.name(schemaNameValidator.validate("io.debezium.connector.mysql.SchemaChangeKey")) | |
.field(Fields.DATABASE_NAME, Schema.STRING_SCHEMA) | |
@@ -118,12 +118,12 @@ public RecordsForTable forTable(long tableNumber, BitSet includedColumns, Blocki | |
* @return the number of records produced; will be 0 or more | |
*/ | |
public int schemaChanges(String databaseName, String ddlStatements, BlockingConsumer<SourceRecord> consumer) { | |
+ String topicName = topicSelector.getPrimaryTopic(); | |
Integer partition = 0; | |
Struct key = schemaChangeRecordKey(databaseName); | |
Struct value = schemaChangeRecordValue(databaseName, ddlStatements); | |
SourceRecord record = new SourceRecord(source.partition(), source.offset(), | |
- ddlChangeTopicName, partition, | |
- schemaChangeKeySchema, key, schemaChangeValueSchema, value); | |
+ topicName, partition, schemaChangeKeySchema, key, schemaChangeValueSchema, value); | |
try { | |
consumer.accept(record); | |
return 1; | |
@@ -181,7 +181,7 @@ public boolean assign(long tableNumber, TableId id, TableMetadataEventData metad | |
} | |
if (tableSchema == null) return false; | |
- String topicName = tableSchema.getEnvelopeSchemaName(); | |
+ String topicName = topicSelector.getTopic(id); | |
Envelope envelope = Envelope.defineSchema() | |
.withName(schemaNameValidator.validate(topicName + ".Envelope")) | |
.withRecord(tableSchema.valueSchema()) | |
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/TopicSelector.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/TopicSelector.java | |
new file mode 100644 | |
index 0000000..16574d2 | |
--- /dev/null | |
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/TopicSelector.java | |
@@ -0,0 +1,91 @@ | |
+/* | |
+ * Copyright Debezium Authors. | |
+ * | |
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 | |
+ */ | |
+package io.debezium.connector.mysql; | |
+ | |
+import io.debezium.annotation.ThreadSafe; | |
+import io.debezium.relational.TableId; | |
+ | |
+/** | |
+ * A function that determines the name of topics for data and metadata. | |
+ * | |
+ * @author Randall Hauch | |
+ */ | |
+@ThreadSafe | |
+public interface TopicSelector { | |
+ /** | |
+ * Get the default topic selector logic, which uses a '.' delimiter character when needed. | |
+ * | |
+ * @param prefix the name of the prefix to be used for all topics; may not be null and must not terminate in the | |
+ * {@code delimiter} | |
+ * @return the topic selector; never null | |
+ */ | |
+ static TopicSelector defaultSelector(String prefix) { | |
+ return defaultSelector(prefix,"."); | |
+ } | |
+ | |
+ /** | |
+ * Get the default topic selector logic, which uses the supplied delimiter character when needed. | |
+ * | |
+ * @param prefix the name of the prefix to be used for all topics; may not be null and must not terminate in the | |
+ * {@code delimiter} | |
+ * @param delimiter the string delineating the server, database, and table names; may not be null | |
+ * @return the topic selector; never null | |
+ */ | |
+ static TopicSelector defaultSelector(String prefix, String delimiter) { | |
+ return new TopicSelector() { | |
+ /** | |
+ * Get the name of the topic for the given server, database, and table names. This method returns | |
+ * "{@code <serverName>}". | |
+ * | |
+ * @return the topic name; never null | |
+ */ | |
+ @Override | |
+ public String getPrimaryTopic() { | |
+ return prefix; | |
+ } | |
+ | |
+ /** | |
+ * Get the name of the topic for the given server name. This method returns | |
+ * "{@code <prefix>.<databaseName>.<tableName>}". | |
+ * | |
+ * @param databaseName the name of the database; may not be null | |
+ * @param tableName the name of the table; may not be null | |
+ * @return the topic name; never null | |
+ */ | |
+ @Override | |
+ public String getTopic(String databaseName, String tableName) { | |
+ return String.join(delimiter, prefix, databaseName, tableName); | |
+ } | |
+ | |
+ }; | |
+ } | |
+ | |
+ /** | |
+ * Get the name of the topic for the given server name. | |
+ * | |
+ * @param tableId the identifier of the table; may not be null | |
+ * @return the topic name; never null | |
+ */ | |
+ default String getTopic(TableId tableId) { | |
+ return getTopic(tableId.catalog(),tableId.table()); | |
+ } | |
+ | |
+ /** | |
+ * Get the name of the topic for the given server name. | |
+ * | |
+ * @param databaseName the name of the database; may not be null | |
+ * @param tableName the name of the table; may not be null | |
+ * @return the topic name; never null | |
+ */ | |
+ String getTopic(String databaseName, String tableName); | |
+ | |
+ /** | |
+ * Get the name of the primary topic. | |
+ * | |
+ * @return the topic name; never null | |
+ */ | |
+ String getPrimaryTopic(); | |
+} | |
diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorTest.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorTest.java | |
index f612f50..8241387 100644 | |
--- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorTest.java | |
+++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorTest.java | |
@@ -34,8 +34,7 @@ protected static void assertConfigDefIsValid(Connector connector, io.debezium.co | |
assertThat(key.importance).isEqualTo(expected.importance()); | |
assertThat(key.documentation).isEqualTo(expected.description()); | |
assertThat(key.type).isEqualTo(expected.type()); | |
- if (expected.equals(MySqlConnectorConfig.DATABASE_HISTORY) || expected.equals(MySqlConnectorConfig.JDBC_DRIVER) || | |
- expected.equals(MySqlConnectorConfig.TOPIC_MAPPER)) { | |
+ if (expected.equals(MySqlConnectorConfig.DATABASE_HISTORY) || expected.equals(MySqlConnectorConfig.JDBC_DRIVER)) { | |
assertThat(((Class<?>) key.defaultValue).getName()).isEqualTo((String) expected.defaultValue()); | |
} else if (!expected.equals(MySqlConnectorConfig.SERVER_ID)) { | |
assertThat(key.defaultValue).isEqualTo(expected.defaultValue()); | |
diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTaskContextIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTaskContextIT.java | |
index 6c5cc72..1ffd20a 100644 | |
--- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTaskContextIT.java | |
+++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTaskContextIT.java | |
@@ -31,6 +31,7 @@ public void shouldCreateTaskFromConfiguration() throws Exception { | |
assertThat(context.logger()).isNotNull(); | |
assertThat(context.makeRecord()).isNotNull(); | |
assertThat(context.source()).isNotNull(); | |
+ assertThat(context.topicSelector()).isNotNull(); | |
assertThat(context.hostname()).isEqualTo(hostname); | |
assertThat(context.port()).isEqualTo(port); | |
diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchema.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchema.java | |
index 2d8adef..a5188b6 100644 | |
--- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchema.java | |
+++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchema.java | |
@@ -27,7 +27,6 @@ | |
import io.debezium.relational.TableSchema; | |
import io.debezium.relational.TableSchemaBuilder; | |
import io.debezium.relational.Tables; | |
-import io.debezium.relational.topic.ByTableTopicMapper; | |
import io.debezium.util.AvroValidator; | |
/** | |
@@ -64,7 +63,7 @@ protected PostgresSchema(PostgresConnectorConfig config) { | |
PostgresValueConverter valueConverter = new PostgresValueConverter(config.adaptiveTimePrecision(), ZoneOffset.UTC); | |
this.schemaNameValidator = AvroValidator.create(LOGGER)::validate; | |
- this.schemaBuilder = new TableSchemaBuilder(new ByTableTopicMapper(), valueConverter, this.schemaNameValidator); | |
+ this.schemaBuilder = new TableSchemaBuilder(valueConverter, this.schemaNameValidator); | |
// Set up the server name and schema prefix ... | |
String serverName = config.serverName(); | |
diff --git a/debezium-core/src/main/java/io/debezium/relational/TableSchema.java b/debezium-core/src/main/java/io/debezium/relational/TableSchema.java | |
index 54dd6d4..79d555e 100644 | |
--- a/debezium-core/src/main/java/io/debezium/relational/TableSchema.java | |
+++ b/debezium-core/src/main/java/io/debezium/relational/TableSchema.java | |
@@ -47,7 +47,6 @@ | |
@Immutable | |
public class TableSchema { | |
- private final String envelopeSchemaName; | |
private final Schema keySchema; | |
private final Schema valueSchema; | |
private final Function<Object[], Object> keyGenerator; | |
@@ -57,8 +56,7 @@ | |
/** | |
* Create an instance with the specified {@link Schema}s for the keys and values, and the functions that generate the | |
* key and value for a given row of data. | |
- * | |
- * @param envelopeSchemaName the name of the schema; may be null | |
+ * | |
* @param keySchema the schema for the primary key; may be null | |
* @param keyGenerator the function that converts a row into a single key object for Kafka Connect; may not be null but may | |
* return nulls | |
@@ -66,9 +64,8 @@ | |
* @param valueGenerator the function that converts a row into a single value object for Kafka Connect; may not be null but | |
* may return nulls | |
*/ | |
- public TableSchema(String envelopeSchemaName, Schema keySchema, Function<Object[], Object> keyGenerator, | |
+ public TableSchema(Schema keySchema, Function<Object[], Object> keyGenerator, | |
Schema valueSchema, Function<Object[], Struct> valueGenerator) { | |
- this.envelopeSchemaName = envelopeSchemaName; | |
this.keySchema = keySchema; | |
this.valueSchema = valueSchema; | |
this.keyGenerator = keyGenerator != null ? keyGenerator : (row) -> null; | |
@@ -76,14 +73,6 @@ public TableSchema(String envelopeSchemaName, Schema keySchema, Function<Object[ | |
} | |
/** | |
- * Get the name of the envelope schema for both the key and value. | |
- * @return the envelope schema name; may be null | |
- */ | |
- public String getEnvelopeSchemaName() { | |
- return envelopeSchemaName; | |
- } | |
- | |
- /** | |
* Get the {@link Schema} that represents the table's columns, excluding those that make up the {@link #keySchema()}. | |
* | |
* @return the Schema describing the columns in the table; never null | |
diff --git a/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java b/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java | |
index 3cb5efa..00a7c96 100644 | |
--- a/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java | |
+++ b/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java | |
@@ -10,7 +10,6 @@ | |
import java.sql.Types; | |
import java.util.ArrayList; | |
import java.util.List; | |
-import java.util.Map; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.function.Function; | |
@@ -30,7 +29,6 @@ | |
import io.debezium.jdbc.JdbcConnection; | |
import io.debezium.relational.mapping.ColumnMapper; | |
import io.debezium.relational.mapping.ColumnMappers; | |
-import io.debezium.relational.topic.TopicMapper; | |
/** | |
* Builder that constructs {@link TableSchema} instances for {@link Table} definitions. | |
@@ -51,21 +49,17 @@ | |
private static final Logger LOGGER = LoggerFactory.getLogger(TableSchemaBuilder.class); | |
- private final TopicMapper topicMapper; | |
private final Function<String, String> schemaNameValidator; | |
private final ValueConverterProvider valueConverterProvider; | |
/** | |
* Create a new instance of the builder. | |
- * | |
- * @param topicMapper the TopicMapper for each table; may not be null | |
+ * | |
* @param valueConverterProvider the provider for obtaining {@link ValueConverter}s and {@link SchemaBuilder}s; may not be | |
* null | |
* @param schemaNameValidator the validation function for schema names; may not be null | |
*/ | |
- public TableSchemaBuilder(TopicMapper topicMapper, ValueConverterProvider valueConverterProvider, | |
- Function<String, String> schemaNameValidator) { | |
- this.topicMapper = topicMapper; | |
+ public TableSchemaBuilder(ValueConverterProvider valueConverterProvider, Function<String, String> schemaNameValidator) { | |
this.schemaNameValidator = schemaNameValidator; | |
this.valueConverterProvider = valueConverterProvider; | |
} | |
@@ -95,7 +89,7 @@ public TableSchema create(ResultSet resultSet, String name) throws SQLException | |
Function<Object[], Struct> valueGenerator = createValueGenerator(valueSchema, id, columns, null, null); | |
// Finally create our result object with no primary key or key generator ... | |
- return new TableSchema(schemaName, null, null, valueSchema, valueGenerator); | |
+ return new TableSchema(null, null, valueSchema, valueGenerator); | |
} | |
/** | |
@@ -135,13 +129,11 @@ public TableSchema create(String schemaPrefix, Table table, Predicate<ColumnId> | |
if (schemaPrefix == null) schemaPrefix = ""; | |
// Build the schemas ... | |
final TableId tableId = table.id(); | |
- final String topicName = topicMapper.getTopicName(schemaPrefix, table); | |
- final String keySchemaName = schemaNameValidator.apply(topicName + ".Key"); | |
- final String valueSchemaName = schemaNameValidator.apply(topicName + ".Value"); | |
- final String envelopeSchemaName = schemaNameValidator.apply(topicName); | |
- LOGGER.debug("Mapping table '{}' to key schemas '{}' and value schema '{}'", tableId, keySchemaName, valueSchemaName); | |
- SchemaBuilder valSchemaBuilder = SchemaBuilder.struct().name(valueSchemaName); | |
- SchemaBuilder keySchemaBuilder = SchemaBuilder.struct().name(keySchemaName); | |
+ final String tableIdStr = tableId.toString(); | |
+ final String schemaNamePrefix = schemaPrefix + tableIdStr; | |
+ LOGGER.debug("Mapping table '{}' to schemas under '{}'", tableId, schemaNamePrefix); | |
+ SchemaBuilder valSchemaBuilder = SchemaBuilder.struct().name(schemaNameValidator.apply(schemaNamePrefix + ".Value")); | |
+ SchemaBuilder keySchemaBuilder = SchemaBuilder.struct().name(schemaNameValidator.apply(schemaNamePrefix + ".Key")); | |
AtomicBoolean hasPrimaryKey = new AtomicBoolean(false); | |
table.columns().forEach(column -> { | |
if (table.isPrimaryKeyColumn(column.name())) { | |
@@ -155,9 +147,6 @@ public TableSchema create(String schemaPrefix, Table table, Predicate<ColumnId> | |
addField(valSchemaBuilder, column, mapper); | |
} | |
}); | |
- // Enhance the key schema if necessary ... | |
- topicMapper.enhanceKeySchema(keySchemaBuilder); | |
- // Create the schemas ... | |
Schema valSchema = valSchemaBuilder.optional().build(); | |
Schema keySchema = hasPrimaryKey.get() ? keySchemaBuilder.build() : null; | |
@@ -167,11 +156,11 @@ public TableSchema create(String schemaPrefix, Table table, Predicate<ColumnId> | |
} | |
// Create the generators ... | |
- Function<Object[], Object> keyGenerator = createKeyGenerator(keySchema, tableId, table, topicMapper, schemaPrefix); | |
+ Function<Object[], Object> keyGenerator = createKeyGenerator(keySchema, tableId, table.primaryKeyColumns()); | |
Function<Object[], Struct> valueGenerator = createValueGenerator(valSchema, tableId, table.columns(), filter, mappers); | |
// And the table schema ... | |
- return new TableSchema(envelopeSchemaName, keySchema, keyGenerator, valSchema, valueGenerator); | |
+ return new TableSchema(keySchema, keyGenerator, valSchema, valueGenerator); | |
} | |
/** | |
@@ -180,21 +169,15 @@ public TableSchema create(String schemaPrefix, Table table, Predicate<ColumnId> | |
* @param schema the Kafka Connect schema for the key; may be null if there is no known schema, in which case the generator | |
* will be null | |
* @param columnSetName the name for the set of columns, used in error messages; may not be null | |
- * @param table the table for the row of data | |
- * @param topicMapper the TopicMapper for the table whose key we are generating; may not be null | |
- * @param schemaPrefix the prefix added to the table identifier to construct the schema names; may be null if there is no | |
- * prefix | |
+ * @param columns the column definitions for the table that defines the row; may not be null | |
* @return the key-generating function, or null if there is no key schema | |
*/ | |
- protected Function<Object[], Object> createKeyGenerator(Schema schema, TableId columnSetName, Table table, | |
- TopicMapper topicMapper, String schemaPrefix) { | |
+ protected Function<Object[], Object> createKeyGenerator(Schema schema, TableId columnSetName, List<Column> columns) { | |
if (schema != null) { | |
- List<Column> columns = table.primaryKeyColumns(); | |
int[] recordIndexes = indexesForColumns(columns); | |
Field[] fields = fieldsForColumns(schema, columns); | |
int numFields = recordIndexes.length; | |
ValueConverter[] converters = convertersForColumns(schema, columnSetName, columns, null, null); | |
- Map<String, Object> nonRowFieldsToAddToKey = topicMapper.getNonRowFieldsToAddToKey(schema, schemaPrefix, table); | |
return (row) -> { | |
Struct result = new Struct(schema); | |
for (int i = 0; i != numFields; ++i) { | |
@@ -212,12 +195,6 @@ public TableSchema create(String schemaPrefix, Table table, Predicate<ColumnId> | |
} | |
} | |
- if (nonRowFieldsToAddToKey != null) { | |
- for (Map.Entry<String, Object> nonRowField : nonRowFieldsToAddToKey.entrySet()) { | |
- result.put(nonRowField.getKey(), nonRowField.getValue()); | |
- } | |
- } | |
- | |
return result; | |
}; | |
} | |
diff --git a/debezium-core/src/main/java/io/debezium/relational/topic/ByLogicalTableTopicMapper.java b/debezium-core/src/main/java/io/debezium/relational/topic/ByLogicalTableTopicMapper.java | |
deleted file mode 100644 | |
index 8f74809..0000000 | |
--- a/debezium-core/src/main/java/io/debezium/relational/topic/ByLogicalTableTopicMapper.java | |
+++ /dev/null | |
@@ -1,66 +0,0 @@ | |
-/* | |
- * Copyright Debezium Authors. | |
- * | |
- * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 | |
- */ | |
-package io.debezium.relational.topic; | |
- | |
-import org.apache.kafka.connect.data.Schema; | |
-import org.apache.kafka.connect.data.SchemaBuilder; | |
- | |
-import io.debezium.relational.Table; | |
- | |
-import java.util.HashMap; | |
-import java.util.Map; | |
-import java.util.regex.Matcher; | |
-import java.util.regex.Pattern; | |
- | |
-/** | |
- * A logical table consists of one or more physical tables with the same schema. A common use case is sharding -- the | |
- * two physical tables `db_shard1.my_table` and `db_shard2.my_table` together form one logical table. | |
- * | |
- * This TopicMapper allows us to send change events from both physical tables to one topic. | |
- * | |
- * @author David Leibovic | |
- */ | |
-public class ByLogicalTableTopicMapper extends TopicMapper { | |
- | |
- public String getTopicName(String topicPrefix, Table table) { | |
- final String fullyQualifiedTableName = composeFullyQualifiedTableName(topicPrefix, table); | |
- Pattern logicalTablePattern = Pattern.compile("^.*?(?=\\..+\\..+)\\.(?<logicalDb>etsy_.+?(?=(_\\d+\\.)|\\.))(_\\d+)?\\.(?<table>.+)$"); | |
- Matcher logicalTableMatcher = logicalTablePattern.matcher(fullyQualifiedTableName); | |
- if (logicalTableMatcher.matches()) { | |
- return logicalTableMatcher.replaceAll("${logicalDb}.${table}"); | |
- } | |
- return fullyQualifiedTableName; | |
- } | |
- | |
- public void enhanceKeySchema(SchemaBuilder keySchemaBuilder) { | |
- // Now that multiple physical tables can share a topic, the Key Schema can no longer consist of solely the | |
- // record's primary / unique key fields, since they are not guaranteed to be unique across tables. We need some | |
- // identifier added to the key that distinguishes the different physical tables. | |
- keySchemaBuilder.field("__dbz__physicalTableIdentifier", Schema.STRING_SCHEMA); | |
- } | |
- | |
- public Map<String, Object> getNonRowFieldsToAddToKey(Schema schema, String topicPrefix, Table table) { | |
- final String fullyQualifiedTableName = composeFullyQualifiedTableName(topicPrefix, table); | |
- Pattern physicalTableIdentifierPattern = Pattern.compile("^.*?(?=\\..+\\..+)\\.(?<logicalDb>etsy_.+?(?=\\.))\\.(?<table>.+)$"); | |
- Matcher physicalTableIdentifierMatcher = physicalTableIdentifierPattern.matcher(fullyQualifiedTableName); | |
- | |
- final String physicalTableIdentifier; | |
- if (physicalTableIdentifierMatcher.matches()) { | |
- physicalTableIdentifier = physicalTableIdentifierMatcher.replaceAll("${logicalDb}"); | |
- } else { | |
- physicalTableIdentifier = fullyQualifiedTableName; | |
- } | |
- | |
- Map<String, Object> nonRowFields = new HashMap<>(); | |
- nonRowFields.put("__dbz__physicalTableIdentifier", physicalTableIdentifier); | |
- return nonRowFields; | |
- } | |
- | |
- private String composeFullyQualifiedTableName(String topicPrefix, Table table) { | |
- return topicPrefix + table.id().toString(); | |
- } | |
- | |
-} | |
diff --git a/debezium-core/src/main/java/io/debezium/relational/topic/ByTableTopicMapper.java b/debezium-core/src/main/java/io/debezium/relational/topic/ByTableTopicMapper.java | |
deleted file mode 100644 | |
index 1791b04..0000000 | |
--- a/debezium-core/src/main/java/io/debezium/relational/topic/ByTableTopicMapper.java | |
+++ /dev/null | |
@@ -1,33 +0,0 @@ | |
-/* | |
- * Copyright Debezium Authors. | |
- * | |
- * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 | |
- */ | |
-package io.debezium.relational.topic; | |
- | |
-import org.apache.kafka.connect.data.Schema; | |
-import org.apache.kafka.connect.data.SchemaBuilder; | |
- | |
-import io.debezium.relational.Table; | |
- | |
-import java.util.Map; | |
- | |
-/** | |
- * @author David Leibovic | |
- */ | |
-public class ByTableTopicMapper extends TopicMapper { | |
- | |
- public String getTopicName(String topicPrefix, Table table) { | |
- return topicPrefix + table.id().toString(); | |
- } | |
- | |
- public void enhanceKeySchema(SchemaBuilder keySchemaBuilder) { | |
- // do nothing ... | |
- } | |
- | |
- public Map<String, Object> getNonRowFieldsToAddToKey(Schema schema, String topicPrefix, Table table) { | |
- // do nothing ... | |
- return null; | |
- } | |
- | |
-} | |
diff --git a/debezium-core/src/main/java/io/debezium/relational/topic/TopicMapper.java b/debezium-core/src/main/java/io/debezium/relational/topic/TopicMapper.java | |
deleted file mode 100644 | |
index fd25233..0000000 | |
--- a/debezium-core/src/main/java/io/debezium/relational/topic/TopicMapper.java | |
+++ /dev/null | |
@@ -1,47 +0,0 @@ | |
-/* | |
- * Copyright Debezium Authors. | |
- * | |
- * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 | |
- */ | |
-package io.debezium.relational.topic; | |
- | |
-import org.apache.kafka.connect.data.Schema; | |
-import org.apache.kafka.connect.data.SchemaBuilder; | |
- | |
-import io.debezium.relational.Table; | |
- | |
-import java.util.Map; | |
- | |
-/** | |
- * @author David Leibovic | |
- */ | |
-public abstract class TopicMapper { | |
- | |
- /** | |
- * Get the name of the topic given for the table. | |
- * | |
- * @param topicPrefix prefix for the topic | |
- * @param table the table that we are getting the topic name for | |
- * @return the topic name; may be null if this strategy could not be applied | |
- */ | |
- abstract public String getTopicName(String topicPrefix, Table table); | |
- | |
- /** | |
- * Depending on your TopicMapper implementation and which rows in a database may occupy the same topic, | |
- * it may be necessary to enhance the key schema for the events to ensure each distinct record in the topic | |
- * has a unique key. | |
- * | |
- * @param keySchemaBuilder the {@link SchemaBuilder} for the key, pre-populated with the table's primary/unique key | |
- */ | |
- abstract public void enhanceKeySchema(SchemaBuilder keySchemaBuilder); | |
- | |
- /** | |
- * Get the extra key-value pairs necessary to add to the event's key. | |
- * @param schema the schema for the key; never null | |
- * @param topicPrefix prefix for the topic | |
- * @param table the table the event is for | |
- * @return the extra key-value pairs, or null if none are necessary. | |
- */ | |
- abstract public Map<String, Object> getNonRowFieldsToAddToKey(Schema schema, String topicPrefix, Table table); | |
- | |
-} | |
diff --git a/debezium-core/src/test/java/io/debezium/relational/TableSchemaBuilderTest.java b/debezium-core/src/test/java/io/debezium/relational/TableSchemaBuilderTest.java | |
index d566e10..57316eb 100644 | |
--- a/debezium-core/src/test/java/io/debezium/relational/TableSchemaBuilderTest.java | |
+++ b/debezium-core/src/test/java/io/debezium/relational/TableSchemaBuilderTest.java | |
@@ -20,8 +20,6 @@ | |
import io.debezium.jdbc.JdbcValueConverters; | |
import io.debezium.time.Date; | |
-import io.debezium.relational.topic.TopicMapper; | |
-import io.debezium.relational.topic.ByTableTopicMapper; | |
import io.debezium.util.AvroValidator; | |
public class TableSchemaBuilderTest { | |
@@ -36,11 +34,9 @@ | |
private Column c4; | |
private TableSchema schema; | |
private AvroValidator validator; | |
- private TopicMapper topicMapper; | |
@Before | |
public void beforeEach() { | |
- topicMapper = new ByTableTopicMapper(); | |
validator = AvroValidator.create((original,replacement, conflict)->{ | |
fail("Should not have come across an invalid schema name"); | |
}); | |
@@ -82,19 +78,19 @@ public void checkPreconditions() { | |
@Test(expected = NullPointerException.class) | |
public void shouldFailToBuildTableSchemaFromNullTable() { | |
- new TableSchemaBuilder(topicMapper, new JdbcValueConverters(),validator::validate).create(prefix,null); | |
+ new TableSchemaBuilder(new JdbcValueConverters(),validator::validate).create(prefix,null); | |
} | |
@Test | |
public void shouldBuildTableSchemaFromTable() { | |
- schema = new TableSchemaBuilder(topicMapper, new JdbcValueConverters(),validator::validate).create(prefix,table); | |
+ schema = new TableSchemaBuilder(new JdbcValueConverters(),validator::validate).create(prefix,table); | |
assertThat(schema).isNotNull(); | |
} | |
@Test | |
public void shouldBuildTableSchemaFromTableWithoutPrimaryKey() { | |
table = table.edit().setPrimaryKeyNames().create(); | |
- schema = new TableSchemaBuilder(topicMapper, new JdbcValueConverters(),validator::validate).create(prefix,table); | |
+ schema = new TableSchemaBuilder(new JdbcValueConverters(),validator::validate).create(prefix,table); | |
assertThat(schema).isNotNull(); | |
// Check the keys ... | |
assertThat(schema.keySchema()).isNull(); | |
diff --git a/pom.xml b/pom.xml | |
index f67d833..a777f33 100644 | |
--- a/pom.xml | |
+++ b/pom.xml | |
@@ -591,13 +591,13 @@ | |
<includeTestSourceDirectory>true</includeTestSourceDirectory> | |
</configuration> | |
<executions> | |
- <!--<execution> | |
+ <execution> | |
<id>check-style</id> | |
<phase>verify</phase> | |
<goals> | |
<goal>checkstyle</goal> | |
</goals> | |
- </execution>--> | |
+ </execution> | |
</executions> | |
</plugin> | |
</plugins> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment