Created
April 11, 2017 21:25
-
-
Save dasl-/e96c76dde05dcdc6694fd926f3b4286f to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java | |
index 2e85cde..6caa6e8 100644 | |
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java | |
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java | |
@@ -18,23 +18,12 @@ | |
import java.util.concurrent.atomic.AtomicLong; | |
import java.util.function.Predicate; | |
+import com.github.shyiko.mysql.binlog.event.*; | |
import org.apache.kafka.connect.errors.ConnectException; | |
import org.apache.kafka.connect.source.SourceRecord; | |
import com.github.shyiko.mysql.binlog.BinaryLogClient; | |
import com.github.shyiko.mysql.binlog.BinaryLogClient.LifecycleListener; | |
-import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData; | |
-import com.github.shyiko.mysql.binlog.event.Event; | |
-import com.github.shyiko.mysql.binlog.event.EventData; | |
-import com.github.shyiko.mysql.binlog.event.EventHeader; | |
-import com.github.shyiko.mysql.binlog.event.EventHeaderV4; | |
-import com.github.shyiko.mysql.binlog.event.EventType; | |
-import com.github.shyiko.mysql.binlog.event.GtidEventData; | |
-import com.github.shyiko.mysql.binlog.event.QueryEventData; | |
-import com.github.shyiko.mysql.binlog.event.RotateEventData; | |
-import com.github.shyiko.mysql.binlog.event.TableMapEventData; | |
-import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData; | |
-import com.github.shyiko.mysql.binlog.event.WriteRowsEventData; | |
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer; | |
import com.github.shyiko.mysql.binlog.event.deserialization.GtidEventDataDeserializer; | |
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream; | |
@@ -118,6 +107,12 @@ public Event nextEvent(ByteArrayInputStream inputStream) throws IOException { | |
if (event.getHeader().getEventType() == EventType.TABLE_MAP) { | |
TableMapEventData tableMapEvent = event.getData(); | |
tableMapEventByTableId.put(tableMapEvent.getTableId(), tableMapEvent); | |
+ } else if (event.getHeader().getEventType() == EventType.TABLE_METADATA) { | |
+ TableMetadataEventData tableMetadataEvent = event.getData(); | |
+ TableMapEventData tableMapEvent = tableMapEventByTableId.get(tableMetadataEvent.getTableId()); | |
+ if (tableMapEvent != null) { | |
+ tableMapEvent.setMetadataEventData(tableMetadataEvent, 0); | |
+ } | |
} | |
return event; | |
} | |
@@ -154,7 +149,7 @@ protected void doStart() { | |
eventHandlers.put(EventType.HEARTBEAT, this::handleServerHeartbeat); | |
eventHandlers.put(EventType.INCIDENT, this::handleServerIncident); | |
eventHandlers.put(EventType.ROTATE, this::handleRotateLogsEvent); | |
- eventHandlers.put(EventType.TABLE_MAP, this::handleUpdateTableMetadata); | |
+ eventHandlers.put(EventType.TABLE_MAP, this::handleTableMap); | |
eventHandlers.put(EventType.QUERY, this::handleQueryEvent); | |
eventHandlers.put(EventType.WRITE_ROWS, this::handleInsert); | |
eventHandlers.put(EventType.UPDATE_ROWS, this::handleUpdate); | |
@@ -164,6 +159,9 @@ protected void doStart() { | |
eventHandlers.put(EventType.EXT_DELETE_ROWS, this::handleDelete); | |
eventHandlers.put(EventType.VIEW_CHANGE, this::viewChange); | |
eventHandlers.put(EventType.XA_PREPARE, this::prepareTransaction); | |
+ eventHandlers.put(EventType.TABLE_METADATA, this::handleTableMetadata); | |
+ eventHandlers.put(EventType.VIEW_CHANGE, this::viewChange); | |
+ eventHandlers.put(EventType.XA_PREPARE, this::prepareTransaction); | |
// Get the current GtidSet from MySQL so we can get a filtered/merged GtidSet based off of the last Debezium checkpoint. | |
String availableServerGtidStr = context.knownGtidSet(); | |
@@ -479,16 +477,37 @@ protected void handleQueryEvent(Event event) throws InterruptedException { | |
* | |
* @param event the update event; never null | |
*/ | |
- protected void handleUpdateTableMetadata(Event event) { | |
- TableMapEventData metadata = unwrapData(event); | |
- long tableNumber = metadata.getTableId(); | |
- String databaseName = metadata.getDatabase(); | |
- String tableName = metadata.getTable(); | |
+ protected void handleTableMap(Event event) { | |
+ if (context.config().getBoolean(MySqlConnectorConfig.EXPECT_METADATA_EVENTS)) { | |
+ return; | |
+ } | |
+ handleTableUpdate(unwrapData(event), null); | |
+ } | |
+ | |
+ protected void handleTableMetadata(Event event) { | |
+ if (!context.config().getBoolean(MySqlConnectorConfig.EXPECT_METADATA_EVENTS)) { | |
+ return; | |
+ } | |
+ TableMetadataEventData meta = unwrapData(event); | |
+ handleTableUpdate(meta.getTableMapEventData(), meta); | |
+ } | |
+ | |
+ private void handleTableUpdate(TableMapEventData map, TableMetadataEventData meta) { | |
+ if (map == null) { | |
+ logger.debug("Skipping table update (missing TABLE_MAP): {}", meta); | |
+ return; | |
+ } else if (meta == null && context.config().getBoolean(MySqlConnectorConfig.EXPECT_METADATA_EVENTS)) { | |
+ logger.debug("Skipping table update (missing TABLE_METADATA): {}", map); | |
+ return; | |
+ } | |
+ long tableNumber = map.getTableId(); | |
+ String databaseName = map.getDatabase(); | |
+ String tableName = map.getTable(); | |
TableId tableId = new TableId(databaseName, null, tableName); | |
- if (recordMakers.assign(tableNumber, tableId)) { | |
- logger.debug("Received update table metadata event: {}", event); | |
+ if (recordMakers.assign(tableNumber, tableId, meta)) { | |
+ logger.debug("Received table update map({}) meta({})", map, meta); | |
} else { | |
- logger.debug("Skipping update table metadata event: {}", event); | |
+ logger.debug("Skipped table update map({}) meta({})", map, meta); | |
} | |
} | |
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java | |
index def91ed..840ea75 100644 | |
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java | |
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java | |
@@ -28,6 +28,7 @@ | |
import io.debezium.relational.TableId; | |
import io.debezium.relational.history.DatabaseHistory; | |
import io.debezium.relational.history.KafkaDatabaseHistory; | |
+import io.debezium.relational.topic.ByTableTopicMapper; | |
/** | |
* The configuration properties. | |
@@ -185,7 +186,17 @@ public static DecimalHandlingMode parse(String value, String defaultValue) { | |
/** | |
* Perform a snapshot and then stop before attempting to read the binlog. | |
*/ | |
- INITIAL_ONLY("initial_only"); | |
+ INITIAL_ONLY("initial_only"), | |
+ | |
+ /** | |
+ * 1) Never performs a snapshot (gets schema info from twitter patch) | |
+ * 2) The first time Debezium connects in this mode, it sets Debezium's executed GTID set to be the same as | |
+ * that of MySQL server it is connecting to. This is necessary to allow Debezium to reconnect later. | |
+ * 3) The first time Debezium connects in this mode, it starts reading the binlog at the GTID corresponding to | |
+ * the last GTID in the executed GTID set we marked in step 2 (i.e. starts reading at the binlog's current | |
+ * position). | |
+ */ | |
+ TWITTER_PATCH("twitter_patch"); | |
private final String value; | |
@@ -614,6 +625,24 @@ public static SecureConnectionMode parse(String value, String defaultValue) { | |
+ "The default is 'true'. This is independent of how the connector internally records database history.") | |
.withDefault(true); | |
+ public static final Field EXPECT_METADATA_EVENTS = Field.create("database.mysql.expect_metadata_events") | |
+ .withDisplayName("Expect metadata events") | |
+ .withType(Type.BOOLEAN) | |
+ .withWidth(Width.SHORT) | |
+ .withImportance(Importance.MEDIUM) | |
+ .withDescription("When enabled, Debezium will track schemas based on TABLE_METADATA events") | |
+ .withDefault(false); | |
+ | |
+ public static final Field TOPIC_MAPPER = Field.create("topic.mapper") | |
+ .withDisplayName("Topic mapper") | |
+ .withType(Type.CLASS) | |
+ .withWidth(Width.LONG) | |
+ .withImportance(Importance.LOW) | |
+ .withDescription("The name of the TopicMapper class that should be used to determine how change events for tables should be mapped into topics. " | |
+ + "Built in options include '" + ByTableTopicMapper.class.getName() | |
+ + "' (the default).") | |
+ .withDefault(ByTableTopicMapper.class.getName()); | |
+ | |
public static final Field SNAPSHOT_MODE = Field.create("snapshot.mode") | |
.withDisplayName("Snapshot mode") | |
.withEnum(SnapshotMode.class, SnapshotMode.INITIAL) | |
@@ -701,7 +730,7 @@ public static final Field MASK_COLUMN(int length) { | |
TABLE_WHITELIST, TABLE_BLACKLIST, TABLES_IGNORE_BUILTIN, | |
DATABASE_WHITELIST, DATABASE_BLACKLIST, | |
COLUMN_BLACKLIST, SNAPSHOT_MODE, SNAPSHOT_MINIMAL_LOCKING, | |
- GTID_SOURCE_INCLUDES, GTID_SOURCE_EXCLUDES, | |
+ TOPIC_MAPPER, GTID_SOURCE_INCLUDES, GTID_SOURCE_EXCLUDES, | |
GTID_SOURCE_FILTER_DML_EVENTS, | |
TIME_PRECISION_MODE, DECIMAL_HANDLING_MODE, | |
SSL_MODE, SSL_KEYSTORE, SSL_KEYSTORE_PASSWORD, | |
@@ -726,7 +755,7 @@ protected static ConfigDef configDef() { | |
KafkaDatabaseHistory.TOPIC, KafkaDatabaseHistory.RECOVERY_POLL_ATTEMPTS, | |
KafkaDatabaseHistory.RECOVERY_POLL_INTERVAL_MS, DATABASE_HISTORY); | |
Field.group(config, "Events", INCLUDE_SCHEMA_CHANGES, TABLES_IGNORE_BUILTIN, DATABASE_WHITELIST, TABLE_WHITELIST, | |
- COLUMN_BLACKLIST, TABLE_BLACKLIST, DATABASE_BLACKLIST, | |
+ COLUMN_BLACKLIST, TABLE_BLACKLIST, DATABASE_BLACKLIST, TOPIC_MAPPER, | |
GTID_SOURCE_INCLUDES, GTID_SOURCE_EXCLUDES, GTID_SOURCE_FILTER_DML_EVENTS); | |
Field.group(config, "Connector", CONNECTION_TIMEOUT_MS, KEEP_ALIVE, MAX_QUEUE_SIZE, MAX_BATCH_SIZE, POLL_INTERVAL_MS, | |
SNAPSHOT_MODE, SNAPSHOT_MINIMAL_LOCKING, TIME_PRECISION_MODE, DECIMAL_HANDLING_MODE); | |
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java | |
index 9482e24..9786c13 100644 | |
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java | |
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java | |
@@ -35,10 +35,7 @@ | |
private volatile ChainedReader readers; | |
/** | |
- * Create an instance of the log reader that uses Kafka to store database schema history and the | |
- * {@link TopicSelector#defaultSelector(String) default topic selector} of "{@code <serverName>.<databaseName>.<tableName>}" | |
- * for | |
- * data and "{@code <serverName>}" for metadata. | |
+ * Create an instance of the log reader that uses Kafka to store database schema history and DDL changes. | |
*/ | |
public MySqlConnectorTask() { | |
} | |
@@ -108,19 +105,43 @@ public synchronized void start(Map<String, String> props) { | |
} else { | |
// We have no recorded offsets ... | |
if (taskContext.isSnapshotNeverAllowed()) { | |
- // We're not allowed to take a snapshot, so instead we have to assume that the binlog contains the | |
- // full history of the database. | |
- logger.info("Found no existing offset and snapshots disallowed, so starting at beginning of binlog"); | |
- source.setBinlogStartPoint("", 0L);// start from the beginning of the binlog | |
- taskContext.initializeHistory(); | |
- | |
- // Look to see what the first available binlog file is called, and whether it looks like binlog files have | |
- // been purged. If so, then output a warning ... | |
- String earliestBinlogFilename = earliestBinlogFilename(); | |
- if (earliestBinlogFilename == null) { | |
- logger.warn("No binlog appears to be available. Ensure that the MySQL row-level binlog is enabled."); | |
- } else if (!earliestBinlogFilename.endsWith("00001")) { | |
- logger.warn("It is possible the server has purged some binlogs. If this is the case, then using snapshot mode may be required."); | |
+ if (taskContext.isSnapshotModeTwitterPatch()) { | |
+ logger.info("Found no existing offset and we're in SnapshotMode.TWITTER_PATCH."); | |
+ taskContext.jdbc().query("SHOW MASTER STATUS", rs -> { | |
+ if (rs.next()) { | |
+ String binlogFilename = rs.getString(1); | |
+ long binlogPosition = rs.getLong(2); | |
+ source.setBinlogStartPoint(binlogFilename, binlogPosition); | |
+ if (rs.getMetaData().getColumnCount() > 4) { | |
+ // This column exists only in MySQL 5.6.5 or later ... | |
+ String gtidSet = rs.getString(5);// GTID set, may be null, blank, or contain a GTID set | |
+ source.setCompletedGtidSet(gtidSet); | |
+ logger.info("\t using binlog '{}' at position '{}' and gtid '{}'", binlogFilename, binlogPosition, | |
+ gtidSet); | |
+ } else { | |
+ logger.info("\t using binlog '{}' at position '{}'", binlogFilename, binlogPosition); | |
+ } | |
+ } else { | |
+ throw new IllegalStateException("Cannot read the binlog filename and position via 'SHOW MASTER STATUS'." | |
+ + " Make sure your server is correctly configured"); | |
+ } | |
+ }); | |
+ taskContext.initializeHistory(); | |
+ } else { // We must be in SnapshotMode.NEVER | |
+ // We're not allowed to take a snapshot, so instead we have to assume that the binlog contains the | |
+ // full history of the database. | |
+ logger.info("Found no existing offset and snapshots disallowed, so starting at beginning of binlog"); | |
+ source.setBinlogStartPoint("", 0L);// start from the beginning of the binlog | |
+ taskContext.initializeHistory(); | |
+ | |
+ // Look to see what the first available binlog file is called, and whether it looks like binlog files have | |
+ // been purged. If so, then output a warning ... | |
+ String earliestBinlogFilename = earliestBinlogFilename(); | |
+ if (earliestBinlogFilename == null) { | |
+ logger.warn("No binlog appears to be available. Ensure that the MySQL row-level binlog is enabled."); | |
+ } else if (!earliestBinlogFilename.endsWith("00001")) { | |
+ logger.warn("It is possible the server has purged some binlogs. If this is the case, then using snapshot mode may be required."); | |
+ } | |
} | |
} else { | |
// We are allowed to use snapshots, and that is the best way to start ... | |
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java | |
index 990bf1e..3490317 100644 | |
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java | |
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java | |
@@ -6,12 +6,14 @@ | |
package io.debezium.connector.mysql; | |
import java.sql.SQLException; | |
-import java.util.HashMap; | |
-import java.util.Map; | |
-import java.util.Set; | |
+import java.sql.Types; | |
+import java.util.*; | |
import java.util.concurrent.Callable; | |
import java.util.function.Predicate; | |
+import com.github.shyiko.mysql.binlog.event.ColumnDescriptor; | |
+import com.github.shyiko.mysql.binlog.event.TableMetadataEventData; | |
+import io.debezium.relational.*; | |
import org.apache.kafka.connect.data.Schema; | |
import org.apache.kafka.connect.errors.ConnectException; | |
import org.slf4j.Logger; | |
@@ -30,6 +32,7 @@ | |
import io.debezium.relational.TableSchema; | |
import io.debezium.relational.TableSchemaBuilder; | |
import io.debezium.relational.Tables; | |
+import io.debezium.relational.topic.TopicMapper; | |
import io.debezium.relational.ddl.DdlChanges; | |
import io.debezium.relational.ddl.DdlChanges.DatabaseStatementStringConsumer; | |
import io.debezium.relational.history.DatabaseHistory; | |
@@ -89,6 +92,9 @@ public MySqlSchema(Configuration config, String serverName, Predicate<String> gt | |
this.ddlChanges = new DdlChanges(this.ddlParser.terminator()); | |
this.ddlParser.addListener(ddlChanges); | |
+ // Set up the topic mapper ... | |
+ TopicMapper topicMapper = config.getInstance(MySqlConnectorConfig.TOPIC_MAPPER, TopicMapper.class); | |
+ | |
// Use MySQL-specific converters and schemas for values ... | |
String timePrecisionModeStr = config.getString(MySqlConnectorConfig.TIME_PRECISION_MODE); | |
TemporalPrecisionMode timePrecisionMode = TemporalPrecisionMode.parse(timePrecisionModeStr); | |
@@ -97,7 +103,7 @@ public MySqlSchema(Configuration config, String serverName, Predicate<String> gt | |
DecimalHandlingMode decimalHandlingMode = DecimalHandlingMode.parse(decimalHandlingModeStr); | |
DecimalMode decimalMode = decimalHandlingMode.asDecimalMode(); | |
MySqlValueConverters valueConverters = new MySqlValueConverters(decimalMode, adaptiveTimePrecision); | |
- this.schemaBuilder = new TableSchemaBuilder(valueConverters, schemaNameValidator::validate); | |
+ this.schemaBuilder = new TableSchemaBuilder(topicMapper, valueConverters, schemaNameValidator::validate); | |
// Set up the server name and schema prefix ... | |
if (serverName != null) serverName = serverName.trim(); | |
@@ -196,6 +202,53 @@ public TableSchema schemaFor(TableId id) { | |
return filters.tableFilter().test(id) ? tableSchemaByTableId.get(id) : null; | |
} | |
+ public TableSchema schemaFromMetadata(TableId id, TableMetadataEventData metadata) { | |
+ if (!filters.tableFilter().test(id)) { | |
+ return null; | |
+ } | |
+ TableSchema schema = tableSchemaByTableId.get(id); | |
+ | |
+ if (schema != null && schema.getTableVersionId() == metadata.getTableId()) { | |
+ return schema; | |
+ } | |
+ | |
+ List<Column> columnDefs = new ArrayList<Column>(); | |
+ List<String> pkNames = new ArrayList<String>(); | |
+ int pos = 1; | |
+ for (ColumnDescriptor descriptor : metadata.getColumnDescriptors()) { | |
+ // See https://github.com/percona/percona-server/blob/5.6/include/mysql_com.h#L95 for bitmasks | |
+ boolean isPkey = (descriptor.getFlags() & 2) == 2; | |
+ boolean isAutoIncr = (descriptor.getFlags() & 512) == 512; | |
+ boolean hasDefault = (descriptor.getFlags() & 4096) != 4096; | |
+ boolean isNullable = (descriptor.getFlags() & 1) != 1; | |
+ boolean isOptional = !isPkey && (hasDefault || isNullable); | |
+ Column col = Column.editor() | |
+ .name(descriptor.getName()) | |
+ .position(pos) | |
+ .jdbcType(MysqlToJdbc.mysqlToJdbcType(descriptor.getType())) // TODO test | |
+ .type(descriptor.getTypeName()) | |
+ .charsetName(null) // TODO test (descriptor.getCharacterSet()) | |
+ .length(descriptor.getLength()) | |
+ .scale(descriptor.getScale()) | |
+ .optional(isOptional) // TODO test | |
+ .generated(false) // TODO test | |
+ .autoIncremented(isAutoIncr) | |
+ .create(); | |
+ columnDefs.add(col); | |
+ if (isPkey) { | |
+ pkNames.add(descriptor.getName()); | |
+ } | |
+ pos++; | |
+ } | |
+ tables.overwriteTable(id, columnDefs, pkNames, null); // TODO test null charset | |
+ schema = schemaBuilder.create(schemaPrefix, tables.forTable(id), filters.columnFilter(), filters.columnMappers()); | |
+ schema.setTableVersionId(metadata.getTableId()); | |
+ tableSchemaByTableId.put(id, schema); | |
+ return schema; | |
+ } | |
+ | |
+ | |
+ | |
/** | |
* Get the information about where the DDL statement history is recorded. | |
* | |
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java | |
index 9ea86f5..07dafb8 100644 | |
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java | |
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java | |
@@ -29,7 +29,6 @@ | |
private final SourceInfo source; | |
private final MySqlSchema dbSchema; | |
- private final TopicSelector topicSelector; | |
private final RecordMakers recordProcessor; | |
private final Predicate<String> gtidSourceFilter; | |
private final Clock clock = Clock.system(); | |
@@ -37,9 +36,6 @@ | |
public MySqlTaskContext(Configuration config) { | |
super(config); | |
- // Set up the topic selector ... | |
- this.topicSelector = TopicSelector.defaultSelector(serverName()); | |
- | |
// Set up the source information ... | |
this.source = new SourceInfo(); | |
this.source.setServerName(serverName()); | |
@@ -47,24 +43,20 @@ public MySqlTaskContext(Configuration config) { | |
// Set up the GTID filter ... | |
String gtidSetIncludes = config.getString(MySqlConnectorConfig.GTID_SOURCE_INCLUDES); | |
String gtidSetExcludes = config.getString(MySqlConnectorConfig.GTID_SOURCE_EXCLUDES); | |
- this.gtidSourceFilter = gtidSetIncludes != null ? Predicates.includesUuids(gtidSetIncludes) | |
- : (gtidSetExcludes != null ? Predicates.excludesUuids(gtidSetExcludes) : null); | |
+ this.gtidSourceFilter = gtidSetIncludes != null ? Predicates.includes(gtidSetIncludes) | |
+ : (gtidSetExcludes != null ? Predicates.excludes(gtidSetExcludes) : null); | |
// Set up the MySQL schema ... | |
this.dbSchema = new MySqlSchema(config, serverName(), this.gtidSourceFilter); | |
// Set up the record processor ... | |
- this.recordProcessor = new RecordMakers(dbSchema, source, topicSelector); | |
+ this.recordProcessor = new RecordMakers(dbSchema, source, serverName()); | |
} | |
public String connectorName() { | |
return config.getString("name"); | |
} | |
- public TopicSelector topicSelector() { | |
- return topicSelector; | |
- } | |
- | |
public SourceInfo source() { | |
return source; | |
} | |
@@ -173,7 +165,11 @@ public boolean isSnapshotAllowedWhenNeeded() { | |
} | |
public boolean isSnapshotNeverAllowed() { | |
- return snapshotMode() == SnapshotMode.NEVER; | |
+ return snapshotMode() == SnapshotMode.NEVER || snapshotMode() == SnapshotMode.TWITTER_PATCH; | |
+ } | |
+ | |
+ public boolean isSnapshotModeTwitterPatch() { | |
+ return snapshotMode() == SnapshotMode.TWITTER_PATCH; | |
} | |
public boolean isInitialSnapshotOnly() { | |
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MysqlToJdbc.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MysqlToJdbc.java | |
new file mode 100644 | |
index 0000000..2305566 | |
--- /dev/null | |
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MysqlToJdbc.java | |
@@ -0,0 +1,78 @@ | |
+package io.debezium.connector.mysql; | |
+ | |
+import java.sql.Types; | |
+ | |
+public class MysqlToJdbc { | |
+ | |
+ public static int mysqlToJdbcType(int mysqlType) { | |
+ // See https://github.com/percona/percona-server/blob/1e2f003a5bd48763c27e37542d97cd8f59d98eaa/libbinlogevents/export/binary_log_types.h#L38 | |
+ // See https://github.com/debezium/debezium/blob/v0.3.6/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlDdlParser.java#L75 | |
+ /* | |
+ MYSQL_TYPE_DECIMAL, // 0 | |
+ MYSQL_TYPE_TINY, // 1 | |
+ MYSQL_TYPE_SHORT, // 2 | |
+ MYSQL_TYPE_LONG, // 3 | |
+ MYSQL_TYPE_FLOAT, // 4 | |
+ MYSQL_TYPE_DOUBLE, // 5 | |
+ MYSQL_TYPE_NULL, // 6 | |
+ MYSQL_TYPE_TIMESTAMP, // 7 | |
+ MYSQL_TYPE_LONGLONG, // 8 | |
+ MYSQL_TYPE_INT24, // 9 | |
+ MYSQL_TYPE_DATE, // 10 | |
+ MYSQL_TYPE_TIME, // 11 | |
+ MYSQL_TYPE_DATETIME, // 12 | |
+ MYSQL_TYPE_YEAR, // 13 | |
+ MYSQL_TYPE_NEWDATE, // 14 | |
+ MYSQL_TYPE_VARCHAR, // 15 | |
+ MYSQL_TYPE_BIT, // 16 | |
+ MYSQL_TYPE_TIMESTAMP2, // 17 | |
+ MYSQL_TYPE_DATETIME2, // 18 | |
+ MYSQL_TYPE_TIME2, // 19 | |
+ MYSQL_TYPE_JSON=245, | |
+ MYSQL_TYPE_NEWDECIMAL=246, | |
+ MYSQL_TYPE_ENUM=247, | |
+ MYSQL_TYPE_SET=248, | |
+ MYSQL_TYPE_TINY_BLOB=249, | |
+ MYSQL_TYPE_MEDIUM_BLOB=250, | |
+ MYSQL_TYPE_LONG_BLOB=251, | |
+ MYSQL_TYPE_BLOB=252, | |
+ MYSQL_TYPE_VAR_STRING=253, | |
+ MYSQL_TYPE_STRING=254, | |
+ MYSQL_TYPE_GEOMETRY=255 | |
+ */ | |
+ switch (mysqlType) { | |
+ case 0: return Types.DECIMAL; | |
+ case 1: return Types.SMALLINT; | |
+ case 2: return Types.SMALLINT; | |
+ case 3: return Types.BIGINT; | |
+ case 4: return Types.FLOAT; | |
+ case 5: return Types.DOUBLE; | |
+ case 6: return Types.NULL; | |
+ case 7: return Types.TIMESTAMP; | |
+ case 8: return Types.BIGINT; | |
+ case 9: return Types.INTEGER; | |
+ case 10: return Types.DATE; | |
+ case 11: return Types.TIME; | |
+ case 12: return Types.TIMESTAMP; | |
+ case 13: return Types.INTEGER; | |
+ case 14: return Types.DATE; | |
+ case 15: return Types.VARCHAR; | |
+ case 16: return Types.BIT; | |
+ case 17: return Types.TIMESTAMP; | |
+ case 18: return Types.TIMESTAMP; | |
+ case 19: return Types.TIME; | |
+ case 245: return Types.OTHER; | |
+ case 246: return Types.DECIMAL; | |
+ case 247: return Types.CHAR; | |
+ case 248: return Types.CHAR; | |
+ case 249: return Types.BLOB; | |
+ case 250: return Types.BLOB; | |
+ case 251: return Types.BLOB; | |
+ case 252: return Types.BLOB; | |
+ case 253: return Types.VARCHAR; | |
+ case 254: return Types.CHAR; | |
+ case 255: return Types.OTHER; | |
+ default: return Types.OTHER; | |
+ } | |
+ } | |
+} | |
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java | |
index 0f5da85..420b030 100644 | |
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java | |
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java | |
@@ -12,6 +12,7 @@ | |
import java.util.Set; | |
import java.util.concurrent.atomic.AtomicInteger; | |
+import com.github.shyiko.mysql.binlog.event.TableMetadataEventData; | |
import org.apache.kafka.connect.data.Schema; | |
import org.apache.kafka.connect.data.SchemaBuilder; | |
import org.apache.kafka.connect.data.Struct; | |
@@ -37,7 +38,7 @@ | |
private final Logger logger = LoggerFactory.getLogger(getClass()); | |
private final MySqlSchema schema; | |
private final SourceInfo source; | |
- private final TopicSelector topicSelector; | |
+ private final String ddlChangeTopicName; | |
private final Map<Long, Converter> convertersByTableNumber = new HashMap<>(); | |
private final Map<TableId, Long> tableNumbersByTableId = new HashMap<>(); | |
private final Schema schemaChangeKeySchema; | |
@@ -49,12 +50,12 @@ | |
* | |
* @param schema the schema information about the MySQL server databases; may not be null | |
* @param source the connector's source information; may not be null | |
- * @param topicSelector the selector for topic names; may not be null | |
+ * @param ddlChangeTopicName the name of the topic to which DDL changes are written; may not be null | |
*/ | |
- public RecordMakers(MySqlSchema schema, SourceInfo source, TopicSelector topicSelector) { | |
+ public RecordMakers(MySqlSchema schema, SourceInfo source, String ddlChangeTopicName) { | |
this.schema = schema; | |
this.source = source; | |
- this.topicSelector = topicSelector; | |
+ this.ddlChangeTopicName = ddlChangeTopicName; | |
this.schemaChangeKeySchema = SchemaBuilder.struct() | |
.name(schemaNameValidator.validate("io.debezium.connector.mysql.SchemaChangeKey")) | |
.field(Fields.DATABASE_NAME, Schema.STRING_SCHEMA) | |
@@ -117,12 +118,12 @@ public RecordsForTable forTable(long tableNumber, BitSet includedColumns, Blocki | |
* @return the number of records produced; will be 0 or more | |
*/ | |
public int schemaChanges(String databaseName, String ddlStatements, BlockingConsumer<SourceRecord> consumer) { | |
- String topicName = topicSelector.getPrimaryTopic(); | |
Integer partition = 0; | |
Struct key = schemaChangeRecordKey(databaseName); | |
Struct value = schemaChangeRecordValue(databaseName, ddlStatements); | |
SourceRecord record = new SourceRecord(source.partition(), source.offset(), | |
- topicName, partition, schemaChangeKeySchema, key, schemaChangeValueSchema, value); | |
+ ddlChangeTopicName, partition, | |
+ schemaChangeKeySchema, key, schemaChangeValueSchema, value); | |
try { | |
consumer.accept(record); | |
return 1; | |
@@ -151,7 +152,7 @@ public void regenerate() { | |
Set<TableId> tableIds = schema.tables().tableIds(); | |
logger.debug("Regenerating converters for {} tables", tableIds.size()); | |
tableIds.forEach(id -> { | |
- assign(nextTableNumber.incrementAndGet(), id); | |
+ assign(nextTableNumber.incrementAndGet(), id, null); | |
}); | |
} | |
@@ -163,20 +164,27 @@ public void regenerate() { | |
* @return {@code true} if the assignment was successful, or {@code false} if the table is currently excluded in the | |
* connector's configuration | |
*/ | |
- public boolean assign(long tableNumber, TableId id) { | |
+ public boolean assign(long tableNumber, TableId id, TableMetadataEventData metadata) { | |
Long existingTableNumber = tableNumbersByTableId.get(id); | |
if (existingTableNumber != null && existingTableNumber.longValue() == tableNumber | |
&& convertersByTableNumber.containsKey(tableNumber)) { | |
// This is the exact same table number for the same table, so do nothing ... | |
return true; | |
} | |
- TableSchema tableSchema = schema.schemaFor(id); | |
+ | |
+ // Get schema from either registry or metadata event | |
+ TableSchema tableSchema; | |
+ if (metadata != null) { | |
+ tableSchema = schema.schemaFromMetadata(id, metadata); | |
+ } else { | |
+ tableSchema = schema.schemaFor(id); | |
+ } | |
if (tableSchema == null) return false; | |
- String topicName = topicSelector.getTopic(id); | |
+ String topicName = tableSchema.getEnvelopeSchemaName(); | |
Envelope envelope = Envelope.defineSchema() | |
.withName(schemaNameValidator.validate(topicName + ".Envelope")) | |
- .withRecord(schema.schemaFor(id).valueSchema()) | |
+ .withRecord(tableSchema.valueSchema()) | |
.withSource(SourceInfo.SCHEMA) | |
.build(); | |
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/TopicSelector.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/TopicSelector.java | |
deleted file mode 100644 | |
index 16574d2..0000000 | |
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/TopicSelector.java | |
+++ /dev/null | |
@@ -1,91 +0,0 @@ | |
-/* | |
- * Copyright Debezium Authors. | |
- * | |
- * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 | |
- */ | |
-package io.debezium.connector.mysql; | |
- | |
-import io.debezium.annotation.ThreadSafe; | |
-import io.debezium.relational.TableId; | |
- | |
-/** | |
- * A function that determines the name of topics for data and metadata. | |
- * | |
- * @author Randall Hauch | |
- */ | |
-@ThreadSafe | |
-public interface TopicSelector { | |
- /** | |
- * Get the default topic selector logic, which uses a '.' delimiter character when needed. | |
- * | |
- * @param prefix the name of the prefix to be used for all topics; may not be null and must not terminate in the | |
- * {@code delimiter} | |
- * @return the topic selector; never null | |
- */ | |
- static TopicSelector defaultSelector(String prefix) { | |
- return defaultSelector(prefix,"."); | |
- } | |
- | |
- /** | |
- * Get the default topic selector logic, which uses the supplied delimiter character when needed. | |
- * | |
- * @param prefix the name of the prefix to be used for all topics; may not be null and must not terminate in the | |
- * {@code delimiter} | |
- * @param delimiter the string delineating the server, database, and table names; may not be null | |
- * @return the topic selector; never null | |
- */ | |
- static TopicSelector defaultSelector(String prefix, String delimiter) { | |
- return new TopicSelector() { | |
- /** | |
- * Get the name of the topic for the given server, database, and table names. This method returns | |
- * "{@code <serverName>}". | |
- * | |
- * @return the topic name; never null | |
- */ | |
- @Override | |
- public String getPrimaryTopic() { | |
- return prefix; | |
- } | |
- | |
- /** | |
- * Get the name of the topic for the given server name. This method returns | |
- * "{@code <prefix>.<databaseName>.<tableName>}". | |
- * | |
- * @param databaseName the name of the database; may not be null | |
- * @param tableName the name of the table; may not be null | |
- * @return the topic name; never null | |
- */ | |
- @Override | |
- public String getTopic(String databaseName, String tableName) { | |
- return String.join(delimiter, prefix, databaseName, tableName); | |
- } | |
- | |
- }; | |
- } | |
- | |
- /** | |
- * Get the name of the topic for the given server name. | |
- * | |
- * @param tableId the identifier of the table; may not be null | |
- * @return the topic name; never null | |
- */ | |
- default String getTopic(TableId tableId) { | |
- return getTopic(tableId.catalog(),tableId.table()); | |
- } | |
- | |
- /** | |
- * Get the name of the topic for the given server name. | |
- * | |
- * @param databaseName the name of the database; may not be null | |
- * @param tableName the name of the table; may not be null | |
- * @return the topic name; never null | |
- */ | |
- String getTopic(String databaseName, String tableName); | |
- | |
- /** | |
- * Get the name of the primary topic. | |
- * | |
- * @return the topic name; never null | |
- */ | |
- String getPrimaryTopic(); | |
-} | |
diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorTest.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorTest.java | |
index 8241387..f612f50 100644 | |
--- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorTest.java | |
+++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorTest.java | |
@@ -34,7 +34,8 @@ protected static void assertConfigDefIsValid(Connector connector, io.debezium.co | |
assertThat(key.importance).isEqualTo(expected.importance()); | |
assertThat(key.documentation).isEqualTo(expected.description()); | |
assertThat(key.type).isEqualTo(expected.type()); | |
- if (expected.equals(MySqlConnectorConfig.DATABASE_HISTORY) || expected.equals(MySqlConnectorConfig.JDBC_DRIVER)) { | |
+ if (expected.equals(MySqlConnectorConfig.DATABASE_HISTORY) || expected.equals(MySqlConnectorConfig.JDBC_DRIVER) || | |
+ expected.equals(MySqlConnectorConfig.TOPIC_MAPPER)) { | |
assertThat(((Class<?>) key.defaultValue).getName()).isEqualTo((String) expected.defaultValue()); | |
} else if (!expected.equals(MySqlConnectorConfig.SERVER_ID)) { | |
assertThat(key.defaultValue).isEqualTo(expected.defaultValue()); | |
diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTaskContextIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTaskContextIT.java | |
index 1ffd20a..6c5cc72 100644 | |
--- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTaskContextIT.java | |
+++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTaskContextIT.java | |
@@ -31,7 +31,6 @@ public void shouldCreateTaskFromConfiguration() throws Exception { | |
assertThat(context.logger()).isNotNull(); | |
assertThat(context.makeRecord()).isNotNull(); | |
assertThat(context.source()).isNotNull(); | |
- assertThat(context.topicSelector()).isNotNull(); | |
assertThat(context.hostname()).isEqualTo(hostname); | |
assertThat(context.port()).isEqualTo(port); | |
diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchema.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchema.java | |
index a5188b6..2d8adef 100644 | |
--- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchema.java | |
+++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchema.java | |
@@ -27,6 +27,7 @@ | |
import io.debezium.relational.TableSchema; | |
import io.debezium.relational.TableSchemaBuilder; | |
import io.debezium.relational.Tables; | |
+import io.debezium.relational.topic.ByTableTopicMapper; | |
import io.debezium.util.AvroValidator; | |
/** | |
@@ -63,7 +64,7 @@ protected PostgresSchema(PostgresConnectorConfig config) { | |
PostgresValueConverter valueConverter = new PostgresValueConverter(config.adaptiveTimePrecision(), ZoneOffset.UTC); | |
this.schemaNameValidator = AvroValidator.create(LOGGER)::validate; | |
- this.schemaBuilder = new TableSchemaBuilder(valueConverter, this.schemaNameValidator); | |
+ this.schemaBuilder = new TableSchemaBuilder(new ByTableTopicMapper(), valueConverter, this.schemaNameValidator); | |
// Set up the server name and schema prefix ... | |
String serverName = config.serverName(); | |
diff --git a/debezium-core/src/main/java/io/debezium/relational/TableSchema.java b/debezium-core/src/main/java/io/debezium/relational/TableSchema.java | |
index 778fd0d..54dd6d4 100644 | |
--- a/debezium-core/src/main/java/io/debezium/relational/TableSchema.java | |
+++ b/debezium-core/src/main/java/io/debezium/relational/TableSchema.java | |
@@ -47,15 +47,18 @@ | |
@Immutable | |
public class TableSchema { | |
+ private final String envelopeSchemaName; | |
private final Schema keySchema; | |
private final Schema valueSchema; | |
private final Function<Object[], Object> keyGenerator; | |
private final Function<Object[], Struct> valueGenerator; | |
+ private long tableVersionId = -1; | |
/** | |
* Create an instance with the specified {@link Schema}s for the keys and values, and the functions that generate the | |
* key and value for a given row of data. | |
- * | |
+ * | |
+ * @param envelopeSchemaName the name of the schema; may be null | |
* @param keySchema the schema for the primary key; may be null | |
* @param keyGenerator the function that converts a row into a single key object for Kafka Connect; may not be null but may | |
* return nulls | |
@@ -63,8 +66,9 @@ | |
* @param valueGenerator the function that converts a row into a single value object for Kafka Connect; may not be null but | |
* may return nulls | |
*/ | |
- public TableSchema(Schema keySchema, Function<Object[], Object> keyGenerator, | |
+ public TableSchema(String envelopeSchemaName, Schema keySchema, Function<Object[], Object> keyGenerator, | |
Schema valueSchema, Function<Object[], Struct> valueGenerator) { | |
+ this.envelopeSchemaName = envelopeSchemaName; | |
this.keySchema = keySchema; | |
this.valueSchema = valueSchema; | |
this.keyGenerator = keyGenerator != null ? keyGenerator : (row) -> null; | |
@@ -72,6 +76,14 @@ public TableSchema(Schema keySchema, Function<Object[], Object> keyGenerator, | |
} | |
/** | |
+ * Get the name of the envelope schema for both the key and value. | |
+ * @return the envelope schema name; may be null | |
+ */ | |
+ public String getEnvelopeSchemaName() { | |
+ return envelopeSchemaName; | |
+ } | |
+ | |
+ /** | |
* Get the {@link Schema} that represents the table's columns, excluding those that make up the {@link #keySchema()}. | |
* | |
* @return the Schema describing the columns in the table; never null | |
@@ -110,7 +122,15 @@ public Object keyFromColumnData(Object[] columnData) { | |
public Struct valueFromColumnData(Object[] columnData) { | |
return columnData == null ? null : valueGenerator.apply(columnData); | |
} | |
- | |
+ | |
+ public void setTableVersionId(long id) { | |
+ tableVersionId = id; | |
+ } | |
+ | |
+ public long getTableVersionId() { | |
+ return tableVersionId; | |
+ } | |
+ | |
@Override | |
public int hashCode() { | |
return valueSchema().hashCode(); | |
diff --git a/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java b/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java | |
index 27345e4..3cb5efa 100644 | |
--- a/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java | |
+++ b/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java | |
@@ -10,6 +10,7 @@ | |
import java.sql.Types; | |
import java.util.ArrayList; | |
import java.util.List; | |
+import java.util.Map; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.function.Function; | |
@@ -29,6 +30,7 @@ | |
import io.debezium.jdbc.JdbcConnection; | |
import io.debezium.relational.mapping.ColumnMapper; | |
import io.debezium.relational.mapping.ColumnMappers; | |
+import io.debezium.relational.topic.TopicMapper; | |
/** | |
* Builder that constructs {@link TableSchema} instances for {@link Table} definitions. | |
@@ -49,17 +51,21 @@ | |
private static final Logger LOGGER = LoggerFactory.getLogger(TableSchemaBuilder.class); | |
+ private final TopicMapper topicMapper; | |
private final Function<String, String> schemaNameValidator; | |
private final ValueConverterProvider valueConverterProvider; | |
/** | |
* Create a new instance of the builder. | |
- * | |
+ * | |
+ * @param topicMapper the TopicMapper for each table; may not be null | |
* @param valueConverterProvider the provider for obtaining {@link ValueConverter}s and {@link SchemaBuilder}s; may not be | |
* null | |
* @param schemaNameValidator the validation function for schema names; may not be null | |
*/ | |
- public TableSchemaBuilder(ValueConverterProvider valueConverterProvider, Function<String, String> schemaNameValidator) { | |
+ public TableSchemaBuilder(TopicMapper topicMapper, ValueConverterProvider valueConverterProvider, | |
+ Function<String, String> schemaNameValidator) { | |
+ this.topicMapper = topicMapper; | |
this.schemaNameValidator = schemaNameValidator; | |
this.valueConverterProvider = valueConverterProvider; | |
} | |
@@ -89,7 +95,7 @@ public TableSchema create(ResultSet resultSet, String name) throws SQLException | |
Function<Object[], Struct> valueGenerator = createValueGenerator(valueSchema, id, columns, null, null); | |
// Finally create our result object with no primary key or key generator ... | |
- return new TableSchema(null, null, valueSchema, valueGenerator); | |
+ return new TableSchema(schemaName, null, null, valueSchema, valueGenerator); | |
} | |
/** | |
@@ -129,11 +135,13 @@ public TableSchema create(String schemaPrefix, Table table, Predicate<ColumnId> | |
if (schemaPrefix == null) schemaPrefix = ""; | |
// Build the schemas ... | |
final TableId tableId = table.id(); | |
- final String tableIdStr = tableId.toString(); | |
- final String schemaNamePrefix = schemaPrefix + tableIdStr; | |
- LOGGER.debug("Mapping table '{}' to schemas under '{}'", tableId, schemaNamePrefix); | |
- SchemaBuilder valSchemaBuilder = SchemaBuilder.struct().name(schemaNameValidator.apply(schemaNamePrefix + ".Value")); | |
- SchemaBuilder keySchemaBuilder = SchemaBuilder.struct().name(schemaNameValidator.apply(schemaNamePrefix + ".Key")); | |
+ final String topicName = topicMapper.getTopicName(schemaPrefix, table); | |
+ final String keySchemaName = schemaNameValidator.apply(topicName + ".Key"); | |
+ final String valueSchemaName = schemaNameValidator.apply(topicName + ".Value"); | |
+ final String envelopeSchemaName = schemaNameValidator.apply(topicName); | |
+ LOGGER.debug("Mapping table '{}' to key schemas '{}' and value schema '{}'", tableId, keySchemaName, valueSchemaName); | |
+ SchemaBuilder valSchemaBuilder = SchemaBuilder.struct().name(valueSchemaName); | |
+ SchemaBuilder keySchemaBuilder = SchemaBuilder.struct().name(keySchemaName); | |
AtomicBoolean hasPrimaryKey = new AtomicBoolean(false); | |
table.columns().forEach(column -> { | |
if (table.isPrimaryKeyColumn(column.name())) { | |
@@ -147,6 +155,9 @@ public TableSchema create(String schemaPrefix, Table table, Predicate<ColumnId> | |
addField(valSchemaBuilder, column, mapper); | |
} | |
}); | |
+ // Enhance the key schema if necessary ... | |
+ topicMapper.enhanceKeySchema(keySchemaBuilder); | |
+ // Create the schemas ... | |
Schema valSchema = valSchemaBuilder.optional().build(); | |
Schema keySchema = hasPrimaryKey.get() ? keySchemaBuilder.build() : null; | |
@@ -156,11 +167,11 @@ public TableSchema create(String schemaPrefix, Table table, Predicate<ColumnId> | |
} | |
// Create the generators ... | |
- Function<Object[], Object> keyGenerator = createKeyGenerator(keySchema, tableId, table.primaryKeyColumns()); | |
+ Function<Object[], Object> keyGenerator = createKeyGenerator(keySchema, tableId, table, topicMapper, schemaPrefix); | |
Function<Object[], Struct> valueGenerator = createValueGenerator(valSchema, tableId, table.columns(), filter, mappers); | |
// And the table schema ... | |
- return new TableSchema(keySchema, keyGenerator, valSchema, valueGenerator); | |
+ return new TableSchema(envelopeSchemaName, keySchema, keyGenerator, valSchema, valueGenerator); | |
} | |
/** | |
@@ -169,15 +180,21 @@ public TableSchema create(String schemaPrefix, Table table, Predicate<ColumnId> | |
* @param schema the Kafka Connect schema for the key; may be null if there is no known schema, in which case the generator | |
* will be null | |
* @param columnSetName the name for the set of columns, used in error messages; may not be null | |
- * @param columns the column definitions for the table that defines the row; may not be null | |
+ * @param table the table for the row of data | |
+ * @param topicMapper the TopicMapper for the table whose key we are generating; may not be null | |
+ * @param schemaPrefix the prefix added to the table identifier to construct the schema names; may be null if there is no | |
+ * prefix | |
* @return the key-generating function, or null if there is no key schema | |
*/ | |
- protected Function<Object[], Object> createKeyGenerator(Schema schema, TableId columnSetName, List<Column> columns) { | |
+ protected Function<Object[], Object> createKeyGenerator(Schema schema, TableId columnSetName, Table table, | |
+ TopicMapper topicMapper, String schemaPrefix) { | |
if (schema != null) { | |
+ List<Column> columns = table.primaryKeyColumns(); | |
int[] recordIndexes = indexesForColumns(columns); | |
Field[] fields = fieldsForColumns(schema, columns); | |
int numFields = recordIndexes.length; | |
ValueConverter[] converters = convertersForColumns(schema, columnSetName, columns, null, null); | |
+ Map<String, Object> nonRowFieldsToAddToKey = topicMapper.getNonRowFieldsToAddToKey(schema, schemaPrefix, table); | |
return (row) -> { | |
Struct result = new Struct(schema); | |
for (int i = 0; i != numFields; ++i) { | |
@@ -194,6 +211,13 @@ public TableSchema create(String schemaPrefix, Table table, Predicate<ColumnId> | |
} | |
} | |
} | |
+ | |
+ if (nonRowFieldsToAddToKey != null) { | |
+ for (Map.Entry<String, Object> nonRowField : nonRowFieldsToAddToKey.entrySet()) { | |
+ result.put(nonRowField.getKey(), nonRowField.getValue()); | |
+ } | |
+ } | |
+ | |
return result; | |
}; | |
} | |
diff --git a/debezium-core/src/main/java/io/debezium/relational/topic/ByLogicalTableTopicMapper.java b/debezium-core/src/main/java/io/debezium/relational/topic/ByLogicalTableTopicMapper.java | |
new file mode 100644 | |
index 0000000..8f74809 | |
--- /dev/null | |
+++ b/debezium-core/src/main/java/io/debezium/relational/topic/ByLogicalTableTopicMapper.java | |
@@ -0,0 +1,66 @@ | |
+/* | |
+ * Copyright Debezium Authors. | |
+ * | |
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 | |
+ */ | |
+package io.debezium.relational.topic; | |
+ | |
+import org.apache.kafka.connect.data.Schema; | |
+import org.apache.kafka.connect.data.SchemaBuilder; | |
+ | |
+import io.debezium.relational.Table; | |
+ | |
+import java.util.HashMap; | |
+import java.util.Map; | |
+import java.util.regex.Matcher; | |
+import java.util.regex.Pattern; | |
+ | |
+/** | |
+ * A logical table consists of one or more physical tables with the same schema. A common use case is sharding -- the | |
+ * two physical tables `db_shard1.my_table` and `db_shard2.my_table` together form one logical table. | |
+ * | |
+ * This TopicMapper allows us to send change events from both physical tables to one topic. | |
+ * | |
+ * @author David Leibovic | |
+ */ | |
+public class ByLogicalTableTopicMapper extends TopicMapper { | |
+ | |
+ public String getTopicName(String topicPrefix, Table table) { | |
+ final String fullyQualifiedTableName = composeFullyQualifiedTableName(topicPrefix, table); | |
+ Pattern logicalTablePattern = Pattern.compile("^.*?(?=\\..+\\..+)\\.(?<logicalDb>etsy_.+?(?=(_\\d+\\.)|\\.))(_\\d+)?\\.(?<table>.+)$"); | |
+ Matcher logicalTableMatcher = logicalTablePattern.matcher(fullyQualifiedTableName); | |
+ if (logicalTableMatcher.matches()) { | |
+ return logicalTableMatcher.replaceAll("${logicalDb}.${table}"); | |
+ } | |
+ return fullyQualifiedTableName; | |
+ } | |
+ | |
+ public void enhanceKeySchema(SchemaBuilder keySchemaBuilder) { | |
+ // Now that multiple physical tables can share a topic, the Key Schema can no longer consist of solely the | |
+ // record's primary / unique key fields, since they are not guaranteed to be unique across tables. We need some | |
+ // identifier added to the key that distinguishes the different physical tables. | |
+ keySchemaBuilder.field("__dbz__physicalTableIdentifier", Schema.STRING_SCHEMA); | |
+ } | |
+ | |
+ public Map<String, Object> getNonRowFieldsToAddToKey(Schema schema, String topicPrefix, Table table) { | |
+ final String fullyQualifiedTableName = composeFullyQualifiedTableName(topicPrefix, table); | |
+ Pattern physicalTableIdentifierPattern = Pattern.compile("^.*?(?=\\..+\\..+)\\.(?<logicalDb>etsy_.+?(?=\\.))\\.(?<table>.+)$"); | |
+ Matcher physicalTableIdentifierMatcher = physicalTableIdentifierPattern.matcher(fullyQualifiedTableName); | |
+ | |
+ final String physicalTableIdentifier; | |
+ if (physicalTableIdentifierMatcher.matches()) { | |
+ physicalTableIdentifier = physicalTableIdentifierMatcher.replaceAll("${logicalDb}"); | |
+ } else { | |
+ physicalTableIdentifier = fullyQualifiedTableName; | |
+ } | |
+ | |
+ Map<String, Object> nonRowFields = new HashMap<>(); | |
+ nonRowFields.put("__dbz__physicalTableIdentifier", physicalTableIdentifier); | |
+ return nonRowFields; | |
+ } | |
+ | |
+ private String composeFullyQualifiedTableName(String topicPrefix, Table table) { | |
+ return topicPrefix + table.id().toString(); | |
+ } | |
+ | |
+} | |
diff --git a/debezium-core/src/main/java/io/debezium/relational/topic/ByTableTopicMapper.java b/debezium-core/src/main/java/io/debezium/relational/topic/ByTableTopicMapper.java | |
new file mode 100644 | |
index 0000000..1791b04 | |
--- /dev/null | |
+++ b/debezium-core/src/main/java/io/debezium/relational/topic/ByTableTopicMapper.java | |
@@ -0,0 +1,33 @@ | |
+/* | |
+ * Copyright Debezium Authors. | |
+ * | |
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 | |
+ */ | |
+package io.debezium.relational.topic; | |
+ | |
+import org.apache.kafka.connect.data.Schema; | |
+import org.apache.kafka.connect.data.SchemaBuilder; | |
+ | |
+import io.debezium.relational.Table; | |
+ | |
+import java.util.Map; | |
+ | |
+/** | |
+ * @author David Leibovic | |
+ */ | |
+public class ByTableTopicMapper extends TopicMapper { | |
+ | |
+ public String getTopicName(String topicPrefix, Table table) { | |
+ return topicPrefix + table.id().toString(); | |
+ } | |
+ | |
+ public void enhanceKeySchema(SchemaBuilder keySchemaBuilder) { | |
+ // do nothing ... | |
+ } | |
+ | |
+ public Map<String, Object> getNonRowFieldsToAddToKey(Schema schema, String topicPrefix, Table table) { | |
+ // do nothing ... | |
+ return null; | |
+ } | |
+ | |
+} | |
diff --git a/debezium-core/src/main/java/io/debezium/relational/topic/TopicMapper.java b/debezium-core/src/main/java/io/debezium/relational/topic/TopicMapper.java | |
new file mode 100644 | |
index 0000000..fd25233 | |
--- /dev/null | |
+++ b/debezium-core/src/main/java/io/debezium/relational/topic/TopicMapper.java | |
@@ -0,0 +1,47 @@ | |
+/* | |
+ * Copyright Debezium Authors. | |
+ * | |
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 | |
+ */ | |
+package io.debezium.relational.topic; | |
+ | |
+import org.apache.kafka.connect.data.Schema; | |
+import org.apache.kafka.connect.data.SchemaBuilder; | |
+ | |
+import io.debezium.relational.Table; | |
+ | |
+import java.util.Map; | |
+ | |
+/** | |
+ * @author David Leibovic | |
+ */ | |
+public abstract class TopicMapper { | |
+ | |
+ /** | |
+ * Get the name of the topic given for the table. | |
+ * | |
+ * @param topicPrefix prefix for the topic | |
+ * @param table the table that we are getting the topic name for | |
+ * @return the topic name; may be null if this strategy could not be applied | |
+ */ | |
+ abstract public String getTopicName(String topicPrefix, Table table); | |
+ | |
+ /** | |
+ * Depending on your TopicMapper implementation and which rows in a database may occupy the same topic, | |
+ * it may be necessary to enhance the key schema for the events to ensure each distinct record in the topic | |
+ * has a unique key. | |
+ * | |
+ * @param keySchemaBuilder the {@link SchemaBuilder} for the key, pre-populated with the table's primary/unique key | |
+ */ | |
+ abstract public void enhanceKeySchema(SchemaBuilder keySchemaBuilder); | |
+ | |
+ /** | |
+ * Get the extra key-value pairs necessary to add to the event's key. | |
+ * @param schema the schema for the key; never null | |
+ * @param topicPrefix prefix for the topic | |
+ * @param table the table the event is for | |
+ * @return the extra key-value pairs, or null if none are necessary. | |
+ */ | |
+ abstract public Map<String, Object> getNonRowFieldsToAddToKey(Schema schema, String topicPrefix, Table table); | |
+ | |
+} | |
diff --git a/debezium-core/src/test/java/io/debezium/relational/TableSchemaBuilderTest.java b/debezium-core/src/test/java/io/debezium/relational/TableSchemaBuilderTest.java | |
index 57316eb..d566e10 100644 | |
--- a/debezium-core/src/test/java/io/debezium/relational/TableSchemaBuilderTest.java | |
+++ b/debezium-core/src/test/java/io/debezium/relational/TableSchemaBuilderTest.java | |
@@ -20,6 +20,8 @@ | |
import io.debezium.jdbc.JdbcValueConverters; | |
import io.debezium.time.Date; | |
+import io.debezium.relational.topic.TopicMapper; | |
+import io.debezium.relational.topic.ByTableTopicMapper; | |
import io.debezium.util.AvroValidator; | |
public class TableSchemaBuilderTest { | |
@@ -34,9 +36,11 @@ | |
private Column c4; | |
private TableSchema schema; | |
private AvroValidator validator; | |
+ private TopicMapper topicMapper; | |
@Before | |
public void beforeEach() { | |
+ topicMapper = new ByTableTopicMapper(); | |
validator = AvroValidator.create((original,replacement, conflict)->{ | |
fail("Should not have come across an invalid schema name"); | |
}); | |
@@ -78,19 +82,19 @@ public void checkPreconditions() { | |
@Test(expected = NullPointerException.class) | |
public void shouldFailToBuildTableSchemaFromNullTable() { | |
- new TableSchemaBuilder(new JdbcValueConverters(),validator::validate).create(prefix,null); | |
+ new TableSchemaBuilder(topicMapper, new JdbcValueConverters(),validator::validate).create(prefix,null); | |
} | |
@Test | |
public void shouldBuildTableSchemaFromTable() { | |
- schema = new TableSchemaBuilder(new JdbcValueConverters(),validator::validate).create(prefix,table); | |
+ schema = new TableSchemaBuilder(topicMapper, new JdbcValueConverters(),validator::validate).create(prefix,table); | |
assertThat(schema).isNotNull(); | |
} | |
@Test | |
public void shouldBuildTableSchemaFromTableWithoutPrimaryKey() { | |
table = table.edit().setPrimaryKeyNames().create(); | |
- schema = new TableSchemaBuilder(new JdbcValueConverters(),validator::validate).create(prefix,table); | |
+ schema = new TableSchemaBuilder(topicMapper, new JdbcValueConverters(),validator::validate).create(prefix,table); | |
assertThat(schema).isNotNull(); | |
// Check the keys ... | |
assertThat(schema.keySchema()).isNull(); | |
diff --git a/pom.xml b/pom.xml | |
index 595be01..f67d833 100644 | |
--- a/pom.xml | |
+++ b/pom.xml | |
@@ -59,10 +59,10 @@ | |
<version.confluent.platform>3.2.0</version.confluent.platform> | |
<!-- Databases --> | |
- <version.postgresql.driver>42.0.0-SNAPSHOT</version.postgresql.driver> | |
- <version.mysql.server>5.7</version.mysql.server> | |
+ <version.postgresql.driver>42.0.0</version.postgresql.driver> | |
+ <version.mysql.server>5.6</version.mysql.server> | |
<version.mysql.driver>5.1.40</version.mysql.driver> | |
- <version.mysql.binlog>0.9.0</version.mysql.binlog> | |
+ <version.mysql.binlog>0.11.0</version.mysql.binlog> | |
<version.mongo.server>3.2.12</version.mongo.server> | |
<version.mongo.driver>3.4.2</version.mongo.driver> | |
@@ -591,13 +591,13 @@ | |
<includeTestSourceDirectory>true</includeTestSourceDirectory> | |
</configuration> | |
<executions> | |
- <execution> | |
+ <!--<execution> | |
<id>check-style</id> | |
<phase>verify</phase> | |
<goals> | |
<goal>checkstyle</goal> | |
</goals> | |
- </execution> | |
+ </execution>--> | |
</executions> | |
</plugin> | |
</plugins> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment