Skip to content

Instantly share code, notes, and snippets.

@dasl-
Created April 12, 2017 20:58
Show Gist options
  • Save dasl-/483d2361feb9296d387b1762e2384be1 to your computer and use it in GitHub Desktop.
Save dasl-/483d2361feb9296d387b1762e2384be1 to your computer and use it in GitHub Desktop.
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java
index 6caa6e8..d9f4788 100644
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java
@@ -18,12 +18,23 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
-import com.github.shyiko.mysql.binlog.event.*;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.BinaryLogClient.LifecycleListener;
+import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
+import com.github.shyiko.mysql.binlog.event.Event;
+import com.github.shyiko.mysql.binlog.event.EventData;
+import com.github.shyiko.mysql.binlog.event.EventHeader;
+import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
+import com.github.shyiko.mysql.binlog.event.EventType;
+import com.github.shyiko.mysql.binlog.event.GtidEventData;
+import com.github.shyiko.mysql.binlog.event.QueryEventData;
+import com.github.shyiko.mysql.binlog.event.RotateEventData;
+import com.github.shyiko.mysql.binlog.event.TableMapEventData;
+import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
+import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
import com.github.shyiko.mysql.binlog.event.deserialization.GtidEventDataDeserializer;
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java
index 840ea75..e98442f 100644
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java
@@ -28,7 +28,6 @@
import io.debezium.relational.TableId;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.history.KafkaDatabaseHistory;
-import io.debezium.relational.topic.ByTableTopicMapper;
/**
* The configuration properties.
@@ -633,16 +632,6 @@ public static SecureConnectionMode parse(String value, String defaultValue) {
.withDescription("When enabled, Debezium will track schemas based on TABLE_METADATA events")
.withDefault(false);
- public static final Field TOPIC_MAPPER = Field.create("topic.mapper")
- .withDisplayName("Topic mapper")
- .withType(Type.CLASS)
- .withWidth(Width.LONG)
- .withImportance(Importance.LOW)
- .withDescription("The name of the TopicMapper class that should be used to determine how change events for tables should be mapped into topics. "
- + "Built in options include '" + ByTableTopicMapper.class.getName()
- + "' (the default).")
- .withDefault(ByTableTopicMapper.class.getName());
-
public static final Field SNAPSHOT_MODE = Field.create("snapshot.mode")
.withDisplayName("Snapshot mode")
.withEnum(SnapshotMode.class, SnapshotMode.INITIAL)
@@ -730,7 +719,7 @@ public static final Field MASK_COLUMN(int length) {
TABLE_WHITELIST, TABLE_BLACKLIST, TABLES_IGNORE_BUILTIN,
DATABASE_WHITELIST, DATABASE_BLACKLIST,
COLUMN_BLACKLIST, SNAPSHOT_MODE, SNAPSHOT_MINIMAL_LOCKING,
- TOPIC_MAPPER, GTID_SOURCE_INCLUDES, GTID_SOURCE_EXCLUDES,
+ GTID_SOURCE_INCLUDES, GTID_SOURCE_EXCLUDES,
GTID_SOURCE_FILTER_DML_EVENTS,
TIME_PRECISION_MODE, DECIMAL_HANDLING_MODE,
SSL_MODE, SSL_KEYSTORE, SSL_KEYSTORE_PASSWORD,
@@ -755,7 +744,7 @@ protected static ConfigDef configDef() {
KafkaDatabaseHistory.TOPIC, KafkaDatabaseHistory.RECOVERY_POLL_ATTEMPTS,
KafkaDatabaseHistory.RECOVERY_POLL_INTERVAL_MS, DATABASE_HISTORY);
Field.group(config, "Events", INCLUDE_SCHEMA_CHANGES, TABLES_IGNORE_BUILTIN, DATABASE_WHITELIST, TABLE_WHITELIST,
- COLUMN_BLACKLIST, TABLE_BLACKLIST, DATABASE_BLACKLIST, TOPIC_MAPPER,
+ COLUMN_BLACKLIST, TABLE_BLACKLIST, DATABASE_BLACKLIST,
GTID_SOURCE_INCLUDES, GTID_SOURCE_EXCLUDES, GTID_SOURCE_FILTER_DML_EVENTS);
Field.group(config, "Connector", CONNECTION_TIMEOUT_MS, KEEP_ALIVE, MAX_QUEUE_SIZE, MAX_BATCH_SIZE, POLL_INTERVAL_MS,
SNAPSHOT_MODE, SNAPSHOT_MINIMAL_LOCKING, TIME_PRECISION_MODE, DECIMAL_HANDLING_MODE);
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java
index 9786c13..2bbf988 100644
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java
@@ -35,7 +35,10 @@
private volatile ChainedReader readers;
/**
- * Create an instance of the log reader that uses Kafka to store database schema history and DDL changes.
+ * Create an instance of the log reader that uses Kafka to store database schema history and the
+ * {@link TopicSelector#defaultSelector(String) default topic selector} of "{@code <serverName>.<databaseName>.<tableName>}"
+ * for
+ * data and "{@code <serverName>}" for metadata.
*/
public MySqlConnectorTask() {
}
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java
index 3490317..32a0a7c 100644
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java
@@ -6,14 +6,17 @@
package io.debezium.connector.mysql;
import java.sql.SQLException;
-import java.sql.Types;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Callable;
import java.util.function.Predicate;
import com.github.shyiko.mysql.binlog.event.ColumnDescriptor;
import com.github.shyiko.mysql.binlog.event.TableMetadataEventData;
-import io.debezium.relational.*;
+
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
@@ -27,12 +30,12 @@
import io.debezium.document.Document;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.jdbc.JdbcValueConverters.DecimalMode;
+import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.TableSchema;
import io.debezium.relational.TableSchemaBuilder;
import io.debezium.relational.Tables;
-import io.debezium.relational.topic.TopicMapper;
import io.debezium.relational.ddl.DdlChanges;
import io.debezium.relational.ddl.DdlChanges.DatabaseStatementStringConsumer;
import io.debezium.relational.history.DatabaseHistory;
@@ -92,9 +95,6 @@ public MySqlSchema(Configuration config, String serverName, Predicate<String> gt
this.ddlChanges = new DdlChanges(this.ddlParser.terminator());
this.ddlParser.addListener(ddlChanges);
- // Set up the topic mapper ...
- TopicMapper topicMapper = config.getInstance(MySqlConnectorConfig.TOPIC_MAPPER, TopicMapper.class);
-
// Use MySQL-specific converters and schemas for values ...
String timePrecisionModeStr = config.getString(MySqlConnectorConfig.TIME_PRECISION_MODE);
TemporalPrecisionMode timePrecisionMode = TemporalPrecisionMode.parse(timePrecisionModeStr);
@@ -103,7 +103,7 @@ public MySqlSchema(Configuration config, String serverName, Predicate<String> gt
DecimalHandlingMode decimalHandlingMode = DecimalHandlingMode.parse(decimalHandlingModeStr);
DecimalMode decimalMode = decimalHandlingMode.asDecimalMode();
MySqlValueConverters valueConverters = new MySqlValueConverters(decimalMode, adaptiveTimePrecision);
- this.schemaBuilder = new TableSchemaBuilder(topicMapper, valueConverters, schemaNameValidator::validate);
+ this.schemaBuilder = new TableSchemaBuilder(valueConverters, schemaNameValidator::validate);
// Set up the server name and schema prefix ...
if (serverName != null) serverName = serverName.trim();
@@ -247,8 +247,6 @@ public TableSchema schemaFromMetadata(TableId id, TableMetadataEventData metadat
return schema;
}
-
-
/**
* Get the information about where the DDL statement history is recorded.
*
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java
index 07dafb8..78e9576 100644
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java
@@ -29,6 +29,7 @@
private final SourceInfo source;
private final MySqlSchema dbSchema;
+ private final TopicSelector topicSelector;
private final RecordMakers recordProcessor;
private final Predicate<String> gtidSourceFilter;
private final Clock clock = Clock.system();
@@ -36,6 +37,9 @@
public MySqlTaskContext(Configuration config) {
super(config);
+ // Set up the topic selector ...
+ this.topicSelector = TopicSelector.defaultSelector(serverName());
+
// Set up the source information ...
this.source = new SourceInfo();
this.source.setServerName(serverName());
@@ -43,20 +47,24 @@ public MySqlTaskContext(Configuration config) {
// Set up the GTID filter ...
String gtidSetIncludes = config.getString(MySqlConnectorConfig.GTID_SOURCE_INCLUDES);
String gtidSetExcludes = config.getString(MySqlConnectorConfig.GTID_SOURCE_EXCLUDES);
- this.gtidSourceFilter = gtidSetIncludes != null ? Predicates.includes(gtidSetIncludes)
- : (gtidSetExcludes != null ? Predicates.excludes(gtidSetExcludes) : null);
+ this.gtidSourceFilter = gtidSetIncludes != null ? Predicates.includesUuids(gtidSetIncludes)
+ : (gtidSetExcludes != null ? Predicates.excludesUuids(gtidSetExcludes) : null);
// Set up the MySQL schema ...
this.dbSchema = new MySqlSchema(config, serverName(), this.gtidSourceFilter);
// Set up the record processor ...
- this.recordProcessor = new RecordMakers(dbSchema, source, serverName());
+ this.recordProcessor = new RecordMakers(dbSchema, source, topicSelector);
}
public String connectorName() {
return config.getString("name");
}
+ public TopicSelector topicSelector() {
+ return topicSelector;
+ }
+
public SourceInfo source() {
return source;
}
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java
index 420b030..e0cbeee 100644
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java
@@ -38,7 +38,7 @@
private final Logger logger = LoggerFactory.getLogger(getClass());
private final MySqlSchema schema;
private final SourceInfo source;
- private final String ddlChangeTopicName;
+ private final TopicSelector topicSelector;
private final Map<Long, Converter> convertersByTableNumber = new HashMap<>();
private final Map<TableId, Long> tableNumbersByTableId = new HashMap<>();
private final Schema schemaChangeKeySchema;
@@ -50,12 +50,12 @@
*
* @param schema the schema information about the MySQL server databases; may not be null
* @param source the connector's source information; may not be null
- * @param ddlChangeTopicName the name of the topic to which DDL changes are written; may not be null
+ * @param topicSelector the selector for topic names; may not be null
*/
- public RecordMakers(MySqlSchema schema, SourceInfo source, String ddlChangeTopicName) {
+ public RecordMakers(MySqlSchema schema, SourceInfo source, TopicSelector topicSelector) {
this.schema = schema;
this.source = source;
- this.ddlChangeTopicName = ddlChangeTopicName;
+ this.topicSelector = topicSelector;
this.schemaChangeKeySchema = SchemaBuilder.struct()
.name(schemaNameValidator.validate("io.debezium.connector.mysql.SchemaChangeKey"))
.field(Fields.DATABASE_NAME, Schema.STRING_SCHEMA)
@@ -118,12 +118,12 @@ public RecordsForTable forTable(long tableNumber, BitSet includedColumns, Blocki
* @return the number of records produced; will be 0 or more
*/
public int schemaChanges(String databaseName, String ddlStatements, BlockingConsumer<SourceRecord> consumer) {
+ String topicName = topicSelector.getPrimaryTopic();
Integer partition = 0;
Struct key = schemaChangeRecordKey(databaseName);
Struct value = schemaChangeRecordValue(databaseName, ddlStatements);
SourceRecord record = new SourceRecord(source.partition(), source.offset(),
- ddlChangeTopicName, partition,
- schemaChangeKeySchema, key, schemaChangeValueSchema, value);
+ topicName, partition, schemaChangeKeySchema, key, schemaChangeValueSchema, value);
try {
consumer.accept(record);
return 1;
@@ -181,7 +181,7 @@ public boolean assign(long tableNumber, TableId id, TableMetadataEventData metad
}
if (tableSchema == null) return false;
- String topicName = tableSchema.getEnvelopeSchemaName();
+ String topicName = topicSelector.getTopic(id);
Envelope envelope = Envelope.defineSchema()
.withName(schemaNameValidator.validate(topicName + ".Envelope"))
.withRecord(tableSchema.valueSchema())
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/TopicSelector.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/TopicSelector.java
new file mode 100644
index 0000000..16574d2
--- /dev/null
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/TopicSelector.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.debezium.connector.mysql;
+
+import io.debezium.annotation.ThreadSafe;
+import io.debezium.relational.TableId;
+
+/**
+ * A function that determines the name of topics for data and metadata.
+ *
+ * @author Randall Hauch
+ */
+@ThreadSafe
+public interface TopicSelector {
+ /**
+ * Get the default topic selector logic, which uses a '.' delimiter character when needed.
+ *
+ * @param prefix the name of the prefix to be used for all topics; may not be null and must not terminate in the
+ * {@code delimiter}
+ * @return the topic selector; never null
+ */
+ static TopicSelector defaultSelector(String prefix) {
+ return defaultSelector(prefix,".");
+ }
+
+ /**
+ * Get the default topic selector logic, which uses the supplied delimiter character when needed.
+ *
+ * @param prefix the name of the prefix to be used for all topics; may not be null and must not terminate in the
+ * {@code delimiter}
+ * @param delimiter the string delineating the server, database, and table names; may not be null
+ * @return the topic selector; never null
+ */
+ static TopicSelector defaultSelector(String prefix, String delimiter) {
+ return new TopicSelector() {
+ /**
+ * Get the name of the topic for the given server, database, and table names. This method returns
+ * "{@code <serverName>}".
+ *
+ * @return the topic name; never null
+ */
+ @Override
+ public String getPrimaryTopic() {
+ return prefix;
+ }
+
+ /**
+ * Get the name of the topic for the given server name. This method returns
+ * "{@code <prefix>.<databaseName>.<tableName>}".
+ *
+ * @param databaseName the name of the database; may not be null
+ * @param tableName the name of the table; may not be null
+ * @return the topic name; never null
+ */
+ @Override
+ public String getTopic(String databaseName, String tableName) {
+ return String.join(delimiter, prefix, databaseName, tableName);
+ }
+
+ };
+ }
+
+ /**
+ * Get the name of the topic for the given server name.
+ *
+ * @param tableId the identifier of the table; may not be null
+ * @return the topic name; never null
+ */
+ default String getTopic(TableId tableId) {
+ return getTopic(tableId.catalog(),tableId.table());
+ }
+
+ /**
+ * Get the name of the topic for the given server name.
+ *
+ * @param databaseName the name of the database; may not be null
+ * @param tableName the name of the table; may not be null
+ * @return the topic name; never null
+ */
+ String getTopic(String databaseName, String tableName);
+
+ /**
+ * Get the name of the primary topic.
+ *
+ * @return the topic name; never null
+ */
+ String getPrimaryTopic();
+}
diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorTest.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorTest.java
index f612f50..8241387 100644
--- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorTest.java
+++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorTest.java
@@ -34,8 +34,7 @@ protected static void assertConfigDefIsValid(Connector connector, io.debezium.co
assertThat(key.importance).isEqualTo(expected.importance());
assertThat(key.documentation).isEqualTo(expected.description());
assertThat(key.type).isEqualTo(expected.type());
- if (expected.equals(MySqlConnectorConfig.DATABASE_HISTORY) || expected.equals(MySqlConnectorConfig.JDBC_DRIVER) ||
- expected.equals(MySqlConnectorConfig.TOPIC_MAPPER)) {
+ if (expected.equals(MySqlConnectorConfig.DATABASE_HISTORY) || expected.equals(MySqlConnectorConfig.JDBC_DRIVER)) {
assertThat(((Class<?>) key.defaultValue).getName()).isEqualTo((String) expected.defaultValue());
} else if (!expected.equals(MySqlConnectorConfig.SERVER_ID)) {
assertThat(key.defaultValue).isEqualTo(expected.defaultValue());
diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTaskContextIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTaskContextIT.java
index 6c5cc72..1ffd20a 100644
--- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTaskContextIT.java
+++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTaskContextIT.java
@@ -31,6 +31,7 @@ public void shouldCreateTaskFromConfiguration() throws Exception {
assertThat(context.logger()).isNotNull();
assertThat(context.makeRecord()).isNotNull();
assertThat(context.source()).isNotNull();
+ assertThat(context.topicSelector()).isNotNull();
assertThat(context.hostname()).isEqualTo(hostname);
assertThat(context.port()).isEqualTo(port);
diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchema.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchema.java
index 2d8adef..a5188b6 100644
--- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchema.java
+++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchema.java
@@ -27,7 +27,6 @@
import io.debezium.relational.TableSchema;
import io.debezium.relational.TableSchemaBuilder;
import io.debezium.relational.Tables;
-import io.debezium.relational.topic.ByTableTopicMapper;
import io.debezium.util.AvroValidator;
/**
@@ -64,7 +63,7 @@ protected PostgresSchema(PostgresConnectorConfig config) {
PostgresValueConverter valueConverter = new PostgresValueConverter(config.adaptiveTimePrecision(), ZoneOffset.UTC);
this.schemaNameValidator = AvroValidator.create(LOGGER)::validate;
- this.schemaBuilder = new TableSchemaBuilder(new ByTableTopicMapper(), valueConverter, this.schemaNameValidator);
+ this.schemaBuilder = new TableSchemaBuilder(valueConverter, this.schemaNameValidator);
// Set up the server name and schema prefix ...
String serverName = config.serverName();
diff --git a/debezium-core/src/main/java/io/debezium/relational/TableSchema.java b/debezium-core/src/main/java/io/debezium/relational/TableSchema.java
index 54dd6d4..79d555e 100644
--- a/debezium-core/src/main/java/io/debezium/relational/TableSchema.java
+++ b/debezium-core/src/main/java/io/debezium/relational/TableSchema.java
@@ -47,7 +47,6 @@
@Immutable
public class TableSchema {
- private final String envelopeSchemaName;
private final Schema keySchema;
private final Schema valueSchema;
private final Function<Object[], Object> keyGenerator;
@@ -57,8 +56,7 @@
/**
* Create an instance with the specified {@link Schema}s for the keys and values, and the functions that generate the
* key and value for a given row of data.
- *
- * @param envelopeSchemaName the name of the schema; may be null
+ *
* @param keySchema the schema for the primary key; may be null
* @param keyGenerator the function that converts a row into a single key object for Kafka Connect; may not be null but may
* return nulls
@@ -66,9 +64,8 @@
* @param valueGenerator the function that converts a row into a single value object for Kafka Connect; may not be null but
* may return nulls
*/
- public TableSchema(String envelopeSchemaName, Schema keySchema, Function<Object[], Object> keyGenerator,
+ public TableSchema(Schema keySchema, Function<Object[], Object> keyGenerator,
Schema valueSchema, Function<Object[], Struct> valueGenerator) {
- this.envelopeSchemaName = envelopeSchemaName;
this.keySchema = keySchema;
this.valueSchema = valueSchema;
this.keyGenerator = keyGenerator != null ? keyGenerator : (row) -> null;
@@ -76,14 +73,6 @@ public TableSchema(String envelopeSchemaName, Schema keySchema, Function<Object[
}
/**
- * Get the name of the envelope schema for both the key and value.
- * @return the envelope schema name; may be null
- */
- public String getEnvelopeSchemaName() {
- return envelopeSchemaName;
- }
-
- /**
* Get the {@link Schema} that represents the table's columns, excluding those that make up the {@link #keySchema()}.
*
* @return the Schema describing the columns in the table; never null
diff --git a/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java b/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java
index 3cb5efa..00a7c96 100644
--- a/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java
+++ b/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java
@@ -10,7 +10,6 @@
import java.sql.Types;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
@@ -30,7 +29,6 @@
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.mapping.ColumnMapper;
import io.debezium.relational.mapping.ColumnMappers;
-import io.debezium.relational.topic.TopicMapper;
/**
* Builder that constructs {@link TableSchema} instances for {@link Table} definitions.
@@ -51,21 +49,17 @@
private static final Logger LOGGER = LoggerFactory.getLogger(TableSchemaBuilder.class);
- private final TopicMapper topicMapper;
private final Function<String, String> schemaNameValidator;
private final ValueConverterProvider valueConverterProvider;
/**
* Create a new instance of the builder.
- *
- * @param topicMapper the TopicMapper for each table; may not be null
+ *
* @param valueConverterProvider the provider for obtaining {@link ValueConverter}s and {@link SchemaBuilder}s; may not be
* null
* @param schemaNameValidator the validation function for schema names; may not be null
*/
- public TableSchemaBuilder(TopicMapper topicMapper, ValueConverterProvider valueConverterProvider,
- Function<String, String> schemaNameValidator) {
- this.topicMapper = topicMapper;
+ public TableSchemaBuilder(ValueConverterProvider valueConverterProvider, Function<String, String> schemaNameValidator) {
this.schemaNameValidator = schemaNameValidator;
this.valueConverterProvider = valueConverterProvider;
}
@@ -95,7 +89,7 @@ public TableSchema create(ResultSet resultSet, String name) throws SQLException
Function<Object[], Struct> valueGenerator = createValueGenerator(valueSchema, id, columns, null, null);
// Finally create our result object with no primary key or key generator ...
- return new TableSchema(schemaName, null, null, valueSchema, valueGenerator);
+ return new TableSchema(null, null, valueSchema, valueGenerator);
}
/**
@@ -135,13 +129,11 @@ public TableSchema create(String schemaPrefix, Table table, Predicate<ColumnId>
if (schemaPrefix == null) schemaPrefix = "";
// Build the schemas ...
final TableId tableId = table.id();
- final String topicName = topicMapper.getTopicName(schemaPrefix, table);
- final String keySchemaName = schemaNameValidator.apply(topicName + ".Key");
- final String valueSchemaName = schemaNameValidator.apply(topicName + ".Value");
- final String envelopeSchemaName = schemaNameValidator.apply(topicName);
- LOGGER.debug("Mapping table '{}' to key schemas '{}' and value schema '{}'", tableId, keySchemaName, valueSchemaName);
- SchemaBuilder valSchemaBuilder = SchemaBuilder.struct().name(valueSchemaName);
- SchemaBuilder keySchemaBuilder = SchemaBuilder.struct().name(keySchemaName);
+ final String tableIdStr = tableId.toString();
+ final String schemaNamePrefix = schemaPrefix + tableIdStr;
+ LOGGER.debug("Mapping table '{}' to schemas under '{}'", tableId, schemaNamePrefix);
+ SchemaBuilder valSchemaBuilder = SchemaBuilder.struct().name(schemaNameValidator.apply(schemaNamePrefix + ".Value"));
+ SchemaBuilder keySchemaBuilder = SchemaBuilder.struct().name(schemaNameValidator.apply(schemaNamePrefix + ".Key"));
AtomicBoolean hasPrimaryKey = new AtomicBoolean(false);
table.columns().forEach(column -> {
if (table.isPrimaryKeyColumn(column.name())) {
@@ -155,9 +147,6 @@ public TableSchema create(String schemaPrefix, Table table, Predicate<ColumnId>
addField(valSchemaBuilder, column, mapper);
}
});
- // Enhance the key schema if necessary ...
- topicMapper.enhanceKeySchema(keySchemaBuilder);
- // Create the schemas ...
Schema valSchema = valSchemaBuilder.optional().build();
Schema keySchema = hasPrimaryKey.get() ? keySchemaBuilder.build() : null;
@@ -167,11 +156,11 @@ public TableSchema create(String schemaPrefix, Table table, Predicate<ColumnId>
}
// Create the generators ...
- Function<Object[], Object> keyGenerator = createKeyGenerator(keySchema, tableId, table, topicMapper, schemaPrefix);
+ Function<Object[], Object> keyGenerator = createKeyGenerator(keySchema, tableId, table.primaryKeyColumns());
Function<Object[], Struct> valueGenerator = createValueGenerator(valSchema, tableId, table.columns(), filter, mappers);
// And the table schema ...
- return new TableSchema(envelopeSchemaName, keySchema, keyGenerator, valSchema, valueGenerator);
+ return new TableSchema(keySchema, keyGenerator, valSchema, valueGenerator);
}
/**
@@ -180,21 +169,15 @@ public TableSchema create(String schemaPrefix, Table table, Predicate<ColumnId>
* @param schema the Kafka Connect schema for the key; may be null if there is no known schema, in which case the generator
* will be null
* @param columnSetName the name for the set of columns, used in error messages; may not be null
- * @param table the table for the row of data
- * @param topicMapper the TopicMapper for the table whose key we are generating; may not be null
- * @param schemaPrefix the prefix added to the table identifier to construct the schema names; may be null if there is no
- * prefix
+ * @param columns the column definitions for the table that defines the row; may not be null
* @return the key-generating function, or null if there is no key schema
*/
- protected Function<Object[], Object> createKeyGenerator(Schema schema, TableId columnSetName, Table table,
- TopicMapper topicMapper, String schemaPrefix) {
+ protected Function<Object[], Object> createKeyGenerator(Schema schema, TableId columnSetName, List<Column> columns) {
if (schema != null) {
- List<Column> columns = table.primaryKeyColumns();
int[] recordIndexes = indexesForColumns(columns);
Field[] fields = fieldsForColumns(schema, columns);
int numFields = recordIndexes.length;
ValueConverter[] converters = convertersForColumns(schema, columnSetName, columns, null, null);
- Map<String, Object> nonRowFieldsToAddToKey = topicMapper.getNonRowFieldsToAddToKey(schema, schemaPrefix, table);
return (row) -> {
Struct result = new Struct(schema);
for (int i = 0; i != numFields; ++i) {
@@ -212,12 +195,6 @@ public TableSchema create(String schemaPrefix, Table table, Predicate<ColumnId>
}
}
- if (nonRowFieldsToAddToKey != null) {
- for (Map.Entry<String, Object> nonRowField : nonRowFieldsToAddToKey.entrySet()) {
- result.put(nonRowField.getKey(), nonRowField.getValue());
- }
- }
-
return result;
};
}
diff --git a/debezium-core/src/main/java/io/debezium/relational/topic/ByLogicalTableTopicMapper.java b/debezium-core/src/main/java/io/debezium/relational/topic/ByLogicalTableTopicMapper.java
deleted file mode 100644
index 8f74809..0000000
--- a/debezium-core/src/main/java/io/debezium/relational/topic/ByLogicalTableTopicMapper.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Copyright Debezium Authors.
- *
- * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
- */
-package io.debezium.relational.topic;
-
-import org.apache.kafka.connect.data.Schema;
-import org.apache.kafka.connect.data.SchemaBuilder;
-
-import io.debezium.relational.Table;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * A logical table consists of one or more physical tables with the same schema. A common use case is sharding -- the
- * two physical tables `db_shard1.my_table` and `db_shard2.my_table` together form one logical table.
- *
- * This TopicMapper allows us to send change events from both physical tables to one topic.
- *
- * @author David Leibovic
- */
-public class ByLogicalTableTopicMapper extends TopicMapper {
-
- public String getTopicName(String topicPrefix, Table table) {
- final String fullyQualifiedTableName = composeFullyQualifiedTableName(topicPrefix, table);
- Pattern logicalTablePattern = Pattern.compile("^.*?(?=\\..+\\..+)\\.(?<logicalDb>etsy_.+?(?=(_\\d+\\.)|\\.))(_\\d+)?\\.(?<table>.+)$");
- Matcher logicalTableMatcher = logicalTablePattern.matcher(fullyQualifiedTableName);
- if (logicalTableMatcher.matches()) {
- return logicalTableMatcher.replaceAll("${logicalDb}.${table}");
- }
- return fullyQualifiedTableName;
- }
-
- public void enhanceKeySchema(SchemaBuilder keySchemaBuilder) {
- // Now that multiple physical tables can share a topic, the Key Schema can no longer consist of solely the
- // record's primary / unique key fields, since they are not guaranteed to be unique across tables. We need some
- // identifier added to the key that distinguishes the different physical tables.
- keySchemaBuilder.field("__dbz__physicalTableIdentifier", Schema.STRING_SCHEMA);
- }
-
- public Map<String, Object> getNonRowFieldsToAddToKey(Schema schema, String topicPrefix, Table table) {
- final String fullyQualifiedTableName = composeFullyQualifiedTableName(topicPrefix, table);
- Pattern physicalTableIdentifierPattern = Pattern.compile("^.*?(?=\\..+\\..+)\\.(?<logicalDb>etsy_.+?(?=\\.))\\.(?<table>.+)$");
- Matcher physicalTableIdentifierMatcher = physicalTableIdentifierPattern.matcher(fullyQualifiedTableName);
-
- final String physicalTableIdentifier;
- if (physicalTableIdentifierMatcher.matches()) {
- physicalTableIdentifier = physicalTableIdentifierMatcher.replaceAll("${logicalDb}");
- } else {
- physicalTableIdentifier = fullyQualifiedTableName;
- }
-
- Map<String, Object> nonRowFields = new HashMap<>();
- nonRowFields.put("__dbz__physicalTableIdentifier", physicalTableIdentifier);
- return nonRowFields;
- }
-
- private String composeFullyQualifiedTableName(String topicPrefix, Table table) {
- return topicPrefix + table.id().toString();
- }
-
-}
diff --git a/debezium-core/src/main/java/io/debezium/relational/topic/ByTableTopicMapper.java b/debezium-core/src/main/java/io/debezium/relational/topic/ByTableTopicMapper.java
deleted file mode 100644
index 1791b04..0000000
--- a/debezium-core/src/main/java/io/debezium/relational/topic/ByTableTopicMapper.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Copyright Debezium Authors.
- *
- * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
- */
-package io.debezium.relational.topic;
-
-import org.apache.kafka.connect.data.Schema;
-import org.apache.kafka.connect.data.SchemaBuilder;
-
-import io.debezium.relational.Table;
-
-import java.util.Map;
-
-/**
- * @author David Leibovic
- */
-public class ByTableTopicMapper extends TopicMapper {
-
- public String getTopicName(String topicPrefix, Table table) {
- return topicPrefix + table.id().toString();
- }
-
- public void enhanceKeySchema(SchemaBuilder keySchemaBuilder) {
- // do nothing ...
- }
-
- public Map<String, Object> getNonRowFieldsToAddToKey(Schema schema, String topicPrefix, Table table) {
- // do nothing ...
- return null;
- }
-
-}
diff --git a/debezium-core/src/main/java/io/debezium/relational/topic/TopicMapper.java b/debezium-core/src/main/java/io/debezium/relational/topic/TopicMapper.java
deleted file mode 100644
index fd25233..0000000
--- a/debezium-core/src/main/java/io/debezium/relational/topic/TopicMapper.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Copyright Debezium Authors.
- *
- * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
- */
-package io.debezium.relational.topic;
-
-import org.apache.kafka.connect.data.Schema;
-import org.apache.kafka.connect.data.SchemaBuilder;
-
-import io.debezium.relational.Table;
-
-import java.util.Map;
-
-/**
- * @author David Leibovic
- */
-public abstract class TopicMapper {
-
- /**
- * Get the name of the topic given for the table.
- *
- * @param topicPrefix prefix for the topic
- * @param table the table that we are getting the topic name for
- * @return the topic name; may be null if this strategy could not be applied
- */
- abstract public String getTopicName(String topicPrefix, Table table);
-
- /**
- * Depending on your TopicMapper implementation and which rows in a database may occupy the same topic,
- * it may be necessary to enhance the key schema for the events to ensure each distinct record in the topic
- * has a unique key.
- *
- * @param keySchemaBuilder the {@link SchemaBuilder} for the key, pre-populated with the table's primary/unique key
- */
- abstract public void enhanceKeySchema(SchemaBuilder keySchemaBuilder);
-
- /**
- * Get the extra key-value pairs necessary to add to the event's key.
- * @param schema the schema for the key; never null
- * @param topicPrefix prefix for the topic
- * @param table the table the event is for
- * @return the extra key-value pairs, or null if none are necessary.
- */
- abstract public Map<String, Object> getNonRowFieldsToAddToKey(Schema schema, String topicPrefix, Table table);
-
-}
diff --git a/debezium-core/src/test/java/io/debezium/relational/TableSchemaBuilderTest.java b/debezium-core/src/test/java/io/debezium/relational/TableSchemaBuilderTest.java
index d566e10..57316eb 100644
--- a/debezium-core/src/test/java/io/debezium/relational/TableSchemaBuilderTest.java
+++ b/debezium-core/src/test/java/io/debezium/relational/TableSchemaBuilderTest.java
@@ -20,8 +20,6 @@
import io.debezium.jdbc.JdbcValueConverters;
import io.debezium.time.Date;
-import io.debezium.relational.topic.TopicMapper;
-import io.debezium.relational.topic.ByTableTopicMapper;
import io.debezium.util.AvroValidator;
public class TableSchemaBuilderTest {
@@ -36,11 +34,9 @@
private Column c4;
private TableSchema schema;
private AvroValidator validator;
- private TopicMapper topicMapper;
@Before
public void beforeEach() {
- topicMapper = new ByTableTopicMapper();
validator = AvroValidator.create((original,replacement, conflict)->{
fail("Should not have come across an invalid schema name");
});
@@ -82,19 +78,19 @@ public void checkPreconditions() {
@Test(expected = NullPointerException.class)
public void shouldFailToBuildTableSchemaFromNullTable() {
- new TableSchemaBuilder(topicMapper, new JdbcValueConverters(),validator::validate).create(prefix,null);
+ new TableSchemaBuilder(new JdbcValueConverters(),validator::validate).create(prefix,null);
}
@Test
public void shouldBuildTableSchemaFromTable() {
- schema = new TableSchemaBuilder(topicMapper, new JdbcValueConverters(),validator::validate).create(prefix,table);
+ schema = new TableSchemaBuilder(new JdbcValueConverters(),validator::validate).create(prefix,table);
assertThat(schema).isNotNull();
}
@Test
public void shouldBuildTableSchemaFromTableWithoutPrimaryKey() {
table = table.edit().setPrimaryKeyNames().create();
- schema = new TableSchemaBuilder(topicMapper, new JdbcValueConverters(),validator::validate).create(prefix,table);
+ schema = new TableSchemaBuilder(new JdbcValueConverters(),validator::validate).create(prefix,table);
assertThat(schema).isNotNull();
// Check the keys ...
assertThat(schema.keySchema()).isNull();
diff --git a/pom.xml b/pom.xml
index f67d833..a777f33 100644
--- a/pom.xml
+++ b/pom.xml
@@ -591,13 +591,13 @@
<includeTestSourceDirectory>true</includeTestSourceDirectory>
</configuration>
<executions>
- <!--<execution>
+ <execution>
<id>check-style</id>
<phase>verify</phase>
<goals>
<goal>checkstyle</goal>
</goals>
- </execution>-->
+ </execution>
</executions>
</plugin>
</plugins>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment