Skip to content

Instantly share code, notes, and snippets.

@adsr
Created January 23, 2018 18:27
Show Gist options
  • Select an option

  • Save adsr/ace7699b7a97b42046e739bf96f4d02f to your computer and use it in GitHub Desktop.

Select an option

Save adsr/ace7699b7a97b42046e739bf96f4d02f to your computer and use it in GitHub Desktop.
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java
index f3fc710..e9cb546 100644
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java
@@ -60,41 +60,43 @@ public synchronized void start(Map<String, String> props) {
throw new ConnectException("Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details");
}
// Create and start the task context ...
this.taskContext = new MySqlTaskContext(config);
PreviousContext prevLoggingContext = this.taskContext.configureLoggingContext("task");
try {
this.taskContext.start();
// Get the offsets for our partition ...
boolean startWithSnapshot = false;
boolean snapshotEventsAreInserts = true;
final SourceInfo source = taskContext.source();
Map<String, ?> offsets = context.offsetStorageReader().offset(taskContext.source().partition());
if (offsets != null) {
// Set the position in our source info ...
source.setOffset(offsets);
logger.info("Found existing offset: {}", offsets);
// Before anything else, recover the database history to the specified binlog coordinates ...
- taskContext.loadHistory(source);
+ if (!taskContext.isSnapshotModeTwitterPatch()) {
+ taskContext.loadHistory(source);
+ }
if (source.isSnapshotInEffect()) {
// The last offset was an incomplete snapshot that we cannot recover from...
if (taskContext.isSnapshotNeverAllowed()) {
// No snapshots are allowed
String msg = "The connector previously stopped while taking a snapshot, but now the connector is configured "
+ "to never allow snapshots. Reconfigure the connector to use snapshots initially or when needed.";
throw new ConnectException(msg);
}
// Otherwise, restart a new snapshot ...
startWithSnapshot = true;
logger.info("Prior execution was an incomplete snapshot, so starting new snapshot");
} else {
// No snapshot was in effect, so we should just start reading from the binlog ...
startWithSnapshot = false;
// But check to see if the server still has those binlog coordinates ...
if (!isBinlogAvailable()) {
if (!taskContext.isSnapshotAllowedWhenNeeded()) {
String msg = "The connector is trying to read binlog starting at " + source + ", but this is no longer "
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment