Skip to content

Instantly share code, notes, and snippets.

@dasl-
Created April 11, 2017 21:25
Show Gist options
  • Save dasl-/e96c76dde05dcdc6694fd926f3b4286f to your computer and use it in GitHub Desktop.
Save dasl-/e96c76dde05dcdc6694fd926f3b4286f to your computer and use it in GitHub Desktop.
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java
index 2e85cde..6caa6e8 100644
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java
@@ -18,23 +18,12 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
+import com.github.shyiko.mysql.binlog.event.*;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.BinaryLogClient.LifecycleListener;
-import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
-import com.github.shyiko.mysql.binlog.event.Event;
-import com.github.shyiko.mysql.binlog.event.EventData;
-import com.github.shyiko.mysql.binlog.event.EventHeader;
-import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
-import com.github.shyiko.mysql.binlog.event.EventType;
-import com.github.shyiko.mysql.binlog.event.GtidEventData;
-import com.github.shyiko.mysql.binlog.event.QueryEventData;
-import com.github.shyiko.mysql.binlog.event.RotateEventData;
-import com.github.shyiko.mysql.binlog.event.TableMapEventData;
-import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
-import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
import com.github.shyiko.mysql.binlog.event.deserialization.GtidEventDataDeserializer;
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
@@ -118,6 +107,12 @@ public Event nextEvent(ByteArrayInputStream inputStream) throws IOException {
if (event.getHeader().getEventType() == EventType.TABLE_MAP) {
TableMapEventData tableMapEvent = event.getData();
tableMapEventByTableId.put(tableMapEvent.getTableId(), tableMapEvent);
+ } else if (event.getHeader().getEventType() == EventType.TABLE_METADATA) {
+ TableMetadataEventData tableMetadataEvent = event.getData();
+ TableMapEventData tableMapEvent = tableMapEventByTableId.get(tableMetadataEvent.getTableId());
+ if (tableMapEvent != null) {
+ tableMapEvent.setMetadataEventData(tableMetadataEvent, 0);
+ }
}
return event;
}
@@ -154,7 +149,7 @@ protected void doStart() {
eventHandlers.put(EventType.HEARTBEAT, this::handleServerHeartbeat);
eventHandlers.put(EventType.INCIDENT, this::handleServerIncident);
eventHandlers.put(EventType.ROTATE, this::handleRotateLogsEvent);
- eventHandlers.put(EventType.TABLE_MAP, this::handleUpdateTableMetadata);
+ eventHandlers.put(EventType.TABLE_MAP, this::handleTableMap);
eventHandlers.put(EventType.QUERY, this::handleQueryEvent);
eventHandlers.put(EventType.WRITE_ROWS, this::handleInsert);
eventHandlers.put(EventType.UPDATE_ROWS, this::handleUpdate);
@@ -164,6 +159,9 @@ protected void doStart() {
eventHandlers.put(EventType.EXT_DELETE_ROWS, this::handleDelete);
eventHandlers.put(EventType.VIEW_CHANGE, this::viewChange);
eventHandlers.put(EventType.XA_PREPARE, this::prepareTransaction);
+ eventHandlers.put(EventType.TABLE_METADATA, this::handleTableMetadata);
+ eventHandlers.put(EventType.VIEW_CHANGE, this::viewChange);
+ eventHandlers.put(EventType.XA_PREPARE, this::prepareTransaction);
// Get the current GtidSet from MySQL so we can get a filtered/merged GtidSet based off of the last Debezium checkpoint.
String availableServerGtidStr = context.knownGtidSet();
@@ -479,16 +477,37 @@ protected void handleQueryEvent(Event event) throws InterruptedException {
*
* @param event the update event; never null
*/
- protected void handleUpdateTableMetadata(Event event) {
- TableMapEventData metadata = unwrapData(event);
- long tableNumber = metadata.getTableId();
- String databaseName = metadata.getDatabase();
- String tableName = metadata.getTable();
+ protected void handleTableMap(Event event) {
+ if (context.config().getBoolean(MySqlConnectorConfig.EXPECT_METADATA_EVENTS)) {
+ return;
+ }
+ handleTableUpdate(unwrapData(event), null);
+ }
+
+ protected void handleTableMetadata(Event event) {
+ if (!context.config().getBoolean(MySqlConnectorConfig.EXPECT_METADATA_EVENTS)) {
+ return;
+ }
+ TableMetadataEventData meta = unwrapData(event);
+ handleTableUpdate(meta.getTableMapEventData(), meta);
+ }
+
+ private void handleTableUpdate(TableMapEventData map, TableMetadataEventData meta) {
+ if (map == null) {
+ logger.debug("Skipping table update (missing TABLE_MAP): {}", meta);
+ return;
+ } else if (meta == null && context.config().getBoolean(MySqlConnectorConfig.EXPECT_METADATA_EVENTS)) {
+ logger.debug("Skipping table update (missing TABLE_METADATA): {}", map);
+ return;
+ }
+ long tableNumber = map.getTableId();
+ String databaseName = map.getDatabase();
+ String tableName = map.getTable();
TableId tableId = new TableId(databaseName, null, tableName);
- if (recordMakers.assign(tableNumber, tableId)) {
- logger.debug("Received update table metadata event: {}", event);
+ if (recordMakers.assign(tableNumber, tableId, meta)) {
+ logger.debug("Received table update map({}) meta({})", map, meta);
} else {
- logger.debug("Skipping update table metadata event: {}", event);
+ logger.debug("Skipped table update map({}) meta({})", map, meta);
}
}
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java
index def91ed..840ea75 100644
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java
@@ -28,6 +28,7 @@
import io.debezium.relational.TableId;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.history.KafkaDatabaseHistory;
+import io.debezium.relational.topic.ByTableTopicMapper;
/**
* The configuration properties.
@@ -185,7 +186,17 @@ public static DecimalHandlingMode parse(String value, String defaultValue) {
/**
* Perform a snapshot and then stop before attempting to read the binlog.
*/
- INITIAL_ONLY("initial_only");
+ INITIAL_ONLY("initial_only"),
+
+ /**
+ * 1) Never performs a snapshot (gets schema info from twitter patch)
+ * 2) The first time Debezium connects in this mode, it sets Debezium's executed GTID set to be the same as
+ * that of MySQL server it is connecting to. This is necessary to allow Debezium to reconnect later.
+ * 3) The first time Debezium connects in this mode, it starts reading the binlog at the GTID corresponding to
+ * the last GTID in the executed GTID set we marked in step 2 (i.e. starts reading at the binlog's current
+ * position).
+ */
+ TWITTER_PATCH("twitter_patch");
private final String value;
@@ -614,6 +625,24 @@ public static SecureConnectionMode parse(String value, String defaultValue) {
+ "The default is 'true'. This is independent of how the connector internally records database history.")
.withDefault(true);
+ public static final Field EXPECT_METADATA_EVENTS = Field.create("database.mysql.expect_metadata_events")
+ .withDisplayName("Expect metadata events")
+ .withType(Type.BOOLEAN)
+ .withWidth(Width.SHORT)
+ .withImportance(Importance.MEDIUM)
+ .withDescription("When enabled, Debezium will track schemas based on TABLE_METADATA events")
+ .withDefault(false);
+
+ public static final Field TOPIC_MAPPER = Field.create("topic.mapper")
+ .withDisplayName("Topic mapper")
+ .withType(Type.CLASS)
+ .withWidth(Width.LONG)
+ .withImportance(Importance.LOW)
+ .withDescription("The name of the TopicMapper class that should be used to determine how change events for tables should be mapped into topics. "
+ + "Built in options include '" + ByTableTopicMapper.class.getName()
+ + "' (the default).")
+ .withDefault(ByTableTopicMapper.class.getName());
+
public static final Field SNAPSHOT_MODE = Field.create("snapshot.mode")
.withDisplayName("Snapshot mode")
.withEnum(SnapshotMode.class, SnapshotMode.INITIAL)
@@ -701,7 +730,7 @@ public static final Field MASK_COLUMN(int length) {
TABLE_WHITELIST, TABLE_BLACKLIST, TABLES_IGNORE_BUILTIN,
DATABASE_WHITELIST, DATABASE_BLACKLIST,
COLUMN_BLACKLIST, SNAPSHOT_MODE, SNAPSHOT_MINIMAL_LOCKING,
- GTID_SOURCE_INCLUDES, GTID_SOURCE_EXCLUDES,
+ TOPIC_MAPPER, GTID_SOURCE_INCLUDES, GTID_SOURCE_EXCLUDES,
GTID_SOURCE_FILTER_DML_EVENTS,
TIME_PRECISION_MODE, DECIMAL_HANDLING_MODE,
SSL_MODE, SSL_KEYSTORE, SSL_KEYSTORE_PASSWORD,
@@ -726,7 +755,7 @@ protected static ConfigDef configDef() {
KafkaDatabaseHistory.TOPIC, KafkaDatabaseHistory.RECOVERY_POLL_ATTEMPTS,
KafkaDatabaseHistory.RECOVERY_POLL_INTERVAL_MS, DATABASE_HISTORY);
Field.group(config, "Events", INCLUDE_SCHEMA_CHANGES, TABLES_IGNORE_BUILTIN, DATABASE_WHITELIST, TABLE_WHITELIST,
- COLUMN_BLACKLIST, TABLE_BLACKLIST, DATABASE_BLACKLIST,
+ COLUMN_BLACKLIST, TABLE_BLACKLIST, DATABASE_BLACKLIST, TOPIC_MAPPER,
GTID_SOURCE_INCLUDES, GTID_SOURCE_EXCLUDES, GTID_SOURCE_FILTER_DML_EVENTS);
Field.group(config, "Connector", CONNECTION_TIMEOUT_MS, KEEP_ALIVE, MAX_QUEUE_SIZE, MAX_BATCH_SIZE, POLL_INTERVAL_MS,
SNAPSHOT_MODE, SNAPSHOT_MINIMAL_LOCKING, TIME_PRECISION_MODE, DECIMAL_HANDLING_MODE);
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java
index 9482e24..9786c13 100644
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java
@@ -35,10 +35,7 @@
private volatile ChainedReader readers;
/**
- * Create an instance of the log reader that uses Kafka to store database schema history and the
- * {@link TopicSelector#defaultSelector(String) default topic selector} of "{@code <serverName>.<databaseName>.<tableName>}"
- * for
- * data and "{@code <serverName>}" for metadata.
+ * Create an instance of the log reader that uses Kafka to store database schema history and DDL changes.
*/
public MySqlConnectorTask() {
}
@@ -108,19 +105,43 @@ public synchronized void start(Map<String, String> props) {
} else {
// We have no recorded offsets ...
if (taskContext.isSnapshotNeverAllowed()) {
- // We're not allowed to take a snapshot, so instead we have to assume that the binlog contains the
- // full history of the database.
- logger.info("Found no existing offset and snapshots disallowed, so starting at beginning of binlog");
- source.setBinlogStartPoint("", 0L);// start from the beginning of the binlog
- taskContext.initializeHistory();
-
- // Look to see what the first available binlog file is called, and whether it looks like binlog files have
- // been purged. If so, then output a warning ...
- String earliestBinlogFilename = earliestBinlogFilename();
- if (earliestBinlogFilename == null) {
- logger.warn("No binlog appears to be available. Ensure that the MySQL row-level binlog is enabled.");
- } else if (!earliestBinlogFilename.endsWith("00001")) {
- logger.warn("It is possible the server has purged some binlogs. If this is the case, then using snapshot mode may be required.");
+ if (taskContext.isSnapshotModeTwitterPatch()) {
+ logger.info("Found no existing offset and we're in SnapshotMode.TWITTER_PATCH.");
+ taskContext.jdbc().query("SHOW MASTER STATUS", rs -> {
+ if (rs.next()) {
+ String binlogFilename = rs.getString(1);
+ long binlogPosition = rs.getLong(2);
+ source.setBinlogStartPoint(binlogFilename, binlogPosition);
+ if (rs.getMetaData().getColumnCount() > 4) {
+ // This column exists only in MySQL 5.6.5 or later ...
+ String gtidSet = rs.getString(5);// GTID set, may be null, blank, or contain a GTID set
+ source.setCompletedGtidSet(gtidSet);
+ logger.info("\t using binlog '{}' at position '{}' and gtid '{}'", binlogFilename, binlogPosition,
+ gtidSet);
+ } else {
+ logger.info("\t using binlog '{}' at position '{}'", binlogFilename, binlogPosition);
+ }
+ } else {
+ throw new IllegalStateException("Cannot read the binlog filename and position via 'SHOW MASTER STATUS'."
+ + " Make sure your server is correctly configured");
+ }
+ });
+ taskContext.initializeHistory();
+ } else { // We must be in SnapshotMode.NEVER
+ // We're not allowed to take a snapshot, so instead we have to assume that the binlog contains the
+ // full history of the database.
+ logger.info("Found no existing offset and snapshots disallowed, so starting at beginning of binlog");
+ source.setBinlogStartPoint("", 0L);// start from the beginning of the binlog
+ taskContext.initializeHistory();
+
+ // Look to see what the first available binlog file is called, and whether it looks like binlog files have
+ // been purged. If so, then output a warning ...
+ String earliestBinlogFilename = earliestBinlogFilename();
+ if (earliestBinlogFilename == null) {
+ logger.warn("No binlog appears to be available. Ensure that the MySQL row-level binlog is enabled.");
+ } else if (!earliestBinlogFilename.endsWith("00001")) {
+ logger.warn("It is possible the server has purged some binlogs. If this is the case, then using snapshot mode may be required.");
+ }
}
} else {
// We are allowed to use snapshots, and that is the best way to start ...
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java
index 990bf1e..3490317 100644
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java
@@ -6,12 +6,14 @@
package io.debezium.connector.mysql;
import java.sql.SQLException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
+import java.sql.Types;
+import java.util.*;
import java.util.concurrent.Callable;
import java.util.function.Predicate;
+import com.github.shyiko.mysql.binlog.event.ColumnDescriptor;
+import com.github.shyiko.mysql.binlog.event.TableMetadataEventData;
+import io.debezium.relational.*;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
@@ -30,6 +32,7 @@
import io.debezium.relational.TableSchema;
import io.debezium.relational.TableSchemaBuilder;
import io.debezium.relational.Tables;
+import io.debezium.relational.topic.TopicMapper;
import io.debezium.relational.ddl.DdlChanges;
import io.debezium.relational.ddl.DdlChanges.DatabaseStatementStringConsumer;
import io.debezium.relational.history.DatabaseHistory;
@@ -89,6 +92,9 @@ public MySqlSchema(Configuration config, String serverName, Predicate<String> gt
this.ddlChanges = new DdlChanges(this.ddlParser.terminator());
this.ddlParser.addListener(ddlChanges);
+ // Set up the topic mapper ...
+ TopicMapper topicMapper = config.getInstance(MySqlConnectorConfig.TOPIC_MAPPER, TopicMapper.class);
+
// Use MySQL-specific converters and schemas for values ...
String timePrecisionModeStr = config.getString(MySqlConnectorConfig.TIME_PRECISION_MODE);
TemporalPrecisionMode timePrecisionMode = TemporalPrecisionMode.parse(timePrecisionModeStr);
@@ -97,7 +103,7 @@ public MySqlSchema(Configuration config, String serverName, Predicate<String> gt
DecimalHandlingMode decimalHandlingMode = DecimalHandlingMode.parse(decimalHandlingModeStr);
DecimalMode decimalMode = decimalHandlingMode.asDecimalMode();
MySqlValueConverters valueConverters = new MySqlValueConverters(decimalMode, adaptiveTimePrecision);
- this.schemaBuilder = new TableSchemaBuilder(valueConverters, schemaNameValidator::validate);
+ this.schemaBuilder = new TableSchemaBuilder(topicMapper, valueConverters, schemaNameValidator::validate);
// Set up the server name and schema prefix ...
if (serverName != null) serverName = serverName.trim();
@@ -196,6 +202,53 @@ public TableSchema schemaFor(TableId id) {
return filters.tableFilter().test(id) ? tableSchemaByTableId.get(id) : null;
}
+ public TableSchema schemaFromMetadata(TableId id, TableMetadataEventData metadata) {
+ if (!filters.tableFilter().test(id)) {
+ return null;
+ }
+ TableSchema schema = tableSchemaByTableId.get(id);
+
+ if (schema != null && schema.getTableVersionId() == metadata.getTableId()) {
+ return schema;
+ }
+
+ List<Column> columnDefs = new ArrayList<Column>();
+ List<String> pkNames = new ArrayList<String>();
+ int pos = 1;
+ for (ColumnDescriptor descriptor : metadata.getColumnDescriptors()) {
+ // See https://github.com/percona/percona-server/blob/5.6/include/mysql_com.h#L95 for bitmasks
+ boolean isPkey = (descriptor.getFlags() & 2) == 2;
+ boolean isAutoIncr = (descriptor.getFlags() & 512) == 512;
+ boolean hasDefault = (descriptor.getFlags() & 4096) != 4096;
+ boolean isNullable = (descriptor.getFlags() & 1) != 1;
+ boolean isOptional = !isPkey && (hasDefault || isNullable);
+ Column col = Column.editor()
+ .name(descriptor.getName())
+ .position(pos)
+ .jdbcType(MysqlToJdbc.mysqlToJdbcType(descriptor.getType())) // TODO test
+ .type(descriptor.getTypeName())
+ .charsetName(null) // TODO test (descriptor.getCharacterSet())
+ .length(descriptor.getLength())
+ .scale(descriptor.getScale())
+ .optional(isOptional) // TODO test
+ .generated(false) // TODO test
+ .autoIncremented(isAutoIncr)
+ .create();
+ columnDefs.add(col);
+ if (isPkey) {
+ pkNames.add(descriptor.getName());
+ }
+ pos++;
+ }
+ tables.overwriteTable(id, columnDefs, pkNames, null); // TODO test null charset
+ schema = schemaBuilder.create(schemaPrefix, tables.forTable(id), filters.columnFilter(), filters.columnMappers());
+ schema.setTableVersionId(metadata.getTableId());
+ tableSchemaByTableId.put(id, schema);
+ return schema;
+ }
+
+
+
/**
* Get the information about where the DDL statement history is recorded.
*
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java
index 9ea86f5..07dafb8 100644
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java
@@ -29,7 +29,6 @@
private final SourceInfo source;
private final MySqlSchema dbSchema;
- private final TopicSelector topicSelector;
private final RecordMakers recordProcessor;
private final Predicate<String> gtidSourceFilter;
private final Clock clock = Clock.system();
@@ -37,9 +36,6 @@
public MySqlTaskContext(Configuration config) {
super(config);
- // Set up the topic selector ...
- this.topicSelector = TopicSelector.defaultSelector(serverName());
-
// Set up the source information ...
this.source = new SourceInfo();
this.source.setServerName(serverName());
@@ -47,24 +43,20 @@ public MySqlTaskContext(Configuration config) {
// Set up the GTID filter ...
String gtidSetIncludes = config.getString(MySqlConnectorConfig.GTID_SOURCE_INCLUDES);
String gtidSetExcludes = config.getString(MySqlConnectorConfig.GTID_SOURCE_EXCLUDES);
- this.gtidSourceFilter = gtidSetIncludes != null ? Predicates.includesUuids(gtidSetIncludes)
- : (gtidSetExcludes != null ? Predicates.excludesUuids(gtidSetExcludes) : null);
+ this.gtidSourceFilter = gtidSetIncludes != null ? Predicates.includes(gtidSetIncludes)
+ : (gtidSetExcludes != null ? Predicates.excludes(gtidSetExcludes) : null);
// Set up the MySQL schema ...
this.dbSchema = new MySqlSchema(config, serverName(), this.gtidSourceFilter);
// Set up the record processor ...
- this.recordProcessor = new RecordMakers(dbSchema, source, topicSelector);
+ this.recordProcessor = new RecordMakers(dbSchema, source, serverName());
}
public String connectorName() {
return config.getString("name");
}
- public TopicSelector topicSelector() {
- return topicSelector;
- }
-
public SourceInfo source() {
return source;
}
@@ -173,7 +165,11 @@ public boolean isSnapshotAllowedWhenNeeded() {
}
public boolean isSnapshotNeverAllowed() {
- return snapshotMode() == SnapshotMode.NEVER;
+ return snapshotMode() == SnapshotMode.NEVER || snapshotMode() == SnapshotMode.TWITTER_PATCH;
+ }
+
+ public boolean isSnapshotModeTwitterPatch() {
+ return snapshotMode() == SnapshotMode.TWITTER_PATCH;
}
public boolean isInitialSnapshotOnly() {
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MysqlToJdbc.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MysqlToJdbc.java
new file mode 100644
index 0000000..2305566
--- /dev/null
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MysqlToJdbc.java
@@ -0,0 +1,78 @@
+package io.debezium.connector.mysql;
+
+import java.sql.Types;
+
+public class MysqlToJdbc {
+
+ public static int mysqlToJdbcType(int mysqlType) {
+ // See https://github.com/percona/percona-server/blob/1e2f003a5bd48763c27e37542d97cd8f59d98eaa/libbinlogevents/export/binary_log_types.h#L38
+ // See https://github.com/debezium/debezium/blob/v0.3.6/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlDdlParser.java#L75
+ /*
+ MYSQL_TYPE_DECIMAL, // 0
+ MYSQL_TYPE_TINY, // 1
+ MYSQL_TYPE_SHORT, // 2
+ MYSQL_TYPE_LONG, // 3
+ MYSQL_TYPE_FLOAT, // 4
+ MYSQL_TYPE_DOUBLE, // 5
+ MYSQL_TYPE_NULL, // 6
+ MYSQL_TYPE_TIMESTAMP, // 7
+ MYSQL_TYPE_LONGLONG, // 8
+ MYSQL_TYPE_INT24, // 9
+ MYSQL_TYPE_DATE, // 10
+ MYSQL_TYPE_TIME, // 11
+ MYSQL_TYPE_DATETIME, // 12
+ MYSQL_TYPE_YEAR, // 13
+ MYSQL_TYPE_NEWDATE, // 14
+ MYSQL_TYPE_VARCHAR, // 15
+ MYSQL_TYPE_BIT, // 16
+ MYSQL_TYPE_TIMESTAMP2, // 17
+ MYSQL_TYPE_DATETIME2, // 18
+ MYSQL_TYPE_TIME2, // 19
+ MYSQL_TYPE_JSON=245,
+ MYSQL_TYPE_NEWDECIMAL=246,
+ MYSQL_TYPE_ENUM=247,
+ MYSQL_TYPE_SET=248,
+ MYSQL_TYPE_TINY_BLOB=249,
+ MYSQL_TYPE_MEDIUM_BLOB=250,
+ MYSQL_TYPE_LONG_BLOB=251,
+ MYSQL_TYPE_BLOB=252,
+ MYSQL_TYPE_VAR_STRING=253,
+ MYSQL_TYPE_STRING=254,
+ MYSQL_TYPE_GEOMETRY=255
+ */
+ switch (mysqlType) {
+ case 0: return Types.DECIMAL;
+ case 1: return Types.SMALLINT;
+ case 2: return Types.SMALLINT;
+ case 3: return Types.BIGINT;
+ case 4: return Types.FLOAT;
+ case 5: return Types.DOUBLE;
+ case 6: return Types.NULL;
+ case 7: return Types.TIMESTAMP;
+ case 8: return Types.BIGINT;
+ case 9: return Types.INTEGER;
+ case 10: return Types.DATE;
+ case 11: return Types.TIME;
+ case 12: return Types.TIMESTAMP;
+ case 13: return Types.INTEGER;
+ case 14: return Types.DATE;
+ case 15: return Types.VARCHAR;
+ case 16: return Types.BIT;
+ case 17: return Types.TIMESTAMP;
+ case 18: return Types.TIMESTAMP;
+ case 19: return Types.TIME;
+ case 245: return Types.OTHER;
+ case 246: return Types.DECIMAL;
+ case 247: return Types.CHAR;
+ case 248: return Types.CHAR;
+ case 249: return Types.BLOB;
+ case 250: return Types.BLOB;
+ case 251: return Types.BLOB;
+ case 252: return Types.BLOB;
+ case 253: return Types.VARCHAR;
+ case 254: return Types.CHAR;
+ case 255: return Types.OTHER;
+ default: return Types.OTHER;
+ }
+ }
+}
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java
index 0f5da85..420b030 100644
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java
@@ -12,6 +12,7 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
+import com.github.shyiko.mysql.binlog.event.TableMetadataEventData;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
@@ -37,7 +38,7 @@
private final Logger logger = LoggerFactory.getLogger(getClass());
private final MySqlSchema schema;
private final SourceInfo source;
- private final TopicSelector topicSelector;
+ private final String ddlChangeTopicName;
private final Map<Long, Converter> convertersByTableNumber = new HashMap<>();
private final Map<TableId, Long> tableNumbersByTableId = new HashMap<>();
private final Schema schemaChangeKeySchema;
@@ -49,12 +50,12 @@
*
* @param schema the schema information about the MySQL server databases; may not be null
* @param source the connector's source information; may not be null
- * @param topicSelector the selector for topic names; may not be null
+ * @param ddlChangeTopicName the name of the topic to which DDL changes are written; may not be null
*/
- public RecordMakers(MySqlSchema schema, SourceInfo source, TopicSelector topicSelector) {
+ public RecordMakers(MySqlSchema schema, SourceInfo source, String ddlChangeTopicName) {
this.schema = schema;
this.source = source;
- this.topicSelector = topicSelector;
+ this.ddlChangeTopicName = ddlChangeTopicName;
this.schemaChangeKeySchema = SchemaBuilder.struct()
.name(schemaNameValidator.validate("io.debezium.connector.mysql.SchemaChangeKey"))
.field(Fields.DATABASE_NAME, Schema.STRING_SCHEMA)
@@ -117,12 +118,12 @@ public RecordsForTable forTable(long tableNumber, BitSet includedColumns, Blocki
* @return the number of records produced; will be 0 or more
*/
public int schemaChanges(String databaseName, String ddlStatements, BlockingConsumer<SourceRecord> consumer) {
- String topicName = topicSelector.getPrimaryTopic();
Integer partition = 0;
Struct key = schemaChangeRecordKey(databaseName);
Struct value = schemaChangeRecordValue(databaseName, ddlStatements);
SourceRecord record = new SourceRecord(source.partition(), source.offset(),
- topicName, partition, schemaChangeKeySchema, key, schemaChangeValueSchema, value);
+ ddlChangeTopicName, partition,
+ schemaChangeKeySchema, key, schemaChangeValueSchema, value);
try {
consumer.accept(record);
return 1;
@@ -151,7 +152,7 @@ public void regenerate() {
Set<TableId> tableIds = schema.tables().tableIds();
logger.debug("Regenerating converters for {} tables", tableIds.size());
tableIds.forEach(id -> {
- assign(nextTableNumber.incrementAndGet(), id);
+ assign(nextTableNumber.incrementAndGet(), id, null);
});
}
@@ -163,20 +164,27 @@ public void regenerate() {
* @return {@code true} if the assignment was successful, or {@code false} if the table is currently excluded in the
* connector's configuration
*/
- public boolean assign(long tableNumber, TableId id) {
+ public boolean assign(long tableNumber, TableId id, TableMetadataEventData metadata) {
Long existingTableNumber = tableNumbersByTableId.get(id);
if (existingTableNumber != null && existingTableNumber.longValue() == tableNumber
&& convertersByTableNumber.containsKey(tableNumber)) {
// This is the exact same table number for the same table, so do nothing ...
return true;
}
- TableSchema tableSchema = schema.schemaFor(id);
+
+ // Get schema from either registry or metadata event
+ TableSchema tableSchema;
+ if (metadata != null) {
+ tableSchema = schema.schemaFromMetadata(id, metadata);
+ } else {
+ tableSchema = schema.schemaFor(id);
+ }
if (tableSchema == null) return false;
- String topicName = topicSelector.getTopic(id);
+ String topicName = tableSchema.getEnvelopeSchemaName();
Envelope envelope = Envelope.defineSchema()
.withName(schemaNameValidator.validate(topicName + ".Envelope"))
- .withRecord(schema.schemaFor(id).valueSchema())
+ .withRecord(tableSchema.valueSchema())
.withSource(SourceInfo.SCHEMA)
.build();
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/TopicSelector.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/TopicSelector.java
deleted file mode 100644
index 16574d2..0000000
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/TopicSelector.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Copyright Debezium Authors.
- *
- * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
- */
-package io.debezium.connector.mysql;
-
-import io.debezium.annotation.ThreadSafe;
-import io.debezium.relational.TableId;
-
-/**
- * A function that determines the name of topics for data and metadata.
- *
- * @author Randall Hauch
- */
-@ThreadSafe
-public interface TopicSelector {
- /**
- * Get the default topic selector logic, which uses a '.' delimiter character when needed.
- *
- * @param prefix the name of the prefix to be used for all topics; may not be null and must not terminate in the
- * {@code delimiter}
- * @return the topic selector; never null
- */
- static TopicSelector defaultSelector(String prefix) {
- return defaultSelector(prefix,".");
- }
-
- /**
- * Get the default topic selector logic, which uses the supplied delimiter character when needed.
- *
- * @param prefix the name of the prefix to be used for all topics; may not be null and must not terminate in the
- * {@code delimiter}
- * @param delimiter the string delineating the server, database, and table names; may not be null
- * @return the topic selector; never null
- */
- static TopicSelector defaultSelector(String prefix, String delimiter) {
- return new TopicSelector() {
- /**
- * Get the name of the topic for the given server, database, and table names. This method returns
- * "{@code <serverName>}".
- *
- * @return the topic name; never null
- */
- @Override
- public String getPrimaryTopic() {
- return prefix;
- }
-
- /**
- * Get the name of the topic for the given server name. This method returns
- * "{@code <prefix>.<databaseName>.<tableName>}".
- *
- * @param databaseName the name of the database; may not be null
- * @param tableName the name of the table; may not be null
- * @return the topic name; never null
- */
- @Override
- public String getTopic(String databaseName, String tableName) {
- return String.join(delimiter, prefix, databaseName, tableName);
- }
-
- };
- }
-
- /**
- * Get the name of the topic for the given server name.
- *
- * @param tableId the identifier of the table; may not be null
- * @return the topic name; never null
- */
- default String getTopic(TableId tableId) {
- return getTopic(tableId.catalog(),tableId.table());
- }
-
- /**
- * Get the name of the topic for the given server name.
- *
- * @param databaseName the name of the database; may not be null
- * @param tableName the name of the table; may not be null
- * @return the topic name; never null
- */
- String getTopic(String databaseName, String tableName);
-
- /**
- * Get the name of the primary topic.
- *
- * @return the topic name; never null
- */
- String getPrimaryTopic();
-}
diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorTest.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorTest.java
index 8241387..f612f50 100644
--- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorTest.java
+++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorTest.java
@@ -34,7 +34,8 @@ protected static void assertConfigDefIsValid(Connector connector, io.debezium.co
assertThat(key.importance).isEqualTo(expected.importance());
assertThat(key.documentation).isEqualTo(expected.description());
assertThat(key.type).isEqualTo(expected.type());
- if (expected.equals(MySqlConnectorConfig.DATABASE_HISTORY) || expected.equals(MySqlConnectorConfig.JDBC_DRIVER)) {
+ if (expected.equals(MySqlConnectorConfig.DATABASE_HISTORY) || expected.equals(MySqlConnectorConfig.JDBC_DRIVER) ||
+ expected.equals(MySqlConnectorConfig.TOPIC_MAPPER)) {
assertThat(((Class<?>) key.defaultValue).getName()).isEqualTo((String) expected.defaultValue());
} else if (!expected.equals(MySqlConnectorConfig.SERVER_ID)) {
assertThat(key.defaultValue).isEqualTo(expected.defaultValue());
diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTaskContextIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTaskContextIT.java
index 1ffd20a..6c5cc72 100644
--- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTaskContextIT.java
+++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTaskContextIT.java
@@ -31,7 +31,6 @@ public void shouldCreateTaskFromConfiguration() throws Exception {
assertThat(context.logger()).isNotNull();
assertThat(context.makeRecord()).isNotNull();
assertThat(context.source()).isNotNull();
- assertThat(context.topicSelector()).isNotNull();
assertThat(context.hostname()).isEqualTo(hostname);
assertThat(context.port()).isEqualTo(port);
diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchema.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchema.java
index a5188b6..2d8adef 100644
--- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchema.java
+++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchema.java
@@ -27,6 +27,7 @@
import io.debezium.relational.TableSchema;
import io.debezium.relational.TableSchemaBuilder;
import io.debezium.relational.Tables;
+import io.debezium.relational.topic.ByTableTopicMapper;
import io.debezium.util.AvroValidator;
/**
@@ -63,7 +64,7 @@ protected PostgresSchema(PostgresConnectorConfig config) {
PostgresValueConverter valueConverter = new PostgresValueConverter(config.adaptiveTimePrecision(), ZoneOffset.UTC);
this.schemaNameValidator = AvroValidator.create(LOGGER)::validate;
- this.schemaBuilder = new TableSchemaBuilder(valueConverter, this.schemaNameValidator);
+ this.schemaBuilder = new TableSchemaBuilder(new ByTableTopicMapper(), valueConverter, this.schemaNameValidator);
// Set up the server name and schema prefix ...
String serverName = config.serverName();
diff --git a/debezium-core/src/main/java/io/debezium/relational/TableSchema.java b/debezium-core/src/main/java/io/debezium/relational/TableSchema.java
index 778fd0d..54dd6d4 100644
--- a/debezium-core/src/main/java/io/debezium/relational/TableSchema.java
+++ b/debezium-core/src/main/java/io/debezium/relational/TableSchema.java
@@ -47,15 +47,18 @@
@Immutable
public class TableSchema {
+ private final String envelopeSchemaName;
private final Schema keySchema;
private final Schema valueSchema;
private final Function<Object[], Object> keyGenerator;
private final Function<Object[], Struct> valueGenerator;
+ private long tableVersionId = -1;
/**
* Create an instance with the specified {@link Schema}s for the keys and values, and the functions that generate the
* key and value for a given row of data.
- *
+ *
+ * @param envelopeSchemaName the name of the schema; may be null
* @param keySchema the schema for the primary key; may be null
* @param keyGenerator the function that converts a row into a single key object for Kafka Connect; may not be null but may
* return nulls
@@ -63,8 +66,9 @@
* @param valueGenerator the function that converts a row into a single value object for Kafka Connect; may not be null but
* may return nulls
*/
- public TableSchema(Schema keySchema, Function<Object[], Object> keyGenerator,
+ public TableSchema(String envelopeSchemaName, Schema keySchema, Function<Object[], Object> keyGenerator,
Schema valueSchema, Function<Object[], Struct> valueGenerator) {
+ this.envelopeSchemaName = envelopeSchemaName;
this.keySchema = keySchema;
this.valueSchema = valueSchema;
this.keyGenerator = keyGenerator != null ? keyGenerator : (row) -> null;
@@ -72,6 +76,14 @@ public TableSchema(Schema keySchema, Function<Object[], Object> keyGenerator,
}
/**
+ * Get the name of the envelope schema for both the key and value.
+ * @return the envelope schema name; may be null
+ */
+ public String getEnvelopeSchemaName() {
+ return envelopeSchemaName;
+ }
+
+ /**
* Get the {@link Schema} that represents the table's columns, excluding those that make up the {@link #keySchema()}.
*
* @return the Schema describing the columns in the table; never null
@@ -110,7 +122,15 @@ public Object keyFromColumnData(Object[] columnData) {
public Struct valueFromColumnData(Object[] columnData) {
return columnData == null ? null : valueGenerator.apply(columnData);
}
-
+
+ public void setTableVersionId(long id) {
+ tableVersionId = id;
+ }
+
+ public long getTableVersionId() {
+ return tableVersionId;
+ }
+
@Override
public int hashCode() {
return valueSchema().hashCode();
diff --git a/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java b/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java
index 27345e4..3cb5efa 100644
--- a/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java
+++ b/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java
@@ -10,6 +10,7 @@
import java.sql.Types;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
@@ -29,6 +30,7 @@
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.mapping.ColumnMapper;
import io.debezium.relational.mapping.ColumnMappers;
+import io.debezium.relational.topic.TopicMapper;
/**
* Builder that constructs {@link TableSchema} instances for {@link Table} definitions.
@@ -49,17 +51,21 @@
private static final Logger LOGGER = LoggerFactory.getLogger(TableSchemaBuilder.class);
+ private final TopicMapper topicMapper;
private final Function<String, String> schemaNameValidator;
private final ValueConverterProvider valueConverterProvider;
/**
* Create a new instance of the builder.
- *
+ *
+ * @param topicMapper the TopicMapper for each table; may not be null
* @param valueConverterProvider the provider for obtaining {@link ValueConverter}s and {@link SchemaBuilder}s; may not be
* null
* @param schemaNameValidator the validation function for schema names; may not be null
*/
- public TableSchemaBuilder(ValueConverterProvider valueConverterProvider, Function<String, String> schemaNameValidator) {
+ public TableSchemaBuilder(TopicMapper topicMapper, ValueConverterProvider valueConverterProvider,
+ Function<String, String> schemaNameValidator) {
+ this.topicMapper = topicMapper;
this.schemaNameValidator = schemaNameValidator;
this.valueConverterProvider = valueConverterProvider;
}
@@ -89,7 +95,7 @@ public TableSchema create(ResultSet resultSet, String name) throws SQLException
Function<Object[], Struct> valueGenerator = createValueGenerator(valueSchema, id, columns, null, null);
// Finally create our result object with no primary key or key generator ...
- return new TableSchema(null, null, valueSchema, valueGenerator);
+ return new TableSchema(schemaName, null, null, valueSchema, valueGenerator);
}
/**
@@ -129,11 +135,13 @@ public TableSchema create(String schemaPrefix, Table table, Predicate<ColumnId>
if (schemaPrefix == null) schemaPrefix = "";
// Build the schemas ...
final TableId tableId = table.id();
- final String tableIdStr = tableId.toString();
- final String schemaNamePrefix = schemaPrefix + tableIdStr;
- LOGGER.debug("Mapping table '{}' to schemas under '{}'", tableId, schemaNamePrefix);
- SchemaBuilder valSchemaBuilder = SchemaBuilder.struct().name(schemaNameValidator.apply(schemaNamePrefix + ".Value"));
- SchemaBuilder keySchemaBuilder = SchemaBuilder.struct().name(schemaNameValidator.apply(schemaNamePrefix + ".Key"));
+ final String topicName = topicMapper.getTopicName(schemaPrefix, table);
+ final String keySchemaName = schemaNameValidator.apply(topicName + ".Key");
+ final String valueSchemaName = schemaNameValidator.apply(topicName + ".Value");
+ final String envelopeSchemaName = schemaNameValidator.apply(topicName);
+ LOGGER.debug("Mapping table '{}' to key schemas '{}' and value schema '{}'", tableId, keySchemaName, valueSchemaName);
+ SchemaBuilder valSchemaBuilder = SchemaBuilder.struct().name(valueSchemaName);
+ SchemaBuilder keySchemaBuilder = SchemaBuilder.struct().name(keySchemaName);
AtomicBoolean hasPrimaryKey = new AtomicBoolean(false);
table.columns().forEach(column -> {
if (table.isPrimaryKeyColumn(column.name())) {
@@ -147,6 +155,9 @@ public TableSchema create(String schemaPrefix, Table table, Predicate<ColumnId>
addField(valSchemaBuilder, column, mapper);
}
});
+ // Enhance the key schema if necessary ...
+ topicMapper.enhanceKeySchema(keySchemaBuilder);
+ // Create the schemas ...
Schema valSchema = valSchemaBuilder.optional().build();
Schema keySchema = hasPrimaryKey.get() ? keySchemaBuilder.build() : null;
@@ -156,11 +167,11 @@ public TableSchema create(String schemaPrefix, Table table, Predicate<ColumnId>
}
// Create the generators ...
- Function<Object[], Object> keyGenerator = createKeyGenerator(keySchema, tableId, table.primaryKeyColumns());
+ Function<Object[], Object> keyGenerator = createKeyGenerator(keySchema, tableId, table, topicMapper, schemaPrefix);
Function<Object[], Struct> valueGenerator = createValueGenerator(valSchema, tableId, table.columns(), filter, mappers);
// And the table schema ...
- return new TableSchema(keySchema, keyGenerator, valSchema, valueGenerator);
+ return new TableSchema(envelopeSchemaName, keySchema, keyGenerator, valSchema, valueGenerator);
}
/**
@@ -169,15 +180,21 @@ public TableSchema create(String schemaPrefix, Table table, Predicate<ColumnId>
* @param schema the Kafka Connect schema for the key; may be null if there is no known schema, in which case the generator
* will be null
* @param columnSetName the name for the set of columns, used in error messages; may not be null
- * @param columns the column definitions for the table that defines the row; may not be null
+ * @param table the table for the row of data
+ * @param topicMapper the TopicMapper for the table whose key we are generating; may not be null
+ * @param schemaPrefix the prefix added to the table identifier to construct the schema names; may be null if there is no
+ * prefix
* @return the key-generating function, or null if there is no key schema
*/
- protected Function<Object[], Object> createKeyGenerator(Schema schema, TableId columnSetName, List<Column> columns) {
+ protected Function<Object[], Object> createKeyGenerator(Schema schema, TableId columnSetName, Table table,
+ TopicMapper topicMapper, String schemaPrefix) {
if (schema != null) {
+ List<Column> columns = table.primaryKeyColumns();
int[] recordIndexes = indexesForColumns(columns);
Field[] fields = fieldsForColumns(schema, columns);
int numFields = recordIndexes.length;
ValueConverter[] converters = convertersForColumns(schema, columnSetName, columns, null, null);
+ Map<String, Object> nonRowFieldsToAddToKey = topicMapper.getNonRowFieldsToAddToKey(schema, schemaPrefix, table);
return (row) -> {
Struct result = new Struct(schema);
for (int i = 0; i != numFields; ++i) {
@@ -194,6 +211,13 @@ public TableSchema create(String schemaPrefix, Table table, Predicate<ColumnId>
}
}
}
+
+ if (nonRowFieldsToAddToKey != null) {
+ for (Map.Entry<String, Object> nonRowField : nonRowFieldsToAddToKey.entrySet()) {
+ result.put(nonRowField.getKey(), nonRowField.getValue());
+ }
+ }
+
return result;
};
}
diff --git a/debezium-core/src/main/java/io/debezium/relational/topic/ByLogicalTableTopicMapper.java b/debezium-core/src/main/java/io/debezium/relational/topic/ByLogicalTableTopicMapper.java
new file mode 100644
index 0000000..8f74809
--- /dev/null
+++ b/debezium-core/src/main/java/io/debezium/relational/topic/ByLogicalTableTopicMapper.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.debezium.relational.topic;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+
+import io.debezium.relational.Table;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * A logical table consists of one or more physical tables with the same schema. A common use case is sharding -- the
+ * two physical tables `db_shard1.my_table` and `db_shard2.my_table` together form one logical table.
+ *
+ * This TopicMapper allows us to send change events from both physical tables to one topic.
+ *
+ * @author David Leibovic
+ */
+public class ByLogicalTableTopicMapper extends TopicMapper {
+
+ public String getTopicName(String topicPrefix, Table table) {
+ final String fullyQualifiedTableName = composeFullyQualifiedTableName(topicPrefix, table);
+ Pattern logicalTablePattern = Pattern.compile("^.*?(?=\\..+\\..+)\\.(?<logicalDb>etsy_.+?(?=(_\\d+\\.)|\\.))(_\\d+)?\\.(?<table>.+)$");
+ Matcher logicalTableMatcher = logicalTablePattern.matcher(fullyQualifiedTableName);
+ if (logicalTableMatcher.matches()) {
+ return logicalTableMatcher.replaceAll("${logicalDb}.${table}");
+ }
+ return fullyQualifiedTableName;
+ }
+
+ public void enhanceKeySchema(SchemaBuilder keySchemaBuilder) {
+ // Now that multiple physical tables can share a topic, the Key Schema can no longer consist of solely the
+ // record's primary / unique key fields, since they are not guaranteed to be unique across tables. We need some
+ // identifier added to the key that distinguishes the different physical tables.
+ keySchemaBuilder.field("__dbz__physicalTableIdentifier", Schema.STRING_SCHEMA);
+ }
+
+ public Map<String, Object> getNonRowFieldsToAddToKey(Schema schema, String topicPrefix, Table table) {
+ final String fullyQualifiedTableName = composeFullyQualifiedTableName(topicPrefix, table);
+ Pattern physicalTableIdentifierPattern = Pattern.compile("^.*?(?=\\..+\\..+)\\.(?<logicalDb>etsy_.+?(?=\\.))\\.(?<table>.+)$");
+ Matcher physicalTableIdentifierMatcher = physicalTableIdentifierPattern.matcher(fullyQualifiedTableName);
+
+ final String physicalTableIdentifier;
+ if (physicalTableIdentifierMatcher.matches()) {
+ physicalTableIdentifier = physicalTableIdentifierMatcher.replaceAll("${logicalDb}");
+ } else {
+ physicalTableIdentifier = fullyQualifiedTableName;
+ }
+
+ Map<String, Object> nonRowFields = new HashMap<>();
+ nonRowFields.put("__dbz__physicalTableIdentifier", physicalTableIdentifier);
+ return nonRowFields;
+ }
+
+ private String composeFullyQualifiedTableName(String topicPrefix, Table table) {
+ return topicPrefix + table.id().toString();
+ }
+
+}
diff --git a/debezium-core/src/main/java/io/debezium/relational/topic/ByTableTopicMapper.java b/debezium-core/src/main/java/io/debezium/relational/topic/ByTableTopicMapper.java
new file mode 100644
index 0000000..1791b04
--- /dev/null
+++ b/debezium-core/src/main/java/io/debezium/relational/topic/ByTableTopicMapper.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.debezium.relational.topic;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+
+import io.debezium.relational.Table;
+
+import java.util.Map;
+
+/**
+ * @author David Leibovic
+ */
+public class ByTableTopicMapper extends TopicMapper {
+
+ public String getTopicName(String topicPrefix, Table table) {
+ return topicPrefix + table.id().toString();
+ }
+
+ public void enhanceKeySchema(SchemaBuilder keySchemaBuilder) {
+ // do nothing ...
+ }
+
+ public Map<String, Object> getNonRowFieldsToAddToKey(Schema schema, String topicPrefix, Table table) {
+ // do nothing ...
+ return null;
+ }
+
+}
diff --git a/debezium-core/src/main/java/io/debezium/relational/topic/TopicMapper.java b/debezium-core/src/main/java/io/debezium/relational/topic/TopicMapper.java
new file mode 100644
index 0000000..fd25233
--- /dev/null
+++ b/debezium-core/src/main/java/io/debezium/relational/topic/TopicMapper.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.debezium.relational.topic;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+
+import io.debezium.relational.Table;
+
+import java.util.Map;
+
+/**
+ * @author David Leibovic
+ */
+public abstract class TopicMapper {
+
+ /**
+ * Get the name of the topic given for the table.
+ *
+ * @param topicPrefix prefix for the topic
+ * @param table the table that we are getting the topic name for
+ * @return the topic name; may be null if this strategy could not be applied
+ */
+ abstract public String getTopicName(String topicPrefix, Table table);
+
+ /**
+ * Depending on your TopicMapper implementation and which rows in a database may occupy the same topic,
+ * it may be necessary to enhance the key schema for the events to ensure each distinct record in the topic
+ * has a unique key.
+ *
+ * @param keySchemaBuilder the {@link SchemaBuilder} for the key, pre-populated with the table's primary/unique key
+ */
+ abstract public void enhanceKeySchema(SchemaBuilder keySchemaBuilder);
+
+ /**
+ * Get the extra key-value pairs necessary to add to the event's key.
+ * @param schema the schema for the key; never null
+ * @param topicPrefix prefix for the topic
+ * @param table the table the event is for
+ * @return the extra key-value pairs, or null if none are necessary.
+ */
+ abstract public Map<String, Object> getNonRowFieldsToAddToKey(Schema schema, String topicPrefix, Table table);
+
+}
diff --git a/debezium-core/src/test/java/io/debezium/relational/TableSchemaBuilderTest.java b/debezium-core/src/test/java/io/debezium/relational/TableSchemaBuilderTest.java
index 57316eb..d566e10 100644
--- a/debezium-core/src/test/java/io/debezium/relational/TableSchemaBuilderTest.java
+++ b/debezium-core/src/test/java/io/debezium/relational/TableSchemaBuilderTest.java
@@ -20,6 +20,8 @@
import io.debezium.jdbc.JdbcValueConverters;
import io.debezium.time.Date;
+import io.debezium.relational.topic.TopicMapper;
+import io.debezium.relational.topic.ByTableTopicMapper;
import io.debezium.util.AvroValidator;
public class TableSchemaBuilderTest {
@@ -34,9 +36,11 @@
private Column c4;
private TableSchema schema;
private AvroValidator validator;
+ private TopicMapper topicMapper;
@Before
public void beforeEach() {
+ topicMapper = new ByTableTopicMapper();
validator = AvroValidator.create((original,replacement, conflict)->{
fail("Should not have come across an invalid schema name");
});
@@ -78,19 +82,19 @@ public void checkPreconditions() {
@Test(expected = NullPointerException.class)
public void shouldFailToBuildTableSchemaFromNullTable() {
- new TableSchemaBuilder(new JdbcValueConverters(),validator::validate).create(prefix,null);
+ new TableSchemaBuilder(topicMapper, new JdbcValueConverters(),validator::validate).create(prefix,null);
}
@Test
public void shouldBuildTableSchemaFromTable() {
- schema = new TableSchemaBuilder(new JdbcValueConverters(),validator::validate).create(prefix,table);
+ schema = new TableSchemaBuilder(topicMapper, new JdbcValueConverters(),validator::validate).create(prefix,table);
assertThat(schema).isNotNull();
}
@Test
public void shouldBuildTableSchemaFromTableWithoutPrimaryKey() {
table = table.edit().setPrimaryKeyNames().create();
- schema = new TableSchemaBuilder(new JdbcValueConverters(),validator::validate).create(prefix,table);
+ schema = new TableSchemaBuilder(topicMapper, new JdbcValueConverters(),validator::validate).create(prefix,table);
assertThat(schema).isNotNull();
// Check the keys ...
assertThat(schema.keySchema()).isNull();
diff --git a/pom.xml b/pom.xml
index 595be01..f67d833 100644
--- a/pom.xml
+++ b/pom.xml
@@ -59,10 +59,10 @@
<version.confluent.platform>3.2.0</version.confluent.platform>
<!-- Databases -->
- <version.postgresql.driver>42.0.0-SNAPSHOT</version.postgresql.driver>
- <version.mysql.server>5.7</version.mysql.server>
+ <version.postgresql.driver>42.0.0</version.postgresql.driver>
+ <version.mysql.server>5.6</version.mysql.server>
<version.mysql.driver>5.1.40</version.mysql.driver>
- <version.mysql.binlog>0.9.0</version.mysql.binlog>
+ <version.mysql.binlog>0.11.0</version.mysql.binlog>
<version.mongo.server>3.2.12</version.mongo.server>
<version.mongo.driver>3.4.2</version.mongo.driver>
@@ -591,13 +591,13 @@
<includeTestSourceDirectory>true</includeTestSourceDirectory>
</configuration>
<executions>
- <execution>
+ <!--<execution>
<id>check-style</id>
<phase>verify</phase>
<goals>
<goal>checkstyle</goal>
</goals>
- </execution>
+ </execution>-->
</executions>
</plugin>
</plugins>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment