Skip to content

Instantly share code, notes, and snippets.

@dasl-
Created April 12, 2017 20:39
Show Gist options
  • Save dasl-/7ff171997b7dbc91ad4ea5abf0c2b31f to your computer and use it in GitHub Desktop.
Save dasl-/7ff171997b7dbc91ad4ea5abf0c2b31f to your computer and use it in GitHub Desktop.
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java
index 2e85cde..d9f4788 100644
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java
@@ -118,6 +118,12 @@ public Event nextEvent(ByteArrayInputStream inputStream) throws IOException {
if (event.getHeader().getEventType() == EventType.TABLE_MAP) {
TableMapEventData tableMapEvent = event.getData();
tableMapEventByTableId.put(tableMapEvent.getTableId(), tableMapEvent);
+ } else if (event.getHeader().getEventType() == EventType.TABLE_METADATA) {
+ TableMetadataEventData tableMetadataEvent = event.getData();
+ TableMapEventData tableMapEvent = tableMapEventByTableId.get(tableMetadataEvent.getTableId());
+ if (tableMapEvent != null) {
+ tableMapEvent.setMetadataEventData(tableMetadataEvent, 0);
+ }
}
return event;
}
@@ -154,7 +160,7 @@ protected void doStart() {
eventHandlers.put(EventType.HEARTBEAT, this::handleServerHeartbeat);
eventHandlers.put(EventType.INCIDENT, this::handleServerIncident);
eventHandlers.put(EventType.ROTATE, this::handleRotateLogsEvent);
- eventHandlers.put(EventType.TABLE_MAP, this::handleUpdateTableMetadata);
+ eventHandlers.put(EventType.TABLE_MAP, this::handleTableMap);
eventHandlers.put(EventType.QUERY, this::handleQueryEvent);
eventHandlers.put(EventType.WRITE_ROWS, this::handleInsert);
eventHandlers.put(EventType.UPDATE_ROWS, this::handleUpdate);
@@ -164,6 +170,9 @@ protected void doStart() {
eventHandlers.put(EventType.EXT_DELETE_ROWS, this::handleDelete);
eventHandlers.put(EventType.VIEW_CHANGE, this::viewChange);
eventHandlers.put(EventType.XA_PREPARE, this::prepareTransaction);
+ eventHandlers.put(EventType.TABLE_METADATA, this::handleTableMetadata);
+ eventHandlers.put(EventType.VIEW_CHANGE, this::viewChange);
+ eventHandlers.put(EventType.XA_PREPARE, this::prepareTransaction);
// Get the current GtidSet from MySQL so we can get a filtered/merged GtidSet based off of the last Debezium checkpoint.
String availableServerGtidStr = context.knownGtidSet();
@@ -479,16 +488,37 @@ protected void handleQueryEvent(Event event) throws InterruptedException {
*
* @param event the update event; never null
*/
- protected void handleUpdateTableMetadata(Event event) {
- TableMapEventData metadata = unwrapData(event);
- long tableNumber = metadata.getTableId();
- String databaseName = metadata.getDatabase();
- String tableName = metadata.getTable();
+ protected void handleTableMap(Event event) {
+ if (context.config().getBoolean(MySqlConnectorConfig.EXPECT_METADATA_EVENTS)) {
+ return;
+ }
+ handleTableUpdate(unwrapData(event), null);
+ }
+
+ protected void handleTableMetadata(Event event) {
+ if (!context.config().getBoolean(MySqlConnectorConfig.EXPECT_METADATA_EVENTS)) {
+ return;
+ }
+ TableMetadataEventData meta = unwrapData(event);
+ handleTableUpdate(meta.getTableMapEventData(), meta);
+ }
+
+ private void handleTableUpdate(TableMapEventData map, TableMetadataEventData meta) {
+ if (map == null) {
+ logger.debug("Skipping table update (missing TABLE_MAP): {}", meta);
+ return;
+ } else if (meta == null && context.config().getBoolean(MySqlConnectorConfig.EXPECT_METADATA_EVENTS)) {
+ logger.debug("Skipping table update (missing TABLE_METADATA): {}", map);
+ return;
+ }
+ long tableNumber = map.getTableId();
+ String databaseName = map.getDatabase();
+ String tableName = map.getTable();
TableId tableId = new TableId(databaseName, null, tableName);
- if (recordMakers.assign(tableNumber, tableId)) {
- logger.debug("Received update table metadata event: {}", event);
+ if (recordMakers.assign(tableNumber, tableId, meta)) {
+ logger.debug("Received table update map({}) meta({})", map, meta);
} else {
- logger.debug("Skipping update table metadata event: {}", event);
+ logger.debug("Skipped table update map({}) meta({})", map, meta);
}
}
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java
index def91ed..e98442f 100644
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java
@@ -185,7 +185,17 @@ public static DecimalHandlingMode parse(String value, String defaultValue) {
/**
* Perform a snapshot and then stop before attempting to read the binlog.
*/
- INITIAL_ONLY("initial_only");
+ INITIAL_ONLY("initial_only"),
+
+ /**
+ * 1) Never performs a snapshot (gets schema info from twitter patch)
+ * 2) The first time Debezium connects in this mode, it sets Debezium's executed GTID set to be the same as
+ * that of MySQL server it is connecting to. This is necessary to allow Debezium to reconnect later.
+ * 3) The first time Debezium connects in this mode, it starts reading the binlog at the GTID corresponding to
+ * the last GTID in the executed GTID set we marked in step 2 (i.e. starts reading at the binlog's current
+ * position).
+ */
+ TWITTER_PATCH("twitter_patch");
private final String value;
@@ -614,6 +624,14 @@ public static SecureConnectionMode parse(String value, String defaultValue) {
+ "The default is 'true'. This is independent of how the connector internally records database history.")
.withDefault(true);
+ public static final Field EXPECT_METADATA_EVENTS = Field.create("database.mysql.expect_metadata_events")
+ .withDisplayName("Expect metadata events")
+ .withType(Type.BOOLEAN)
+ .withWidth(Width.SHORT)
+ .withImportance(Importance.MEDIUM)
+ .withDescription("When enabled, Debezium will track schemas based on TABLE_METADATA events")
+ .withDefault(false);
+
public static final Field SNAPSHOT_MODE = Field.create("snapshot.mode")
.withDisplayName("Snapshot mode")
.withEnum(SnapshotMode.class, SnapshotMode.INITIAL)
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java
index 9482e24..2bbf988 100644
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java
@@ -108,19 +108,43 @@ public synchronized void start(Map<String, String> props) {
} else {
// We have no recorded offsets ...
if (taskContext.isSnapshotNeverAllowed()) {
- // We're not allowed to take a snapshot, so instead we have to assume that the binlog contains the
- // full history of the database.
- logger.info("Found no existing offset and snapshots disallowed, so starting at beginning of binlog");
- source.setBinlogStartPoint("", 0L);// start from the beginning of the binlog
- taskContext.initializeHistory();
-
- // Look to see what the first available binlog file is called, and whether it looks like binlog files have
- // been purged. If so, then output a warning ...
- String earliestBinlogFilename = earliestBinlogFilename();
- if (earliestBinlogFilename == null) {
- logger.warn("No binlog appears to be available. Ensure that the MySQL row-level binlog is enabled.");
- } else if (!earliestBinlogFilename.endsWith("00001")) {
- logger.warn("It is possible the server has purged some binlogs. If this is the case, then using snapshot mode may be required.");
+ if (taskContext.isSnapshotModeTwitterPatch()) {
+ logger.info("Found no existing offset and we're in SnapshotMode.TWITTER_PATCH.");
+ taskContext.jdbc().query("SHOW MASTER STATUS", rs -> {
+ if (rs.next()) {
+ String binlogFilename = rs.getString(1);
+ long binlogPosition = rs.getLong(2);
+ source.setBinlogStartPoint(binlogFilename, binlogPosition);
+ if (rs.getMetaData().getColumnCount() > 4) {
+ // This column exists only in MySQL 5.6.5 or later ...
+ String gtidSet = rs.getString(5);// GTID set, may be null, blank, or contain a GTID set
+ source.setCompletedGtidSet(gtidSet);
+ logger.info("\t using binlog '{}' at position '{}' and gtid '{}'", binlogFilename, binlogPosition,
+ gtidSet);
+ } else {
+ logger.info("\t using binlog '{}' at position '{}'", binlogFilename, binlogPosition);
+ }
+ } else {
+ throw new IllegalStateException("Cannot read the binlog filename and position via 'SHOW MASTER STATUS'."
+ + " Make sure your server is correctly configured");
+ }
+ });
+ taskContext.initializeHistory();
+ } else { // We must be in SnapshotMode.NEVER
+ // We're not allowed to take a snapshot, so instead we have to assume that the binlog contains the
+ // full history of the database.
+ logger.info("Found no existing offset and snapshots disallowed, so starting at beginning of binlog");
+ source.setBinlogStartPoint("", 0L);// start from the beginning of the binlog
+ taskContext.initializeHistory();
+
+ // Look to see what the first available binlog file is called, and whether it looks like binlog files have
+ // been purged. If so, then output a warning ...
+ String earliestBinlogFilename = earliestBinlogFilename();
+ if (earliestBinlogFilename == null) {
+ logger.warn("No binlog appears to be available. Ensure that the MySQL row-level binlog is enabled.");
+ } else if (!earliestBinlogFilename.endsWith("00001")) {
+ logger.warn("It is possible the server has purged some binlogs. If this is the case, then using snapshot mode may be required.");
+ }
}
} else {
// We are allowed to use snapshots, and that is the best way to start ...
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java
index 990bf1e..32a0a7c 100644
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java
@@ -6,12 +6,17 @@
package io.debezium.connector.mysql;
import java.sql.SQLException;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.function.Predicate;
+import com.github.shyiko.mysql.binlog.event.ColumnDescriptor;
+import com.github.shyiko.mysql.binlog.event.TableMetadataEventData;
+
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
@@ -25,6 +30,7 @@
import io.debezium.document.Document;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.jdbc.JdbcValueConverters.DecimalMode;
+import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.TableSchema;
@@ -196,6 +202,51 @@ public TableSchema schemaFor(TableId id) {
return filters.tableFilter().test(id) ? tableSchemaByTableId.get(id) : null;
}
+ public TableSchema schemaFromMetadata(TableId id, TableMetadataEventData metadata) {
+ if (!filters.tableFilter().test(id)) {
+ return null;
+ }
+ TableSchema schema = tableSchemaByTableId.get(id);
+
+ if (schema != null && schema.getTableVersionId() == metadata.getTableId()) {
+ return schema;
+ }
+
+ List<Column> columnDefs = new ArrayList<Column>();
+ List<String> pkNames = new ArrayList<String>();
+ int pos = 1;
+ for (ColumnDescriptor descriptor : metadata.getColumnDescriptors()) {
+ // See https://github.com/percona/percona-server/blob/5.6/include/mysql_com.h#L95 for bitmasks
+ boolean isPkey = (descriptor.getFlags() & 2) == 2;
+ boolean isAutoIncr = (descriptor.getFlags() & 512) == 512;
+ boolean hasDefault = (descriptor.getFlags() & 4096) != 4096;
+ boolean isNullable = (descriptor.getFlags() & 1) != 1;
+ boolean isOptional = !isPkey && (hasDefault || isNullable);
+ Column col = Column.editor()
+ .name(descriptor.getName())
+ .position(pos)
+ .jdbcType(MysqlToJdbc.mysqlToJdbcType(descriptor.getType())) // TODO test
+ .type(descriptor.getTypeName())
+ .charsetName(null) // TODO test (descriptor.getCharacterSet())
+ .length(descriptor.getLength())
+ .scale(descriptor.getScale())
+ .optional(isOptional) // TODO test
+ .generated(false) // TODO test
+ .autoIncremented(isAutoIncr)
+ .create();
+ columnDefs.add(col);
+ if (isPkey) {
+ pkNames.add(descriptor.getName());
+ }
+ pos++;
+ }
+ tables.overwriteTable(id, columnDefs, pkNames, null); // TODO test null charset
+ schema = schemaBuilder.create(schemaPrefix, tables.forTable(id), filters.columnFilter(), filters.columnMappers());
+ schema.setTableVersionId(metadata.getTableId());
+ tableSchemaByTableId.put(id, schema);
+ return schema;
+ }
+
/**
* Get the information about where the DDL statement history is recorded.
*
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java
index 9ea86f5..78e9576 100644
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java
@@ -173,7 +173,11 @@ public boolean isSnapshotAllowedWhenNeeded() {
}
public boolean isSnapshotNeverAllowed() {
- return snapshotMode() == SnapshotMode.NEVER;
+ return snapshotMode() == SnapshotMode.NEVER || snapshotMode() == SnapshotMode.TWITTER_PATCH;
+ }
+
+ public boolean isSnapshotModeTwitterPatch() {
+ return snapshotMode() == SnapshotMode.TWITTER_PATCH;
}
public boolean isInitialSnapshotOnly() {
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MysqlToJdbc.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MysqlToJdbc.java
new file mode 100644
index 0000000..2305566
--- /dev/null
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MysqlToJdbc.java
@@ -0,0 +1,78 @@
+package io.debezium.connector.mysql;
+
+import java.sql.Types;
+
+public class MysqlToJdbc {
+
+ public static int mysqlToJdbcType(int mysqlType) {
+ // See https://github.com/percona/percona-server/blob/1e2f003a5bd48763c27e37542d97cd8f59d98eaa/libbinlogevents/export/binary_log_types.h#L38
+ // See https://github.com/debezium/debezium/blob/v0.3.6/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlDdlParser.java#L75
+ /*
+ MYSQL_TYPE_DECIMAL, // 0
+ MYSQL_TYPE_TINY, // 1
+ MYSQL_TYPE_SHORT, // 2
+ MYSQL_TYPE_LONG, // 3
+ MYSQL_TYPE_FLOAT, // 4
+ MYSQL_TYPE_DOUBLE, // 5
+ MYSQL_TYPE_NULL, // 6
+ MYSQL_TYPE_TIMESTAMP, // 7
+ MYSQL_TYPE_LONGLONG, // 8
+ MYSQL_TYPE_INT24, // 9
+ MYSQL_TYPE_DATE, // 10
+ MYSQL_TYPE_TIME, // 11
+ MYSQL_TYPE_DATETIME, // 12
+ MYSQL_TYPE_YEAR, // 13
+ MYSQL_TYPE_NEWDATE, // 14
+ MYSQL_TYPE_VARCHAR, // 15
+ MYSQL_TYPE_BIT, // 16
+ MYSQL_TYPE_TIMESTAMP2, // 17
+ MYSQL_TYPE_DATETIME2, // 18
+ MYSQL_TYPE_TIME2, // 19
+ MYSQL_TYPE_JSON=245,
+ MYSQL_TYPE_NEWDECIMAL=246,
+ MYSQL_TYPE_ENUM=247,
+ MYSQL_TYPE_SET=248,
+ MYSQL_TYPE_TINY_BLOB=249,
+ MYSQL_TYPE_MEDIUM_BLOB=250,
+ MYSQL_TYPE_LONG_BLOB=251,
+ MYSQL_TYPE_BLOB=252,
+ MYSQL_TYPE_VAR_STRING=253,
+ MYSQL_TYPE_STRING=254,
+ MYSQL_TYPE_GEOMETRY=255
+ */
+ switch (mysqlType) {
+ case 0: return Types.DECIMAL;
+ case 1: return Types.SMALLINT;
+ case 2: return Types.SMALLINT;
+ case 3: return Types.BIGINT;
+ case 4: return Types.FLOAT;
+ case 5: return Types.DOUBLE;
+ case 6: return Types.NULL;
+ case 7: return Types.TIMESTAMP;
+ case 8: return Types.BIGINT;
+ case 9: return Types.INTEGER;
+ case 10: return Types.DATE;
+ case 11: return Types.TIME;
+ case 12: return Types.TIMESTAMP;
+ case 13: return Types.INTEGER;
+ case 14: return Types.DATE;
+ case 15: return Types.VARCHAR;
+ case 16: return Types.BIT;
+ case 17: return Types.TIMESTAMP;
+ case 18: return Types.TIMESTAMP;
+ case 19: return Types.TIME;
+ case 245: return Types.OTHER;
+ case 246: return Types.DECIMAL;
+ case 247: return Types.CHAR;
+ case 248: return Types.CHAR;
+ case 249: return Types.BLOB;
+ case 250: return Types.BLOB;
+ case 251: return Types.BLOB;
+ case 252: return Types.BLOB;
+ case 253: return Types.VARCHAR;
+ case 254: return Types.CHAR;
+ case 255: return Types.OTHER;
+ default: return Types.OTHER;
+ }
+ }
+}
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java
index 0f5da85..e0cbeee 100644
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java
@@ -12,6 +12,7 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
+import com.github.shyiko.mysql.binlog.event.TableMetadataEventData;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
@@ -151,7 +152,7 @@ public void regenerate() {
Set<TableId> tableIds = schema.tables().tableIds();
logger.debug("Regenerating converters for {} tables", tableIds.size());
tableIds.forEach(id -> {
- assign(nextTableNumber.incrementAndGet(), id);
+ assign(nextTableNumber.incrementAndGet(), id, null);
});
}
@@ -163,20 +164,27 @@ public void regenerate() {
* @return {@code true} if the assignment was successful, or {@code false} if the table is currently excluded in the
* connector's configuration
*/
- public boolean assign(long tableNumber, TableId id) {
+ public boolean assign(long tableNumber, TableId id, TableMetadataEventData metadata) {
Long existingTableNumber = tableNumbersByTableId.get(id);
if (existingTableNumber != null && existingTableNumber.longValue() == tableNumber
&& convertersByTableNumber.containsKey(tableNumber)) {
// This is the exact same table number for the same table, so do nothing ...
return true;
}
- TableSchema tableSchema = schema.schemaFor(id);
+
+ // Get schema from either registry or metadata event
+ TableSchema tableSchema;
+ if (metadata != null) {
+ tableSchema = schema.schemaFromMetadata(id, metadata);
+ } else {
+ tableSchema = schema.schemaFor(id);
+ }
if (tableSchema == null) return false;
String topicName = topicSelector.getTopic(id);
Envelope envelope = Envelope.defineSchema()
.withName(schemaNameValidator.validate(topicName + ".Envelope"))
- .withRecord(schema.schemaFor(id).valueSchema())
+ .withRecord(tableSchema.valueSchema())
.withSource(SourceInfo.SCHEMA)
.build();
diff --git a/debezium-core/src/main/java/io/debezium/relational/TableSchema.java b/debezium-core/src/main/java/io/debezium/relational/TableSchema.java
index 778fd0d..79d555e 100644
--- a/debezium-core/src/main/java/io/debezium/relational/TableSchema.java
+++ b/debezium-core/src/main/java/io/debezium/relational/TableSchema.java
@@ -51,6 +51,7 @@
private final Schema valueSchema;
private final Function<Object[], Object> keyGenerator;
private final Function<Object[], Struct> valueGenerator;
+ private long tableVersionId = -1;
/**
* Create an instance with the specified {@link Schema}s for the keys and values, and the functions that generate the
@@ -110,7 +111,15 @@ public Object keyFromColumnData(Object[] columnData) {
public Struct valueFromColumnData(Object[] columnData) {
return columnData == null ? null : valueGenerator.apply(columnData);
}
-
+
+ public void setTableVersionId(long id) {
+ tableVersionId = id;
+ }
+
+ public long getTableVersionId() {
+ return tableVersionId;
+ }
+
@Override
public int hashCode() {
return valueSchema().hashCode();
diff --git a/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java b/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java
index 27345e4..00a7c96 100644
--- a/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java
+++ b/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java
@@ -194,6 +194,7 @@ public TableSchema create(String schemaPrefix, Table table, Predicate<ColumnId>
}
}
}
+
return result;
};
}
diff --git a/pom.xml b/pom.xml
index 595be01..a777f33 100644
--- a/pom.xml
+++ b/pom.xml
@@ -59,10 +59,10 @@
<version.confluent.platform>3.2.0</version.confluent.platform>
<!-- Databases -->
- <version.postgresql.driver>42.0.0-SNAPSHOT</version.postgresql.driver>
- <version.mysql.server>5.7</version.mysql.server>
+ <version.postgresql.driver>42.0.0</version.postgresql.driver>
+ <version.mysql.server>5.6</version.mysql.server>
<version.mysql.driver>5.1.40</version.mysql.driver>
- <version.mysql.binlog>0.9.0</version.mysql.binlog>
+ <version.mysql.binlog>0.11.0</version.mysql.binlog>
<version.mongo.server>3.2.12</version.mongo.server>
<version.mongo.driver>3.4.2</version.mongo.driver>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment