Created
January 23, 2018 18:27
-
-
Save adsr/ace7699b7a97b42046e739bf96f4d02f to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java | |
| index f3fc710..e9cb546 100644 | |
| --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java | |
| +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java | |
| @@ -60,41 +60,43 @@ public synchronized void start(Map<String, String> props) { | |
| throw new ConnectException("Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details"); | |
| } | |
| // Create and start the task context ... | |
| this.taskContext = new MySqlTaskContext(config); | |
| PreviousContext prevLoggingContext = this.taskContext.configureLoggingContext("task"); | |
| try { | |
| this.taskContext.start(); | |
| // Get the offsets for our partition ... | |
| boolean startWithSnapshot = false; | |
| boolean snapshotEventsAreInserts = true; | |
| final SourceInfo source = taskContext.source(); | |
| Map<String, ?> offsets = context.offsetStorageReader().offset(taskContext.source().partition()); | |
| if (offsets != null) { | |
| // Set the position in our source info ... | |
| source.setOffset(offsets); | |
| logger.info("Found existing offset: {}", offsets); | |
| // Before anything else, recover the database history to the specified binlog coordinates ... | |
| - taskContext.loadHistory(source); | |
| + if (!taskContext.isSnapshotModeTwitterPatch()) { | |
| + taskContext.loadHistory(source); | |
| + } | |
| if (source.isSnapshotInEffect()) { | |
| // The last offset was an incomplete snapshot that we cannot recover from... | |
| if (taskContext.isSnapshotNeverAllowed()) { | |
| // No snapshots are allowed | |
| String msg = "The connector previously stopped while taking a snapshot, but now the connector is configured " | |
| + "to never allow snapshots. Reconfigure the connector to use snapshots initially or when needed."; | |
| throw new ConnectException(msg); | |
| } | |
| // Otherwise, restart a new snapshot ... | |
| startWithSnapshot = true; | |
| logger.info("Prior execution was an incomplete snapshot, so starting new snapshot"); | |
| } else { | |
| // No snapshot was in effect, so we should just start reading from the binlog ... | |
| startWithSnapshot = false; | |
| // But check to see if the server still has those binlog coordinates ... | |
| if (!isBinlogAvailable()) { | |
| if (!taskContext.isSnapshotAllowedWhenNeeded()) { | |
| String msg = "The connector is trying to read binlog starting at " + source + ", but this is no longer " |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment