Created
April 12, 2017 20:39
-
-
Save dasl-/7ff171997b7dbc91ad4ea5abf0c2b31f to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java | |
index 2e85cde..d9f4788 100644 | |
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java | |
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java | |
@@ -118,6 +118,12 @@ public Event nextEvent(ByteArrayInputStream inputStream) throws IOException { | |
if (event.getHeader().getEventType() == EventType.TABLE_MAP) { | |
TableMapEventData tableMapEvent = event.getData(); | |
tableMapEventByTableId.put(tableMapEvent.getTableId(), tableMapEvent); | |
+ } else if (event.getHeader().getEventType() == EventType.TABLE_METADATA) { | |
+ TableMetadataEventData tableMetadataEvent = event.getData(); | |
+ TableMapEventData tableMapEvent = tableMapEventByTableId.get(tableMetadataEvent.getTableId()); | |
+ if (tableMapEvent != null) { | |
+ tableMapEvent.setMetadataEventData(tableMetadataEvent, 0); | |
+ } | |
} | |
return event; | |
} | |
@@ -154,7 +160,7 @@ protected void doStart() { | |
eventHandlers.put(EventType.HEARTBEAT, this::handleServerHeartbeat); | |
eventHandlers.put(EventType.INCIDENT, this::handleServerIncident); | |
eventHandlers.put(EventType.ROTATE, this::handleRotateLogsEvent); | |
- eventHandlers.put(EventType.TABLE_MAP, this::handleUpdateTableMetadata); | |
+ eventHandlers.put(EventType.TABLE_MAP, this::handleTableMap); | |
eventHandlers.put(EventType.QUERY, this::handleQueryEvent); | |
eventHandlers.put(EventType.WRITE_ROWS, this::handleInsert); | |
eventHandlers.put(EventType.UPDATE_ROWS, this::handleUpdate); | |
@@ -164,6 +170,9 @@ protected void doStart() { | |
eventHandlers.put(EventType.EXT_DELETE_ROWS, this::handleDelete); | |
eventHandlers.put(EventType.VIEW_CHANGE, this::viewChange); | |
eventHandlers.put(EventType.XA_PREPARE, this::prepareTransaction); | |
+ eventHandlers.put(EventType.TABLE_METADATA, this::handleTableMetadata); | |
+ eventHandlers.put(EventType.VIEW_CHANGE, this::viewChange); | |
+ eventHandlers.put(EventType.XA_PREPARE, this::prepareTransaction); | |
// Get the current GtidSet from MySQL so we can get a filtered/merged GtidSet based off of the last Debezium checkpoint. | |
String availableServerGtidStr = context.knownGtidSet(); | |
@@ -479,16 +488,37 @@ protected void handleQueryEvent(Event event) throws InterruptedException { | |
* | |
* @param event the update event; never null | |
*/ | |
- protected void handleUpdateTableMetadata(Event event) { | |
- TableMapEventData metadata = unwrapData(event); | |
- long tableNumber = metadata.getTableId(); | |
- String databaseName = metadata.getDatabase(); | |
- String tableName = metadata.getTable(); | |
+ protected void handleTableMap(Event event) { | |
+ if (context.config().getBoolean(MySqlConnectorConfig.EXPECT_METADATA_EVENTS)) { | |
+ return; | |
+ } | |
+ handleTableUpdate(unwrapData(event), null); | |
+ } | |
+ | |
+ protected void handleTableMetadata(Event event) { | |
+ if (!context.config().getBoolean(MySqlConnectorConfig.EXPECT_METADATA_EVENTS)) { | |
+ return; | |
+ } | |
+ TableMetadataEventData meta = unwrapData(event); | |
+ handleTableUpdate(meta.getTableMapEventData(), meta); | |
+ } | |
+ | |
+ private void handleTableUpdate(TableMapEventData map, TableMetadataEventData meta) { | |
+ if (map == null) { | |
+ logger.debug("Skipping table update (missing TABLE_MAP): {}", meta); | |
+ return; | |
+ } else if (meta == null && context.config().getBoolean(MySqlConnectorConfig.EXPECT_METADATA_EVENTS)) { | |
+ logger.debug("Skipping table update (missing TABLE_METADATA): {}", map); | |
+ return; | |
+ } | |
+ long tableNumber = map.getTableId(); | |
+ String databaseName = map.getDatabase(); | |
+ String tableName = map.getTable(); | |
TableId tableId = new TableId(databaseName, null, tableName); | |
- if (recordMakers.assign(tableNumber, tableId)) { | |
- logger.debug("Received update table metadata event: {}", event); | |
+ if (recordMakers.assign(tableNumber, tableId, meta)) { | |
+ logger.debug("Received table update map({}) meta({})", map, meta); | |
} else { | |
- logger.debug("Skipping update table metadata event: {}", event); | |
+ logger.debug("Skipped table update map({}) meta({})", map, meta); | |
} | |
} | |
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java | |
index def91ed..e98442f 100644 | |
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java | |
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java | |
@@ -185,7 +185,17 @@ public static DecimalHandlingMode parse(String value, String defaultValue) { | |
/** | |
* Perform a snapshot and then stop before attempting to read the binlog. | |
*/ | |
- INITIAL_ONLY("initial_only"); | |
+ INITIAL_ONLY("initial_only"), | |
+ | |
+ /** | |
+ * 1) Never performs a snapshot (gets schema info from twitter patch) | |
+ * 2) The first time Debezium connects in this mode, it sets Debezium's executed GTID set to be the same as | |
+ * that of MySQL server it is connecting to. This is necessary to allow Debezium to reconnect later. | |
+ * 3) The first time Debezium connects in this mode, it starts reading the binlog at the GTID corresponding to | |
+ * the last GTID in the executed GTID set we marked in step 2 (i.e. starts reading at the binlog's current | |
+ * position). | |
+ */ | |
+ TWITTER_PATCH("twitter_patch"); | |
private final String value; | |
@@ -614,6 +624,14 @@ public static SecureConnectionMode parse(String value, String defaultValue) { | |
+ "The default is 'true'. This is independent of how the connector internally records database history.") | |
.withDefault(true); | |
+ public static final Field EXPECT_METADATA_EVENTS = Field.create("database.mysql.expect_metadata_events") | |
+ .withDisplayName("Expect metadata events") | |
+ .withType(Type.BOOLEAN) | |
+ .withWidth(Width.SHORT) | |
+ .withImportance(Importance.MEDIUM) | |
+ .withDescription("When enabled, Debezium will track schemas based on TABLE_METADATA events") | |
+ .withDefault(false); | |
+ | |
public static final Field SNAPSHOT_MODE = Field.create("snapshot.mode") | |
.withDisplayName("Snapshot mode") | |
.withEnum(SnapshotMode.class, SnapshotMode.INITIAL) | |
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java | |
index 9482e24..2bbf988 100644 | |
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java | |
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java | |
@@ -108,19 +108,43 @@ public synchronized void start(Map<String, String> props) { | |
} else { | |
// We have no recorded offsets ... | |
if (taskContext.isSnapshotNeverAllowed()) { | |
- // We're not allowed to take a snapshot, so instead we have to assume that the binlog contains the | |
- // full history of the database. | |
- logger.info("Found no existing offset and snapshots disallowed, so starting at beginning of binlog"); | |
- source.setBinlogStartPoint("", 0L);// start from the beginning of the binlog | |
- taskContext.initializeHistory(); | |
- | |
- // Look to see what the first available binlog file is called, and whether it looks like binlog files have | |
- // been purged. If so, then output a warning ... | |
- String earliestBinlogFilename = earliestBinlogFilename(); | |
- if (earliestBinlogFilename == null) { | |
- logger.warn("No binlog appears to be available. Ensure that the MySQL row-level binlog is enabled."); | |
- } else if (!earliestBinlogFilename.endsWith("00001")) { | |
- logger.warn("It is possible the server has purged some binlogs. If this is the case, then using snapshot mode may be required."); | |
+ if (taskContext.isSnapshotModeTwitterPatch()) { | |
+ logger.info("Found no existing offset and we're in SnapshotMode.TWITTER_PATCH."); | |
+ taskContext.jdbc().query("SHOW MASTER STATUS", rs -> { | |
+ if (rs.next()) { | |
+ String binlogFilename = rs.getString(1); | |
+ long binlogPosition = rs.getLong(2); | |
+ source.setBinlogStartPoint(binlogFilename, binlogPosition); | |
+ if (rs.getMetaData().getColumnCount() > 4) { | |
+ // This column exists only in MySQL 5.6.5 or later ... | |
+ String gtidSet = rs.getString(5);// GTID set, may be null, blank, or contain a GTID set | |
+ source.setCompletedGtidSet(gtidSet); | |
+ logger.info("\t using binlog '{}' at position '{}' and gtid '{}'", binlogFilename, binlogPosition, | |
+ gtidSet); | |
+ } else { | |
+ logger.info("\t using binlog '{}' at position '{}'", binlogFilename, binlogPosition); | |
+ } | |
+ } else { | |
+ throw new IllegalStateException("Cannot read the binlog filename and position via 'SHOW MASTER STATUS'." | |
+ + " Make sure your server is correctly configured"); | |
+ } | |
+ }); | |
+ taskContext.initializeHistory(); | |
+ } else { // We must be in SnapshotMode.NEVER | |
+ // We're not allowed to take a snapshot, so instead we have to assume that the binlog contains the | |
+ // full history of the database. | |
+ logger.info("Found no existing offset and snapshots disallowed, so starting at beginning of binlog"); | |
+ source.setBinlogStartPoint("", 0L);// start from the beginning of the binlog | |
+ taskContext.initializeHistory(); | |
+ | |
+ // Look to see what the first available binlog file is called, and whether it looks like binlog files have | |
+ // been purged. If so, then output a warning ... | |
+ String earliestBinlogFilename = earliestBinlogFilename(); | |
+ if (earliestBinlogFilename == null) { | |
+ logger.warn("No binlog appears to be available. Ensure that the MySQL row-level binlog is enabled."); | |
+ } else if (!earliestBinlogFilename.endsWith("00001")) { | |
+ logger.warn("It is possible the server has purged some binlogs. If this is the case, then using snapshot mode may be required."); | |
+ } | |
} | |
} else { | |
// We are allowed to use snapshots, and that is the best way to start ... | |
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java | |
index 990bf1e..32a0a7c 100644 | |
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java | |
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java | |
@@ -6,12 +6,17 @@ | |
package io.debezium.connector.mysql; | |
import java.sql.SQLException; | |
+import java.util.ArrayList; | |
import java.util.HashMap; | |
+import java.util.List; | |
import java.util.Map; | |
import java.util.Set; | |
import java.util.concurrent.Callable; | |
import java.util.function.Predicate; | |
+import com.github.shyiko.mysql.binlog.event.ColumnDescriptor; | |
+import com.github.shyiko.mysql.binlog.event.TableMetadataEventData; | |
+ | |
import org.apache.kafka.connect.data.Schema; | |
import org.apache.kafka.connect.errors.ConnectException; | |
import org.slf4j.Logger; | |
@@ -25,6 +30,7 @@ | |
import io.debezium.document.Document; | |
import io.debezium.jdbc.JdbcConnection; | |
import io.debezium.jdbc.JdbcValueConverters.DecimalMode; | |
+import io.debezium.relational.Column; | |
import io.debezium.relational.Table; | |
import io.debezium.relational.TableId; | |
import io.debezium.relational.TableSchema; | |
@@ -196,6 +202,51 @@ public TableSchema schemaFor(TableId id) { | |
return filters.tableFilter().test(id) ? tableSchemaByTableId.get(id) : null; | |
} | |
+ public TableSchema schemaFromMetadata(TableId id, TableMetadataEventData metadata) { | |
+ if (!filters.tableFilter().test(id)) { | |
+ return null; | |
+ } | |
+ TableSchema schema = tableSchemaByTableId.get(id); | |
+ | |
+ if (schema != null && schema.getTableVersionId() == metadata.getTableId()) { | |
+ return schema; | |
+ } | |
+ | |
+ List<Column> columnDefs = new ArrayList<Column>(); | |
+ List<String> pkNames = new ArrayList<String>(); | |
+ int pos = 1; | |
+ for (ColumnDescriptor descriptor : metadata.getColumnDescriptors()) { | |
+ // See https://github.com/percona/percona-server/blob/5.6/include/mysql_com.h#L95 for bitmasks | |
+ boolean isPkey = (descriptor.getFlags() & 2) == 2; | |
+ boolean isAutoIncr = (descriptor.getFlags() & 512) == 512; | |
+ boolean hasDefault = (descriptor.getFlags() & 4096) != 4096; | |
+ boolean isNullable = (descriptor.getFlags() & 1) != 1; | |
+ boolean isOptional = !isPkey && (hasDefault || isNullable); | |
+ Column col = Column.editor() | |
+ .name(descriptor.getName()) | |
+ .position(pos) | |
+ .jdbcType(MysqlToJdbc.mysqlToJdbcType(descriptor.getType())) // TODO test | |
+ .type(descriptor.getTypeName()) | |
+ .charsetName(null) // TODO test (descriptor.getCharacterSet()) | |
+ .length(descriptor.getLength()) | |
+ .scale(descriptor.getScale()) | |
+ .optional(isOptional) // TODO test | |
+ .generated(false) // TODO test | |
+ .autoIncremented(isAutoIncr) | |
+ .create(); | |
+ columnDefs.add(col); | |
+ if (isPkey) { | |
+ pkNames.add(descriptor.getName()); | |
+ } | |
+ pos++; | |
+ } | |
+ tables.overwriteTable(id, columnDefs, pkNames, null); // TODO test null charset | |
+ schema = schemaBuilder.create(schemaPrefix, tables.forTable(id), filters.columnFilter(), filters.columnMappers()); | |
+ schema.setTableVersionId(metadata.getTableId()); | |
+ tableSchemaByTableId.put(id, schema); | |
+ return schema; | |
+ } | |
+ | |
/** | |
* Get the information about where the DDL statement history is recorded. | |
* | |
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java | |
index 9ea86f5..78e9576 100644 | |
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java | |
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java | |
@@ -173,7 +173,11 @@ public boolean isSnapshotAllowedWhenNeeded() { | |
} | |
public boolean isSnapshotNeverAllowed() { | |
- return snapshotMode() == SnapshotMode.NEVER; | |
+ return snapshotMode() == SnapshotMode.NEVER || snapshotMode() == SnapshotMode.TWITTER_PATCH; | |
+ } | |
+ | |
+ public boolean isSnapshotModeTwitterPatch() { | |
+ return snapshotMode() == SnapshotMode.TWITTER_PATCH; | |
} | |
public boolean isInitialSnapshotOnly() { | |
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MysqlToJdbc.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MysqlToJdbc.java | |
new file mode 100644 | |
index 0000000..2305566 | |
--- /dev/null | |
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MysqlToJdbc.java | |
@@ -0,0 +1,78 @@ | |
+package io.debezium.connector.mysql; | |
+ | |
+import java.sql.Types; | |
+ | |
+public class MysqlToJdbc { | |
+ | |
+ public static int mysqlToJdbcType(int mysqlType) { | |
+ // See https://github.com/percona/percona-server/blob/1e2f003a5bd48763c27e37542d97cd8f59d98eaa/libbinlogevents/export/binary_log_types.h#L38 | |
+ // See https://github.com/debezium/debezium/blob/v0.3.6/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlDdlParser.java#L75 | |
+ /* | |
+ MYSQL_TYPE_DECIMAL, // 0 | |
+ MYSQL_TYPE_TINY, // 1 | |
+ MYSQL_TYPE_SHORT, // 2 | |
+ MYSQL_TYPE_LONG, // 3 | |
+ MYSQL_TYPE_FLOAT, // 4 | |
+ MYSQL_TYPE_DOUBLE, // 5 | |
+ MYSQL_TYPE_NULL, // 6 | |
+ MYSQL_TYPE_TIMESTAMP, // 7 | |
+ MYSQL_TYPE_LONGLONG, // 8 | |
+ MYSQL_TYPE_INT24, // 9 | |
+ MYSQL_TYPE_DATE, // 10 | |
+ MYSQL_TYPE_TIME, // 11 | |
+ MYSQL_TYPE_DATETIME, // 12 | |
+ MYSQL_TYPE_YEAR, // 13 | |
+ MYSQL_TYPE_NEWDATE, // 14 | |
+ MYSQL_TYPE_VARCHAR, // 15 | |
+ MYSQL_TYPE_BIT, // 16 | |
+ MYSQL_TYPE_TIMESTAMP2, // 17 | |
+ MYSQL_TYPE_DATETIME2, // 18 | |
+ MYSQL_TYPE_TIME2, // 19 | |
+ MYSQL_TYPE_JSON=245, | |
+ MYSQL_TYPE_NEWDECIMAL=246, | |
+ MYSQL_TYPE_ENUM=247, | |
+ MYSQL_TYPE_SET=248, | |
+ MYSQL_TYPE_TINY_BLOB=249, | |
+ MYSQL_TYPE_MEDIUM_BLOB=250, | |
+ MYSQL_TYPE_LONG_BLOB=251, | |
+ MYSQL_TYPE_BLOB=252, | |
+ MYSQL_TYPE_VAR_STRING=253, | |
+ MYSQL_TYPE_STRING=254, | |
+ MYSQL_TYPE_GEOMETRY=255 | |
+ */ | |
+ switch (mysqlType) { | |
+ case 0: return Types.DECIMAL; | |
+ case 1: return Types.SMALLINT; | |
+ case 2: return Types.SMALLINT; | |
+ case 3: return Types.BIGINT; | |
+ case 4: return Types.FLOAT; | |
+ case 5: return Types.DOUBLE; | |
+ case 6: return Types.NULL; | |
+ case 7: return Types.TIMESTAMP; | |
+ case 8: return Types.BIGINT; | |
+ case 9: return Types.INTEGER; | |
+ case 10: return Types.DATE; | |
+ case 11: return Types.TIME; | |
+ case 12: return Types.TIMESTAMP; | |
+ case 13: return Types.INTEGER; | |
+ case 14: return Types.DATE; | |
+ case 15: return Types.VARCHAR; | |
+ case 16: return Types.BIT; | |
+ case 17: return Types.TIMESTAMP; | |
+ case 18: return Types.TIMESTAMP; | |
+ case 19: return Types.TIME; | |
+ case 245: return Types.OTHER; | |
+ case 246: return Types.DECIMAL; | |
+ case 247: return Types.CHAR; | |
+ case 248: return Types.CHAR; | |
+ case 249: return Types.BLOB; | |
+ case 250: return Types.BLOB; | |
+ case 251: return Types.BLOB; | |
+ case 252: return Types.BLOB; | |
+ case 253: return Types.VARCHAR; | |
+ case 254: return Types.CHAR; | |
+ case 255: return Types.OTHER; | |
+ default: return Types.OTHER; | |
+ } | |
+ } | |
+} | |
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java | |
index 0f5da85..e0cbeee 100644 | |
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java | |
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java | |
@@ -12,6 +12,7 @@ | |
import java.util.Set; | |
import java.util.concurrent.atomic.AtomicInteger; | |
+import com.github.shyiko.mysql.binlog.event.TableMetadataEventData; | |
import org.apache.kafka.connect.data.Schema; | |
import org.apache.kafka.connect.data.SchemaBuilder; | |
import org.apache.kafka.connect.data.Struct; | |
@@ -151,7 +152,7 @@ public void regenerate() { | |
Set<TableId> tableIds = schema.tables().tableIds(); | |
logger.debug("Regenerating converters for {} tables", tableIds.size()); | |
tableIds.forEach(id -> { | |
- assign(nextTableNumber.incrementAndGet(), id); | |
+ assign(nextTableNumber.incrementAndGet(), id, null); | |
}); | |
} | |
@@ -163,20 +164,27 @@ public void regenerate() { | |
* @return {@code true} if the assignment was successful, or {@code false} if the table is currently excluded in the | |
* connector's configuration | |
*/ | |
- public boolean assign(long tableNumber, TableId id) { | |
+ public boolean assign(long tableNumber, TableId id, TableMetadataEventData metadata) { | |
Long existingTableNumber = tableNumbersByTableId.get(id); | |
if (existingTableNumber != null && existingTableNumber.longValue() == tableNumber | |
&& convertersByTableNumber.containsKey(tableNumber)) { | |
// This is the exact same table number for the same table, so do nothing ... | |
return true; | |
} | |
- TableSchema tableSchema = schema.schemaFor(id); | |
+ | |
+ // Get schema from either registry or metadata event | |
+ TableSchema tableSchema; | |
+ if (metadata != null) { | |
+ tableSchema = schema.schemaFromMetadata(id, metadata); | |
+ } else { | |
+ tableSchema = schema.schemaFor(id); | |
+ } | |
if (tableSchema == null) return false; | |
String topicName = topicSelector.getTopic(id); | |
Envelope envelope = Envelope.defineSchema() | |
.withName(schemaNameValidator.validate(topicName + ".Envelope")) | |
- .withRecord(schema.schemaFor(id).valueSchema()) | |
+ .withRecord(tableSchema.valueSchema()) | |
.withSource(SourceInfo.SCHEMA) | |
.build(); | |
diff --git a/debezium-core/src/main/java/io/debezium/relational/TableSchema.java b/debezium-core/src/main/java/io/debezium/relational/TableSchema.java | |
index 778fd0d..79d555e 100644 | |
--- a/debezium-core/src/main/java/io/debezium/relational/TableSchema.java | |
+++ b/debezium-core/src/main/java/io/debezium/relational/TableSchema.java | |
@@ -51,6 +51,7 @@ | |
private final Schema valueSchema; | |
private final Function<Object[], Object> keyGenerator; | |
private final Function<Object[], Struct> valueGenerator; | |
+ private long tableVersionId = -1; | |
/** | |
* Create an instance with the specified {@link Schema}s for the keys and values, and the functions that generate the | |
@@ -110,7 +111,15 @@ public Object keyFromColumnData(Object[] columnData) { | |
public Struct valueFromColumnData(Object[] columnData) { | |
return columnData == null ? null : valueGenerator.apply(columnData); | |
} | |
- | |
+ | |
+ public void setTableVersionId(long id) { | |
+ tableVersionId = id; | |
+ } | |
+ | |
+ public long getTableVersionId() { | |
+ return tableVersionId; | |
+ } | |
+ | |
@Override | |
public int hashCode() { | |
return valueSchema().hashCode(); | |
diff --git a/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java b/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java | |
index 27345e4..00a7c96 100644 | |
--- a/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java | |
+++ b/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java | |
@@ -194,6 +194,7 @@ public TableSchema create(String schemaPrefix, Table table, Predicate<ColumnId> | |
} | |
} | |
} | |
+ | |
return result; | |
}; | |
} | |
diff --git a/pom.xml b/pom.xml | |
index 595be01..a777f33 100644 | |
--- a/pom.xml | |
+++ b/pom.xml | |
@@ -59,10 +59,10 @@ | |
<version.confluent.platform>3.2.0</version.confluent.platform> | |
<!-- Databases --> | |
- <version.postgresql.driver>42.0.0-SNAPSHOT</version.postgresql.driver> | |
- <version.mysql.server>5.7</version.mysql.server> | |
+ <version.postgresql.driver>42.0.0</version.postgresql.driver> | |
+ <version.mysql.server>5.6</version.mysql.server> | |
<version.mysql.driver>5.1.40</version.mysql.driver> | |
- <version.mysql.binlog>0.9.0</version.mysql.binlog> | |
+ <version.mysql.binlog>0.11.0</version.mysql.binlog> | |
<version.mongo.server>3.2.12</version.mongo.server> | |
<version.mongo.driver>3.4.2</version.mongo.driver> | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment